java延时队列使用
原创 echojava 发表于:2017-06-17 14:42:48
  阅读 :147   收藏   编辑

在实际的业务中会遇到如下场景:

1)过1分钟失败任务重试
2)过1小时发送邮件

等等,需要延时一段时间处理,在Java的juc包中给我提供了DelayQueue延时队列处理,过一会该处理的事儿。

DelayQueue,一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。

下面通过代码演示这一场景:

1.新建消息实现Delayed接口

package cn.slimsmart.study.queue;  

import java.util.concurrent.Delayed;  
import java.util.concurrent.TimeUnit;  

/** 
 * 延时消息实体 
 * @author slimina 
 * 
 */  
public class Message implements Delayed{  

    private int id;    
    private String body;  //消息内容  
    private long excuteTime;//执行时间      

    public int getId() {  
        return id;  
    }  

    public void setId(int id) {  
        this.id = id;  
    }  

    public String getBody() {  
        return body;  
    }  

    public void setBody(String body) {  
        this.body = body;  
    }  

    public long getExcuteTime() {  
        return excuteTime;  
    }  

    public void setExcuteTime(long excuteTime) {  
        this.excuteTime = excuteTime;  
    }  

    public Message(int id, String body,long delayTime) {    
        this.id = id;    
        this.body = body;    
        this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();    
    }   

    @Override  
    public int compareTo(Delayed delayed) {  
        Message msg = (Message)delayed;    
        return Integer.valueOf(this.id)>Integer.valueOf(msg.id)?1:( Integer.valueOf(this.id)<Integer.valueOf(msg.id)?-1:0);    
    }  

    @Override  
    public long getDelay(TimeUnit unit) {  
        return  unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);     
    }  

}

2.创建消费者

package cn.slimsmart.study.queue;  

import java.util.concurrent.DelayQueue;  

//消費者  
public class Consumer implements Runnable {  

    // 延时队列  
    private DelayQueue<Message> queue;  

    public Consumer(DelayQueue<Message> queue) {  
        this.queue = queue;  
    }  

    @Override  
    public void run() {  
        while (true) {  
            try {  
                Message take = queue.take();  
                System.out.println("消费消息:" + take.getId() + ":" + take.getBody());  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}

3.向延时队列发送消息

package cn.slimsmart.study.queue;  

import java.util.concurrent.DelayQueue;  

public class DelayQueueTest {  

    public static void main(String[] args) {  
        // 创建延时队列  
        DelayQueue<Message> queue = new DelayQueue<Message>();  
        // 添加延时消息,m1 延时5s  
        Message m1 = new Message(1, "world", 3000);  
        // 添加延时消息,m2 延时3s  
        Message m2 = new Message(2, "hello", 10000);  
        queue.offer(m2);  
        queue.offer(m1);  
        // 启动消费线程  
        new Thread(new Consumer(queue)).start();  
    }  

}

运行可以看到消息接收的时间间隔。

转载http://blog.csdn.net/zhu_tianwei/article/details/53549653