ThingsBoard⼆次开发之源码分析3-启动分析2
thingsboard聚集地
欢迎⼤家加⼊thingsboard ⼆次开发讨论群:69998183
ThingsBoard源码分析3-启动分析2
以下的分析环境基于内存消息队列和默认配置
1. DefaultTransportService
分析初始化⽅法:
吊唁@PostConstruct
青少年活动
public void init() {
/
/根据配置判断是否创建限流
if (rateLimitEnabled) {
//Just checking the configuration parameters
new TbRateLimits(perTenantLimitsConf);
new TbRateLimits(perDevicesLimitsConf);
}25的英文怎么写
this.schedulerExecutor = wSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));
this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) ssionReportTimeout), ssionReportTimeout, TimeUnit.MILLISECONDS);
/
/transportApiRequestTemplate的创建见下分析①,transportApiRequestTemplate中包含了
//⼀个⽣产者producerTemplate(requestTemplate) topic:tb_spons ②
//和⼀个消费者consumerTemplate (responTemplate) topic:tb_spons.localHostName ③
transportApiRequestTemplate = ateTransportApiRequestTemplate();
//此处的producerProvider bean的创建是按照配置⽂件的值创建的,TbQueueProducerProvider有三个实现类,使⽤ConditionalOnExpression注解,读取pe的值(默认monolith),所以该Bean的实现类是TbCoreQueueProducerProvider,此 // 1.toTbCore topic:tb_core
// 2.toTransport topic:ifications
// 3.toRuleEngine topic:tb_rule_engine
// 4.toRuleEngineNotifications topic:tb_rule_engine
// 5.toTbCoreNotifications topic:tb_core
ruleEngineMsgProducer = RuleEngineMsgProducer();
tbCoreMsgProducer = TbCoreMsgProducer();
transportNotificationsConsumer = ateTransportNotificationsConsumer();
//fullTopic = topic:ifications.localHostName
TopicPartitionInfo tpi = NotificationsTopic(ServiceType.TB_TRANSPORT, ServiceId());
transportNotificationsConsumer.subscribe(Collections.singleton(tpi));
//见④分析
transportApiRequestTemplate.init();
网上游戏while (!stopped) {
try {
List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration);
if (records.size() == 0) {
continue;
好听的网名女生}
records.forEach(record -> {
try {
Value());
} catch (Throwable e) {
log.warn("Failed to process the notification.", e);
}
});
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(notificationsPollDuration);
} catch (InterruptedException e2) {
}
}
}
}
});
亲的成语}
①createTransportApiRequestTemplate In InMemoryTbTransportQueueFactory,因为我们没有启⽤相应的消息队列中间件,我们分析InMemoryTbTransportQueueFactory:
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponMsg>> createTransportApiRequestTemplate() {
合拢的反义词//根据配置⽂件值quests_topic获取到的topic是tb_quests创建了⽣产者
InMemoryTbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
new InMemoryTbQueueProducer<>(RequestsTopic());
//根据配置⽂件值spons_topic获取到的topic是tb_spons
/
/加上rviceId(我们在第⼆篇分析中提到,本机的HostName作为rviceId,其topic就是tb_spons.localHostName
InMemoryTbQueueConsumer<TbProtoQueueMsg<TransportApiResponMsg>> consumerTemplate =
new InMemoryTbQueueConsumer<>(ResponsTopic() + "." + ServiceId());
//使⽤建造者模式返回了TbQueueRequestTemplate实例,其中包含了⼀个消费者和⼀个⽣产者
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
templateBuilder.queueAdmin(new TbQueueAdmin() {
@Override
public void createTopicIfNotExists(String topic) {}
@Override
public void destroy() {}
});
templateBuilder.MaxPendingRequests());
templateBuilder.MaxRequestsTimeout());
templateBuilder.ResponPollInterval());
return templateBuilder.build();
}
④init() in DefaultTbQueueRequestTemplate:
public void init() {
//按照是使⽤的中间件,实现不同的初始化⽅法,Inmemory该⽅法体为空
tickTs = System.currentTimeMillis();
//见③,订阅主题为 tb_spons.localHostName
responTemplate.subscribe();
executor.submit(() -> {
long nextCleanupMs = 0L;
while (!stopped) {
try {
/
/从消息队列⾥⾯获取消息
List<Respon> respons = responTemplate.poll(pollInterval);
...........
2.TbCoreTransportApiService
PostConstruct注解⽅法:
@PostConstruct
public void init() {
//topic是配置⽂件spons_topic的值默认为:tb_spons ⑤
TbQueueProducer<TbProtoQueueMsg> producer = ateTransportApiResponProducer();
//topic是配置⽂件quests_topic的值,默认为:tb_quests ⑥
TbQueueConsumer<TbProtoQueueMsg> consumer = ateTransportApiRequestConsumer();
DefaultTbQueueResponTemplate.DefaultTbQueueResponTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponMsg>> builder = DefaultTbQueueResponTemplate.builder();
builder.maxPendingRequests(maxPendingRequests);
builder.pollInterval(responPollDuration);
水墨宏村utor(transportCallbackExecutor);
builder.handler(transportApiService);
transportApiTemplate = builder.build();
- `@EventListener(ApplicationReadyEvent.class)`注解⽅法,调⽤了`transportApiTemplate.init(transportApiService);``transportApiTemplate`即上⼀步创建的`DefaultTbQueueResponTemplate`对象`init()`⽅法为:
```java
@Override
public void init(TbQueueHandler<Request, Respon> handler) {
//按照是使⽤的中间件,实现不同的初始化⽅法,Inmemory该⽅法体为空
//见⑥,订阅主题为tb_quests
requestTemplate.subscribe();
loopExecutor.submit(() -> {
while (!stopped) {
try {
while (() >= maxPendingRequests) {
try {
Thread.sleep(pollInterval);
} catch (InterruptedException e) {
}
}
List<Request> requests = requestTemplate.poll(pollInterval);
.
..........
总结
DefaultTransportService和TbCoreTransportApiService⽅法的启动并不是很复杂,我们需要将主要的关注点放在两个Bean初始化消费者和⽣产者的topic上⾯,thingsboard将使⽤中间件将消息解耦,如果按照传统的调试⽅法很容易找不到消息的流向,此时我们将topic作为关键的切⼊点,⽅便后⾯整个数据流的分析。