上一次我们对DDS有了基础的了解,这一次我们对FastDDS关键特性和典型场景的实现机制进行进一步的了解。
一、机制
1.传输
传输层提供 DDS 实体之间的通信服务,负责通过物理传输实际发送和接收消息。DDS 层将此服务用于用户数据和发现流量通信。
不同传输方式之间的比较:
- UDP/TCP传输:默认UDPv4 传输。相比于TCP,UDP具有更好的性能。
- SHM传输:在同一主机上运行的实体之间的共享内存通信。 默认情况下会在新的DomainParticipant上创建此传输。
- Data-Sharing交付:通过共享内存与DataReader共享DataWriter的历史记录,从而加快同一机器内实体之间的通信。这避免了传输层中涉及的任何开销,有效地避免了 DataWriter 和 DataReader 之间的任何数据复制。与共享内存传输的不同之处在于避免了传输的数据从 DataWriter 历史记录复制到传输,然后从传输复制到 DataReader的副本。
- 进程内交付:发布者直接调用订阅者的接收函数。完全避免传输的复制或发送操作,同时避免了确认机制。
-
白名单:可以通过将 IP 地址添加到
interfaceWhiteList
字段来实现的TCP/UDP的访问白名单控制。
2.持久化
默认QoS时,DataWriterHistory、DataReaderHistory未持久化,意外退出时数据将会丢失。为此FastDDS提供了DurabilityQosPolicy持久化策略,可以将数据持久化到数据库中,并在DataWriters、DataReaders重启时自动加载恢复,以实现在意外关闭等情况下增加应用程序的稳健性。
FastDDS采用SQLite3实现持久化存贮,通过配置DurabilityQosPolicyKind为 TRANSIENT_DURABILITY_QOS开启持久化策略。为避免因并发访问 SQLite3 数据库而导致不必要的延迟,建议为每个 DataWriter 和 DataReader 指定不同的数据库文件。
3.自发现
自发现主要通过多播通信实现,允许跨域参与者Participant自动查找和匹配DataWriters和DataReader。
3.1.发现阶段
-
Participant Discovery Phase 参与者发现阶段(PDP):在此阶段,
DomainParticipants
确认彼此的存在。为此,每个 DomainParticipant 都会侦听并定期发送公告消息到元数据和用户数据流量的单播地址(IP 和端口)。当两个给定的 DomainParticipants 存在于同一个 DDS 域中时,它们将发现对方。默认情况下,通知消息使用众所周知的多播地址和端口(使用DomainId
计算)发送。此外,可以指定地址列表以使用单播发送通知(参见初始对等点)。此外,还可以配置此类公告的周期(请参阅 发现配置)。 -
Endpoint Discovery Phase 端点发现阶段(EDP):在这个阶段,
DataWriters
和DataReaders
相互确认。为此,DomainParticipants 使用 PDP 期间建立的通信通道相互共享有关其 DataWriters 和 DataReader 的信息。除其他外,此信息包含Topic
和 数据类型。对于要匹配的两个端点,它们的主题和数据类型必须一致。一旦 DataWriter 和 DataReader 匹配,它们就可以发送/接收用户数据。
3.2.发现机制
- 简单发现:这是默认机制。它支持 PDP 和 EDP 的RTPS 标准,因此提供与任何其他 DDS 和 RTPS 实现的兼容性。
- 静态发现:此机制使用简单参与者发现协议 (SPDP) 进行 PDP 阶段,但允许在所有数据写入器和数据读取器的 IP 和端口、数据类型和主题是事先已知的。
- 发现服务器:此发现机制使用集中式发现架构,其中域参与者(称为服务器)充当元流量发现的中心。
-
手动发现:此机制仅与 RTPS 层兼容。它禁用 PDP,让用户手动匹配和取消匹配
RTPSParticipants
,RTPSReaders
,并RTPSWriters
使用其选择的任何外部元信息通道。因此,用户必须访问由DomainParticipant实现的RTPSParticipant,并直接匹配RTPS Entities。
3.3.发现设置
- 常规设置:指定发现协议(类型同上发现机制)、发现过滤条件、端点判活条件、心跳间隔。
- 简单发现设置:每个RTPSParticipant都会同时与多播和单播两个端口监听PDP协议数据,DomainParticipant也通过此方式发送自身节点PDP数据。(发现流量单播侦听端口计算公式:7400 + 250 * domainID + 10 + 2 * participantID,例如7400 + 250 * 0 + 10 + 2 * 1 = 7412)
DomainParticipantQos qos;
// configure an initial peer on host 192.168.10.13.
// The port number corresponds to the well-known port for metatraffic unicast
// on participant ID `1` and domain `0`.
Locator_t initial_peer;
IPLocator::setIPv4(initial_peer, "192.168.10.13");
initial_peer.port = 7412;
qos.wire_protocol().builtin.initialPeersList.push_back(initial_peer);
- 静态EDP设置:直接通过xml配置DataWriter、DataReader节点的entityID、Topic、TopicType、IP、端口、QoS等信息,用于主动发现。
- 发现服务器设置:由一个或多个服务器实现节点发现。发现服务器域参与者可能是客户端或服务器。它们之间的唯一区别在于它们如何处理发现流量。用户流量,即他们创建的 DataWriters 和 DataReader 之间的流量,是与角色无关的。发现服务器示例:
// Get default participant QoS
DomainParticipantQos server_qos = PARTICIPANT_QOS_DEFAULT;
// Set participant as SERVER
server_qos.wire_protocol().builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SERVER;
// Set SERVER's GUID prefix
std::istringstream("44.53.00.5f.45.50.52.4f.53.49.4d.41") >> server_qos.wire_protocol().prefix;
// Set SERVER's listening locator for PDP
Locator_t locator;
IPLocator::setIPv4(locator, 127, 0, 0, 1);
locator.port = 11811;
server_qos.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(locator);
/* Add a remote serve to which this server will connect */
// Set remote SERVER's GUID prefix
RemoteServerAttributes remote_server_att;
remote_server_att.ReadguidPrefix("44.53.01.5f.45.50.52.4f.53.49.4d.41");
// Set remote SERVER's listening locator for PDP
Locator_t remote_locator;
IPLocator::setIPv4(remote_locator, 127, 0, 0, 1);
remote_locator.port = 11812;
remote_server_att.metatrafficUnicastLocatorList.push_back(remote_locator);
// Add remote SERVER to SERVER's list of SERVERs
server_qos.wire_protocol().builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att);
// Create SERVER
DomainParticipant* server =
DomainParticipantFactory::get_instance()->create_participant(0, server_qos);
if (nullptr == server)
{
// Error
return;
}
- 发现回调:实现DomainParticipantListener 的以下回调即可监听到发现相关事件: on_participant_discovery()、 on_subscriber_discovery()、 on_publisher_discovery()、 on_type_discovery()。
4.安全
DDS 安全规范包括五个安全内置插件:
- 身份验证插件:DDS:Auth:PKI-DH。此插件使用受信任的证书颁发机构(CA)为加入 DDS 域 的每个域参与者提供身份验证。支持DomainParticipants之间的相互认证,建立共享秘密。
- 访问控制插件:DDS:Access:Permissions。此插件为执行受保护操作的 DomainParticipants 提供访问控制。
- 加密插件:DDS:Crypto:AES-GCM-GMAC。此插件使用AES中的高级加密标准 (AES-GCM) 提供经过身份验证的加密。可以对整个RTPS消息、特定实体的子消息或特定Writer的payload进行加密。
- 日志插件:DDS:Logging:DDS_LogTopic。此插件记录DomainParticipant的所有安全事件数据并将它们保存在本地文件中。
- 数据标记:DDS:标记:DDS_Discovery。该插件可以为数据添加安全标签。因此,可以指定数据的分类级别。在 DDS 上下文中,它可以作为访问控制的补充,创建基于数据标记的访问控制;用于消息优先级;并防止它被中间件使用而被应用程序或服务使用(FastDDS未实现此插件)。
默认情况下Fast DDS 不编译任何安全支持,但可以在 CMake环节添加 -DSECURITY=ON
配置打开。
二、典型场景
1.大数据传输
当pub-sub传输数据量过大时,需要考虑对网络和cpu负载的影响,并进行控制:
1.1.增加socket缓冲区大小
传输的数据量在处理之前填满了套接字缓冲区时,网络包可能会被丢弃,需要增加缓冲区大小:
# 查看操作系统socket缓冲区最大值
sudo sysctl -a | grep net.core.wmem_max # 发送
sudo sysctl -a | grep net.core.rmem_max # 接收
# 修改操作系统socket缓冲区最大值
sudo sysctl -w net.core.wmem_max=12582912 # 发送
sudo sysctl -w net.core.rmem_max=12582912 # 接收
# 修改dds默认socket buffer size
DomainParticipantQos participant_qos;
participant_qos.transport().send_socket_buffer_size = 1048576; // sending buffer size
participant_qos.transport().listen_socket_buffer_size = 4194304; // receiving buffer size
1.2.增加接口的传输队列长度
传输队列长度 ( txqueuelen
) 是 TCP/UDP/IP 堆栈网络接口值。此值设置网络接口设备的每个内核传输队列允许的数据包数。默认情况下,在 Linux 中
txqueuelen
以太网接口的值为1000
。这个值对于大多数千兆网络设备来说已经足够了。但是,在某些特定情况下,txqueuelen
应增加该设置以避免丢弃数据包的溢出。同样,选择太大的值可能会导致额外的开销,从而导致更高的网络延迟。
# 查看指定网卡的传输队列长度设置
ifconfig wlo1 | grep txqueuelen
# 修改
ifconfig ${interface} txqueuelen ${size}
1.3.流量控制器
Fast DDS提供了一种机制来限制 DataWriter 发送数据的速率。这些控制器应该在使用FlowControllersQos创建 DomainParticipant 时注册,然后在使用PublishModeQosPolicy创建 DataWriter 时引用。
异步 DataWriter 第一次引用流控制器时会产生一个新线程。该线程将负责仲裁引用同一流控制器的所有 DataWriters 正在传输的样本的网络输出。
应该为流控制器命名,以便以后可以由 DataWriters 引用它们。默认的、无限制的FIFO
流量控制器为 FASTDDS_FLOW_CONTROLLER_DEFAULT
。
调度策略:
根据使用的调度策略,有不同种类的流控制器,它们仅在决定样本发送顺序的方式上有所不同。所有流控制器都将限制发送到网络的字节数在period_ms
毫秒内不超过max_bytes_per_period
字节。
- FIFO:以先到先得的顺序输出样本。
- ROUND_ROBIN:以循环顺序从每个 DataWriter 输出一个样本。
- HIGH_PRIORITY:首先从具有最高优先级的 DataWriters 输出样本。使用 property 配置 DataWriter 的优先级fastdds.sfc.priority( 最高-10到最低10,默认最低)。具有相同优先级的 DataWriter 的样本按 FIFO 顺序处理。
- PRIORITY_WITH_RESERVATION:与前一个一样工作,但允许 DataWriters 保留部分输出带宽。这是通过属性完成的fastdds.sfc.bandwidth_reservation(从 0 到 100,表示总流量控制器限制的百分比,默认0)。预留带宽消耗完后,剩下的样本将按照HIGH_PRIORITY规则处理。
配置示例:
// 限制300kb/s
static const char* flow_controller_name = "example_flow_controller";
auto flow_control_300k_per_sec = std::make_shared<eprosima::fastdds::rtps::FlowControllerDescriptor>();
flow_control_300k_per_sec->name = flow_controller_name;
flow_control_300k_per_sec->scheduler = eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::FIFO;
flow_control_300k_per_sec->max_bytes_per_period = 300 * 1000;
flow_control_300k_per_sec->period_ms = 1000;
// 为域成员策略 配置流控制器
DomainParticipantQos participant_qos;
participant_qos.flow_controllers().push_back(flow_control_300k_per_sec);
// .... 创建域成员和发布者
// 为DataWriter策略关联流控制器
DataWriterQos qos;
qos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE;
qos.publish_mode().flow_controller_name = flow_controller_name;
1.4.调整心跳周期
在RELIABLE_RELIABILITY_QOS
(强可靠性策略) 上,RTPS 协议可以检测哪些消息已丢失并重传。它基于 DataWriters 和 DataReaders 之间交换的元流量信息,即 Heartbeat 和 Ack/Nack 消息。
较小的 Heartbeat 周期会增加 CPU 和网络开销,但会在丢失数据时加快系统响应速度。因此,用户可以根据自己的需要自定义心跳周期。这可以通过 DataWriterQos 完成。
DataWriterQos qos;
qos.reliable_writer_qos().times.heartbeatPeriod.seconds = 0;
qos.reliable_writer_qos().times.heartbeatPeriod.nanosec = 500000000; //500 ms
1.5.使用非严格可靠性
当HistoryQosPolicyKind设置为KEEP_ALL_HISTORY_QOS
时,所有订阅者都必须接收所有样本(并Ack),然后样本才能被 DataWriter 覆盖。如果消息速率高且网络不可靠(即大量数据包丢失),DataWriter 的历史记录会被填满,阻止新消息的发布,直到所有旧消息被所有订阅者确认。
如果实际场景不需要这种严格性,可以将HistoryQosPolicyKind设置为KEEP_LAST_HISTORY_QOS
。在这种情况下,当 DataWriter 的历史记录已满时,尚未完全Ack的最旧消息将被新消息覆盖。如果任何订阅者都没有收到被丢弃的消息,发布者将发送 GAP 消息通知订阅者该消息将永远丢失。
1.6.示例-大文件
示例场景:
- Publisher 发送一个大小为 9.9 MB 的文件。
- Publisher 和 Subscriber 连接的网络带宽为 100 MB/s 。
对于 64 kB 的片段大小,发布者必须发送大约 1100 个片段才能发送整个文件。
推荐方案:
- 使用
RELIABLE_RELIABILITY_QOS
, 因为丢失单个片段将意味着丢失整个文件。 - 减少心跳周期,以增加发布者的反应性。
- 使用Flow Controller流控制器限制数据速率,以避免这种传输蚕食整个带宽。此应用程序的合理速率可能是 5 MB/s,仅占总带宽的 5%。
其他:
- 如使用SHM共享内存传输,对片段大小的唯一限制是可用内存。因此,可以通过增加共享缓冲区的大小来避免 SHM 碎片。
1.7.示例-视频流
示例场景:
- 应用程序以 50 fps 在发布者和订阅者之间传输视频流。
推荐方案:
- 在实时音频或视频传输中,通常优选具有高稳定的数据速率馈送,即使以丢失一些样本为代价。
- 以 50 fps 每秒丢失1-2个样本比冻结视频等待重新传输丢失的样本更可接受。因此,在这种情况下采用
BEST_EFFORT_RELIABILITY_QOS
策略(丢失后不重发)。
2.有很多订阅者的Topic
默认情况下,每次DataWriter在Topic上发布数据时 ,都会为订阅该 Topic的每个 DataReader发送一条单播消息。如果订阅了多个 DataReader,建议使用多播而不是单播。这样,每个样本只会发送一个网络包。这将提高 CPU 和网络的使用率。
此解决方案可以使用UDP 传输或共享内存传输(SHM) 来实现。SHM 传输默认是多播的,但仅在同一台机器上的 DataWriters 和 DataReader 之间可用。UDP 传输需要一些额外的配置。下面的示例显示如何设置DataReaderQos以将 DataReader 配置为在 UDP 上使用多播传输。
// 添加一个多播定位器(指定IP和端口列表)
eprosima::fastrtps::rtps::Locator_t new_multicast_locator;
eprosima::fastrtps::rtps::IPLocator::setIPv4(new_multicast_locator, "239.255.0.4");
new_multicast_locator.port = 7900;
qos.endpoint().multicast_locator_list.push_back(new_multicast_locator);
3.实时性
实时应用程序对数据处理时间有非常严格的限制,为了确保在指定时间内响应,我们可以采取以下措施:
- 在实体初始化期间分配所有需要的内存,以便所有数据处理任务无堆分配。
- 如果达到指定超时时间,强制从阻塞函数返回。
3.1.内存分配
分配和释放内存意味着一些非确定性的耗时操作。因此,大多数实时系统需要在应用程序初始化期间以所有动态内存分配的方式运行,避免在主循环中进行内存管理操作。
如果用户为DDS内部保留的数据和集合提供最大大小,则可以在实体初始化期间为这些数据和集合预分配内存。为了选择正确的大小值,用户必须了解整个域的拓扑。具体来说, 在设置配置时必须知道DomainParticipants、 DataWriters和DataReaders的数量。
我们以下方拓扑的系统为例,说明如何进行内存的预分配:
拓扑关系分析:
- DomainParticipants 的总数为 3。
- 每个 DomainParticipant 的最大 DataWriter 数为 1
- 每个 DomainParticipant 的最大 DataReader 数为 2。
- Topic 1 的 DataWriter 与 3 个 DataReader 匹配。
- Topic 2 的 DataWriter 与 2 个 DataReader 匹配。
- 所有 DataReader 都与 1 个 DataWriter 完全匹配。
假设没有使用内容过滤,并且限制策略参数的大小:
- 最大PartitionQosPolicy大小:256
- 最大UserDataQosPolicy大小:256
- 最大PropertyPolicyQos大小:512
将以上信息进行预配置,启动时将自动提前分配相关内存,以避免相关的堆分配。
3.2.非阻塞调用
当操作竞争资源时, Fast DDS API的部分函数可能会被阻塞一段不确定的时间。在获得控制权的操作完成之前,被阻塞的函数不能继续,从而阻塞调用线程。
实时应用程序需要可预测的行为,包括从调用函数到返回控制权的可预测的最长时间。为了遵守这个限制,需要限制这些功能的最大阻塞时间。如果超过阻塞时间限制,请求的操作将被中止并终止函数,将控制权返回给调用者。
此机制需要两个步骤:
-
-DSTRICT_REALTIME=ON
在应用程序编译期间设置 CMake 选项。 - 在QoS策略中配置函数的最大阻塞时间参数reliability().max_blocking_time。
4.降低内存占用
大量现代系统对可用内存有严格的限制,因此将内存使用量降至最低至关重要。减少DDS应用程序的内存消耗可以通过多种方法来实现,主要是通过应用程序的架构重组,但也可以通过限制中间件使用的资源以及避免静态分配来实现。
4.1.限制资源
ResourceLimitsQosPolicy控制策略可以对使用的资源进行限制。它根据以下参数限制每个DataWriter或 DataReader分配的内存量:
- max_samples:配置 DataWriter 或 DataReader 可以与其关联的所有实例的最大样本数,即它表示中间件可以为 DataReader 或 DataWriter 存储的最大样本数。
- max_instances:配置 DataWriter 或 DataReader 可以管理的最大实例数。
- max_samples_per_instance:控制 DataWriter 或 DataReader 的单实例中的最大样本数。
- allocated_samples:说明将在初始化时分配的样本数。
所有这些参数都可以根据需要降低,以减少内存消耗,将资源限制为应用程序的需要。
4.2.设置动态分配
默认情况下MemoryManagementPolicy设置为PREALLOCATED_MEMORY_MODE
预分配内存模式,这意味着配置的ResourceLimitsQosPolicy所需的内存量将在初始化时分配。
使用RTPSEndpointQos的动态设置将防止不必要的分配。DYNAMIC_RESERVE_MEMORY_MODE
动态持有模式以更高的分配计数为代价实现了最低的占用空间,在这种模式下,内存在需要时分配,并在停止使用时立即释放。为了以较小的内存成本获得更高的确定性,DYNAMIC_REUSABLE_MEMORY_MODE
动态复用模式一旦分配了更多内存,它就不会被释放并重新用于未来的消息。
RTPSEndpointQos endpoint;
endpoint.history_memory_policy = eprosima::fastrtps::rtps::DYNAMIC_REUSABLE_MEMORY_MODE;
5.零拷贝通信
零拷贝通信允许在同一台机器上的应用程序之间传输数据,而无需将数据复制到内存中,从而节省时间和资源。为了实现这一点,它使用DataWriter 和DataReader之间的数据共享传递,以及应用程序和DDS之间的数据缓冲区借用。
5.1.概述
Data-Sharing使用共享内存在 DataWriter 和 DataReader 之间提供通信通道。因此,它不需要复制样本数据来传输它。
DataWriter 样本借用 是一个DDS扩展,它允许应用程序为 DataWriter 发布的样本借用缓冲区。样本可以直接在这个缓冲区上构建,之后无需将其复制到 DataWriter 中。这可以防止在发布应用程序和 DataWriter 之间复制数据。如果使用Data-Sharing,则借出的数据缓冲区将位于共享内存本身中。
DataReader读取数据也可以通过的借用缓冲区来完成。应用程序获取接收到的样本作为对接收队列本身的引用。这可以防止将数据从 DataReader 复制到接收应用程序。同样,如果使用Data-Sharing,则借出的数据将在共享内存中,并且确实与 DataWriter 历史记录中使用的内存缓冲区相同。
结合这三个特性,我们可以实现发布应用和订阅应用之间的零拷贝通信。
5.2.实现
启用零拷贝的步骤:
A、在 IDL 文件中定义一个普通的有界类型,并生成相应的源代码
struct LoanableHelloWorld {
unsigned long index;
char message[256];
};
B、DataWriter 特殊处理
DataWriter启用DataSharing:
// 打开data_sharing的writer
DataWriterQos wqos = publisher_->get_default_datawriter_qos();
wqos.history().depth = 10;
wqos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
wqos.data_sharing().automatic();
writer_ = publisher_->create_datawriter(topic_, wqos, &listener_);
使用loan_sample()借用缓冲区存储pub数据:
void* sample = nullptr; // 借用缓冲区存储pub数据
if (ReturnCode_t::RETCODE_OK == writer_->loan_sample(sample)){
std::cout << "Preparing sample at address " << sample << std::endl;
LoanableHelloWorld* data = static_cast<LoanableHelloWorld*>(sample);
data->index() = msgsent + 1;
memcpy(data->message().data(), "LoanableHelloWorld ", 20);
writer_->write(sample); // write后归还缓冲区的持有权。如果loan_sample()后不write,需要使用discard_loan()释放缓冲区
}
C、DataReader 特殊处理:
DataReader 启用 DataSharing:
//CREATE THE READER
DataReaderQos rqos = subscriber_->get_default_datareader_qos();
rqos.history().depth = 10;
rqos.reliability().kind = RELIABLE_RELIABILITY_QOS;
rqos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
rqos.data_sharing().automatic();
reader_ = subscriber_->create_datareader(topic_, rqos, &listener_);
DataReader 通过接收队列获得样本数据的引用,使用完毕后归还持有权:
void LoanableHelloWorldSubscriber::SubListener::on_data_available(DataReader* reader)
{
FASTDDS_CONST_SEQUENCE(DataSeq, LoanableHelloWorld);
DataSeq data;
SampleInfoSeq infos;
while (ReturnCode_t::RETCODE_OK == reader->take(data, infos))
{
for (LoanableCollection::size_type i = 0; i < infos.length(); ++i)
{
if (infos[i].valid_data)
{
// 通过接收队列获得样本数据的引用
const LoanableHelloWorld& sample = data[i];
std::cout << "Sample received (count=" << samples << ") at address " << &sample
<< (reader->is_sample_valid(&sample, &infos[i]) ? " is valid" : " was replaced" ) << std::endl
<< " index=" << sample.index() << std::endl
<< " message=" << sample.message().data() << std::endl;
}
}
// 归还缓冲区持有权
reader->return_loan(data, infos);
}
}
6.性能指标统计
打开dds编译统计模块选项:
cd ~/project/Fast-DDS
vim src/fastrtps/CMakeLists.txt
option(FASTDDS_STATISTICS "Enable Fast DDS Statistics Module" ON)
colcon build
配置要统计的Topic类型:
# 统计DataWriters时延
export FASTDDS_STATISTICS="HISTORY_LATENCY_TOPIC;NETWORK_LATENCY_TOPIC"
监控统计数据的应用示例:
cd ~/project/FastDDS
vim fastrtps.repos
fastdds_statistics_backend:
type: git
url: https://github.com/eProsima/Fast-DDS-statistics-backend.git
version: main
# download code
vcs import src < fastrtps.repos
# build
colcon build --cmake-target install
or
cd src/fastdds_statistics_backend/build
cmake ..
sudo cmake --build . --target install
# example
cd src/fastdds_statistics_backend/examples/cpp/HelloWorldExample/build
cmake ..
cmake --build .
# 统计pub/sub的通信的延迟和吞吐量
./HelloWorldExample publisher
./HelloWorldExample subcriber
./HelloWorldExample monitor
Fast DDS Latency of HelloWorld topic: [ 206.079 μs]
Publication throughput of Participant Participant_pub: [ 229.257 B/s]
监控的实现逻辑:
#include <fastdds_statistics_backend/StatisticsBackend.hpp>
using namespace eprosima::statistics_backend;
...
# 初始化监控
monitor_id_ = StatisticsBackend::init_monitor(domain);
if (!monitor_id_.is_valid()){
std::cout << "Error creating monitor" << std::endl;
return 1;
}
StatisticsBackend::set_physical_listener(&physical_listener_);
...
# 获取要监控的topicid
std::vector<StatisticsData> latency_data{};
std::vector<EntityId> topics = StatisticsBackend::get_entities(EntityKind::TOPIC);
EntityId helloworld_topic_id = -1;
Info topic_info;
for (auto topic_id : topics)
{
topic_info = StatisticsBackend::get_info(topic_id);
if (topic_info["name"] == "HelloWorldTopic" && topic_info["data_type"] == "HelloWorld")
{
helloworld_topic_id = topic_id;
}
}
if (helloworld_topic_id < 0)
{
return latency_data;
}
# 通过topic id获得对应的DataWriter和DataReader
std::vector<EntityId> topic_datawriters = StatisticsBackend::get_entities(EntityKind::DATAWRITER, helloworld_topic_id);
std::vector<EntityId> topic_datareaders = StatisticsBackend::get_entities(EntityKind::DATAREADER, helloworld_topic_id);
# 获得指定DataWriters和DataReaders间的传输时延统计数据
std::vector<StatisticsData> latency_data{};
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
latency_data = StatisticsBackend::get_data(
DataKind::FASTDDS_LATENCY, // DataKind
topic_datawriters, // Source entities
topic_datareaders, // Target entities
1, // Number of bins
now - std::chrono::seconds(5), // t_from
now, // t_to
StatisticKind::MEAN); // Statistic
for (auto latency : latency_data){
std::cout << "Fast DDS Latency of HelloWorld topic: ["
<< timestamp_to_string(latency.first) << ", " << latency.second / 1000 << " μs]" << std::endl;
}
三、总结
这次我们对FastDDS的传输、自发现以及高实时、大文件等特殊应用场景的实现机制进行了了解,但还不够深入,本文先到这里,后续再针对某个具体的点进行深入展开分析和讲解。
yan 22.9.3
参考: