博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【并发】9、借助redis 实现生产消费,消息订阅发布模式队列
阅读量:4710 次
发布时间:2019-06-10

本文共 4197 字,大约阅读时间需要 13 分钟。

这个就是一个消息可以被多次消费的范例了

其实这个实现的方式可以参考我之前的设计模式,观察者模式

 

不过有一点需要注意一下啊,这个消息发布的时候,好像是不支持字节数据的,里面好像会对字节进行转换,这样的结果就是导致我最后无法吧相应的字节转换成我之前序列化的对象

不知道是不是ObjectInputStream和ObjectOutputStream实现不是很好的原因,还是什么,反正反序列化的时候,有些不可见的字符应该是被截掉了

 

消息发布者

package queue.redisQueue;import queue.fqueue.vo.TempVo;import redis.clients.jedis.Jedis;import java.io.ByteArrayOutputStream;import java.io.ObjectOutputStream;import java.util.UUID;/** * @ProjectName: cutter-point * @Package: queue.redisQueue * @ClassName: RedisQueueProducter3 * @Author: xiaof * @Description: 订阅,发布模式 发布消息 * @Date: 2019/6/12 16:47 * @Version: 1.0 */public class RedisQueueProducter3 implements Runnable {    private Jedis jedis;    private String queueKey;    public RedisQueueProducter3(Jedis jedis, String queueKey) {        this.jedis = jedis;        this.queueKey = queueKey;    }    public void putMessage() {        try {            Thread.sleep((long) (Math.random() * 1000));            //不存在则创建,存在则直接插入            //向redis队列中存放数据            //生成数据            TempVo tempVo = new TempVo();            tempVo.setName(Thread.currentThread().getName() + ",time is:" + UUID.randomUUID());            try {                int i = 0;                while(i < 10) {                    //反馈订阅的数量                    long num = jedis.publish(queueKey.getBytes(), tempVo.toString().getBytes());                    if(num > 0) {                        System.out.println("成功!num:" + num);                        break;                    }                    ++i;                }            } catch (Exception e) {                System.out.println("失败!");            }        } catch (Exception e) {            e.printStackTrace();        }    }    @Override    public void run() {        while(true) {            putMessage();        }    }}

 

消息消费者

package queue.redisQueue;import queue.fqueue.vo.EventVo;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;import redis.clients.util.SafeEncoder;import java.io.ByteArrayInputStream;import java.io.IOException;import java.io.ObjectInputStream;import java.io.UnsupportedEncodingException;/** * @ProjectName: cutter-point * @Package: queue.redisQueue * @ClassName: RedisQueueConsume3 * @Author: xiaof * @Description: 发布订阅消息,订阅线程 * @Date: 2019/6/12 16:53 * @Version: 1.0 */public class RedisQueueConsume3 implements Runnable {    private Jedis jedis;    private String queueKey;    class myJedisPubSub extends JedisPubSub {        /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现         * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage         * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法         * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]         **/        @Override        public void onMessage(String channel, String message) {            System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);            //接收到exit消息后退出            System.out.println(message);        }    }    public RedisQueueConsume3(Jedis jedis, String queueKey) {        this.jedis = jedis;        this.queueKey = queueKey;    }    public void consumerMessage() {        jedis.subscribe(new myJedisPubSub(), queueKey);    }    @Override    public void run() {        while (true) {            consumerMessage();        }    }}

 

测试代码:

@Test    public void test4() throws InterruptedException {        //读写取数据        for(int i = 0; i < 2; ++i) {            System.out.println("输出测试" + i);            RedisQueueProducter3 producter = new RedisQueueProducter3(jedisPool.getResource(), "xiaof");            Thread t = new Thread(producter);            t.start();        }        while(true) {            Thread.sleep(1000);        }    }    @Test    public void test5() throws InterruptedException {        //读写取数据        for(int i = 0; i < 5; ++i) {            System.out.println("输出测试" + i);            //切记一定要重新获取Resource,不然无法并发操作            RedisQueueConsume3 fqueueConsume = new RedisQueueConsume3(jedisPool.getResource(), "xiaof");            Thread t = new Thread(fqueueConsume);            t.setDaemon(true);            t.start();        }        while(true) {            Thread.sleep(1000);        }    }

 

效果展示

 

同一消息被多个订阅者同步消费

 

转载于:https://www.cnblogs.com/cutter-point/p/11011122.html

你可能感兴趣的文章
Markdown的使用
查看>>
销售系统学习.mdl
查看>>
触发器
查看>>
mysql配置默认字符集为UTF8mb4
查看>>
WPF实现3D翻转的动画效果
查看>>
自定义圆环进度条
查看>>
UILayer
查看>>
复杂对象写入文件
查看>>
k8s-高级调度方式-二十一
查看>>
[HDU3555]Bomb
查看>>
基于dubbo的分布式系统(一)安装docker
查看>>
Recursion
查看>>
66. Plus One
查看>>
COMP30023 Computer Systems 2019
查看>>
CSS选择器分类
查看>>
Kali学习笔记39:SQL手工注入(1)
查看>>
C# MD5加密
查看>>
Codeforces Round #329 (Div. 2)D LCA+并查集路径压缩
查看>>
移动应用开发测试工具Bugtags集成和使用教程
查看>>
Java GC、新生代、老年代
查看>>