metaq使用java语言编写,可以在多种平台部署,客户端支持c++和java,单台服务可支持1w以上各消息队列,事实上测试可能在4-6千之间,可横向扩展,队列持久化,队列长度无限(取决于磁盘大小),可从队列任何位置开始消费
通常在磁盘中的二进制文件id为8位定长,通常情况下还需要有int,4个字节,数据字节,此时如果有个数据为11个字节,如下在id,int date就看做一个消息体,如下图:
metaq特点:
生产者和服务器和消费者都可分布式,消息存储通常安装顺序写,他的性能也非常高,吞吐量达,并且支持消息顺序,客户pull,随机读,批量拉数据,数据迁移,以及扩展并且对用户同名,最后,消费状态保持在客户端总体架构,如下图:
metaq术语
topic:消息的主题,由用户定义并在服务器端配置,producer发送消息到某个topic,consumer从某个topic下的消费消息offset:消息在broker上的每个分区都是组织成一个文件列表,消费者拉取数据需要指定数据在文件中的偏移量,这个偏移就是所谓的offset,offset是绝对偏移量,服务器会将offset转化为具体文件的相对偏移量broker:meta的服务器或者书服务器,在消息中间件中通常称为brokerpartition(分区):同一个topic下面还分为多个分区,如meta-test这个topic我们可以分为10个分区,分别有两台服务器提供,那么可能每台服务器提供5个分区,假设服务器id分别是0和1,则所有分区为0-0,0-1,0-2,0-3,0-4,1-0,1-1,1-2,1-3,1-4
metaq主要配置
zk.zkEnable=true 是否注册到zk,默认为truezk.zkConnect=localhost:2181 zk的服务器列表zk.zkSessionTimeoutMs=30000 zk心跳超市,单位毫秒,默认30秒zk.zkConnection TImeoutMs=30000 zk链接超时时间,单位毫秒,默认30秒brokerld 服务器id,必须唯一,0-1024之间 serverprot 服务端口hostName 默认将取本机IP,如果多机网卡需要指明dataLogPath 日志存放路径,默认和datapath一样datapath 指定默认数据存储路径daletepolicy=delete,168 数据删除策略,默认超过7天则删除,这里是168是小时,10s表示秒,10m表示分钟,10h表示小时,默认为小时deleteWhen 何时执行删除策略的cron表达式,默认是0 0 6,18 ?,也就是每天的早晚执行处理策略deletewhen 删除策略的执行时间,cron表达式
flushTxLogAtCommit=1 事务日志的同步设置,0表示让操作系统决定,1表示每次commit都同步,2表示每隔1秒同步一次,不过会影响事务性能,可根据需要的性能和可靠性直接权衡做出一个合理的选择。通常设置为2,表示每隔1秒刷一次,也就最多丢失一秒内的运行时事务,最安全的设置为1,但是严重影响性能,而0的安全级别最低,安全级别上1>=2>0 而性能则是0>=2>1
unflushthreshold 每隔多少条消息做一次磁盘的sync,强制将更改的数据刷入磁盘,默认1000,也就是说在掉电的情况下,最多允许丢失1000条消息,可设置为0,强制每次写如都sync,在设置0的情况下,服务器会自动启用group commit技术,将多个消息合并成一次sync来提高io性能,而group commit情况下消息发送者的tps没有收到太大影响,但是服务端的负责会上升很多
unflushinterval 间隔多少毫秒定期做一个磁盘的sync,默认是10秒,也就是说服务器掉电情况下,最多丢失10秒内发送来的消息,不可设置为小于或者等于0metaq集群:所有broker注册到zookeeper,客户端链接zookeeper并返回可用的broker列表,选择一个broker并发送消息
RabbitMQ
在AMQP协议标准基础上完整的,可复用的企业消息系统,遵循Mozilla Public License开源协议,采用Erlang实现的工业级的消息队列MQ服务器AMQP高级消息队列协议,是一个一步消息传递所使用的应用层协议规范,作为线路层协议,而不是API,例如JMS,AMQP客户端能够无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件MOM系统,例如发布/订阅队列,没有作为基本元素实现,反而通过发送简化的AMQ实体,用户被赋予 了构建这些实体的能力,这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级,AMQP模型,这个模型同意了消息模式,例如之前提到的发布订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由
RabbitMQ整体架构,如下图:
运行原理图
RabbitMQ核心组件分别是exchange和queue,如下图:
RabbitMQ术语
Server(broker) 接收客户端链接,实现AMQP消息队列和路由功能的进程Virtual Host 一个虚拟概念,类似权限控制组,一个virtual host里面可有多个exchange和queue,但是权限控制 的最小粒度是virtual hostexchange 接收生产者发送的消息,并根据binding规则将消息路由给服务器中的队列,exchangeType决定了exchnage路由消息的行为,如,在RabbitMQ中,exchangeType有Direct,fanout和topic三种,不同类型的exchange路由的行为是不一样的message queue 消息队列,用于存储还未被消费者消费的消息message 由header和body组成,header是由生产者添加的各种属性的集合,包括message是否被持久化,由那个message queue接受,优先级是多少等,而body是真正需要传输的app数据bindingkey 所谓绑定就是将一个特定的exchange和一个特定的queue绑定起来,绑定关键字成为bindingkey
Exchange分类
直接式交换器类型Direct Exchange 直接交互式处理路由键,需要将一个队列绑定到交换机上,需要该消息与一个特定的路由键完全匹配,这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键“dog",则只有被标记为“dog"的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog,如下图:
广播式交换器类型
Fanout Exchange 广播式路由键,只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上,很像子网广播,每台子网内的主机都获得了一份复制的消息,fanout交换机转发消息是最快,如下图:
主题式交换器类型
Topic exchange,主题式交换器,通过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中,这种路由器类型可以被用来支持经典的发布,订阅消息传输模型,使用主题名字空间作为消息寻址模式,将消息传递给那些部分或者全部匹配主题模式的多个消费者,主题交换机类型的工资方式如下:绑定关键字用零个或多个标记构成,每一个标记之间用"."字符分隔,绑定关键字必须用这种形式明确说明,并支持通配符“”匹配一个词组,“#”零个或多个词组,因此绑定关键字“ *。stock.#" 匹配路由关键字”usd.stock“和”eur.stock.db",但是不匹配“stock.nasdaq”,如下图:
配置
配置文件存放:/etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODE_IP_ADDRESS:IP地址
RABBITMQ_NODE_PORT:端口号
RABBITMQ_CONFIG_FILE:默认路径
ABBITMQ_LOG_BASE:日志路径
rabbitmq.config是标准的erlang配置文件,必须符合erlang配置标准,erlangtuple,结构为{key,value},key为atom类型,value为一个term,其中关键参数为
tcp_listerners设置rabimq监听端口,默认5672
disk_free_limit磁盘低水位线,若磁盘容量低于指定值则停止接收数据
vm_memory_high_watermark,设置内存低水位线,则开启流控机制,默认值是0.4,即内存总量的40%
安装
[root@LinuxEA ~]# yum install epel-release
[root@LinuxEA ~]# yum install rabbitmq-server -y
[root@LinuxEA ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins list
[ ] amqp_client 3.3.5
[ ] cowboy 0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap 3.3.5-gite309de4
[ ] mochiweb 2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0 3.3.5
[ ] rabbitmq_auth_backend_ldap 3.3.5
[ ] rabbitmq_auth_mechanism_ssl 3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation 3.3.5
[ ] rabbitmq_federation_management 3.3.5
[ ] rabbitmq_management 3.3.5
[ ] rabbitmq_management_agent 3.3.5
[ ] rabbitmq_management_visualiser 3.3.5
[ ] rabbitmq_mqtt 3.3.5
[ ] rabbitmq_shovel 3.3.5
[ ] rabbitmq_shovel_management 3.3.5
[ ] rabbitmq_stomp 3.3.5
[ ] rabbitmq_test 3.3.5
[ ] rabbitmq_tracing 3.3.5
[ ] rabbitmq_web_dispatch 3.3.5
[ ] rabbitmq_web_stomp 3.3.5
[ ] rabbitmq_web_stomp_examples 3.3.5
[ ] sockjs 0.3.4-rmq3.3.5-git3132eb9
[ ] webmachine 1.10.3-rmq3.3.5-gite9359c7
[root@LinuxEA ~]#
[root@LinuxEA ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
[root@LinuxEA ~]#
[root@LinuxEA ~]# systemctl restart rabbitmq-server
创建vhost
[root@LinuxEA ~]# rabbitmqctl add_vhost linuxea
Creating vhost "linuxea" ...
...done.
[root@LinuxEA ~]#
[root@LinuxEA ~]# cat LinuxEA
#!/bin/bash
###www.#linu#xea#.com
#The source address of this article:http://www.lin#uxea.com/1514.html
查看vhosts
[root@LinuxEA ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
linuxea
...done.
[root@LinuxEA ~]#
删除vhosts:rabbitmqctl delete_vhosts vhostname
添加用户名和密码:
[root@LinuxEA ~]# rabbitmqctl add_user linuxea linuxea
Creating user "linuxea" ...
...done.
[root@LinuxEA ~]#
修改用户名密码:rabbitmqctl change_password username newpassword
授权读写权限:
[root@LinuxEA ~]# rabbitmqctl set_permissions -p linuxea linuxea ".*" ".*" ".*"
Setting permissions for user "linuxea" in vhost "linuxea" ...
...done.
[root@LinuxEA ~]#
查看:-p 即可
[root@LinuxEA ~]# rabbitmqctl list_queues -p linuxea
Listing queues ...
...done.
[root@LinuxEA ~]#