吴忠躺衫网络科技有限公司

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

延遲隊列的實現方式

科技綠洲 ? 來源:Java技術指北 ? 作者:Java技術指北 ? 2023-09-30 11:17 ? 次閱讀

延遲任務

最近有一個需求,基于消息隊列對數據消費,并根據多次消費的結果對數據進行重新組裝,如果在指定時間內,需要的數據全部到達,則進行數據組裝以及后續邏輯。簡單的說,設置一個超時時間,如果在該時間內由MQ中消費到完整的數據則直接處理,否則進入其他流程。

針對這種場景使用了延遲任務來實現,以此為契機對延遲任務相關的技術做了個簡單了解...

簡介

延遲任務是一種指定任務在未來某個時間點或一定時間后執行的方式。通常情況下,延遲任務可以通過設置任務的執行時間或延遲時間來實現。

延遲任務可以用于異步操作、定時任務和任務調度等場景。例如,在用戶注冊后發送歡迎郵件或者在用戶下單后發送訂單確認短信,可以通過延遲任務來實現異步操作。定時檢查服務器狀態、定時備份數據等任務,也可以通過延遲任務來實現定時任務。在某個時間點觸發某個任務、在某個時間段內重復執行某個任務等,可以通過延遲任務來實現任務調度。

延遲任務通常使用隊列或者定時器來實現。在隊列中,任務會被添加到一個等待隊列中,等待隊列中的任務會在指定的時間點或延遲時間后被取出執行。在定時器中,任務會被添加到一個定時器中,定時器會在指定的時間點觸發任務執行。

總之,延遲任務是一種非常實用的技術,可以幫助我們更好地管理系統中的異步操作、定時任務和任務調度等場景。

使用場景

異步操作:延遲任務可以用于異步操作,例如在用戶注冊后發送歡迎郵件或者在用戶下單后發送訂單確認短信。通過使用延遲任務,可以將這些操作推遲到后臺處理,從而提高系統的響應速度和并發能力。

定時任務:延遲任務可以用于定時任務,例如定時檢查服務器狀態、定時備份數據等。通過使用延遲任務,可以在指定的時間點自動觸發任務,避免手動操作的繁瑣和容易出錯。

任務調度:延遲任務可以用于任務調度,例如在某個時間點觸發某個任務、在某個時間段內重復執行某個任務等。通過使用延遲任務,可以方便地進行任務調度,提高系統的可靠性和穩定性。

技術實現

  • Timer 基于java基礎類庫java.util.Timer實現
  • DelayQueue
    基于延時隊列實現
  1. 基于內存,應用重啟(或宕機)會導致任務丟失
  2. 基于內存存放隊列,不支持集群
  3. 依據compareTo方法排列隊列,調用take阻塞式的取出第一個任務(不調用則不取出),比較不靈活,會影響時間的準確性
  • ScheduledThreadPoolExecutor
    1. 基于內存,應用重啟(或宕機)會導致任務丟失
    2. 基于內存存放任務,不支持集群
    3. 一個任務就要新建一個線程綁定任務的執行,容易造成資源浪費
  • Redis過期監聽 基于Redis過期訂閱
    1. 客戶端斷開后重連會導致所有事件丟失
    2. 高并發場景下,存在大量的失效key場景會導出失效時間存在延遲
    3. 若有多個監聽器監聽該key,是會重復消費這個過期事件的,需要特定邏輯判斷
  • MQ延遲隊列 基于消息死信隊列實現 支持集群,分布式,高并發場景;缺點:引入額外的消息隊列,增加項目的部署和維護的復雜度。
  • HashedWheelTimer 基于Netty提供的工具類HashedWheelTimer HashedWheelTimer 是使用定時輪實現的,定時輪其實就是一種環型的數據結構,可以把它想象成一個時鐘, 分成了許多格子,每個格子代表一定的時間,在這個格子上用一個鏈表來保存要執行的超時任務,同時有一個指針一格一格的走,走到那個格子時就執行格子對應的延遲任務,

其中前三種Timer、DelayQueue、ScheduledThreadPoolExecutor實現比較簡單,只不過只適用于單體應用,任務數據都在內存中,在系統崩潰后數據丟失;后兩張實現相對復雜,并且需要依賴于第三方應用,在系統整體結構上更加復雜且消耗更多資源,但能支持分布式系統,且有較高的容錯性。

示例

定義延遲任務對象:

@Getter
public class DelayTask implements Serializable{

    private static final long serialVersionUID = -5062977578344039366L;
    
    private long delaySeconds;
    private TaskExecute taskExecute;

    public DelayTask(long delaySeconds, TaskExecute taskExecute) {
        this.delaySeconds = delaySeconds;
        this.taskExecute = taskExecute;
    }

    /**
     *
     */
    public void execute(){
        taskExecute.run();
    }

    public interface TaskExecute extends Runnable, Serializable {

    }
}

調度器:

public interface ScheduleTrigger {

    /**
     * 延遲任務調度
     * @param delayTask
     */
    void schedule(DelayTask delayTask);
}
  1. Timer
public class JavaTrigger implements ScheduleTrigger{

    private Timer timer;

    public JavaTimer(){
        this.timer = new Timer();
    }
    
    /**
     *
     * @param delayTask
     */
    public void schedule(DelayTask delayTask){
        timer.schedule(buildTimerTask(delayTask.getTaskExecute()), toMillis(delayTask.getDelaySeconds()));
    }

    private TimerTask buildTimerTask(Runnable runnable){
        return new TimerTask() {
            @Override
            public void run() {
                runnable.run();
            }
        };
    }

}
  1. DelayQueue
public class DelayQueueTrigger implements ScheduleTrigger{

    private DelayQueue< Task > queue = new DelayQueue<  >();

    public DelayQueueTrigger() {
        Thread thread = new Thread(() - > {
            while (true) {
                try {
                    Task task = queue.take();
                    if(task != null)
                        task.execute();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    /**
     * @param delayTask
     */
    public void schedule(DelayTask delayTask){
        if( delayTask instanceof Task ){
            queue.put((Task) delayTask);
        }
    }

}

class Task extends DelayTask implements Delayed{

    private long execTime;

    public Task(long delaySeconds, TaskExecute taskExecute) {
        super(delaySeconds, taskExecute);
        this.execTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
    }

    /**
     * 輪詢執行該方法判斷是否滿足執行條件(<=0)
     * 同時該返回作為等待時長
     * @param unit the time unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return this.execTime - System.currentTimeMillis(); // ms
    }

    public long getExecTime() {
        return execTime;
    }

    @Override
    public int compareTo(Delayed other) {
        if(this.getExecTime() == ((Task)other).getExecTime()){
            return 0;
        }
        return this.getExecTime() > ((Task)other).getExecTime() ? 1: -1;
    }
}
  1. ScheduledThreadPoolExecutor
    ScheduledThreadPoolExecutor實現也是基于延遲隊列BlockingQueue實現
public class ScheduledExecutorTrigger implements ScheduleTrigger{

    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
    
    public void schedule(DelayTask delayTask){
        executorService.schedule(delayTask.getTaskExecute(), delayTask.getDelaySeconds(), TimeUnit.SECONDS);
    }

}
  1. Redis過期監聽
    需要修改redis配置文件:notify-keyspace-events Ex
public class RedisTimer{

    private static final String EXPIRATION_KEY = "REDIS_EXPIRATION_KEY";

    @Configuration
    @Import(RedisAutoConfiguration.class)
    public static class Config{

        @Bean(name = "redisTemplate")
        public RedisTemplate< Object, Object > redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate< Object, Object > template = new RedisTemplate<  >();
            RedisSerializer< String > keySerializer = new StringRedisSerializer();
            RedisSerializer< Object > valueSerializer = new ObjectRedisSerializer();
            template.setConnectionFactory(factory);
            template.setKeySerializer(keySerializer);
            template.setValueSerializer(valueSerializer);
            return template;
        }

        /**
         * 消息監聽器容器bean
         * @param connectionFactory
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            return container;
        }

        @Bean
        public RedisKeyExpirationListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer){
            RedisKeyExpirationListener redisKeyExpirationListener = new RedisKeyExpirationListener(redisMessageListenerContainer);
            redisKeyExpirationListener.setContext(context());
            return redisKeyExpirationListener;
        }

        @Bean
        public Context context(){
            return new Context();
        }

        @Bean
        public RedisTrigger redisTrigger(RedisTemplate redisTemplate){
            return new RedisTrigger(redisTemplate, context());
        }


        class ObjectRedisSerializer implements RedisSerializer{

            @Override
            public byte[] serialize(Object o) throws SerializationException {
                return SerializeUtils.serialize(o);
            }

            @Override
            public Object deserialize(byte[] bytes) throws SerializationException {
                return SerializeUtils.deserialize(bytes);
            }
        }
    }

    public static class RedisTrigger implements ScheduleTrigger{

        private RedisTemplate redisTemplate;
        private Context context;

        public RedisTrigger(RedisTemplate redisTemplate, Context context){
            this.redisTemplate = redisTemplate;
            this.context = context;
        }
        
        public void schedule(DelayTask delayTask){
            context.put(EXPIRATION_KEY, delayTask);
            redisTemplate.opsForValue().set(EXPIRATION_KEY, delayTask, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
        }
    }

    @Slf4j
    public static class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

        private Context context;

        public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }

        /**
         * 這里沒法拿到過期值
         * @param message never {@literal null}.
         */
        @SneakyThrows
        @Override
        public void doHandleMessage(Message message) {
            try {
                String topic = new String(message.getChannel(), "utf-8");
                String key = new String(message.getBody(), "utf-8");
                if (EXPIRATION_KEY.equals(key)) {
                    Object object = context.get(EXPIRATION_KEY);
                    if( object instanceof DelayTask ){
                        log.info("redis key[{}] 過期回調", key);
                        ((DelayTask) object).execute();
                    }
                }
            } catch (Exception e) {
                log.error("處理Redis延遲任務異常:{}", e.getMessage() ,e);
            }
        }

        public void setContext(Context context) {
            this.context = context;
        }
    }

    public static class Context{
        private Map< String,Object > context = new ConcurrentHashMap<  >();

        public void put(String key, Object value){
            context.put(key, value);
        }

        public Object get(String key){
            return context.get(key);
        }
    }
}
  1. MQ延遲隊列
    這里MQ選擇的是RabbitMq,要知道在RabbitMq中是沒有延遲隊列的,但可以通過延遲消息插件rabbitmq_delayed_message_exchange實現,另外一種是基于死信來實現。

什么時候消息進入死信?

  • 1)消息消費方調用了basicNack() 或 basicReject(),并且參數都是 requeue = false,則消息會路由進死信隊列
  • 2)消息消費過期,過了TTL(消息、或隊列設置超時時間) 存活時間,就是消費方在 TTL 時間之內沒有消費,則消息會路由進死信隊列
  • 3)隊列設置了x-max-length 最大消息數量且當前隊列中的消息已經達到了這個數量,再次投遞,消息將被擠掉,被擠掉的消息會路由進死信隊列
public class RabbitTimer{

