消息队列
基本概述
-
基于通信协议定义和抽象的更高层次的通信模型,一般都是生产者和消费者模型,又或者说服务端和客户端模型。
-
生产者/消费者模型:一般通过定义生产者和消费者实现消息通信从而屏蔽复杂的底层通信协议。应用于分布式应用系统,而且为之提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量堆积,高吞吐和可靠性重试机制的特性。
核心概念
- 消息主题:Message Topic,一级消息类型,生产者向其发送消息
- 消息生产者:Message Producer,消息发布者,或者消息服务终端,负责生产消息和发送消息到消息主题
- 消息消费者:Message Consumer,消息订阅者,或者消息客户终端,负责从消息主题接收并处理消费消息
- 消息实体:Message Object,消息对象,生产者向消息主题发送并最终传送给消息者的数据和属性的符号以及组合
- 消息属性:Message Attributes,消息内容,生产者对消息进行抽象和定义的相关属性,包含Message Key 和Message Target
- 消息组:Message Group,消息分类组别,通称一类生产者和消费者,通常生产和消费同一类消息,且消息发布和订阅的逻辑基本一致
编程思想
数据结构:消息队列的数据结构采用FIFO方式来定义与实现 设计模式:采用观察者模式
消息流派:
- 有Broker:基于消息中间件,所有消息通过中间件中转,基于Broker把消息推送给消费者[消费者主动轮询] 1.重Topic:Kafka和RocketMq,Activemq,生产者把消息发送Key和数据到Broker,然后由Broker比较Key之后决定推送给具体的消费者[推送消息给消费者] 2.轻Topic:Rabbitmq[AMQP],生产者把生产消息且发送Key和数据,消费者定义订阅的队列,Broker收到请求之后,按照一定的逻辑和规则计算出相应的Key对应的队列,然后把消息推送给具体的消费者[消费者向生产者订阅消息,消费者主动拉取消息]
- 无Broker:基于Socket网络[传输层]直连通信,代表作ZeroMQ
基本概述
NameServer:名称服务器[MQ命名空间服务器],大致相当于 jndi技术,更新和发现 broker服务。用于保存Broker相关元信息,并给生产者和消费者查找Broker消息。每个Broker在启动都会在名称服务器[NameServer]注册,生产者在发送消息前会根据消息主题到名称服务器查询获取Broker路由消息,消费者也会定时获取主题的路由消息。
Broker:消息存储中心[消息中转角色],负责存储和转发消息。接收来自生产者的消息并进行存储,消费者从这拉取消息。存储与消息相关的元数据,主要包括用户组,消息进度偏移量,队列消息等。其中Broker分为Master和Slave节点:
- Master节点:可读可写
- Slave节点:只可读不可写 其部署方式:
- 单机Master:Broker重启和宕机之后服务不可用,不建议生产使用
- 多机Master:所有消息服务器均是Master,没有Slave。 优点:配置简单,单个Master重启和宕机维护对应用无影响 缺点:单机重启和宕机期间,机器上未被消费的消息在机器恢复期间不可订阅,消息实时性会受影响
- 多机Master多机Slave[同步双写]: 优点:数据和服务无单点故障,Master重启和宕机消息无延迟,数据和服务可用高 缺点:相对异步复制性能较低,发送消息的延时较高
- 多机Master多机Slave[异步复制]: 优点:消息丢失较少,实时性高,Master重启和宕机可继续从Slave消费 缺点:Master重启和宕机后在磁盘损坏会导致消息丢失,但是情况比较少
Producer:消息生产者->负责生产消息,生产者向消息服务器发送业务应用程序生成的消息。主要有同步发送和异步发送方式两种,其中:
- 同步发送:消息发送方发出数据后,在消息接收方发送响应之后再发送下一个数据。一般适用于重要消息通知场景[重要通知邮件,营销短信推送]
- 异步发送:消息发送方发出数据后,不用等待接收方发回响应,接着发送下一个数据。一般适用于链路耗时较长,而对响应时间敏感的业务场景[视频上传通知 启动自动转码服务处理通知]
- 单向发送:负责发送消息而不等待服务器回应且没有回调函数触发。一般适用于对可靠性要求不高的业务场景[日志收集]
Consumer:消息消费者 负责消费消息,从消息服务器拉取消息并将其输入用户应用程序中。主要分为拉取型消费者和推送型消费者: 拉取型消费者:Pull Consumer->主动从消息服务器拉取消息,只要批量拉取消息,用户就会启动消费过程 推送型消费者:Push Consumer->封装消息的拉取,消费进度和其它内部维护工作,消息到达之后便执行回调接口留给用户应用程序来实现。属于被动消费类型,Push拉取时需要注册消息费者监听器,当监听器被触发之后开始消费消息。
部署Rocketmq
拉取rocketmq镜像:docker pull foxiswho/rocketmq
1.查询镜像images:docker search rocketmq
[root@centos-meteor ~]# docker search rocketmq NAME DESCRIPTION STARS OFFICIAL AUTOMATED styletang/rocketmq-console-ng rocketmq-console-ng 20 rocketmqinc/rocketmq Image repository for Apache RocketMQ 17 foxiswho/rocketmq rocketmq 14 laoyumi/rocketmq 10 [OK] xlxwhy/rocketmq alibaba's rocketmq 4 huanwei/rocketmq-broker 2 2019liurui/rocketmq-broker RocketMQ broker image for RocketMQ-Operator 1 2019liurui/rocketmq-namesrv RocketMQ name service image for RocketMQ-Ope… 1 apacherocketmq/rocketmq Docker Image for Apache RocketMQ 1 rocketmqinc/rocketmq-operator The Kubernetes operator for RocketMQ 0 2019liurui/rocketmq-operator Kubernetes Operator for RocketMQ ! 0 apacherocketmq/rocketmq-operator RocketMQ Operator is to manage RocketMQ serv… 0 coder4/rocketmq rocketmq 0 [OK] rocketmqinc/rocketmq-namesrv Customized RocketMQ Name Server Image for Ro… 0 rocketmqinc/rocketmq-broker Customized RocketMQ Broker Image for RocketM… 0 slpcat/rocketmq-console-ng 0 huanwei/rocketmq 0 huanwei/rocketmq-broker-k8s 0 king019/rocketmq rocketmq 0 pengzu/rocketmq-console-ng web console for rocketmq ,this code is from … 0 fengzt/rocketmq-broker apache rocketmq 4.2.0 broker server(官方文档… 0 huanwei/rocketmq-operator 0 slpcat/rocketmq 0 fengzt/rocketmq-nameserver apache rocketmq 4.2.0 nameserver 0 icyblazek/rocketmq RocketMQ 0 [root@centos-meteor ~]#
2.执行:docker pull foxiswho/rocketmq
3.创建docker存储根目录并且授权: mkdir docker && chmod -R 777 docker/
[root@centos-meteor /]# cd docker/ [root@centos-meteor docker]# pwd /docker [root@centos-meteor docker]#
4.部署名称服务器rocketmq-namesrv-server[9876]: #rocketmq-namesrv-server docker run -itd --restart=always --privileged=true -p 9876:9876 --name rocketmq-namesrv-server -v /docker/rocketmq/namesrv/logs:/root/rocketmq/logs -v /docker/rocketmq/namesrv/store:/root/rocketmq/store -e "MAX_POSSIBLE_HEAP=100000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqnamesrv
docker run -itd --restart=always --privileged=true -p 9876:9876 --name rocketmq-namesrv-server -v /docker/rocketmq/namesrv/logs:/root/rocketmq/logs -v /docker/rocketmq/namesrv/store:/root/rocketmq/store -e "MAX_POSSIBLE_HEAP=100000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqnamesrv
5.部署消息服务器rocketmq-broker-server[10911]: #rocketmq-broker-server docker run -itd --restart=always --privileged=true -p 10909:10909 -p 10911:10911 -p 10912:10912 --name rocketmq-broker-server --link rocketmq-namesrv-server:namesrv -v /docker/rocketmq/broker/logs:/root/rocketmq/logs -v /docker/rocketmq/broker/store:/root/rocketmq/store -v /docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
docker run -itd --restart=always --privileged=true -p 10909:10909 -p 10911:10911 -p 10912:10912 --name rocketmq-broker-server --link rocketmq-namesrv-server:namesrv -v /docker/rocketmq/broker/logs:/root/rocketmq/logs -v /docker/rocketmq/broker/store:/root/rocketmq/store -v /docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
brokerClusterName = rocketmq-cluster brokerName = broker-server brokerId = 0 deleteWhen = 04 fileReservedTime = 48 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRote=ASYNC_MASTER brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH # 如果是本地程序调用云主机 mq,这个需要设置成 云主机 IP brokerIP1=Server-IP #限制的消息大小 maxMessageSize=65536 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=50000000 #并发send线程数,多线程来发送消息可能会出现broker busy sendMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=8 highSpeedMode=false commercialBaseCount=1 maxErrorRateOfBloomFilter=20 accessMessageInMemoryMaxRatio=40 #无读写客户端存活时间 clientChannelMaxIdleTimeSeconds=120 flushDelayOffsetInterval=10000 serverSocketRcvBufSize=131072 #单次 Pull 消息(内存)传输的 最大字节数 maxTransferBytesOnMessageInMemory=262144 clientManageThreadPoolNums=32 serverChannelMaxIdleTimeSeconds=120 serverCallbackExecutorThreads=0 enablePropertyFilter=false transientStorePoolSize=5 enableConsumeQueueExt=false #rocketmq server config serverPooledByteBufAllocatorEnable=true serverSocketRcvBufSize=131072 #rocketmq client config
6.部署控制后台rocketmq-consloe-server[8082]: #rocketmq-consloe-server docker run -itd -p 8082:8080 --restart=always --privileged=true --name rocketmq-console-server -v /docker/rocketmq/console/data:/tmp -e "JAVA_OPTS=-Drocketmq.namesrv.addr=47.104.22.10:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m -Duser.home=/opt" styletang/rocketmq-console-ng:latest
docker run -itd -p 8082:8080 --restart=always --privileged=true --name rocketmq-console-server -v /docker/rocketmq/console/data:/tmp -e "JAVA_OPTS=-Drocketmq.namesrv.addr=Server-IP:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m -Duser.home=/opt" styletang/rocketmq-console-ng:latest
7.最终部署结果:
整合Rocketmq开发实战
1.配置Rocketmq的Maven依赖:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
2.创建pivotal-cloud-queue工程:
3.封装rocketmq属性配置类:
添加spring-boot-configuration-processor依赖:
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
编写RocketmqProperties属性配置类:
package com.pivotal.cloud.queue.properties; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * @className: com.pivotal.cloud.queue.properties.RocketmqProperties * @title: RocketmqProperties * @description: 封装Pivotal项目RocketmqProperties类 * @content: PivotalCloud项目系统RocketmqProperties自定义属性配置类 * @author: marklin * @datetime: 2020-07-07 01:32 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ @Data @Configuration @ConfigurationProperties(prefix = "pivotal.cloud.rocketmq") public class RocketmqProperties { /** * rocketmq消息队列生产者-Producer */ private final Producer producer = new Producer(); /** * rocketmq消息队列消费者-Consumer */ private final Consumer consumer = new Consumer(); @Data public static class Producer { } @Data public static class Consumer { } }
package com.pivotal.cloud.queue.configuration; import com.pivotal.cloud.queue.properties.RocketmqProperties; import lombok.AllArgsConstructor; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * @className: com.pivotal.cloud.queue.configuration.RocketmqConfiguration * @title: RocketmqConfiguration * @description: 封装Pivotal项目RocketmqConfiguration类 * @content: //TODO * @author: marklin * @datetime: 2020-07-07 02:25 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ @AllArgsConstructor @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties({RocketmqProperties.class}) public class RocketmqConfiguration { }
编写RocketmqTemplate生产者模板类:
package com.pivotal.cloud.queue.template; /** * @className: com.pivotal.cloud.queue.template.RocketmqTemplate * @title: RocketmqTemplate * @description: 封装Pivotal项目RocketmqTemplate类 * @content: PivotalCloud项目系统RocketmqTemplate生产者模板类 * @author: marklin * @datetime: 2020-07-07 02:48 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ public class RocketmqTemplate { }
编写RocketmqListener消费者监听器类:
package com.pivotal.cloud.queue.listener; /** * @className: com.pivotal.cloud.queue.listener.RocketmqListener * @title: RocketmqListener * @description: 封装Pivotal项目RocketmqListener类 * @content: PivotalCloud项目系统RocketmqListener消费者监听器类 * @author: marklin * @datetime: 2020-07-07 02:52 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ public interface RocketmqListener { }
编写META-INF/spring.factories工厂类:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.pivotal.cloud.queue.configuration.RocketmqConfiguration,\ com.pivotal.cloud.queue.configuration.RabbitmqConfiguration,\ com.pivotal.cloud.queue.configuration.ActivemqConfiguration,\ com.pivotal.cloud.queue.configuration.KafkaConfiguration,
版权声明:本文为博主原创文章,遵循相关版权协议,如若转载或者分享请附上原文出处链接和链接来源。