105 lines
2.9 KiB
Python
105 lines
2.9 KiB
Python
import rclpy
|
|
from rclpy.node import Node
|
|
|
|
from geometry_msgs.msg import Twist
|
|
from std_msgs.msg import String
|
|
from sensor_msgs.msg import Joy
|
|
|
|
import numpy as np
|
|
|
|
SCALE_LINEAR_VELOCITY = 0.2
|
|
SCALE_ANGULAR_VELOCITY = 0.7
|
|
PUBLISH_FREQUENCY_HZ = 30
|
|
|
|
SMOOTH_WEIGHTS = np.array([1, 2, 4, 10, 20])
|
|
|
|
|
|
class TwistPublisher(Node):
|
|
def __init__(
|
|
self,
|
|
smooth_weights=np.ones(5),
|
|
linear_scale=1.0,
|
|
angular_scale=1.0,
|
|
publish_frequency_hz=30,
|
|
):
|
|
super().__init__("twist_publisher")
|
|
|
|
self.linear_window = np.zeros(smooth_weights.shape[0])
|
|
self.angular_window = np.zeros(smooth_weights.shape[0])
|
|
self.linear_recent = 0.0
|
|
self.angular_recent = 0.0
|
|
|
|
self.linear_out = 0.0
|
|
self.angular_out = 0.0
|
|
|
|
self.weights = smooth_weights / smooth_weights.sum()
|
|
self.window_size = smooth_weights.shape[0]
|
|
|
|
self.linear_scale = linear_scale
|
|
self.angular_scale = angular_scale
|
|
|
|
self.publisher_ = self.create_publisher(
|
|
Twist, "/diff_cont/cmd_vel_unstamped", 1
|
|
)
|
|
|
|
self.publisher_2 = self.create_publisher(Twist, "/cmd_vel", 1)
|
|
|
|
self.create_subscription(Joy, "/joy", self.joy_callback, 1)
|
|
self.timer = self.create_timer(1 / publish_frequency_hz, self.timer_callback)
|
|
|
|
def joy_callback(self, joy_message):
|
|
x, y = joy_message.axes[:2]
|
|
self.linear_recent = -y * SCALE_LINEAR_VELOCITY
|
|
self.angular_recent = x * SCALE_ANGULAR_VELOCITY
|
|
|
|
def update_smooth_window(self):
|
|
self.linear_window = np.append(self.linear_window, self.linear_recent)[
|
|
-self.window_size :
|
|
]
|
|
self.angular_window = np.append(self.angular_window, self.angular_recent)[
|
|
-self.window_size :
|
|
]
|
|
|
|
def calc_smooth_speed(self):
|
|
self.linear_out = np.average(self.linear_window, weights=self.weights)
|
|
self.angular_out = np.average(self.angular_window, weights=self.weights)
|
|
|
|
def timer_callback(self):
|
|
msg = Twist()
|
|
|
|
self.update_smooth_window()
|
|
self.calc_smooth_speed()
|
|
|
|
msg.linear.x = self.linear_out
|
|
msg.angular.z = self.angular_out
|
|
self.publisher_.publish(msg)
|
|
self.publisher_2.publish(msg)
|
|
self.get_logger().info(
|
|
"Linear_window: {}, Linear_out: {}, Angular_window: {}, Angular_out: {}".format(
|
|
self.linear_window,
|
|
self.linear_out,
|
|
self.angular_window,
|
|
self.angular_out,
|
|
)
|
|
)
|
|
|
|
|
|
def main(args=None):
|
|
rclpy.init(args=args)
|
|
|
|
minimal_publisher = TwistPublisher(
|
|
SMOOTH_WEIGHTS,
|
|
SCALE_LINEAR_VELOCITY,
|
|
SCALE_ANGULAR_VELOCITY,
|
|
PUBLISH_FREQUENCY_HZ,
|
|
)
|
|
|
|
rclpy.spin(minimal_publisher)
|
|
|
|
minimal_publisher.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|