    @Configuration
    @Import(RabbitAutoConfiguration.class)
    public static class Config{

        static final String TTL_EXCHANGE_FOR_SCHEDULE = "TTL_EXCHANGE_FOR_SCHEDULE";
        static final String TTL_QUEUE_FOR_SCHEDULE = "TTL_QUEUE_FOR_SCHEDULE";
        static final String TTL_ROUTING_KEY_FOR_SCHEDULE = "TTL_ROUTING_KEY_FOR_SCHEDULE";
        static final String COMMON_QUEUE_FOR_SCHEDULE = "COMMON_QUEUE_FOR_SCHEDULE";

        @Bean
        public Queue ttlQueue(){
            return QueueBuilder.durable(TTL_QUEUE_FOR_SCHEDULE).build();
        }

        @Bean
        public Exchange ttlExchange(){
            return ExchangeBuilder.directExchange(TTL_EXCHANGE_FOR_SCHEDULE).build();
        }

        @Bean
        public Binding ttlBinding(){
            return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(TTL_ROUTING_KEY_FOR_SCHEDULE).noargs();
        }

        @Bean
        public Queue commonQueue(){
            return QueueBuilder.durable(COMMON_QUEUE_FOR_SCHEDULE)
                    .deadLetterExchange(TTL_EXCHANGE_FOR_SCHEDULE)
                    .deadLetterRoutingKey(TTL_ROUTING_KEY_FOR_SCHEDULE)
                    .build();
        }

        @Bean
        public TtlMessageConsumer ttlMessageConsumer(){
            return new TtlMessageConsumer();
        }
        
        @Bean
        public RabbitTrigger rabbitTrigger(RabbitTemplate rabbitTemplate){
            return new RabbitTrigger(rabbitTemplate);
        }
    }

    @Slf4j
    @RabbitListener(queues=TTL_QUEUE_FOR_SCHEDULE)
    public static class TtlMessageConsumer{

        @RabbitHandler
        public void handle(byte [] message){
            Object deserialize = SerializeUtils.deserialize(message);
            if( deserialize instanceof DelayTask ){
                ((DelayTask) deserialize).execute();
            }
        }

    }
    
    public static class RabbitTrigger implements ScheduleTrigger{

        @Autowired
        private RabbitTemplate rabbitTemplate;

        public RabbitTrigger(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
        
        public void schedule(DelayTask delayTask){
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration( String.valueOf(TimeUnit.SECONDS.toMillis(delayTask.getDelaySeconds())));
            Message message = new Message(SerializeUtils.serialize(delayTask), messageProperties);
            rabbitTemplate.send(COMMON_QUEUE_FOR_SCHEDULE, message);
        }

    }

}
  1. HashedWheelTimer
public class NettyTrigger implements ScheduleTrigger {

    HashedWheelTimer timer = new HashedWheelTimer(200,
            TimeUnit.MILLISECONDS,
            100); // 時間輪中的槽數

    /**
     *
     */
    @Override
    public void schedule(DelayTask delayTask){
        TimerTask task = timeout - > delayTask.execute();
        //
        timer.newTimeout(task, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
    }

}

測試:

ScheduleTrigger.schedule(DelayTask delayTask);

結束語

通過幾個簡單的示例了解延遲隊列的實現方式,可以根據實際業務場景以及應用架構做出合理的選擇。

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 數據
    +關注

    關注

    8

    文章

    7139

    瀏覽量

    89576
  • 服務器
    +關注

    關注

    12

    文章

    9303

    瀏覽量

    86061
  • 內存
    +關注

    關注

    8

    文章

    3055

    瀏覽量

    74327
  • 定時器
    +關注

    關注

    23

    文章

    3255

    瀏覽量

    115368
  • 延遲
    +關注

    關注

    1

    文章

    70

    瀏覽量

    13583
收藏 人收藏

    評論

    相關推薦

    利用CAS技術實現無鎖隊列

    【 導讀 】:本文 主要講解利用CAS技術實現無鎖隊列。 關于無鎖隊列實現,網上有很多文章,雖然本文可能和那些文章有所重復,但是我還是想以我自己的
    的頭像 發表于 01-11 10:52 ?2343次閱讀
    利用CAS技術<b class='flag-5'>實現</b>無鎖<b class='flag-5'>隊列</b>

    深度解析數據結構與算法篇之隊列及環形隊列實現

    的位置。 02 — 環形隊列實現 要想將元素放入隊列我們必須知道對頭和隊尾,在隊列長度不能無限大的條件下我們還要知道隊列的最大容量,我們還
    的頭像 發表于 06-18 10:07 ?2001次閱讀

    TencentOS-tiny中環形隊列實現

    ; 隊尾指針(可變):永遠指向此隊列的最后一個數據元素; 隊列中的數據存儲方式有兩種: ① 基于靜態連續內存(數組)存儲,如圖:② 基于動態內存(鏈表節點)存儲,如圖: ? 后續都使用基于靜態內存存儲的
    的頭像 發表于 10-08 16:30 ?1422次閱讀

    QueueForMcu 基于單片機實現隊列功能模塊

    QueueForMcu基于單片機實現隊列功能模塊,主要用于8位、16位、32位非運行RTOS的單片機應用,兼容大多數單片機平臺。一、特性動態創建隊列對象動態設置隊列數據緩沖區靜態指定
    發表于 12-31 19:35 ?1次下載
    QueueForMcu 基于單片機<b class='flag-5'>實現</b>的<b class='flag-5'>隊列</b>功能模塊

    RTOS消息隊列的多種用途

