add python script for generate can_viwer_node

This commit is contained in:
Elias Rosendahl 2025-07-30 10:58:13 +02:00
parent be93645b4d
commit 76634ef316

View File

@ -0,0 +1,132 @@
import re
dbc_path = "/workspaces/ft_analytics/ros2_ws/src/ft_can_transceiver_lib/dbc/can/CANdbcFT25/CAN1MainFT25.dbc"
with open(dbc_path, "r") as f:
dbc_text = f.read()
msg_pattern = re.compile(r'^BO_ \d+ (\w+):')
sg_pattern = re.compile(r'^\s*SG_ (\w+)\s*:\s*\d+\|(\d+)')
messages = []
current = None
for line in dbc_text.splitlines():
m = msg_pattern.match(line)
if m:
msg_name = m.group(1)
current = {"name": msg_name, "signals": []}
messages.append(current)
elif current:
sg = sg_pattern.match(line)
if sg:
current["signals"].append({
"name": sg.group(1),
"bit_length": int(sg.group(2))
})
def get_topic(msg, sig):
return f"can/{msg}/{sig}"
def publisher_map_type(bitlen):
return "bool_publishers_" if bitlen == 1 else "float_publishers_"
def ros_msg_type(bitlen):
return "std_msgs::msg::Bool" if bitlen == 1 else "std_msgs::msg::Float32"
def pub_map(bitlen):
return "bool_publishers_" if bitlen == 1 else "float_publishers_"
def ros_data_type(bitlen):
return "d" if bitlen == 1 else "f"
# --- HEADER ---
header = '''#include <memory>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/float32.hpp>
#include <std_msgs/msg/bool.hpp>
#include </workspaces/ft_analytics/ros2_ws/src/ft_can_transceiver_lib/include/ft_can_transceiver_lib/transceiver.h>
#include </workspaces/ft_analytics/ros2_ws/src/ft_can_transceiver_lib/include/ft_can_transceiver_lib/can1.h>
class CanViewerNode : public rclcpp::Node {
public:
CanViewerNode() : Node("can_viewer_node"), transceiver_("slcan0", {{}}) {
'''
# --- PUBLISHER INITIALISIERUNG ---
publisher_init = []
for msg in messages:
msg_lc = msg["name"].lower()
for sig in msg["signals"]:
topic = get_topic(msg_lc, sig["name"])
if sig["bit_length"] == 1:
publisher_init.append(f' bool_publishers_["{sig["name"]}"] = this->create_publisher<std_msgs::msg::Bool>("{topic}", 10);')
else:
publisher_init.append(f' float_publishers_["{sig["name"]}"] = this->create_publisher<std_msgs::msg::Float32>("{topic}", 10);')
# --- CALLBACK BLOECKE ---
callback_blocks = []
for msg in messages:
msg_lc = msg["name"].lower()
struct_decoded = f"canlib::frame::decoded::can1::{msg_lc}_t"
struct_raw = f"can1_{msg_lc}_t"
block = []
block.append(f' canlib::callback::can1::{msg_lc} = [this]({struct_raw} /*frame_raw*/, {struct_decoded} frame_decoded) {{')
for sig in msg["signals"]:
block.append(f' {ros_msg_type(sig["bit_length"])} msg_{sig["name"].lower()};')
block.append("")
for sig in msg["signals"]:
block.append(f' msg_{sig["name"].lower()}.data = frame_decoded.{sig["name"]};')
block.append("")
for sig in msg["signals"]:
block.append(f' {pub_map(sig["bit_length"])}["{sig["name"]}"]->publish(msg_{sig["name"].lower()});')
block.append("")
log_parts = [f'{sig["name"]}=%{ros_data_type(sig["bit_length"])}' for sig in msg["signals"]]
log_vars = ', '.join([f'msg_{sig["name"].lower()}.data' for sig in msg["signals"]])
block.append(f' RCLCPP_INFO(this->get_logger(), "{msg["name"]}: {" ".join(log_parts)}", {log_vars});')
block.append(" };")
callback_blocks.append("\n".join(block))
# --- THREAD BLOCK ---
thread_block = '''
// Empfangsthread starten
receiver_thread_ = std::thread([this]() {
while (rclcpp::ok()) {
transceiver_.receive();
}
});
receiver_thread_.detach();
}
'''
# --- PRIVATE ---
private_block = '''
private:
canlib::Transceiver transceiver_;
std::thread receiver_thread_;
std::map<std::string, rclcpp::Publisher<std_msgs::msg::Bool>::SharedPtr> bool_publishers_;
std::map<std::string, rclcpp::Publisher<std_msgs::msg::Float32>::SharedPtr> float_publishers_;
};
'''
# --- MAIN ---
main_block = '''
int main(int argc, char * argv[]) {
rclcpp::init(argc, argv);
auto node = std::make_shared<CanViewerNode>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
'''
# --- AUSGABE ---
with open("ros2_ws/src/ft_can_viewer/test/can_viewer_node.cpp", "w") as f:
f.write(header)
f.write('\n'.join(publisher_init))
f.write("\n\n")
f.write('\n\n'.join(callback_blocks))
f.write(thread_block)
f.write(private_block)
f.write(main_block)
print("COMPLETE! -> can_viewer_node.cpp")