Program Listing for File MqttClient.ros2.hpp
↰ Return to documentation for file (include/mqtt_client/MqttClient.ros2.hpp)
/*
==============================================================================
MIT License
Copyright 2022 Institute for Automotive Engineering of RWTH Aachen University.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
==============================================================================
*/
#pragma once
#include <filesystem>
#include <map>
#include <memory>
#include <optional>
#include <string>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <mqtt/async_client.h>
#include <mqtt_client_interfaces/srv/is_connected.hpp>
#include <mqtt_client_interfaces/srv/new_mqtt2_ros_bridge.hpp>
#include <mqtt_client_interfaces/srv/new_ros2_mqtt_bridge.hpp>
#include <rclcpp/rclcpp.hpp>
#include <rclcpp/serialization.hpp>
#include <rclcpp/qos.hpp>
#include <std_msgs/msg/float64.hpp>
namespace mqtt_client {
class MqttClient : public rclcpp::Node,
public virtual mqtt::callback,
public virtual mqtt::iaction_listener {
public:
explicit MqttClient(const rclcpp::NodeOptions& options);
protected:
struct Ros2MqttInterface;
struct Mqtt2RosInterface;
void loadParameters();
bool loadParameter(const std::string& key, std::string& value);
bool loadParameter(const std::string& key, std::string& value, const std::string& default_value);
template <typename T>
bool loadParameter(const std::string& key, T& value);
template <typename T>
bool loadParameter(const std::string& key, T& value, const T& default_value);
template <typename T>
bool loadParameter(const std::string& key, std::vector<T>& value);
template <typename T>
bool loadParameter(const std::string& key, std::vector<T>& value, const std::vector<T>& default_value);
std::filesystem::path resolvePath(const std::string& path_string);
void setup();
std::optional<rclcpp::QoS> getCompatibleQoS(
const std::string& ros_topic, const rclcpp::TopicEndpointInfo& tei,
const Ros2MqttInterface& ros2mqtt) const;
std::vector<rclcpp::TopicEndpointInfo> getCandidatePublishers(
const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const;
void setupSubscriptions();
void setupPublishers();
void setupClient();
void connect();
void ros2mqtt(
const std::shared_ptr<rclcpp::SerializedMessage>& serialized_msg,
const std::string& ros_topic);
void mqtt2ros(mqtt::const_message_ptr mqtt_msg,
const rclcpp::Time& arrival_stamp);
void mqtt2primitive(mqtt::const_message_ptr mqtt_msg);
void mqtt2fixed(mqtt::const_message_ptr mqtt_msg);
void connected(const std::string& cause) override;
void connection_lost(const std::string& cause) override;
bool isConnected();
void isConnectedService(
mqtt_client_interfaces::srv::IsConnected::Request::SharedPtr request,
mqtt_client_interfaces::srv::IsConnected::Response::SharedPtr response);
void newRos2MqttBridge(
mqtt_client_interfaces::srv::NewRos2MqttBridge::Request::SharedPtr request,
mqtt_client_interfaces::srv::NewRos2MqttBridge::Response::SharedPtr response);
void newMqtt2RosBridge(
mqtt_client_interfaces::srv::NewMqtt2RosBridge::Request::SharedPtr request,
mqtt_client_interfaces::srv::NewMqtt2RosBridge::Response::SharedPtr response);
void message_arrived(mqtt::const_message_ptr mqtt_msg) override;
void delivery_complete(mqtt::delivery_token_ptr token) override;
void on_success(const mqtt::token& token) override;
void on_failure(const mqtt::token& token) override;
protected:
struct BrokerConfig {
std::string host;
int port;
std::string user;
std::string pass;
struct {
bool enabled;
std::filesystem::path ca_certificate;
} tls;
};
struct ClientConfig {
std::string id;
struct {
bool enabled;
int size;
std::filesystem::path directory;
} buffer;
struct {
std::string topic;
std::string message;
int qos;
bool retained;
} last_will;
bool clean_session;
double keep_alive_interval;
int max_inflight;
struct {
std::filesystem::path certificate;
std::filesystem::path key;
std::string password;
int version;
bool verify;
std::vector<std::string> alpn_protos;
} tls;
};
struct Ros2MqttInterface {
struct {
rclcpp::GenericSubscription::SharedPtr
subscriber;
std::string msg_type;
int queue_size = 1;
bool is_stale = false;
struct {
// If these are set to nullopt then that part of the QoS is determine automatically based on discovery
std::optional<rclcpp::ReliabilityPolicy> reliability;
std::optional<rclcpp::DurabilityPolicy> durability;
} qos;
} ros;
struct {
std::string topic;
int qos = 0;
bool retained = false;
} mqtt;
bool fixed_type = false;
bool primitive = false;
bool stamped = false;
};
struct Mqtt2RosInterface {
struct {
int qos = 0;
} mqtt;
struct {
std::string topic;
std::string msg_type;
rclcpp::GenericPublisher::SharedPtr publisher;
rclcpp::Publisher<std_msgs::msg::Float64>::SharedPtr
latency_publisher;
int queue_size = 1;
struct {
rclcpp::ReliabilityPolicy reliability = rclcpp::ReliabilityPolicy::SystemDefault;
rclcpp::DurabilityPolicy durability = rclcpp::DurabilityPolicy::SystemDefault;
} qos;
bool latched = false;
bool is_stale = false;
} ros;
bool fixed_type = false;
bool primitive = false;
bool stamped = false;
};
protected:
static const std::string kRosMsgTypeMqttTopicPrefix;
static const std::string kLatencyRosTopicPrefix;
rclcpp::TimerBase::SharedPtr check_subscriptions_timer_;
rclcpp::Service<mqtt_client_interfaces::srv::IsConnected>::SharedPtr
is_connected_service_;
rclcpp::Service<mqtt_client_interfaces::srv::NewRos2MqttBridge>::SharedPtr
new_ros2mqtt_bridge_service_;
rclcpp::Service<mqtt_client_interfaces::srv::NewMqtt2RosBridge>::SharedPtr
new_mqtt2ros_bridge_service_;
bool is_connected_ = false;
BrokerConfig broker_config_;
ClientConfig client_config_;
std::shared_ptr<mqtt::async_client> client_;
mqtt::connect_options connect_options_;
std::map<std::string, Ros2MqttInterface> ros2mqtt_;
std::map<std::string, Mqtt2RosInterface> mqtt2ros_;
uint32_t stamp_length_;
};
template <typename T>
bool MqttClient::loadParameter(const std::string& key, T& value) {
bool found = get_parameter(key, value);
if (found)
RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
std::to_string(value).c_str());
return found;
}
template <typename T>
bool MqttClient::loadParameter(const std::string& key, T& value,
const T& default_value) {
bool found = get_parameter_or(key, value, default_value);
if (!found)
RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'",
key.c_str(), std::to_string(default_value).c_str());
if (found)
RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
std::to_string(value).c_str());
return found;
}
template <typename T>
bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value) {
const bool found = get_parameter(key, value);
if (found)
RCLCPP_WARN(get_logger(), "Retrieved parameter '%s' = '[%s]'", key.c_str(),
fmt::format("{}", fmt::join(value, ", ")).c_str());
return found;
}
template <typename T>
bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value,
const std::vector<T>& default_value) {
const bool found = get_parameter_or(key, value, default_value);
if (!found)
RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'",
key.c_str(), fmt::format("{}", fmt::join(value, ", ")).c_str());
if (found)
RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
fmt::format("{}", fmt::join(value, ", ")).c_str());
return found;
}
template <typename T>
void serializeRosMessage(const T& msg,
rclcpp::SerializedMessage& serialized_msg) {
rclcpp::Serialization<T> serializer;
serializer.serialize_message(&msg, &serialized_msg);
}
template <typename T>
void deserializeRosMessage(const rclcpp::SerializedMessage& serialized_msg,
T& msg) {
rclcpp::Serialization<T> serializer;
serializer.deserialize_message(&serialized_msg, &msg);
}
} // namespace mqtt_client