`

rabbitmq & spring amqp

    博客分类:
  • J2EE
阅读更多
 
My main AMQP post on blogger:
http://wuaner.blogspot.com/2012/10/messaging.html
sudo rabbitmqctl add_user admin admin
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
sudo rabbitmqctl set_user_tags admin administrator


AMQP的好处:
1 network protocal(TCP),P、Broker、C 可以在不同的主机上。
2 作为 P or C 的 client 可以是任意的编程语言。
3 提供队列的持久化机制,保证消息的安全性和可靠性。
4 支持插件,灵活的可扩展性。
5 Two-directional authentication,良好的安全机制。
5 开源,标准开放。


Difference between activeMQ  n rabbitMQ:
http://stackoverflow.com/questions/7044157/switching-from-activemq-to-rabbitmq
activeMQ 是以 Java JSM 标准API 作为默认的消息实现的;而 rabbitMQ 是 AMQP 的实现;比较 activeMQ  n rabbitMQ 的区别,比较 JMS 和 AMQP 就可以大概看出点来,见 wiki:
引用
http://en.wikipedia.org/wiki/AMQP
JMS, the Java messaging service, is often compared to AMQP. However, JMS is an API specification (part of the Java EE specification) that defines how message producers and consumers are implemented. JMS does not guarantee interoperability between implementations, and the JMS-compliant messaging system in use may need to be deployed on both client and server. On the other hand, AMQP is a wire-level protocol specification. In theory AMQP provides interoperability as different AMQP-compliant software can be deployed on the client and server sides. Note that, like HTTP and XMPP, AMQP does not have a standard API.
当然,activeMQ 现如今也加入了对 AMQP 以及其他面向消息协议的支持。



关于 rabbitmq 的 rabbitmq.config & rabbitmq-env.conf:
http://www.rabbitmq.com/configure.html#configuration-file


rabbitMQ 集群:
How To Cluster Rabbit-MQ:
http://www.godlikemouse.com/2010/12/14/how-to-cluster-rabbit-mq/
Clustering Guide:
http://www.rabbitmq.com/clustering.html
Highly Available Queues:
http://www.rabbitmq.com/ha.html
Distributed RabbitMQ brokers:
http://www.rabbitmq.com/distributed.html
RabbitMQ, backing stores, databases and disks:
http://www.rabbitmq.com/blog/2011/01/20/rabbitmq-backing-stores-databases-and-disks/

疑问:
RabbitMQ application 、node 、broker 间的关系?


在 consumer 和 producer(publisher)分离的场景下(即 consumer 和 producer 被分别启动,在不同的 app 下),queue / exchange / binding 的 declare,在哪端做比较合适?
http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2012-February/018105.html



关于 rabbitmq 的 Qos(即spring amqp 中的 prefetchCount):
http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/


Dead Letter Exchanges:
http://www.rabbitmq.com/dlx.html
Dead lettering with RabbitMQ – Strategies:
http://blog.craftforge.net/dead-lettering-with-rabbitmq-strategies/


exclusive, auto-delete, and TTL:
http://yabfog.com/blog/2013/04/23/rabbitmq-queue-auto-delete
https://www.rabbitmq.com/ttl.html
引用
auto-delete 和 Queue TTL 在对 queue 做删除时,都是只考虑该 queue 未被 consume 的时间,而不考虑 queue 是否为空。所以,这两项配置是会将一个不会空的 queue 连同其中的 messages 一起被删除的,这点需要注意。他们的不同在于:
queue 的 auto-delete 为 true 时:只在 consumer 的数量减至 0 时,才会删除该 queue;
为 queue 配置了 TTL 时:只要该 queue 未被 consume 的时间达到了 TTL 配置的时间,该 queue 会被删除。



Policy:
http://www.rabbitmq.com/parameters.html#policies
引用
#增加一个名为 expiry 的 policy,功能为配置所有以 a.b.c. 开头的 queue(且后面至少还有一个字符) 的 TTL(since unused,及多长时间未被 consume) 为 60 秒:
$ sudo rabbitmqctl set_policy expiry "^a\.b\.c\..+$" '{"expires":60000}' --apply-to queues



retry that based-on Spring-Retry:
Consumer发生business exception时,可以使用Spring-Retry尝试重发(指定次数)。
retry操作是需要Message有messageId(org.springframework.amqp.core.MessageProperties.messageId)作为retry的依据的;如果你可以控制Producer端Message的生成,那请为生成的Message添加messageId,如果控制不了,可以使用下面的MissingMessageIdAdvice:
https://github.com/garyrussell/spring-amqp/commit/a8f06c6ff1225e5e65a94d16621b6f99db6f8ba6
retry 范例:
http://forum.springsource.org/showthread.php?125717-AMQP-1-0-1-SNAPSHOT-DLQ-and-Retry-Backoff-policy



Performance of rabbitMQ:
http://stackoverflow.com/questions/10030227/maximize-throughput-with-rabbitmq
引用
Use a larger prefetch count. Small values hurt performance.
A topic exchange is slower than a direct or a fanout exchange.
Make sure queues stay short. Longer queues impose more processing overhead.
If you care about latency and message rates then use smaller messages. Use an efficient format (e.g. avoid XML) or compress the payload.
Experiment with HiPE, which helps performance.
Avoid transactions and persistence. Also avoid publishing in immediate or mandatory mode. Avoid HA. Clustering can also impact performance.
You will achieve better throughput on a multi-core system if you have multiple queues and consumers.
Use at least v2.8.1, which introduces flow control. Make sure the memory and disk space alarms never trigger.
Virtualisation can impose a small performance penalty.
Tune your OS and network stack. Make sure you provide more than enough RAM. Provide fast cores and RAM.



RabbitMQ Flow Control:
http://www.rabbitmq.com/memory.html

http://www.cnblogs.com/zhengyun_ustc/archive/2012/08/25/flowcontrol.html
引用
1. Per-Connection Flow Control
是面向每一个连接做的流量控制。
即,RabbitMQ 会主动阻塞(Block)那些发布消息太快的连接(Connections),无需做任何配置。
如果连接被阻塞了,那么它在 rabbitmqctl 控制台上会显示一个blocked的状态。
RabbitMQ 的流量控制机制是基于信用证(Credit)的拥塞控制机制。
2. Memory-Based Flow Control
RabbitMQ 会在启动时检测机器的物理内存数值。默认当 MQ 占用 40% 以上内存时,MQ 会主动抛出一个内存警告并阻塞所有连接(Connections)。
你也可以通过修改 rabbitmq.config 文件来调整内存阈值,默认值是 0.4,如下所示:
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
当 MQ 启动时,该内存阈值也会写入到  RABBITMQ_NODENAME.log 文件里,如下所示:
Memory limit set to 2048MB.
如果 MQ Server 不能识别你的系统,或者你在用 Windows 系统,那么它会写一个警告信息到 RABBITMQ_NODENAME.log 文件里,如下所示:
=WARNING REPORT==== 29-Oct-2009::17:23:44 ===
Unknown total memory size for your OS {unix,magic_homebrew_os}. Assuming memory size is 1024MB.
3. Disk-Based Flow Control
默认情况,如果剩余磁盘空间在 1GB 以下,RabbitMQ 主动阻塞所有的生产者。这个阈值也是可调的。



Spring AMQP - Reference Documentation:
http://static.springsource.org/spring-amqp/reference/html/index.html
Spring Integration 的 AMQP 支持:
http://static.springsource.org/spring-amqp/reference/html/spring-integration-amqp.html
引用
In Spring Integration, "Channel Adapters" are unidirectional (one-way) whereas "Gateways" are bidirectional (request-reply). We provide an inbound-channel-adapter, outbound-channel-adapter, inbound-gateway, and outbound-gateway.
[Spring-based app] <---> [AMQP Broker]。inbound、outbound 之 进、出,都是站在[Spring-based app]的角度考虑:
amqp:inbound-channel-adapter:To receive AMQP Messages from a Queue
amqp:outbound-channel-adapter:To send AMQP Messages to an Exchange
amqp:inbound-gateway:To receive an AMQP Message from a Queue, and respond to its reply-to address
amqp:outbound-gateway:To send AMQP Messages to an Exchange and receive back a response from a remote client
adapter 和 gateway 的区别,举个例子,以 inbound 为例,如果你得到 message 后只是对其做处理,那就用adapter;如果在处理之外,还会有处理之后的返回值,那针对这个返回值,可以使用 gateway,gateway 的 reply-channel 就是放返回值的地方!



常用插件plugins:
Management Plugin 及  Management Command Line Tool:
http://stackoverflow.com/questions/10709533/is-it-possible-to-view-rabbitmq-message-contents-directly-from-the-command-line
http://www.rabbitmq.com/management.html
http://www.rabbitmq.com/plugins.html
http://www.rabbitmq.com/man/rabbitmq-plugins.1.man.html
#enable management plugin:
$ rabbitmq-plugins enable rabbitmq_management
#lists all plugins, on one line each.
$ rabbitmq-plugins list
#Disables the specified plugins and all plugins that depend on them.
$ rabbitmq-plugins disable amqp_client
http://www.rabbitmq.com/management-cli.html
引用
访问 http://server-name:15672/cli/ 下载 rabbitmq management 命令行工具,将其 cp 到 /usr/local/bin 即可在命令行下使用 rabbitmqadmin 命令。如:
//查看rabbitmqadmin帮助
$ rabbitmqadmin -h (or --help)
//查看可用的子命令
$ rabbitmqadmin help subcommands
//声明一个名为 test 的队列(可指定auto_delete(默认false)、durable(默认true)等参数):
$ rabbitmqadmin  declare queue name=test
//列出所有exchanges:
$ rabbitmqadmin list exchanges;
//发布一条 message 到队列 test(使用的是没有名字的默认exchange):
$ rabbitmqadmin publish exchange=amq.default routing_key=test payload="hello, world 1"
//取出队列 test 中的一条 message(默认 requeue 为 true,即重新放回原队列头(谨记不是requeue入队列尾,而是头;下次还是会取到该message)!)
$ rabbitmqadmin get queue=test requeue=false



spring amqp ReturnCallback的使用:
http://forum.springsource.org/showthread.php?127065-Support-for-ReturnListener-on-RabbitTemplate
关于amqp中消息的 mandatory 和 immediate:
http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish
引用
bit mandatory
This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.
The server SHOULD implement the mandatory flag.
bit immediate
This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed.
http://answerpot.com/showthread.php?1119669-default+queue+for+unroutable+messages
引用
The mandatory flag causes a basic.return if the message wasn't routed to a queue.
The immediate flag causes a basic.return if the message got to some queues but there were no consumers waiting for it.
https://groups.google.com/forum/?fromgroups=#!topic/rabbitmq-discuss/_SSqSLE65vo
引用
mandatory->"unroutable", immediate->"undelivered".
关于 rabbitmq Java API 中 Channel.basicNack() 和 Channel.basicReject 的区别:
http://www.rabbitmq.com/nack.html
引用
The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately, basic.reject provides no support for negatively acknowledging messages in bulk.
To solve this, RabbitMQ supports the basic.nack method that provides all the functionality of basic.reject whilst also allowing for bulk processing of messages.
To reject messages in bulk, clients set the multiple flag of the basic.nack method to true. The broker will then reject all unacknowledged, delivered messages up to and including the message specified in the delivery_tag field of the basic.nack method. In this respect, basic.nack complements the bulk acknowledgement semantics of basic.ack.



cluster 环境下的 failover(故障转移) & HA(High Availability):
http://www.rabbitmq.com/ha.html
http://www.rabbitmq.com/clustering.html#clients
Failover with RabbitMQ, the sender's story:
http://blog.zenika.com/index.php?post/2012/03/14/Failover-with-RabbitMQ%2C-the-sender-s-story

"Messaging for Modern Applications" 相关:
http://forum.springsource.org/showthread.php?123977-Connection-Failover
https://github.com/tmccuch

Spring Integration + Spring Retry:
http://forum.springsource.org/showthread.php?120707-Failproof-spring-amqp
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics