SpringbootRabbitMq源码解析之RabbitListener的Message。。。

更新时间:2023-06-18 05:18:32 阅读: 评论:0

SpringbootRabbitMq源码解析之RabbitListener的Message。。。
在中,我们解析了Springboot中是如何根据RabbitListener注解⽣成MessageListenerContainer。
在,我们以SimpleMessageListenerContainer解析了MessageListener是如何消息rabbitmq消息的。对于messageListener类型的监听器,SimpleMessageListenerContainer最终是通过MessagListener#onMessage⽅法进⾏了消息的消费逻辑。
接下来,我们关注的重点是在RabbitListener注解的MessageListener#onMessage处理逻辑。
⼀、⽣成MessageListenerContainer
从上⽂可以看到,在 RabbitListenerEndpointRegistry#registerListenerContainer中的通过MessageListenerContainer container = createListenerContainer(endpoint, factory);⽣成MessageListenerContainer。
1. RabbitListenerEndpointRegistry#createListenerContainer
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
RabbitListenerContainerFactory<?> factory) {
MessageListenerContainer listenerContainer = ateListenerContainer(endpoint);
if (listenerContainer instanceof InitializingBean) {
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
}
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
}
}
int containerPha = Pha();
if (containerPha < Integer.MAX_VALUE) {  // a custom pha value
if (this.pha < Integer.MAX_VALUE && this.pha != containerPha) {
throw new IllegalStateException("Encountered pha mismatch between container factory definitions: " +
this.pha + " vs " + containerPha);
}
this.pha = Pha();
}
return listenerContainer;
}
点击MessageListenerContainer listenerContainer = ateListenerContainer(endpoint);继续跟踪。
2. AbstractRabbitListenerContainerFactory#createListenerContainer
@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
科学家发现地球上最干净空气
C instance = createContainerInstance();
if (tionFactory != null) {
instance.tionFactory);
}
if (Handler != null) {
instance.Handler);
}
if (ssageConverter != null) {
instance.ssageConverter);
}
if (this.acknowledgeMode != null) {
instance.tAcknowledgeMode(this.acknowledgeMode);
}
if (this.channelTransacted != null) {
instance.tChannelTransacted(this.channelTransacted);
}
if (this.applicationContext != null) {
instance.tApplicationContext(this.applicationContext);
}
if (this.taskExecutor != null) {
instance.tTaskExecutor(this.taskExecutor);
}
if (ansactionManager != null) {
instance.ansactionManager);
}
if (this.prefetchCount != null) {
instance.tPrefetchCount(this.prefetchCount);
}
if (this.defaultRequeueRejected != null) {
instance.tDefaultRequeueRejected(this.defaultRequeueRejected);
}
if (this.adviceChain != null) {
instance.tAdviceChain(this.adviceChain);
}
if (veryBackOff != null) {
instance.veryBackOff);
}
if (this.mismatchedQueuesFatal != null) {
instance.tMismatchedQueuesFatal(this.mismatchedQueuesFatal);
}
if (this.missingQueuesFatal != null) {
instance.tMissingQueuesFatal(this.missingQueuesFatal);
}
if (sumerTagStrategy != null) {
instance.sumerTagStrategy);
}
if (this.idleEventInterval != null) {
instance.tIdleEventInterval(this.idleEventInterval);
}
if (this.failedDeclarationRetryInterval != null) {
instance.tFailedDeclarationRetryInterval(this.failedDeclarationRetryInterval); }
if (this.applicationEventPublisher != null) {
instance.tApplicationEventPublisher(this.applicationEventPublisher);
wode
}
keyshia coleif (AutoStartup() != null) {
instance.AutoStartup());
}
el if (this.autoStartup != null) {
instance.tAutoStartup(this.autoStartup);
}
if (this.pha != null) {
instance.tPha(this.pha);
}
if (this.afterReceivePostProcessors != null) {
instance.tAfterReceivePostProcessors(this.afterReceivePostProcessors); }
instance.Id());
endpoint.tupListenerContainer(instance);
initializeContainer(instance, endpoint);
return instance;
}
AbstractRabbitListenerContainerFactory是MessageListener的抽象⼯⼚类,⽅法中的绝⼤多数内容都只是给对象属性赋值,需要关注的是endpoint.tupListenerContainer(instance);语句。
3. AbstractRabbitListenerEndpoint#tListenerContainer
@Override
public void tupListenerContainer(MessageListenerContainer listenerContainer) {
AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) listenerContainer;
boolean queuesEmpty = getQueues().isEmpty();
boolean queueNamesEmpty = getQueueNames().isEmpty();
if (!queuesEmpty && !queueNamesEmpty) {
throw new IllegalStateException("Queues or queue names must be provided but not both for " + this);
}
if (queuesEmpty) {
Collection<String> names = getQueueNames();
container.Array(new String[names.size()]));
}
el {
Collection<Queue> instances = getQueues();
container.Array(new Queue[instances.size()]));
}
container.tExclusive(isExclusive());
if (getPriority() != null) {
哈利波特txt全集下载Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", getPriority());
container.tConsumerArguments(args);
}
if (getAdmin() != null) {
container.tRabbitAdmin(getAdmin());
}
tupMessageListener(listenerContainer);
}
protected abstract MessageListener createMessageListener(MessageListenerContainer container);
private void tupMessageListener(MessageListenerContainer container) {
香港大学研究生专业
MessageListener messageListener = createMessageListener(container);
Asrt.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener");
give it a gocontainer.tupMessageListener(messageListener);
}
⼆、⽣成MessageListener
可以看到,上⽂中⽣成MessageListener的关键⽅法AbstractRabbitListenerEndpoint#createMessageListener是⼀个抽象⽅法。AbstractRabbitListenerEndpoint有3个⼦类,分别SimpleRabbitListenerEndpoint,MethodRabbitListenerEndpoint和MultiMethodRabbitListenerEndpoint,其中MultiMethodRabbitListenerEndpoint是MethodRabbitListenerEndpoint的⼦类。
回到上⽂RabbitListenerAnnotationBeanPostProcessor的processAmqpListener和processMultiMethodListeners可以看
到,RabbitListener注解对应的endpoint有两种。
(1)当RabbitListener注解在⽅法上时,对应的endpoint就是MethodRabbitListenerEndpoint。
(2)当RabbitListener注解在类上时,和RabbitHandle注解配合使⽤,对应的endpoint就是MultiMethodRabbitListenerEndpoint。
1. MethodRabbitListenerEndpoint#createMessageListener
@Override
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
Asrt.ssageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not t");
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
messageListener.tHandlerMethod(configureListenerAdapter(messageListener));
String replyToAddress = getDefaultReplyToAddress();
if (replyToAddress != null) {
messageListener.tResponAddress(replyToAddress);
}
MessageConverter messageConverter = MessageConverter();
if (messageConverter != null) {
messageListener.tMessageConverter(messageConverter);
藤门留学}
if (getBeanResolver() != null) {
messageListener.tBeanResolver(getBeanResolver());
}
return messageListener;
}
/**
* Create a {@link HandlerAdapter} for this listener adapter.
* @param messageListener the listener adapter.
* @return the handler adapter.
*/
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) {
InvocableHandlerMethod invocableHandlerMethod =
return new HandlerAdapter(invocableHandlerMethod);
}
/**
* Create an empty {@link MessagingMessageListenerAdapter} instance.
* @return the {@link MessagingMessageListenerAdapter} instance.
*/
protected MessagingMessageListenerAdapter createMessageListenerInstance() {
return new MessagingMessageListenerAdapter(this.bean, hod, urnExceptions, Handler);
}
可以看到RabbitListener注解对应的MessageListener类型是MessagingMessageListenerAdapter。
2. MultiMethodRabbitListenerEndpoint#configureListenerAdapter
MultiMethodRabbitListenerEndpoint作为MethodRabbitListenerEndpoint的⼦类,其createMessageListener⽅法与⽗类相同,不过其重写了其中的configureListenerAdapter⽅法。
@Override
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) {
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<InvocableHandlerMethod>();
for (Method method : hods) {
invocableHandlerMethods.add(getMessageHandlerMethodFactory()
.createInvocableHandlerMethod(getBean(), method));
}
this.delegatingHandler = new DelegatingInvocableHandler(invocableHandlerMethods, getBean(), getResolver(),
getBeanExpressionContext());
return new HandlerAdapter(this.delegatingHandler);
}
三. HandlerAdapter类
可以看到MethodRabbitListenerEndpoint和MultiMethodRabbitListenerEndpoint对应的都是MessagingMessageListenerAdapter,彼此的最⼤区别就是handlerMethod不同,为了保持该属性的类型⼀致,特地新建了⼀个适配器类:HandlerAdapter。
HandlerAdapter的属性很简单,只有两个:InvocableHandlerMethod和DelegatingInvocableHandler。
其中InvocableHandlerMethod⽤于MethodRabbitListenerEndpoint,DelegatingInvocableHandler⽤于MultiMethodRabbitListenerEndpoint,彼此之间没有其他联系。
核⼼⽅法是invoke⽅法,执⾏InvocableHandlerMethod或者DelegatingInvocableHandler的invoke⽅法。
public Object invoke(Message<?> message, providedArgs) throws Exception {
if (this.invokerHandlerMethod != null) {
return this.invokerHandlerMethod.invoke(message, providedArgs);
}
el {
return this.delegatingHandler.invoke(message, providedArgs);
}
}
1. InvocableHandlerMethod类
InvocableHandlerMethod类是HandlerMethod类的⼦类,核⼼⽅法是invoke⽅法,基于接收到的Message和参数执⾏rabbit的监听⽅法。swing是什么意思
@Nullable
public Object invoke(Message<?> message, providedArgs) throws Exception {
Object[] args = getMethodArgumentValues(message, providedArgs);
if (logger.isTraceEnabled()) {
"' with arguments " + String(args));
}
Object returnValue = doInvoke(args);
if (logger.isTraceEnabled()) {
"] returned [" + returnValue + "]");
}
return returnValue;
}
2. DelegatingInvocableHandler类
当RabbitListener注解在类上时,可以同时有多个被RabbitHandle注解的⽅法⽤于处理同意queue的消息,因此在DelegatingInvocableHandler类中⼀个属性rivate final List<InvocableHandlerMethod> handlers;为InvocableHandlerMethod的集合,同样的核⼼⽅法也是invoke⽅法,多了⼀个根据消息内容的类型选择对应的InvocableHandlerMethod的逻辑。
天译翻译
public Object invoke(Message<?> message, providedArgs) throws Exception {
guest houClass<? extends Object> payloadClass = Payload().getClass();
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
Object result = handler.invoke(message, providedArgs);
if (Headers().get(AmqpHeaders.REPLY_TO) == null) {
Expression replyTo = (handler);
if (replyTo != null) {
result = new AbstractAdaptableMessageListener.ResultHolder(result, replyTo);
}
}
return result;
}
四、MessagingMessageListenerAdapter类

本文发布于:2023-06-18 05:18:32,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/148997.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:类型   对应   消息   地球
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图