package rabbitmq.publisherConfirms;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.RabbitMQConstant;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: wuyulin
* Date: 2024-12-18
* Time: 16:47
*/
public class PublisherConfirmsDemo {
public static Connection createConnection() throws IOException, TimeoutException {
// 1. 创建连接⼯⼚
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(RabbitMQConstant.HOST);//ip 默认值localhost
factory.setPort(RabbitMQConstant.PORT); //默认值 5672
factory.setVirtualHost(RabbitMQConstant.VIRTUALHOST);//虚拟机名称, 默认 /
factory.setUsername(RabbitMQConstant.USERNAME);//⽤户名,默认guest
factory.setPassword(RabbitMQConstant.PASSWORD);//密码, 默认guest
//3. 创建连接Connection
Connection connection = factory.newConnection();
return connection;
}
/**
* 单独确认模式(发送一条消息就确认一条消息)
* */
public static void publishMessagesIndividually() throws Exception{
try (Connection connection=createConnection()){
//开启信道
Channel ch = connection.createChannel();
//开启信道确认模式
ch.confirmSelect();
//声明队列
ch.queueDeclare(RabbitMQConstant.PUBLISHER_CONFIRMS_QUEUE_NAME1, true, false, true, null);
long start = System.currentTimeMillis();
//循环发送消息
for (int i = 0; i < RabbitMQConstant.MESSAGE_COUNT; i++) {
String body = "消息"+ i;
//发布消息
ch.basicPublish("", RabbitMQConstant.PUBLISHER_CONFIRMS_QUEUE_NAME1, null, body.getBytes());
//等待确认消息.只要消息被确认,这个⽅法就会被返回
//如果超时过期, 则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。
ch.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.format("单独确认模式消息条数 %d 所用时间 %d ms\n", RabbitMQConstant.MESSAGE_COUNT, end - start);
}
}
/**
* 批量确认模式(发满多条消息再一起确认)
* */
public static void publishMessagesInBatch() throws Exception{
try (Connection connection=createConnection()){
//开启信道
Channel ch = connection.createChannel();
//开启信道确认模式
ch.confirmSelect();
//声明队列
ch.queueDeclare(RabbitMQConstant.PUBLISHER_CONFIRMS_QUEUE_NAME2, true, false, true, null);
long start = System.currentTimeMillis();
//未确认消息数量
int outstandingMessageCount=0;
//循环发送消息
for (int i = 0; i < RabbitMQConstant.MESSAGE_COUNT; i++) {
String body = "消息"+ i;
//发布消息
ch.basicPublish("", RabbitMQConstant.PUBLISHER_CONFIRMS_QUEUE_NAME2, null, body.getBytes());
outstandingMessageCount++;
//判断未确认消息是否达到批量检查数值
if(outstandingMessageCount>=RabbitMQConstant.BATCH_COUNT){
//等待确认消息.只要消息被确认,这个⽅法就会被返回
//如果超时过期, 则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。
ch.waitForConfirmsOrDie(5000);
outstandingMessageCount=0;
}
//判断是否所有的消息都已经确认
if(outstandingMessageCount>0){
ch.waitForConfirmsOrDie(5000);
outstandingMessageCount=0;
}
}
long end = System.currentTimeMillis();
System.out.format("批量确认模式消息条数 %d 所用时间 %d ms\n", RabbitMQConstant.MESSAGE_COUNT, end - start);
}
}
public static void publishMessagesInAsynchronously()throws Exception{
try (Connection connection=createConnection()){
//开启信道
Channel ch = connection.createChannel();
//开启信道确认模式
ch.confirmSelect();
//声明队列
ch.queueDeclare(RabbitMQConstant.PUBLISHER_CONFIRMS_QUEUE_NAME3, true, false, true, null);
//创建一个线程安全的有序集合,元素按照⾃然顺序进⾏排序,存储未confirm消息序号
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
//为信道添加一个监听者,可以监听信息是否被 rabbitmq 成功接收
/*
* 异步confirm⽅法的编程实现最为复杂.Channel接⼝提供了⼀个⽅法addConfirmListener.这个⽅法
可以添加 ConfirmListener 回调接⼝.ConfirmListener 接⼝中包含两个⽅法:handleAck(long deliveryTag, boolean multiple)
和handleNack(long deliveryTag, boolean multiple) ,分别对应处理RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表⽰发送消息的序号.multiple 表⽰是否批量确认.
我们需要为每⼀个 Channel 维护⼀个已发送消息的序号集合.当收到 RabbitMQ 的 confirm 回调时,从集合中删除对应的消息.
当 Channel 开启confirm模式后,channel 上发送消息都会附带⼀个从1开始递增的deliveryTag序号.
我们可以使⽤ SortedSet 的有序性来维护这个已发消息的集合.
1.当收到ack时,从序列中删除该消息的序号.如果为批量确认消息,表⽰⼩于等于当前序号 deliveryTag 的消息都收到了,则清除对应集合
2. 当收到nack时,处理逻辑类似,不过需要结合具体的业务情况,进⾏消息重发等操作.*/
ch.addConfirmListener(new ConfirmListener(){
// deliveryTag 表示是对于哪个信息的确认(信息的编号)
//multiple 是否批量确认
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//System.out.println("成功接收到deliveryTag:"+deliveryTag+"的信息");
//confirmSet.headSet(n)⽅法返回当前集合中⼩于n的集合
if (multiple){
//批量确认:将集合中⼩于等于当前序号 deliveryTag 元素的集合清除,表⽰这批序号的消息都已经被ack了
confirmSet.headSet(deliveryTag+1).clear();
}else {
//单条确认:将当前的deliveryTag从集合中移除
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//System.out.println("未接收到deliveryTag:"+deliveryTag+"的信息,请重发");
if (multiple) {
//批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰这批序号的消息都已经被ack了
confirmSet.headSet(delivery
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论





























收起资源包目录

























































































共 56 条
- 1
资源评论


小林想被监督学习
- 粉丝: 7256
上传资源 快速赚钱
我的内容管理 展开
我的资源 快来上传第一个资源
我的收益
登录查看自己的收益我的积分 登录查看自己的积分
我的C币 登录后查看C币余额
我的收藏
我的下载
下载帮助


最新资源
- 通信工程设计概述.ppt
- 公务员信息化与电子政务考试培训PPT课件.ppt
- 大众点评网网络推广方案.ppt
- 如何做好医疗企业网络营销策划.doc
- 华中科技大学计算机网络课件习题讲解.doc
- 基于51单片机的数字电压表设计.doc
- (源码)基于C语言的嵌入式文件管理与查看系统.zip
- 2023年浙江省计算机二级考试办公自动化高级应用中Excel考试题常用函数.doc
- 网络科技公司创业计划书通用6篇.docx
- 精华版国家开放大学电大《网络系统管理与维护》机考2套真题题库及答案2.pdf
- 外贸企业营销型网站建设技巧-.doc
- (源码)基于Swift框架的iOS自定义模板项目.zip
- (源码)基于Android和ZXing库的二维码条形码扫描系统.zip
- (源码)基于JavaSpring Boot框架的快速开发系统.zip
- 大三上Python大作业,关于AC小说网的网络爬虫,爬取了首页小说的内容等相关信息 网址:https://m.acxsw.com/
- (源码)基于MicroPython的ESP32外设控制项目.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈



安全验证
文档复制为VIP权益,开通VIP直接复制
