ROS2探索(⼆)executor
前⾔
在 我们接触了executor这个新东西,发现spin函数其实调⽤的就是executor的接⼝,⼀个executor可以通过add_node来添加多个节点,然后不断循环执⾏准备好了的AnyExecutable对象。
本⽂围绕ros2源码中的executors⽰例,学习下多线程executor,进⼀步分析executor处理subscription的流程。
正⽂
打开ros2/examples/rclcpp/executors/下的multithreaded_executor.cpp,直奔main函数:
int main(int argc,char* argv[])
{
吕蒙传rclcpp::init(argc, argv);
rclcpp::executors::MultiThreadedExecutor executor;
auto pubnode = std::make_shared<PublisherNode>();
auto subnode = std::make_shared<DualThreadedNode>();
executor.add_node(pubnode);
executor.add_node(subnode);
executor.spin();
rclcpp::shutdown();
葱爆猪肝
return0;
}
表达爱意的话
这⾥构造了两个节点,PublisherNode和DualThreadedNode,其中DualThreadedNode从名字来看拥有两个线程⽤来监听话题消息。多线程的节点必须要⽤MultiThreadedExecutor,这是和之前的不⼀样的地⽅。
接下来⽰例给出了⼀个进程拥有多个节点的例⼦,只需要调⽤add_node添加这些节点即可,添加完了
开始spin,不过这⾥不⽤给定节点指针对象,因为直接调⽤的executor.spin:
executor.add_node(pubnode);
executor.add_node(subnode);
executor.spin();
PublisherNode没什么好说的,和前⼀篇⽂章中基本⼀样。直接看DualThreadedNode:
rclcpp::CallbackGroup::SharedPtr callback_group_subscriber1_;
rclcpp::CallbackGroup::SharedPtr callback_group_subscriber2_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription1_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription2_;
DualThreadedNode拥有两个Subscription成员⽤于监听同⼀个话题以及Subscription对应的回调组(CallbackGroup)。
回调组通过Node::create_callback_group进⾏创建:
callback_group_subscriber1_ =this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
callback_group_subscriber2_ =this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
参数MutuallyExclusive表⽰此组的回调函数是互斥的,⼀次只能同时执⾏⼀个。如果配置成CallbackGroupType::Reentrant则同时可以执⾏多个。
接着创建SubscriptionOptions并设置所属回调组:
auto sub1_opt = rclcpp::SubscriptionOptions();
sub1_opt.callback_group = callback_group_subscriber1_;
auto sub2_opt = rclcpp::SubscriptionOptions()
sub2_opt.callback_group = callback_group_subscriber2_;
然后⽤这些Options创建Subscription,这⾥没有⽤默认参数⽽是传⼊了带回调组的SubscriptionOptions:
subscription1_ =this->create_subscription<std_msgs::msg::String>(
"topic",
rclcpp::QoS(10),
std::bind(&DualThreadedNode::subscriber1_cb,std::placeholders::_1),
sub1_opt);
回调函数与之前的lambda表达式不同,这⾥通过std::bind绑定了成员函数作为作为回调函数,占位符表⽰回调函数待定的输⼊。
我们现在重点来看⼀下rclcpp::executors::MultiThreadedExecutor对象。⾸先多线程executor的spin函数如下:
void MultiThreadedExecutor::spin()
{
...
std::vector<std::thread> threads;
size_t thread_id =0;
{
std::lock_guard<std::mutex>wait_lock(wait_mutex_);
for(; thread_id < number_of_threads_ -1;++thread_id)
{
auto func = std::bind(&MultiThreadedExecutor::run,this, thread_id);
}
}
run(thread_id);
for(auto&thread : threads)
{
thread.join();
}
}
spin函数中⾸先根据number_of_threads_创建该数量个std::thread,每个thread绑定run成员函数。run函数与单线程的spin类似,其中增加了线程安全和线程定时器操作。
number_of_threads由⽤户指定,默认为std::thread::hardware_concurrency()即cpu的核⼼数,我的输出是8个线程:
当使⽤8个线程时,ros2 run examples_rclcpp_multithreaded_executor multithreaded_executor执⾏后输出如下:
可以看出节点线程ID不固定,如果将MultiThreadedExecutor的线程数⽬改为1` :
rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(),1);
得到的输出如下:
发现此时三个节点共⽤了同⼀个线程。
⾄此得出以下结论:
executor有MultiThreadedExecutor和SingleThreadedExecutor两种,MultiThreadedExecutor默认开启CPU核⼼数个线程每个线程独⽴地检查executor中已添加节点是否有AnyExecutable,其既可以是publish也可以是subscription
分析完多线程executor的线程创建,我们来看⼀下executor是执⾏订阅消息回调的过程。
⾸先Executor开始spin后每个线程不断循环三个步骤,即下图所⽰:
其中get_next_executable()调⽤get_next_subscription()来检查当前是否收到消息
void get_next_subscription(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override
{
auto it = subscription_handles_.begin();
while(it != subscription_handles_.end())
{
auto subscription =get_subscription_by_handle(*it, weak_nodes);
if(subscription)
{
// Find the group for this handle and e if it can be rviced
auto group =get_group_by_subscription(subscription, weak_nodes);
if(!group)
{
// Group was not found, meaning the subscription is
// Remove it from the ready list and continue looking
it = subscription_handles_.era(it);
continue;
}
if(!group->can_be_taken_from().load())
{
// Group is mutually exclusive and is being ud, so skip it for now
/
/ Leave it to be checked next time, but continue arching
++it;
continue;
}
// Otherwi it is safe to t and return the any_exec
any_exec.subscription = subscription;
any_exec.callback_group = group;
de_ba =get_node_by_group(group, weak_nodes);
subscription_handles_.era(it);
return;
}
/
/ El, the subscription is no longer valid, remove it and continue
it = subscription_handles_.era(it);
}乒乓葡萄
}
get_next_subscription遍历当前节点的所有rcl_subscription_t句柄,对每个rcl_subscription_t句柄检查其所在回调组是否为MutuallyExclusive,如果是MutuallyExclusive互斥组那么还得确保当前回调没有其他线程正在调⽤。
得到⼀个包含subscription的AnyExecutable对象之后,我们执⾏execute_any_executable中的execute_subscription函数,这个函数很长⽽且不太容易理解,挑出了关键部分如下:
void Executor::execute_subscription(rclcpp::SubscriptionBa::SharedPtr subscription)
{
rclcpp::MessageInfo message_info;
_rmw_message_info().from_intra_process =fal;
if(subscription->is_rialized())//ca 1
{
// This is the ca where a copy of the rialized message is taken from
// the middleware via inter-process communication.
...
}
el if(subscription->can_loan_messages())//ca 2
{
// This is the ca where a loaned message is taken from the middleware via
// inter-process communication, given to the ur for their callback,
/
/ and then returned.
...
}
el//ca 3
{
// This ca is taking a copy of the message data from the middleware via
// inter-process communication.
std::shared_ptr<void> message = subscription->create_message();
take_and_do_error_handling(
"taking a message from topic",我看朗诵
subscription->get_topic_name(),
[&](){return subscription->take_type_(), message_info);},
[&](){ subscription->handle_message(message, message_info);});
subscription->return_message(message);
}
}
execute_subscription有三个分⽀,他们都是处理经过中间件的消息数据,主要的区别在于:
ca1使⽤的是序列化的数据,这种消息的header和消息payload是⼿⼯设置的,好⽐TCP定制报⽂
ca2是向中间件借⽤消息(loan message),采⽤的共享内存来避免拷贝,性能不错但需要DDS中间件的⽀持。这部分先关注,后⾯⽂章再谈,先挖好坑。
ca3最简单,直接将消息拷贝⼀份传给回调函数
我们现在只分析下ca3,ca3⾥第⼀步开辟消息内存:
std::shared_ptr<void> message = subscription->create_message();
然后Subscription对象操作rcl句柄:
rcl_ret_t ret =rcl_take(
this->get_subscription_handle().get(),
message_out,
&message__rmw_message_info(),
nullptr// rmw_subscription_allocation_t is unud here
);
rcl句柄再往下获取rmw句柄的消息:
rmw_ret_t ret =rmw_take_with_info(
subscription->impl->rmw_handle, ros_message,&taken, message_info_local, allocation);
如果rmw_take_with_info返回RMW_RET_OK的话表⽰rmw有消息,则进⾏handle_message步骤。
使用英语
handle_message⾸先根据模板参数CallbackMessageT将返回数据转换成对应类型:
auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
然后将带类型的消息数据分发给回调函数进⾏处理:
any_callback_.dispatch(typed_message, message_info);
完整的handle_message代码如下:
void handle_message(std::shared_ptr<void>&message,
const rclcpp::MessageInfo &message_info) override
{
巴山夜雨涨秋池if(matches_any_intra_process_publishers(&_rmw_message_info().publisher_gid))
{
// In this ca, the message will be delivered via intra process and
/
/ we should ignore this copy of the message.
return;
}
auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
any_callback_.dispatch(typed_message, message_info);
if(subscription_topic_statistics_)
{
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoconds>(
std::chrono::system_clock::now());
const auto time = rclcpp::Time(nanos.time_since_epoch().count());
subscription_topic_statistics_->handle_message(*typed_message, time);
}
}
下⾯画图总结⼀下executor的使⽤与Subscription处理的流程:
总结
本⽂到⽬前了解了ROS2的以下内容:
executor多线程的实现
多线程executor默认开启cpu核⼼数个线程帅气的男生
CallbackGroup实现了对回调函数的互斥与重⼊调⽤
execute_subscription对于中间件消息有三种处理类型:rialized message、loaded message以及普通的拷贝消息executor处理subscription回调的流程
下⼀篇将分析ros2中的rvice实现。