ThingsBoard二次开发之源码分析3-启动分析2

更新时间:2023-07-17 13:37:40 阅读: 评论:0

ThingsBoard⼆次开发之源码分析3-启动分析2
thingsboard聚集地
欢迎⼤家加⼊thingsboard ⼆次开发讨论群:69998183
ThingsBoard源码分析3-启动分析2
以下的分析环境基于内存消息队列和默认配置
1. DefaultTransportService
分析初始化⽅法:
吊唁@PostConstruct
青少年活动
public void init() {
/
/根据配置判断是否创建限流
if (rateLimitEnabled) {
//Just checking the configuration parameters
new TbRateLimits(perTenantLimitsConf);
new TbRateLimits(perDevicesLimitsConf);
}25的英文怎么写
this.schedulerExecutor = wSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));
this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) ssionReportTimeout), ssionReportTimeout, TimeUnit.MILLISECONDS);
/
/transportApiRequestTemplate的创建见下分析①,transportApiRequestTemplate中包含了
//⼀个⽣产者producerTemplate(requestTemplate)    topic:tb_spons  ②
//和⼀个消费者consumerTemplate (responTemplate)  topic:tb_spons.localHostName ③
transportApiRequestTemplate = ateTransportApiRequestTemplate();
//此处的producerProvider bean的创建是按照配置⽂件的值创建的,TbQueueProducerProvider有三个实现类,使⽤ConditionalOnExpression注解,读取pe的值(默认monolith),所以该Bean的实现类是TbCoreQueueProducerProvider,此  // 1.toTbCore                    topic:tb_core
// 2.toTransport                topic:ifications
// 3.toRuleEngine                topic:tb_rule_engine
// 4.toRuleEngineNotifications  topic:tb_rule_engine
// 5.toTbCoreNotifications      topic:tb_core
ruleEngineMsgProducer = RuleEngineMsgProducer();
tbCoreMsgProducer = TbCoreMsgProducer();
transportNotificationsConsumer = ateTransportNotificationsConsumer();
//fullTopic = topic:ifications.localHostName
TopicPartitionInfo tpi = NotificationsTopic(ServiceType.TB_TRANSPORT, ServiceId());
transportNotificationsConsumer.subscribe(Collections.singleton(tpi));
//见④分析
transportApiRequestTemplate.init();
网上游戏while (!stopped) {
try {
List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration);
if (records.size() == 0) {
continue;
好听的网名女生}
records.forEach(record -> {
try {
Value());
} catch (Throwable e) {
log.warn("Failed to process the notification.", e);
}
});
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(notificationsPollDuration);
} catch (InterruptedException e2) {
}
}
}
}
});
亲的成语}
①createTransportApiRequestTemplate In InMemoryTbTransportQueueFactory,因为我们没有启⽤相应的消息队列中间件,我们分析InMemoryTbTransportQueueFactory:
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponMsg>> createTransportApiRequestTemplate() {
合拢的反义词//根据配置⽂件值quests_topic获取到的topic是tb_quests创建了⽣产者
InMemoryTbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
new InMemoryTbQueueProducer<>(RequestsTopic());
//根据配置⽂件值spons_topic获取到的topic是tb_spons
/
/加上rviceId(我们在第⼆篇分析中提到,本机的HostName作为rviceId,其topic就是tb_spons.localHostName
InMemoryTbQueueConsumer<TbProtoQueueMsg<TransportApiResponMsg>> consumerTemplate =
new InMemoryTbQueueConsumer<>(ResponsTopic() + "." + ServiceId());
//使⽤建造者模式返回了TbQueueRequestTemplate实例,其中包含了⼀个消费者和⼀个⽣产者
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
templateBuilder.queueAdmin(new TbQueueAdmin() {
@Override
public void createTopicIfNotExists(String topic) {}
@Override
public void destroy() {}
});
templateBuilder.MaxPendingRequests());
templateBuilder.MaxRequestsTimeout());
templateBuilder.ResponPollInterval());
return templateBuilder.build();
}
④init() in DefaultTbQueueRequestTemplate:
public void init() {
//按照是使⽤的中间件,实现不同的初始化⽅法,Inmemory该⽅法体为空
tickTs = System.currentTimeMillis();
//见③,订阅主题为 tb_spons.localHostName
responTemplate.subscribe();
executor.submit(() -> {
long nextCleanupMs = 0L;
while (!stopped) {
try {
/
/从消息队列⾥⾯获取消息
List<Respon> respons = responTemplate.poll(pollInterval);
...........
2.TbCoreTransportApiService
PostConstruct注解⽅法:
@PostConstruct
public void init() {
//topic是配置⽂件spons_topic的值默认为:tb_spons ⑤
TbQueueProducer<TbProtoQueueMsg> producer = ateTransportApiResponProducer();
//topic是配置⽂件quests_topic的值,默认为:tb_quests ⑥
TbQueueConsumer<TbProtoQueueMsg> consumer = ateTransportApiRequestConsumer();
DefaultTbQueueResponTemplate.DefaultTbQueueResponTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponMsg>> builder = DefaultTbQueueResponTemplate.builder();
builder.maxPendingRequests(maxPendingRequests);
builder.pollInterval(responPollDuration);
水墨宏村utor(transportCallbackExecutor);
builder.handler(transportApiService);
transportApiTemplate = builder.build();
- `@EventListener(ApplicationReadyEvent.class)`注解⽅法,调⽤了`transportApiTemplate.init(transportApiService);``transportApiTemplate`即上⼀步创建的`DefaultTbQueueResponTemplate`对象`init()`⽅法为:
```java
@Override
public void init(TbQueueHandler<Request, Respon> handler) {
//按照是使⽤的中间件,实现不同的初始化⽅法,Inmemory该⽅法体为空
//见⑥,订阅主题为tb_quests
requestTemplate.subscribe();
loopExecutor.submit(() -> {
while (!stopped) {
try {
while (() >= maxPendingRequests) {
try {
Thread.sleep(pollInterval);
} catch (InterruptedException e) {
}
}
List<Request> requests = requestTemplate.poll(pollInterval);
.
..........
总结
DefaultTransportService和TbCoreTransportApiService⽅法的启动并不是很复杂,我们需要将主要的关注点放在两个Bean初始化消费者和⽣产者的topic上⾯,thingsboard将使⽤中间件将消息解耦,如果按照传统的调试⽅法很容易找不到消息的流向,此时我们将topic作为关键的切⼊点,⽅便后⾯整个数据流的分析。

本文发布于:2023-07-17 13:37:40,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/1085117.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:分析   消息   队列   配置   创建   中间件   消费者   返回
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图