.. _program_listing_file_include_agnocast_agnocast_subscription.hpp: Program Listing for File agnocast_subscription.hpp ================================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/agnocast/agnocast_subscription.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #pragma once #include "agnocast/agnocast_callback_info.hpp" #include "agnocast/agnocast_ioctl.hpp" #include "agnocast/agnocast_mq.hpp" #include "agnocast/agnocast_smart_pointer.hpp" #include "agnocast/agnocast_utils.hpp" #include "rclcpp/rclcpp.hpp" #include "tracetools/tracetools.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace agnocast { void map_read_only_area(const pid_t pid, const uint64_t shm_addr, const uint64_t shm_size); struct SubscriptionOptions { rclcpp::CallbackGroup::SharedPtr callback_group{nullptr}; }; // These are cut out of the class for information hiding. mqd_t open_mq_for_subscription( const std::string & topic_name, const topic_local_id_t subscriber_id, std::pair & mq_subscription); void remove_mq(const std::pair & mq_subscription); rclcpp::CallbackGroup::SharedPtr get_valid_callback_group( const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node, const SubscriptionOptions & options); class SubscriptionBase { protected: topic_local_id_t id_; const std::string topic_name_; union ioctl_subscriber_args initialize( const rclcpp::QoS & qos, const bool is_take_sub, const std::string & node_name); public: SubscriptionBase(rclcpp::Node * node, const std::string & topic_name); }; template class Subscription : public SubscriptionBase { std::pair mq_subscription; public: using SharedPtr = std::shared_ptr>; Subscription( rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos, std::function &)> callback, agnocast::SubscriptionOptions options) : SubscriptionBase(node, topic_name) { union ioctl_subscriber_args subscriber_args = initialize(qos, false, node->get_fully_qualified_name()); id_ = subscriber_args.ret_id; mqd_t mq = open_mq_for_subscription(topic_name_, id_, mq_subscription); auto node_base = node->get_node_base_interface(); rclcpp::CallbackGroup::SharedPtr callback_group = get_valid_callback_group(node_base, options); const bool is_transient_local = qos.durability() == rclcpp::DurabilityPolicy::TransientLocal; [[maybe_unused]] uint32_t callback_info_id = agnocast::register_callback( callback, topic_name_, id_, is_transient_local, mq, callback_group); #ifdef TRACETOOLS_LTTNG_ENABLED uint64_t pid_ciid = (static_cast(getpid()) << 32) | callback_info_id; TRACEPOINT( agnocast_subscription_init, static_cast(this), static_cast(node_base->get_shared_rcl_node_handle().get()), static_cast(&callback), static_cast(callback_group.get()), tracetools::get_symbol(callback), topic_name_.c_str(), qos.depth(), pid_ciid); #endif } ~Subscription() { remove_mq(mq_subscription); } }; template class TakeSubscription : public SubscriptionBase { public: using SharedPtr = std::shared_ptr>; TakeSubscription(rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos) : SubscriptionBase(node, topic_name) { #ifdef TRACETOOLS_LTTNG_ENABLED auto dummy_cbg = node->get_node_base_interface()->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, false); auto dummy_cb = []() {}; std::string dummy_cb_symbols = "dummy_take" + topic_name; TRACEPOINT( agnocast_subscription_init, static_cast(this), static_cast( node->get_node_base_interface()->get_shared_rcl_node_handle().get()), static_cast(&dummy_cb), static_cast(dummy_cbg.get()), dummy_cb_symbols.c_str(), topic_name_.c_str(), qos.depth(), 0); #endif union ioctl_subscriber_args subscriber_args = initialize(qos, true, node->get_fully_qualified_name()); id_ = subscriber_args.ret_id; } agnocast::ipc_shared_ptr take(bool allow_same_message = false) { union ioctl_take_msg_args take_args; take_args.topic_name = {topic_name_.c_str(), topic_name_.size()}; take_args.subscriber_id = id_; take_args.allow_same_message = allow_same_message; if (ioctl(agnocast_fd, AGNOCAST_TAKE_MSG_CMD, &take_args) < 0) { RCLCPP_ERROR(logger, "AGNOCAST_TAKE_MSG_CMD failed: %s", strerror(errno)); close(agnocast_fd); exit(EXIT_FAILURE); } for (uint32_t i = 0; i < take_args.ret_pub_shm_info.publisher_num; i++) { const pid_t pid = take_args.ret_pub_shm_info.publisher_pids[i]; const uint64_t addr = take_args.ret_pub_shm_info.shm_addrs[i]; const uint64_t size = take_args.ret_pub_shm_info.shm_sizes[i]; map_read_only_area(pid, addr, size); } if (take_args.ret_addr == 0) { return agnocast::ipc_shared_ptr(); } #ifdef TRACETOOLS_LTTNG_ENABLED TRACEPOINT( agnocast_take, static_cast(this), reinterpret_cast(take_args.ret_addr), take_args.ret_entry_id); #endif MessageT * ptr = reinterpret_cast(take_args.ret_addr); return agnocast::ipc_shared_ptr(ptr, topic_name_, id_, take_args.ret_entry_id); } }; // Wrapper of TakeSubscription for Autoware template class PollingSubscriber { typename TakeSubscription::SharedPtr subscriber_; public: using SharedPtr = std::shared_ptr>; explicit PollingSubscriber( rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos = rclcpp::QoS{1}) { subscriber_ = std::make_shared>(node, topic_name, qos); }; // `takeData()` is remaining for backward compatibility. const agnocast::ipc_shared_ptr takeData() { return subscriber_->take(true); }; const agnocast::ipc_shared_ptr take_data() { return subscriber_->take(true); }; }; } // namespace agnocast