当前位置: 首页 > >

Java 使用RabbitMQ插件实现延时队列

发布时间:

Springboot项目,windows环境


环境配置

在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。


插件下载地址:?http://www.rabbitmq.com/community-plugins.html


在?rabbitmq_delayed_message_exchange 一栏中选择,根据自己RabbitMQ的版本下载适合自己版本的插件,将插件放在RabbitMQ安装目录下plugins目录中,将名字改为: rabbitmq_delayed_message_exchange-0.0.1.ez


或?rabbitmq_delayed_message_exchange-3.7.7.ez? (我的版本是3.7.7) 。


关闭RabbitMQ服务,打开命令窗口,到sbin目录中, 执行命令:(根据自己的安装目录调整)


"{RabbitMQ 安装目录}sbin abbitmq-plugins.bat" enable rabbitmq_delayed_message_exchange


然后再开启服务。 OK,环境配置完成。


代码实现



org.springframework.boot
spring-boot-starter-amqp

1. 创建路由、队列



import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* 创建队列和交换机
*/
public class RabbitConfig {

static final String exchangeName = "test_exchange";
static final String queueName = "test_queue";
static final String routingKey = "test_queue";

/**
* 创建路由、队列
*/
@Test
public void binding() throws IOException, TimeoutException{

//建立连接,创建通道
ConnectionFactory fc = new ConnectionFactory();
fc.setHost("localhost");
fc.setPort(5672);
fc.setUsername("guest");
fc.setPassword("guest");

Connection conn = fc.newConnection();
Channel channel = conn.createChannel();


channel.exchangeDelete(exchangeName); //删除路由

Map map = new HashMap<>();
map.put("x-delayed-type", "direct");
channel.exchangeDeclare(exchangeName, "x-delayed-message",false, false,map); //创建路由

channel.queueDelete(queueName); //删除队列
channel.queueDeclare(queueName, true, false, false, null); //创建队列

channel.queueBind(queueName, exchangeName, routingKey); //绑定路由、队列

channel.close();
conn.close();

}


}

另一种创建方式



import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

/**
*
* @author lyb
* 2019年11月14日 下午4:39:05
*/
@SpringBootConfiguration
public class RabbitMQConfig {


static final String exchangeName = "test_exchange";
static final String queueName = "test_queue";
static final String routingKey = "test_queue";
/**
* 创建延迟队列
*/
@Bean
public Queue createQueue(){
return QueueBuilder.durable(queueName).build();
}

/**
* 创建路由
*/
@Bean
public CustomExchange createExchange(){
Map map = new HashMap();
map.put("x-delayed-type", "direct");
return new CustomExchange(
exchangeName, "x-delayed-message", true, false, map);
}

/**
* 绑定路由与队列
*/
@Bean
public Binding exchangeBindingQueue(){
return BindingBuilder.bind(createQueue()).
to(createExchange()).with(routingKey).noargs();
}

}

2. 消息发送者



import java.time.LocalDateTime;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 延迟消息 发布者
*/
@Component
public class DelayProducer {

@Autowired
RabbitTemplate rabbitTemplate;

public void sendMsg(){

String msg = "测试延时de消息|"+LocalDateTime.now();

rabbitTemplate.convertAndSend(RabbitConfig.exchangeName, RabbitConfig.routingKey, msg, (message) ->{
message.getMessageProperties().setHeader("x-delay", 9000); //延迟9秒
return message;
});

//另一种写法
rabbitTemplate.convertAndSend(RabbitMQConfig.delayExchangeName, RabbitMQConfig.delayRoutinKey, msg, new MessagePostProcessor() {

@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 5000);
return message;
}
});

}


}

3.消息接收者



import java.time.LocalDateTime;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 消费者
*/
@Component
public class DelayConsumer {

@RabbitListener(queues = "test_queue")
public void receive(String msg) {
System.out.println("接收到的消息:"+msg +"||"+LocalDateTime.now());
}
}

4. 测试结果:


接收到的消息:测试延时de消息|2018-07-26T19:22:49.895||2018-07-26T19:22:58.920



友情链接: