rabbitmq配置SimpleRabbitListenerContainerFactor。。。

更新时间:2023-07-01 12:00:53 阅读: 评论:0

rabbitmq配置SimpleRabbitListenerContainerFactor。。。
代码
@Configuration
@EnableRabbit
山西过油肉的做法public class RabbitMqConfigurer {
// 在容器中注⼊  SimpleRabbitListenerContainerFactory
睡觉用英语怎么说@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory customContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory =new SimpleRabbitListenerContainerFactory();
factory.tConcurrentConsumers(4);//设置线程数
factory.tMaxConcurrentConsumers(4);//最⼤线程数
return factory;
}
}
rabbitmq 配置 containerFactory 属性
@RabbitListener(queues ="${custom.queue}", containerFactory ="customContainerFactory")
public void publish(Entity dto){
}
rabbitmq 执⾏流程
RabbitListenerEndpointRegistrar 实现 InitializingBean 接⼝,启动会⾃动被调⽤
org.springframework.beans.factory.InitializingBean#afterPropertiesSet⽅法
@Override
public void afterPropertiesSet(){
registerAllEndpoints();
}
注册所有的端点
protected void registerAllEndpoints(){
Asrt.dpointRegistry !=null,"No registry available");
dpointDescriptors){
for(AmqpListenerEndpointDescriptor descriptor :dpointDescriptors){
dpoint instanceof MultiMethodRabbitListenerEndpoint &&this.validator !=null){
((MultiMethodRabbitListenerEndpoint) dpoint).tValidator(this.validator);
}
resolveContainerFactory(descriptor));
}
this.startImmediately =true;// trigger immediate startup
}
}
org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar#resolveContainerF actory
private RabbitListenerContainerFactory<?>resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor){
ainerFactory !=null){
ainerFactory;
}
el ainerFactory !=null){
ainerFactory;
}
el ainerFactoryBeanName !=null){
Asrt.state(this.beanFactory !=null,"BeanFactory must be t to obtain container factory by bean name");
// 得到注⼊容器中的  containerFactory
ainerFactory;// Consider changing this if live change of the factory is required
}
el{
throw new IllegalStateException("Could not resolve the "+
SimpleName()+" to u for ["+
}
}
spring执⾏到 t.support.AbstractApplicationContext#finishRefresh ⽅法
protected void finishRefresh(){
// Clear context-level resource caches (such as ASM metadata from scanning).
clearResourceCaches();
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// 这⼀步
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// Publish the final event.
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
if(!NativeDetector.inNativeImage()){
}
}
t.support.DefaultLifecycleProcessor#startBeans
private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
Map<Integer, LifecycleGroup> phas = new TreeMap<>();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int pha = getPha(bean);
pha,
p -> new LifecycleGroup(pha, this.timeoutPerShutdownPha, lifecycleBeans, autoStartupOnly)私人
).add(beanName, bean);
}
});
if (!phas.isEmpty()) {
//执⾏start
phas.values().forEach(LifecycleGroup::start);
}
}
t.support.DefaultLifecycleProcessor.LifecycleGroup#start
public void start(){
bers.isEmpty()){
return;
}
if(logger.isDebugEnabled()){
logger.debug("Starting beans in pha "+this.pha);
}
Collections.bers);
for(LifecycleGroupMember member :bers){
doStart(this.lifecycleBeans, member.name,this.autoStartupOnly);
}
}
t.support.DefaultLifecycleProcessor#doStart
private void doStart(Map<String,?extends Lifecycle> lifecycleBeans, String beanName,boolean autoStartupOnly){
Lifecycle bean = ve(beanName);
if(bean !=null&& bean !=this){
String[] dependenciesForBean =getBeanFactory().getDependenciesForBean(beanName);
for(String dependency : dependenciesForBean){
doStart(lifecycleBeans, dependency, autoStartupOnly);
}
if(!bean.isRunning()&&
(!autoStartupOnly ||!(bean instanceof SmartLifecycle)||((SmartLifecycle) bean).isAutoStartup())){
if(logger.isTraceEnabled()){
}
try{
// 调⽤start
bean.start();
}
catch(Throwable ex){
throw new ApplicationContextException("Failed to start bean '"+ beanName +"'", ex);
}
if(logger.isDebugEnabled()){
logger.debug("Successfully started bean '"+ beanName +"'");
}
}
}
}
执⾏ RabbitListenerEndpointRegistry 的start⽅法
public void start(){
for(MessageListenerContainer listenerContainer :getListenerContainers()){
startIfNecessary(listenerContainer);
党风廉政建设工作计划
}
}
org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry#startIfNecessary
private void startIfNecessary(MessageListenerContainer listenerContainer){
tRefreshed || listenerContainer.isAutoStartup()){
// simpleRabbitListenerContainerFactory SimpleMessageListenerContainer
listenerContainer.start();
}
}
SimpleRabbitListenerContainer
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
// 执⾏⽗类 AbstractMessageListenerContainer 的start⽅法
public void start(){
if(isRunning()){
return;
}
if(!this.initialized){
synchronized(this.lifecycleMonitor){
if(!this.initialized){
afterPropertiesSet();
}
}
}
try{
logger.debug("Starting Rabbit listener container.");
configureAdminIfNeeded();
checkMismatchedQueues();
相争鹬蚌doStart();
}
catch(Exception ex){
throw convertRabbitAccessException(ex);
}
finally{
this.lazyLoad =fal;
大禹治水课文原文}
}
}
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#checkMismatch edQueues
protected void checkMismatchedQueues(){
if(this.mismatchedQueuesFatal &&this.amqpAdmin !=null){
try{
写一件事的作文400字// 初始化
this.amqpAdmin.initialize();
}
catch(AmqpConnectException e){
logger.info("Broker not available; cannot check queue declarations");
}
catch(AmqpIOException e){
if(RabbitUtils.isMismatchedQueueArgs(e)){
throw new FatalListenerStartupException("Mismatched queues", e);
}
el{
logger.info("Failed to get connection during start(): "+ e);
}
}
}
el{
try{
Connection connection =getConnectionFactory().createConnection();// NOSONAR
if(connection !=null){
connection.clo();
}
}
catch(Exception e){
logger.info("Broker not available; cannot force queue declarations during start: "+ e.getMessage());
}
}
}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart
protected void doStart(){
Asrt.state(!sumerBatchEnabled ||getMessageListener()instanceof BatchMessageListener
||getMessageListener()instanceof ChannelAwareBatchMessageListener,
"When tting 'consumerBatchEnabled' to true, the listener must support batching");
checkListenerContainerAware();
super.doStart();
sumersMonitor){
sumers !=null){
throw new IllegalStateException("A stopped container should not have consumers");
}
int newConsumers =initializeConsumers();
sumers ==null){
logger.info("Consumers were initialized and then cleared "+
"(presumably the container was stopped concurrently)");
return;
}
if(newConsumers <=0){
if(logger.isInfoEnabled()){
logger.info("Consumers are already running");
}
return;
}
Set<AsyncMessageProcessingConsumer> processors =new HashSet<AsyncMessageProcessingConsumer>();
// 关键代码
for(BlockingQueueConsumer consumer :sumers){
//
AsyncMessageProcessingConsumer processor =new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 提交任务执⾏
getTaskExecutor().execute(processor);
if(getApplicationEventPublisher()!=null){
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
waitForConsumersToStart(processors);
}
}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessagePr ocessingConsumer#run
public void run(){// NOSONAR - line count
if(!isActive()){
return;
}
boolean aborted =fal;
String routingLookupKey =getRoutingLookupKey();
if(routingLookupKey !=null){
SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey);// NOSONAR both never null
}
QueueCount()<1){
if(logger.isDebugEnabled()){
logger.debug("Consumer stopping; no queues for "+sumer);生日大蛋糕
}
SimpleMessageListenerContainer.sumer);
if(getApplicationEventPublisher()!=null){
getApplicationEventPublisher().publishEvent(
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.sumer));

本文发布于:2023-07-01 12:00:53,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/1063092.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:过油肉   计划   设置   配置   启动   端点   得到   课文
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图