      消息隊列可以以多種不同的方式使用。事實上,您可以編寫可能只使用消息隊列的相當復雜的應用程序。僅使用消息隊列可以減少代碼的大?。凑加每臻g),因為可以模擬許多其他服務(信號量、時間
    的頭像 發表于 06-29 14:57 ?2604次閱讀
    RTOS消息<b class='flag-5'>隊列</b>的多種用途

    實現一個雙端隊列的步驟簡析

    隊列是非?;A且重要的數據結構,雙端隊列屬于隊列的升級。很多的算法都是基于隊列實現,例如搜索中的bfs,圖論中的spfa,計算幾何中的me
    的頭像 發表于 10-27 18:11 ?1489次閱讀

    什么是消息隊列?消息隊列中間件重要嗎?

    應用解耦:消息隊列減少了服務之間的耦合性,不同的服務可以通過消息隊列進行通信,而不用關心彼此的實現細節。
    的頭像 發表于 11-07 14:55 ?1471次閱讀

    如何用Redis實現延遲隊列呢?

    前段時間有個小項目需要使用延遲任務,談到延遲任務,我腦子第一時間一閃而過的就是使用消息隊列來做,比如RabbitMQ的死信隊列又或者RocketMQ的
    的頭像 發表于 03-16 14:28 ?707次閱讀

    一種異步延遲隊列實現方式調研

    目前系統中有很多需要用到延時處理的功能:支付超時取消、排隊超時、短信、微信等提醒延遲發送、token刷新、會員卡過期等等。
    的頭像 發表于 03-31 10:10 ?658次閱讀

    嵌入式環形隊列和消息隊列實現

    嵌入式環形隊列和消息隊列實現數據緩存和通信的常見數據結構,廣泛應用于嵌入式系統中的通信協議和領域。
    的頭像 發表于 04-14 11:52 ?1623次閱讀

    嵌入式環形隊列和消息隊列是如何去實現的?

    嵌入式環形隊列和消息隊列實現數據緩存和通信的常見數據結構,廣泛應用于嵌入式系統中的通信協議和領域。
    發表于 05-20 14:55 ?1180次閱讀

    單片機消息隊列實現原理和機制

    單片機開發過程中通常會用到“消息隊列”,一般實現的方法有多種。 本文給大家分享一下隊列實現的原理和機制。
    的頭像 發表于 05-26 09:50 ?1643次閱讀
    單片機消息<b class='flag-5'>隊列</b>的<b class='flag-5'>實現</b>原理和機制

    RTOS消息隊列的應用

    基于RTOS的應用中,通常使用隊列機制實現任務間的數據交互,一個應用程序可以有任意數量的消息隊列,每個消息隊列都有自己的用途。
    發表于 05-29 10:49 ?664次閱讀
    RTOS消息<b class='flag-5'>隊列</b>的應用

    Disruptor高性能隊列的原理

    許多應用程序依靠隊列在處理階段之間交換數據。我們的性能測試表明,當以這種方式使用隊列時,其延遲成本與磁盤(基于RAID或SSD的磁盤系統)的IO操作成本處于同一數量級都很慢。如果在一個
    的頭像 發表于 07-26 10:47 ?761次閱讀
    Disruptor高性能<b class='flag-5'>隊列</b>的原理

    嵌入式環形隊列與消息隊列實現原理

    嵌入式環形隊列,也稱為環形緩沖區或循環隊列,是一種先進先出(FIFO)的數據結構,用于在固定大小的存儲區域中高效地存儲和訪問數據。其主要特點包括固定大小的數組和兩個指針(頭指針和尾指針),分別指向隊列的起始位置和結束位置。
    的頭像 發表于 09-02 15:29 ?658次閱讀
    澳门百家乐官网公司| 百家乐事电影| 新浪棋牌竞技风暴| 网上赌百家乐官网有假| 大发扑克官网| 百家乐官网保证赢| 大发888官方下载 银行| 百家乐比赛技巧| 郴州市| 百家乐规律和方法| 博狗百家乐官网真实| 南京百家乐的玩法技巧和规则| 百家乐官网出千赌具| sz全讯网新2xb112| 百家乐官网路单下| 金乡县| 逍遥坊百家乐的玩法技巧和规则 | 博彩百家乐最新优惠| 权威百家乐官网信誉网站| 958棋牌游戏| 凯旋门百家乐娱乐城| 百家乐官网娱乐城棋牌| 天津太阳城橙翠园| 百家乐官网平一直压庄| 荥阳市| 威尼斯人娱乐开户| 百家乐官网游戏筹码| 大发百家乐官网现金网| 威尼斯人娱乐城筹码| 百家乐赌博出千| 大发888 没人举报吗| 百家乐代理占成| 优博百家乐官网现金网平台| 大发888 打法888 大发官网| 百家乐最低投注| 百家乐官网方案| 罗城| 大发888扑克下载| 百家乐投注方法| 网上玩百家乐官网游戏有人挣到钱了吗| 鸿运娱乐城|