ROS⾯试题汇总
1、ROS中订阅(Subscribe)最新消息以及消息队列相关问题
机器⼈应⽤中难免会遇到运算起来很费时间的操作,⽐如图像的特征提取、点云的匹配等等。有时候,不可避免地,我们需要在ROS 的Subscriber的Callback回调函数中进⾏这些费时的操作。Subscriber所订阅的消息的发布频率可能是很⾼的,⽽这些操作的运算速度肯定达不到消息发布的速度。所以,如果我们要是没有取舍的对于每个消息都调⽤⼀次回调函数,那么势必会导致计算越来越不实时,很有可能当下在处理的还是⼏⼗秒以前的数据。所以,我们希望每次回调函数都处理当前时刻最新的⼀个消息,这就是我们的⽬标。
要达到这个⽬标有三点,第⼀点是要设置Publisher的queue_size等于1;第⼆点是要设置Subscriber的queue_size(消息队列⼤⼩)等于1;第三点⾮常重要,要设置Subscriber的buff_size(缓冲区⼤⼩)⾜够⼤,⼤于⼀个消息的⼤⼩。像这样:
# ROS Python
pcdpub = rospy.Publisher("lidardata", PointCloud, queue_size=1)
rospy.Subscriber("lidardata", PointCloud, lf.pcd_resolve_callback,queue_size=1,buff_size=52428800)
1.1 Subscriber和Publisher的消息队列起什么作⽤,队列的⼤⼩有什么影响?
简单描述⼀下,Publisher的消息队列是为了缓存发布节点发布的消息,⼀旦队列中消息的数量超过了queue_size,那么最先进⼊队列的(最⽼的)消息被舍弃。Subscriber的消息队列是为了缓存节点接收到的信息,⼀旦⾃⼰处理的速度过慢,接收到的消息数量超过了queue_size,那么最先进⼊队列的(最⽼的)消息会被舍弃。所以,我们想只处理最新的消息,实际上只需要把两个queue_size都设置成1,那么系统不会缓存数据,⾃然处理的就是最新的消息。
1.2 Subscriber有消息队列缓存消息了,为什么Publisher还要有消息队列?
在我看来,Publisher的消息队列是⼀定要有的,因为ROS中发布节点往外发送消息是基于Topic发送,⽽不是直接向Subscriber订阅者发送,所以必须要有⼀个消息队列来存放发布的消息,以供订阅者来获取。⽽且这个消息队列的好处是在⽹络差、带宽⼩、延时⾼的时候,保证数据不容易丢失。
1.3 既然Publisher有消息队列了,为什么Subscriber还要有消息队列?
这个问题⽐较难⼀点。我的理解是,由于ROS毕竟是分布式系统,Publisher和Subscriber不⼀定在同⼀台主机上,因此消息需要通过⽹络来交换。但是⽹络的性能时好时坏,如果Subscriber没有消息队列,那么每次运⾏Callback函数前都要先通过⽹络取回消息,然后才能处理。当⽹络很差时,就会让
系统堵塞。⽽有消息队列的话,Subscriber就可以⼀边处理队列中的消息,⼀边通过⽹络缓存新的消息,⽽不⽤每次处理消息前都要临时去读⼀个回来。这样就增加了系统的可靠性。
1.4 为什么要设置缓冲区的⼤⼩?
这个缓冲区的⼤⼩是指消息队列使⽤的缓冲区物理内存空间⼤⼩。如果这个空间⼩于⼀个消息所需要的空间,⽐如消息是⼀副图⽚或者⼀帧点云,数据量超过了缓冲区的⼤⼩。这个时候为了保证通信不发⽣错误,就会触发⽹络通信的保护机制,TCP的Buffer会为你缓存消息。这种机制就会导致每⼀帧消息都被完整的缓存下来,没有消息被丢弃,感觉上就像queue_size被设置成了⽆穷⼤。详细说明请参考:
1.5 消息队列的运⾏机制
这⾥只能说是我的理解了,因为没有看过源代码。
⾸先,发布节点把消息发布,消息进⼊Publisher的消息队列,同时通知订阅了该话题消息的Subscriber来取消息。
其次,Subscriber来Publisher的消息队列⾥取消息,但取⾛的也是最⽼的消息,因为毕竟这是先⼊先出的队列。这也是为什么Publisher的消息队列的⼤⼩也要设置为1。
最后,被取⾛的消息存放⼊了Subscriber的消息队列中,等待被Callback执⾏。如果Callback执⾏很慢,消息越堆越多,最⽼的消息会逐渐被顶替。
当然,这⾥究竟是Subscriber来取消息,还是Publisher直接把消息推给Subscriber,我只是猜测,反正这⾥交换的消息肯定不是最新的消息,⽽是队列⾥最⽼的消息。
2、launch⽂件的写法
<launch>
<!-- 启动mbot -->
<node name="robot_bringup" pkg="robot_bringup" type="robot_bringup" output="screen" />
<!-- 加载机器⼈模型参数 -->
<param name="robot_description" command="$(find xacro)/xacro --inorder '$(find robot_description)/xacro/robot_with_lar.xacro'" />
<!-- 运⾏joint_state_publisher节点,发布机器⼈的关节状态 -->
<node name="joint_state_publisher" pkg="joint_state_publisher" type="joint_state_publisher" />
<!-- 运⾏robot_state_publisher节点,发布tf -->
<node name="state_publisher" pkg="robot_state_publisher" type="robot_state_publisher">
<param name="publish_frequency" type="double" value="50.0" />
</node>
<!-- 运⾏激光雷达驱动 -->
<include file="$(find robot_bringup)/launch/rplidar.launch" />
</launch>
3、ros::spin()和ros::spinonce()的区别
消息发布器(ros::Publisher)在⼀个while循环⾥⼀直循环发布消息(“hello world”)到话题(“chatter”)上。
消息订阅器(ros::Subscriber)⼀直监视话题,⼀旦知道话题上有数据,就会将该话题上的数据(message)作为参数传⼊到对应的回调函数(callback)中,但是这时候回调函数还没有被执⾏,⽽是把callback函数放到了回调函数队列中。所以当消息发布器不断发送message到该话题上时,就会有相应的callback函数存⼊队列中,它们函数名⼀样,只是实参内容不⼀样。(其实,是将传⼊的message 存⼊到队列中,这⾥说是将callback函数存⼊到队列中,应该是为了便于理解)。
那什么时候会执⾏callback函数呢?那就是ros::spin()和ros::spinOnce()的事情了。
当spinOnce()函数被调⽤时,spinOnce()会调⽤回调函数队列中第⼀个callback函数,此时callback函数才被执⾏,然后等到下次spinOnce函数⼜被调⽤时,回调函数队列中第⼆个callback函数就会被调⽤,以此类推。
所以,这会有⼀个问题。因为回调函数队列的长度是有限的,如果发布器发送数据的速度太快,spinOnce函数调⽤的频率太少,就会导致队列溢出,⼀些callback函数就会被挤掉,导致没被执⾏到。
⽽对于spin函数,⼀旦进⼊spin函数,它就不会返回了,相当于它在⾃⼰的函数⾥⾯死循环了。只要回调函数队列⾥⾯有callback函数在,它就会马上去执⾏callback函数。如果没有的话,它就会阻塞,不会占⽤CPU。
3.1 spin()和spinOnce()函数意义
⾸先要知道,这俩兄弟学名叫ROS消息回调处理函数。它俩通常会出现在ROS的主循环中,程序需要不断调⽤ros::spin() 或
ros::spinOnce(),两者区别在于前者调⽤后不会再返回,也就是你的主程序到这⼉就不往下执⾏了,⽽后者在调⽤后还可以继续执⾏之后的程序。
如果你的程序写了相关的消息订阅函数,那么程序在执⾏过程中,除了主程序以外,ROS还会⾃动在后台按照你规定的格式,接受订阅的消息,但是所接到的消息并不是⽴刻就被处理,⽽是必须要等到ros::spin()或ros::spinOnce()执⾏的时候才被调⽤,这就是消息回调函数的原理。
3.2 spin()和spinOnce()的区别
就像上⾯说的,ros::spin() 在调⽤后不会再返回,也就是你的主程序到这⼉就不往下执⾏了,⽽ ros::spinOnce() 后者在调⽤后还可以继续执⾏之后的程序。
其实看函数名也能理解个差不多,⼀个是⼀直调⽤;另⼀个是只调⽤⼀次,如果还想再调⽤,就需要加上循环了。
这⾥⼀定要记住,
ros::spin()函数⼀般不会出现在循环中,因为程序执⾏到spin()后就不调⽤其他语句了,也就是说该循环没有任何意义。
spin()函数后⾯⼀定不能有其他语句(return 0 除外),有也是⽩搭,不会执⾏的。
九阳豆浆机使用说明书
ros::spinOnce()的⽤法相对来说很灵活,但往往需要考虑调⽤消息的时机,调⽤频率,以及消息池的⼤⼩,这些都要根据现实情况协调好,不然会造成数据丢包或者延迟的错误。
3.3 spin()和spinOnce()常见使⽤⽅法
如果你的程序写了相关的消息订阅函数,那千万千万千万不要忘了在相应位置加上ros::spin()或者ros::spinOnce()函数,不然你是永远都得不到另⼀边发出的数据或消息的。
3.3.1 ros::spin()
ros::spin()函数⽤起来⽐较简单,⼀般都在主程序的最后,加⼊该语句就可。例⼦如下:
// 发送端
#include "ros/ros.h"
#include "std_msgs/String.h"
#include <sstream>
int main(int argc, char **argv)
{
ros::init(argc, argv, "talker");
ros::NodeHandle n;
ros::Publisher chatter_pub = n.adverti<std_msgs::String>("chatter", 1000);
ros::Rate loop_rate(10);
int count = 0;
while (ros::ok())
{
std_msgs::String msg;
std::stringstream ss;
ss << "hello world " << count;
msg.data = ss.str();
ROS_INFO("%s", msg.data.c_str());
/**
* 向 Topic: chatter 发送消息, 发送频率为10Hz(1秒发10次);消息池最⼤容量1000。
人员招聘方案*/
chatter_pub.publish(msg);
loop_rate.sleep();
++count;
}
return 0;
}
接收端代码中⽤到spin()函数:
// 接收端
#include "ros/ros.h"
#include "std_msgs/String.h"
void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO("I heard: [%s]", msg->data.c_str());
}
int main(int argc, char **argv)
{
ros::init(argc, argv, "listener");
ros::NodeHandle n;
cm和弦ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);
/**
* ros::spin() 将会进⼊循环,⼀直调⽤回调函数chatterCallback(),每次调⽤1000个数据。
* 当⽤户输⼊Ctrl+C或者ROS主进程关闭时退出,
*/
ros::spin();
return 0;草排
清理热水器}
3.3.2 ros::spinOnce()
对于ros::spinOnce()的使⽤,虽说⽐ros::spin()更⾃由,可以出现在程序的各个部位,但是需要注意的因素也更多。⽐如:
1. 对于有些传输特别快的消息,尤其需要注意合理控制消息池⼤⼩和ros::spinOnce()执⾏频率; ⽐如消息送达频率为10Hz,
ros::spinOnce()的调⽤频率为5Hz,那么消息池的⼤⼩就⼀定要⼤于2,才能保证数据不丢失,⽆延迟。
// 接收端
#include "ros/ros.h"
#include "std_msgs/String.h"
void chatterCallback(const std_msgs::String::ConstPtr& msg)
燥的成语{
/*...*/
}
int main(int argc, char **argv)
{
ros::init(argc, argv, "listener");
ros::NodeHandle n;
ros::Subscriber sub = n.subscribe("chatter", 2, chatterCallback);
ros::Rate loop_rate(5);
while (ros::ok())
{
/
*...*/
ros::spinOnce();
loop_rate.sleep();
}
return 0;
}
2. ros::spinOnce()⽤法很灵活,也很⼴泛,具体情况需要具体分析。但是对于⽤户⾃定义的周期性的函数,最好和ros::spinOnce并列执⾏,不太建议放在回调函数中;
/*...*/
ros::Rate loop_rate(100);
while (ros::ok())
{
/*...*/
ur_handle_events_timeout(...);
ros::spinOnce();
loop_rate.sleep();
}
4、ros命名空间NodeHandle相关问题
4.1 NodeHandle的定义
NodeHandles节点类,ros::NodeHandle类有两个作⽤:
⾸先,在roscpp程序的内部,它提供了⽅式启动和关闭。
其次,它提供了⼀个额外的层命令空间解决⽅案,可以使组件更容易写。
4.2 NodeHandle的⾃动启动节点和关闭节点
⾃动启动和关闭
ros::NodeHandle管理内部的引⽤计数
开始节点:两半合一
ros::NodeHandle nh;
创建时候,如果内部节点没有开始,ros::NodeHandle会开始节点,ros::NodeHandle实例销毁,节点就会关闭。
4.3 NodeHandle的命名空间
1、句柄可以让你通过构造函数指定命名空间
查阅
龙龙与忠狗
NodeHandle可以指定命名空间给构造器:
ros::NodeHandle nh("my_namespace");
这使得使⽤该句柄的任何相对名字都是相对<node_namespace>/my_namespace,⽽不只是相对<node_namespace>。
也可以指定⽗NodeHandle和命名空间:
ros::NodeHandle nh1("ns1");
ros::NodeHandle nh2(nh1, "ns2");
这个放nh2 进⼊<node_namespace>/ns1/ns2 命名空间。
2、也可以指定全局名字
ros::NodeHandle nh("/my_global_namespace");