延遲任務
最近有一個需求,基于消息隊列對數據消費,并根據多次消費的結果對數據進行重新組裝,如果在指定時間內,需要的數據全部到達,則進行數據組裝以及后續邏輯。簡單的說,設置一個超時時間,如果在該時間內由MQ中消費到完整的數據則直接處理,否則進入其他流程。
針對這種場景使用了延遲任務來實現,以此為契機對延遲任務相關的技術做了個簡單了解...
簡介
延遲任務是一種指定任務在未來某個時間點或一定時間后執行的方式。通常情況下,延遲任務可以通過設置任務的執行時間或延遲時間來實現。
延遲任務可以用于異步操作、定時任務和任務調度等場景。例如,在用戶注冊后發送歡迎郵件或者在用戶下單后發送訂單確認短信,可以通過延遲任務來實現異步操作。定時檢查服務器狀態、定時備份數據等任務,也可以通過延遲任務來實現定時任務。在某個時間點觸發某個任務、在某個時間段內重復執行某個任務等,可以通過延遲任務來實現任務調度。
延遲任務通常使用隊列或者定時器來實現。在隊列中,任務會被添加到一個等待隊列中,等待隊列中的任務會在指定的時間點或延遲時間后被取出執行。在定時器中,任務會被添加到一個定時器中,定時器會在指定的時間點觸發任務執行。
總之,延遲任務是一種非常實用的技術,可以幫助我們更好地管理系統中的異步操作、定時任務和任務調度等場景。
使用場景
異步操作:延遲任務可以用于異步操作,例如在用戶注冊后發送歡迎郵件或者在用戶下單后發送訂單確認短信。通過使用延遲任務,可以將這些操作推遲到后臺處理,從而提高系統的響應速度和并發能力。
定時任務:延遲任務可以用于定時任務,例如定時檢查服務器狀態、定時備份數據等。通過使用延遲任務,可以在指定的時間點自動觸發任務,避免手動操作的繁瑣和容易出錯。
任務調度:延遲任務可以用于任務調度,例如在某個時間點觸發某個任務、在某個時間段內重復執行某個任務等。通過使用延遲任務,可以方便地進行任務調度,提高系統的可靠性和穩定性。
技術實現
- 基于內存,應用重啟(或宕機)會導致任務丟失
- 基于內存存放隊列,不支持集群
- 依據compareTo方法排列隊列,調用take阻塞式的取出第一個任務(不調用則不取出),比較不靈活,會影響時間的準確性
- ScheduledThreadPoolExecutor
- 基于內存,應用重啟(或宕機)會導致任務丟失
- 基于內存存放任務,不支持集群
- 一個任務就要新建一個線程綁定任務的執行,容易造成資源浪費
- Redis過期監聽 基于Redis過期訂閱
- 客戶端斷開后重連會導致所有事件丟失
- 高并發場景下,存在大量的失效key場景會導出失效時間存在延遲
- 若有多個監聽器監聽該key,是會重復消費這個過期事件的,需要特定邏輯判斷
- MQ延遲隊列 基于消息死信隊列實現 支持集群,分布式,高并發場景;缺點:引入額外的消息隊列,增加項目的部署和維護的復雜度。
- HashedWheelTimer 基于Netty提供的工具類HashedWheelTimer HashedWheelTimer 是使用定時輪實現的,定時輪其實就是一種環型的數據結構,可以把它想象成一個時鐘, 分成了許多格子,每個格子代表一定的時間,在這個格子上用一個鏈表來保存要執行的超時任務,同時有一個指針一格一格的走,走到那個格子時就執行格子對應的延遲任務,
其中前三種Timer、DelayQueue、ScheduledThreadPoolExecutor實現比較簡單,只不過只適用于單體應用,任務數據都在內存中,在系統崩潰后數據丟失;后兩張實現相對復雜,并且需要依賴于第三方應用,在系統整體結構上更加復雜且消耗更多資源,但能支持分布式系統,且有較高的容錯性。
示例
定義延遲任務對象:
@Getter
public class DelayTask implements Serializable{
private static final long serialVersionUID = -5062977578344039366L;
private long delaySeconds;
private TaskExecute taskExecute;
public DelayTask(long delaySeconds, TaskExecute taskExecute) {
this.delaySeconds = delaySeconds;
this.taskExecute = taskExecute;
}
/**
*
*/
public void execute(){
taskExecute.run();
}
public interface TaskExecute extends Runnable, Serializable {
}
}
調度器:
public interface ScheduleTrigger {
/**
* 延遲任務調度
* @param delayTask
*/
void schedule(DelayTask delayTask);
}
- Timer
public class JavaTrigger implements ScheduleTrigger{
private Timer timer;
public JavaTimer(){
this.timer = new Timer();
}
/**
*
* @param delayTask
*/
public void schedule(DelayTask delayTask){
timer.schedule(buildTimerTask(delayTask.getTaskExecute()), toMillis(delayTask.getDelaySeconds()));
}
private TimerTask buildTimerTask(Runnable runnable){
return new TimerTask() {
@Override
public void run() {
runnable.run();
}
};
}
}
- DelayQueue
public class DelayQueueTrigger implements ScheduleTrigger{
private DelayQueue< Task > queue = new DelayQueue< >();
public DelayQueueTrigger() {
Thread thread = new Thread(() - > {
while (true) {
try {
Task task = queue.take();
if(task != null)
task.execute();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread.setDaemon(true);
thread.start();
}
/**
* @param delayTask
*/
public void schedule(DelayTask delayTask){
if( delayTask instanceof Task ){
queue.put((Task) delayTask);
}
}
}
class Task extends DelayTask implements Delayed{
private long execTime;
public Task(long delaySeconds, TaskExecute taskExecute) {
super(delaySeconds, taskExecute);
this.execTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
}
/**
* 輪詢執行該方法判斷是否滿足執行條件(<=0)
* 同時該返回作為等待時長
* @param unit the time unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return this.execTime - System.currentTimeMillis(); // ms
}
public long getExecTime() {
return execTime;
}
@Override
public int compareTo(Delayed other) {
if(this.getExecTime() == ((Task)other).getExecTime()){
return 0;
}
return this.getExecTime() > ((Task)other).getExecTime() ? 1: -1;
}
}
- ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor實現也是基于延遲隊列BlockingQueue實現
public class ScheduledExecutorTrigger implements ScheduleTrigger{
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
public void schedule(DelayTask delayTask){
executorService.schedule(delayTask.getTaskExecute(), delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
- Redis過期監聽
需要修改redis配置文件:notify-keyspace-events Ex
public class RedisTimer{
private static final String EXPIRATION_KEY = "REDIS_EXPIRATION_KEY";
@Configuration
@Import(RedisAutoConfiguration.class)
public static class Config{
@Bean(name = "redisTemplate")
public RedisTemplate< Object, Object > redisTemplate(RedisConnectionFactory factory) {
RedisTemplate< Object, Object > template = new RedisTemplate< >();
RedisSerializer< String > keySerializer = new StringRedisSerializer();
RedisSerializer< Object > valueSerializer = new ObjectRedisSerializer();
template.setConnectionFactory(factory);
template.setKeySerializer(keySerializer);
template.setValueSerializer(valueSerializer);
return template;
}
/**
* 消息監聽器容器bean
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Bean
public RedisKeyExpirationListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer){
RedisKeyExpirationListener redisKeyExpirationListener = new RedisKeyExpirationListener(redisMessageListenerContainer);
redisKeyExpirationListener.setContext(context());
return redisKeyExpirationListener;
}
@Bean
public Context context(){
return new Context();
}
@Bean
public RedisTrigger redisTrigger(RedisTemplate redisTemplate){
return new RedisTrigger(redisTemplate, context());
}
class ObjectRedisSerializer implements RedisSerializer{
@Override
public byte[] serialize(Object o) throws SerializationException {
return SerializeUtils.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws SerializationException {
return SerializeUtils.deserialize(bytes);
}
}
}
public static class RedisTrigger implements ScheduleTrigger{
private RedisTemplate redisTemplate;
private Context context;
public RedisTrigger(RedisTemplate redisTemplate, Context context){
this.redisTemplate = redisTemplate;
this.context = context;
}
public void schedule(DelayTask delayTask){
context.put(EXPIRATION_KEY, delayTask);
redisTemplate.opsForValue().set(EXPIRATION_KEY, delayTask, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
@Slf4j
public static class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
private Context context;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 這里沒法拿到過期值
* @param message never {@literal null}.
*/
@SneakyThrows
@Override
public void doHandleMessage(Message message) {
try {
String topic = new String(message.getChannel(), "utf-8");
String key = new String(message.getBody(), "utf-8");
if (EXPIRATION_KEY.equals(key)) {
Object object = context.get(EXPIRATION_KEY);
if( object instanceof DelayTask ){
log.info("redis key[{}] 過期回調", key);
((DelayTask) object).execute();
}
}
} catch (Exception e) {
log.error("處理Redis延遲任務異常:{}", e.getMessage() ,e);
}
}
public void setContext(Context context) {
this.context = context;
}
}
public static class Context{
private Map< String,Object > context = new ConcurrentHashMap< >();
public void put(String key, Object value){
context.put(key, value);
}
public Object get(String key){
return context.get(key);
}
}
}
- MQ延遲隊列
這里MQ選擇的是RabbitMq,要知道在RabbitMq中是沒有延遲隊列的,但可以通過延遲消息插件rabbitmq_delayed_message_exchange實現,另外一種是基于死信來實現。
什么時候消息進入死信?
- 1)消息消費方調用了basicNack() 或 basicReject(),并且參數都是 requeue = false,則消息會路由進死信隊列
- 2)消息消費過期,過了TTL(消息、或隊列設置超時時間) 存活時間,就是消費方在 TTL 時間之內沒有消費,則消息會路由進死信隊列
- 3)隊列設置了x-max-length 最大消息數量且當前隊列中的消息已經達到了這個數量,再次投遞,消息將被擠掉,被擠掉的消息會路由進死信隊列
public class RabbitTimer{
@Configuration
@Import(RabbitAutoConfiguration.class)
public static class Config{
static final String TTL_EXCHANGE_FOR_SCHEDULE = "TTL_EXCHANGE_FOR_SCHEDULE";
static final String TTL_QUEUE_FOR_SCHEDULE = "TTL_QUEUE_FOR_SCHEDULE";
static final String TTL_ROUTING_KEY_FOR_SCHEDULE = "TTL_ROUTING_KEY_FOR_SCHEDULE";
static final String COMMON_QUEUE_FOR_SCHEDULE = "COMMON_QUEUE_FOR_SCHEDULE";
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable(TTL_QUEUE_FOR_SCHEDULE).build();
}
@Bean
public Exchange ttlExchange(){
return ExchangeBuilder.directExchange(TTL_EXCHANGE_FOR_SCHEDULE).build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(TTL_ROUTING_KEY_FOR_SCHEDULE).noargs();
}
@Bean
public Queue commonQueue(){
return QueueBuilder.durable(COMMON_QUEUE_FOR_SCHEDULE)
.deadLetterExchange(TTL_EXCHANGE_FOR_SCHEDULE)
.deadLetterRoutingKey(TTL_ROUTING_KEY_FOR_SCHEDULE)
.build();
}
@Bean
public TtlMessageConsumer ttlMessageConsumer(){
return new TtlMessageConsumer();
}
@Bean
public RabbitTrigger rabbitTrigger(RabbitTemplate rabbitTemplate){
return new RabbitTrigger(rabbitTemplate);
}
}
@Slf4j
@RabbitListener(queues=TTL_QUEUE_FOR_SCHEDULE)
public static class TtlMessageConsumer{
@RabbitHandler
public void handle(byte [] message){
Object deserialize = SerializeUtils.deserialize(message);
if( deserialize instanceof DelayTask ){
((DelayTask) deserialize).execute();
}
}
}
public static class RabbitTrigger implements ScheduleTrigger{
@Autowired
private RabbitTemplate rabbitTemplate;
public RabbitTrigger(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void schedule(DelayTask delayTask){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration( String.valueOf(TimeUnit.SECONDS.toMillis(delayTask.getDelaySeconds())));
Message message = new Message(SerializeUtils.serialize(delayTask), messageProperties);
rabbitTemplate.send(COMMON_QUEUE_FOR_SCHEDULE, message);
}
}
}
- HashedWheelTimer
public class NettyTrigger implements ScheduleTrigger {
HashedWheelTimer timer = new HashedWheelTimer(200,
TimeUnit.MILLISECONDS,
100); // 時間輪中的槽數
/**
*
*/
@Override
public void schedule(DelayTask delayTask){
TimerTask task = timeout - > delayTask.execute();
//
timer.newTimeout(task, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
測試:
ScheduleTrigger.schedule(DelayTask delayTask);
結束語
通過幾個簡單的示例了解延遲隊列的實現方式,可以根據實際業務場景以及應用架構做出合理的選擇。
-
數據
+關注
關注
8文章
7139瀏覽量
89576 -
服務器
+關注
關注
12文章
9303瀏覽量
86061 -
內存
+關注
關注
8文章
3055瀏覽量
74327 -
定時器
+關注
關注
23文章
3255瀏覽量
115368 -
延遲
+關注
關注
1文章
70瀏覽量
13583
發布評論請先 登錄
相關推薦
深度解析數據結構與算法篇之隊列及環形隊列的實現
TencentOS-tiny中環形隊列的實現
QueueForMcu 基于單片機實現的隊列功能模塊
![QueueForMcu 基于單片機<b class='flag-5'>實現</b>的<b class='flag-5'>隊列</b>功能模塊](https://file.elecfans.com/web1/M00/D9/4E/pIYBAF_1ac2Ac0EEAABDkS1IP1s689.png)
RTOS消息隊列的多種用途
![RTOS消息<b class='flag-5'>隊列</b>的多種用途](https://file.elecfans.com/web2/M00/4E/08/poYBAGK7-BCAa0fqAADeuVJW2Rk395.png)
實現一個雙端隊列的步驟簡析
如何用Redis實現延遲隊列呢?
一種異步延遲隊列的實現方式調研
Disruptor高性能隊列的原理
![Disruptor高性能<b class='flag-5'>隊列</b>的原理](https://file1.elecfans.com/web2/M00/8D/D2/wKgZomTAideAYzvFAAAUUBQF6ko228.png)
評論