From 76634ef316004ef082c07ffef433cc26fc1759b7 Mon Sep 17 00:00:00 2001 From: Elias Rosendahl Date: Wed, 30 Jul 2025 10:58:13 +0200 Subject: [PATCH] add python script for generate can_viwer_node --- .../ft_can_viewer/generate_can_viewer_node.py | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 ros2_ws/src/ft_can_viewer/generate_can_viewer_node.py diff --git a/ros2_ws/src/ft_can_viewer/generate_can_viewer_node.py b/ros2_ws/src/ft_can_viewer/generate_can_viewer_node.py new file mode 100644 index 0000000..9535ccf --- /dev/null +++ b/ros2_ws/src/ft_can_viewer/generate_can_viewer_node.py @@ -0,0 +1,132 @@ +import re + +dbc_path = "/workspaces/ft_analytics/ros2_ws/src/ft_can_transceiver_lib/dbc/can/CANdbcFT25/CAN1MainFT25.dbc" + +with open(dbc_path, "r") as f: + dbc_text = f.read() + +msg_pattern = re.compile(r'^BO_ \d+ (\w+):') +sg_pattern = re.compile(r'^\s*SG_ (\w+)\s*:\s*\d+\|(\d+)') +messages = [] +current = None + +for line in dbc_text.splitlines(): + m = msg_pattern.match(line) + if m: + msg_name = m.group(1) + current = {"name": msg_name, "signals": []} + messages.append(current) + elif current: + sg = sg_pattern.match(line) + if sg: + current["signals"].append({ + "name": sg.group(1), + "bit_length": int(sg.group(2)) + }) + +def get_topic(msg, sig): + return f"can/{msg}/{sig}" + +def publisher_map_type(bitlen): + return "bool_publishers_" if bitlen == 1 else "float_publishers_" + +def ros_msg_type(bitlen): + return "std_msgs::msg::Bool" if bitlen == 1 else "std_msgs::msg::Float32" + +def pub_map(bitlen): + return "bool_publishers_" if bitlen == 1 else "float_publishers_" + +def ros_data_type(bitlen): + return "d" if bitlen == 1 else "f" + +# --- HEADER --- +header = '''#include +#include +#include +#include +#include +#include + +class CanViewerNode : public rclcpp::Node { +public: + CanViewerNode() : Node("can_viewer_node"), transceiver_("slcan0", {{}}) { +''' + +# --- PUBLISHER INITIALISIERUNG --- +publisher_init = [] +for msg in messages: + msg_lc = msg["name"].lower() + for sig in msg["signals"]: + topic = get_topic(msg_lc, sig["name"]) + if sig["bit_length"] == 1: + publisher_init.append(f' bool_publishers_["{sig["name"]}"] = this->create_publisher("{topic}", 10);') + else: + publisher_init.append(f' float_publishers_["{sig["name"]}"] = this->create_publisher("{topic}", 10);') + +# --- CALLBACK BLOECKE --- +callback_blocks = [] +for msg in messages: + msg_lc = msg["name"].lower() + struct_decoded = f"canlib::frame::decoded::can1::{msg_lc}_t" + struct_raw = f"can1_{msg_lc}_t" + block = [] + block.append(f' canlib::callback::can1::{msg_lc} = [this]({struct_raw} /*frame_raw*/, {struct_decoded} frame_decoded) {{') + for sig in msg["signals"]: + block.append(f' {ros_msg_type(sig["bit_length"])} msg_{sig["name"].lower()};') + block.append("") + for sig in msg["signals"]: + block.append(f' msg_{sig["name"].lower()}.data = frame_decoded.{sig["name"]};') + block.append("") + for sig in msg["signals"]: + block.append(f' {pub_map(sig["bit_length"])}["{sig["name"]}"]->publish(msg_{sig["name"].lower()});') + block.append("") + log_parts = [f'{sig["name"]}=%{ros_data_type(sig["bit_length"])}' for sig in msg["signals"]] + log_vars = ', '.join([f'msg_{sig["name"].lower()}.data' for sig in msg["signals"]]) + block.append(f' RCLCPP_INFO(this->get_logger(), "{msg["name"]}: {" ".join(log_parts)}", {log_vars});') + block.append(" };") + callback_blocks.append("\n".join(block)) + +# --- THREAD BLOCK --- +thread_block = ''' + // Empfangsthread starten + receiver_thread_ = std::thread([this]() { + while (rclcpp::ok()) { + transceiver_.receive(); + } + }); + receiver_thread_.detach(); + } +''' + +# --- PRIVATE --- +private_block = ''' +private: + canlib::Transceiver transceiver_; + std::thread receiver_thread_; + std::map::SharedPtr> bool_publishers_; + std::map::SharedPtr> float_publishers_; +}; +''' + +# --- MAIN --- +main_block = ''' +int main(int argc, char * argv[]) { + rclcpp::init(argc, argv); + auto node = std::make_shared(); + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} +''' + +# --- AUSGABE --- +with open("ros2_ws/src/ft_can_viewer/test/can_viewer_node.cpp", "w") as f: + f.write(header) + f.write('\n'.join(publisher_init)) + f.write("\n\n") + f.write('\n\n'.join(callback_blocks)) + f.write(thread_block) + f.write(private_block) + f.write(main_block) + +print("COMPLETE! -> can_viewer_node.cpp")