Rabbitmqchannel参数详解
1、Channel
1.hangeDeclare():
type:有direct、fanout、topic三种:
fanout
fanout类型的Exchange路由规则⾮常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
direct
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
topic
规则就是模糊匹配,可以通过通配符满⾜⼀部分规则就可以传送。它的约定是:
routing key为⼀个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每⼀段独⽴的字符串称为⼀个单词),如“”、
“ny.vmw”、“ange.rabbit” binding key与routing key⼀样也是句点号“. ”分隔的字符串。
binding key中可以存在两种特殊字符“”与“#”,⽤于做模糊匹配,其中“”⽤于匹配⼀个单词,“#”⽤于匹配多个单词(可以是零个)
durable:true、fal true:服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在。卫玠怎么读
原⽂:true if we are declaring a durable exchange (the exchange will survive a rver restart)
autoDelete:true、ue:当已经没有消费者时,服务器是否可以删除该Exchange。
原⽂1:true if the rver should delete the exchange when it is no longer in u。
/**
* Declare an exchange.
* @e com.rabbitmq.client.AMQP.Exchange.Declare
* @e com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a rver restart)
* @param autoDelete true if the rver should delete the exchange when it is no longer in u
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
1.2 chanel.basicQos()
prefetchSize:0
prefetchCount:会告诉RabbitMQ不要同时给⼀个消费者推送多于N个消息,即⼀旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack global:true\fal 是否将上⾯设置应⽤于channel,简单点说,就是上⾯限制是channel级别的还是consumer级别
备注:据说prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究
QoS = quality-of-rvice,顾名思义,服务的质量。通常我们设计系统的时候不能完全排除故障或保证说没有故障,⽽应该设计有完善的异常处理机制。在出现错误的时候知道在哪⾥出现什么样⼦的错误,原因是什么,怎么去恢复或者处理才是真正应该去做的。在接收消息出现故障的时候我们可以通过RabbitMQ重发机制来处理。重发就有重发次数的限制,有些时候你不可能不限次数的重发,这取决于消息的⼤⼩,重要程度和处理⽅式。
甚⾄QoS是在接收端设置的。发送端没有任何变化,接收端的代码也⽐较简单,只需要加如下代码:
channel.BasicQos(0, 1, fal);
代码第⼀个参数是可接收消息的⼤⼩的,但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运⾏到这⼀⾏的时候报错,说还没有实现不为0的情况。第⼆个参数是处理消息最⼤的数量。举个例⼦,如果输⼊1,那如果接收⼀个消息,但是没有应答,则客户端不会收到下⼀个消息,消息只会在队列中阻塞。如果输⼊3,那么可以最多有3个消息不应答,如果到达了3个,则发送端发给这个接收⽅得消息只会在队列中,⽽接收⽅不会有接收到消息的事件产⽣。总结说,就是在下⼀次发送应答消息前,客户端可以收到的消息最⼤数量。第三个参数则设置了是不是针对整个Connection的,因为⼀个Connection可以有多个Channel,如果是fal则说明只是针对于这个Channel的。
这种数量的设置,也为我们在多个客户端监控同⼀个queue的这种负载均衡环境下提供了更多的选择。
/**
* Request specific "quality of rvice" ttings.
*
* The ttings impo limits on the amount of data the rver
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @e com.rabbitmq.client.AMQP.Basic.Qos
* @param prefetchSize maximum amount of content (measured in
* octets) that the rver will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the rver
* will deliver, 0 if unlimited
* @param global true if the ttings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
1.3 channel.basicPublish()
参观routingKey:路由键,#匹配0个或多个单词,*匹配⼀个单词,在topic exchange做消息转发⽤
mandatory:true:如果exchange根据⾃⾝类型和消息routeKey⽆法找到⼀个符合条件的queue,那么会调⽤urn⽅法将消息返还给⽣产者。 fal:出现上述情形broker会直接将消息扔掉
immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放⼊队列中。
当与消息routeKey关联的所有queue(⼀个或多个)都没有消费者时,该消息会通过urn⽅法返还给⽣产者。BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化
这⾥指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
简单来说:mandatory标志告诉服务器⾄少将该消息route到⼀个队列中,否则将消息返还给⽣产者;
immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,
如果所有queue都没有消费者,直接把消息返还给⽣产者,不⽤将消息⼊队列等待消费者了。
/**
* Publish a message.
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which clos the channel.
*
* Invocations of <code>Channel#basicPublish</code> will eventually block if a
opp袋是什么* <a href="/alarms.html">resource-driven alarm</a> is in effect.
*
* @e com.rabbitmq.client.AMQP.Basic.Publish
* @e <a href="/alarms.html">Resource-driven alarms</a>.
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be t
* @param immediate true if the 'immediate' flag is to be
* t. Note that the RabbitMQ rver does not support this flag.
* @param props other properties for the message - routing headers etc
* @param body the message body
锅炉受热面
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
垂直搜索网站throws IOException;
1.4 channel.basicAck();
deliveryTag:该消息的index
multiple:是否批量.true:将⼀次性ack所有⼩于deliveryTag的消息。
/**
* Acknowledge one or veral received
* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being acknowledged.
* @e com.rabbitmq.client.AMQP.Basic.Ack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to acknowledge all messages up to and
* including the supplied delivery tag; fal to acknowledge just
* the supplied delivery tag.
* @throws java.io.IOException if an error is encountered
*/
绿化工void basicAck(long deliveryTag, boolean multiple) throws IOException;
1.5 channel.Envelope().getDeliveryTag(), fal, true);
deliveryTag:该消息的index
multiple:是否批量.true:将⼀次性拒绝所有⼩于deliveryTag的消息。
requeue:被拒绝的是否重新⼊队列
/**
* Reject one or veral received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @e com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; fal to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
1.6channel.Envelope().getDeliveryTag(), fal);
deliveryTag:该消息的index
requeue:被拒绝的是否重新⼊队列
channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,⽽basicReject⼀次只能拒绝⼀条消息
/**
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being rejected.
* @e com.rabbitmq.client.AMQP.Basic.Reject
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;
1.7 channel.basicConsume(QUEUE_NAME, true, consumer);
新鲜罗汉果
autoAck:是否⾃动ack,如果不⾃动ack,需要使⽤channel.ack、channel.nack、channel.basicReject 进⾏消息应答/**
* Start a non-nolocal, non-exclusive consumer, with
* a rver-generated consumerTag.
* @param queue the name of the queue
* @param autoAck true if the rver should consider messages
* acknowledged once delivered; fal if the rver should expect
* explicit acknowledgements
* @param callback an interface to the consumer object
* @return the consumerTag generated by the rver
* @throws java.io.IOException if an error is encountered
* @e com.rabbitmq.client.AMQP.Basic.Consume
* @e com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @e #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
1.hangeBind()
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
⽤于通过绑定bindingKey将queue到Exchange,之后便可以进⾏消息接收
推进的近义词/**
* Bind an exchange to an exchange, with no extra arguments.
* @e com.rabbitmq.client.AMQP.Exchange.Bind
* @e com.rabbitmq.client.AMQP.Exchange.BindOk
* @param destination the name of the exchange to which messages flow across the binding
* @param source the name of the exchange from which messages flow across the binding
* @param routingKey the routine key to u for the binding
* @return a binding-confirm method if the binding was successfully created
* @throws java.io.IOException if an error is encountered
*/
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
1.8 channel.queueDeclare(QUEUE_NAME, fal, fal, fal, null);
durable:true、fal true:在服务器重启时,能够存活
exclusive :是否为当前连接的专⽤队列,在连接断开后,会⾃动删除该队列,⽣产环境中应该很少⽤到吧。autodelete:当没有任何消费者使⽤时,⾃动删除该队列。
/**
* Declare a queue
* @e com.rabbitmq.client.AMQP.Queue.Declare
* @e com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a rver restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (rver will delete it when no longer in u)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
ref: