without realsense

This commit is contained in:
Your Name 2023-06-15 11:30:04 +02:00
commit b75357d4f5
31 changed files with 1769 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
install/
log/
build/

53
src/dcaitirobot/.gitignore vendored Normal file
View File

@ -0,0 +1,53 @@
devel/
logs/
install/
build/
bin/
lib/
log/
msg_gen/
srv_gen/
msg/*Action.msg
msg/*ActionFeedback.msg
msg/*ActionGoal.msg
msg/*ActionResult.msg
msg/*Feedback.msg
msg/*Goal.msg
msg/*Result.msg
msg/_*.py
build_isolated/
devel_isolated/
# Generated by dynamic reconfigure
*.cfgc
/cfg/cpp/
/cfg/*.py
# Ignore generated docs
*.dox
*.wikidoc
# eclipse stuff
.project
.cproject
# qcreator stuff
CMakeLists.txt.user
srv/_*.py
*.pcd
*.pyc
qtcreator-*
*.user
/planning/cfg
/planning/docs
/planning/src
*~
# Emacs
.#*
# Catkin custom files
CATKIN_IGNORE

View File

View File

@ -0,0 +1,315 @@
from math import atan2, degrees, sqrt
from struct import Struct
from time import sleep
from typing import List
import rclpy
from rclpy.context import Context
from rclpy.node import Node
from rclpy.parameter import Parameter
from sensor_msgs.msg import Joy
class JSEvent(object):
"""A joystick even class (struct)
struct js_event_t
{
uint32_t time;
int16_t value;
uint8_t type;
uint8_t id;
};
Takes in raw bytes from a file.read(JS_EVENT_SIZE) call.
Also defines AXIS and BUTTON ids
"""
JS_EVENT_SIZE = 8 # 64 bits is 8 bytes
EVENT_BUTTON = 0x01
EVENT_AXIS = 0x02
EVENT_INIT = 0x80
MAX_AXIS_COUNT = 27
MAX_BUTTON_COUNT = 18
AXIS_COUNT = 26
BUTTON_COUNT = 17
AXIS_LEFT_STICK_HORIZONTAL = 0
AXIS_LEFT_STICK_VERTICAL = 1
AXIS_RIGHT_STICK_HORIZONTAL = 2
AXIS_RIGHT_STICK_VERTICAL = 3
AXIS_DPAD_UP = 8
AXIS_DPAD_RIGHT = 9
AXIS_DPAD_DOWN = 10
AXIS_DPAD_LEFT = 11 # who knows what the left value should actually be...
AXIS_LEFT_TRIGGER = 12
AXIS_RIGHT_TRIGGER = 13
AXIS_LEFT_BUMPER = 14
AXIS_RIGHT_BUMPER = 15
AXIS_TRIANGLE = 16
AXIS_CIRCLE = 17
AXIS_X = 18
AXIS_SQUARE = 19
AXIS_ACCEL_X = 23 # note: left is positive, right is negative
AXIS_ACCEL_Y = 24 # note: back is positive, forward is negative
AXIS_ACCEL_Z = 25 # note: can't tell what sign is what
BUTTON_SELECT = 0
BUTTON_LEFT_JOYSTICK = 1
BUTTON_RIGHT_JOYSTICK = 2
BUTTON_START = 3
BUTTON_DPAD_UP = 4
BUTTON_DPAD_RIGHT = 5
BUTTON_DPAD_DOWN = 6
BUTTON_DPAD_LEFT = 7
BUTTON_LEFT_TRIGGER = 8
BUTTON_RIGHT_TRIGGER = 9
BUTTON_LEFT_BUMPER = 10
BUTTON_RIGHT_BUMPER = 11
BUTTON_TRIANGLE = 12
BUTTON_CIRCLE = 13
BUTTON_X = 14
BUTTON_SQUARE = 15
BUTTON_PS3 = 16
def __init__(self, event_struct):
# c.f. https://docs.python.org/3/library/struct.html#format-characters
s = Struct("< I h B B")
self.time, self.value, self.type, self.id = s.unpack(event_struct)
# also c.f. Struct.pack()
# ignore non-input events
self.type = self.type & ~self.EVENT_INIT
def __repr__(self):
struct = "struct js_event_t\n{"
# struct += '\n\tuint32_t time : {}'.format(self.time)
struct += "\n\tint16_t value : {}".format(self.value)
struct += "\n\tuint8_t type : {}".format(hex(self.type))
struct += "\n\tuint8_t id : {}".format(self.id)
struct += "\n};\n"
return struct
class Joystick(object):
def __init__(self, js_file="/dev/input/js1"):
# note names beginning with `__` are enforced as private
self.file = open(js_file, "rb")
# since we're using ID's as indices, account for 0 based indices
self.__axis_values = [0] * (JSEvent.MAX_AXIS_COUNT + 1)
# since we're using ID's as indices, account for 0 based indices
self.__button_values = [0] * (JSEvent.MAX_BUTTON_COUNT + 1)
# TODO: think about adding in previous value lists
self.update_id = -1
self.AxisUpdate = False
self.ButtonUpdate = False
self.event = None
def __del__(self):
self.file.close()
def __enter__(self):
return self
# __enter__ and __exit__ allow the context manager syntax
# with Joystick() as js:
# ...
def __exit__(self, exc_type, exc_value, traceback):
self.file.close()
def Update(self):
self.event = JSEvent(self.file.read(JSEvent.JS_EVENT_SIZE))
if self.event.type == JSEvent.EVENT_AXIS:
self.update_id = self.event.id
self.AxisUpdate = True
self.ButtonUpdate = False
self.__axis_values[self.event.id] = self.event.value
if self.event.type == JSEvent.EVENT_BUTTON:
self.update_id = self.event.id
self.AxisUpdate = False
self.ButtonUpdate = True
self.__button_values[self.event.id] = self.event.value
def getButtonState(self, button_id):
return self.__button_values[button_id]
def getAxisState(self, axis_id):
return self.__axis_values[axis_id]
class JoystickController(Joystick):
"""An implementation specific controller that inherits a basic Joystick. Adds the capability to
check for specific updates that you care about and then do something based on that.
"""
def __init__(self, js_file="/dev/input/js0"):
Joystick.__init__(self, js_file)
# direction gives the direction of travel. Defined in terms of r and theta
self.direction_vector = (0, 0)
# rotation tells the robot to rotate x degrees
self.rotation_vector = (0, 0)
# rotation as direction - rotation
# self.difference_vector = (0, 0)
def hasUpdate(self):
return self.ButtonUpdate or self.AxisUpdate
def performUpdates(self):
care_about_buttons = [
JSEvent.BUTTON_SELECT,
JSEvent.BUTTON_LEFT_JOYSTICK,
JSEvent.BUTTON_RIGHT_JOYSTICK,
JSEvent.BUTTON_START,
JSEvent.BUTTON_DPAD_UP,
JSEvent.BUTTON_DPAD_RIGHT,
JSEvent.BUTTON_DPAD_DOWN,
JSEvent.BUTTON_DPAD_LEFT,
JSEvent.BUTTON_LEFT_TRIGGER,
JSEvent.BUTTON_RIGHT_TRIGGER,
JSEvent.BUTTON_LEFT_BUMPER,
JSEvent.BUTTON_RIGHT_BUMPER,
JSEvent.BUTTON_TRIANGLE,
JSEvent.BUTTON_CIRCLE,
JSEvent.BUTTON_X,
JSEvent.BUTTON_SQUARE,
JSEvent.BUTTON_PS3,
]
care_about_axes = [
JSEvent.AXIS_LEFT_STICK_HORIZONTAL,
JSEvent.AXIS_LEFT_STICK_VERTICAL,
JSEvent.AXIS_RIGHT_STICK_HORIZONTAL,
JSEvent.AXIS_RIGHT_STICK_VERTICAL,
JSEvent.AXIS_DPAD_UP,
JSEvent.AXIS_DPAD_LEFT,
JSEvent.AXIS_DPAD_RIGHT,
JSEvent.AXIS_DPAD_DOWN,
JSEvent.AXIS_LEFT_TRIGGER,
JSEvent.AXIS_RIGHT_TRIGGER,
JSEvent.AXIS_LEFT_BUMPER,
JSEvent.AXIS_RIGHT_BUMPER,
JSEvent.AXIS_TRIANGLE,
JSEvent.AXIS_CIRCLE,
JSEvent.AXIS_X,
JSEvent.AXIS_SQUARE,
JSEvent.AXIS_ACCEL_X,
JSEvent.AXIS_ACCEL_Y,
JSEvent.AXIS_ACCEL_Z,
]
if self.ButtonUpdate and self.update_id in care_about_buttons:
self.__button_update()
if self.AxisUpdate and self.update_id in care_about_axes:
self.__axis_update()
def __button_update(self):
print(
"button: {} \tvalue: {}".format(
self.update_id, self.getButtonState(self.update_id)
)
)
def __axis_update(self):
"""Uses the left and right joysticks to produce a direction vector and a rotation vector.
On a traditional mecanum robot, it can locomote in all directions with no rotation. This is
why they are separate. The vectors are in polar coordinates.
"""
print(f"{self.update_id} {self.getAxisState(self.update_id)}")
# x = self.getAxisState(JSEvent.AXIS_LEFT_STICK_HORIZONTAL)
# # down and right is positive, so flip y axis
# y = -self.getAxisState(JSEvent.AXIS_LEFT_STICK_VERTICAL)
# # TODO: define SHRT_MAX (32767) as as large as the radius can get, so scale appropriately?
# # interestingly enough, we don't have a circular portion of the plane to work with, we can
# # get the largest vectors in the 45 degree direction. Think about how this will effect
# # scaling the vectors' magnitude
# # TODO: Think about headless - rotation being a difference from the current direction
# # or actually what it would take... This is actually a part of the *robots* locomotion
# # code, not the joystick. Do headless stuff there.
# # by headless I mean - there is no sense of front - the current direction of locomotion is
# # forwards.
# # TODO: Think about what kind of coordinate transformations would be useful...
# d = self.__to_polar(x, y)
# rotate_x = self.getAxisState(JSEvent.AXIS_RIGHT_STICK_HORIZONTAL)
# rotate_y = -self.getAxisState(JSEvent.AXIS_RIGHT_STICK_VERTICAL)
# r = self.__to_polar(rotate_x, rotate_y)
# self.direction_vector = d
# self.rotation_vector = r
# # self.difference_vector = (d[0] - r[0], d[1] - r[1])
# print('direction: ({0[0]:.2f}, {0[1]:.2f})'.format(self.direction_vector))
# print('rotation: ({0[0]:.2f}, {0[1]:.2f})'.format(self.rotation_vector))
# print('difference: ({0[0]:.2f}, {0[1]:.2f})\n'.format(self.difference_vector))
def __to_polar(self, x, y):
# since atan2 knows both y and x, it yields the angle in the proper quadrant
return sqrt(x**2 + y**2), degrees(atan2(y, x))
class JoyNode(Node):
def __init__(
self,
node_name: str,
*,
context: Context = None,
cli_args: List[str] = None,
namespace: str = None,
use_global_arguments: bool = True,
enable_rosout: bool = True,
start_parameter_services: bool = True,
parameter_overrides: List[Parameter] = None,
allow_undeclared_parameters: bool = False,
automatically_declare_parameters_from_overrides: bool = False,
) -> None:
super().__init__(
node_name,
context=context,
cli_args=cli_args,
namespace=namespace,
use_global_arguments=use_global_arguments,
enable_rosout=enable_rosout,
start_parameter_services=start_parameter_services,
parameter_overrides=parameter_overrides,
allow_undeclared_parameters=allow_undeclared_parameters,
automatically_declare_parameters_from_overrides=automatically_declare_parameters_from_overrides,
)
self.pub = self.create_publisher(Joy, "/joy", 1)
def run(self):
msg = Joy()
msg.axes = [0.0, 0.0]
with JoystickController("/dev/input/js0") as controller:
while rclpy.ok():
controller.Update()
sleep(0.0001)
if controller.hasUpdate():
hor = controller.getAxisState(JSEvent.AXIS_LEFT_STICK_HORIZONTAL)
ver = controller.getAxisState(JSEvent.AXIS_LEFT_STICK_VERTICAL)
hor /= 2**15
ver /= 2**15
msg.axes[JSEvent.AXIS_LEFT_STICK_HORIZONTAL] = hor
msg.axes[JSEvent.AXIS_LEFT_STICK_VERTICAL] = ver
self.pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = JoyNode("test")
node.run()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,270 @@
import multiprocessing as mp
import struct
import threading
import time
from threading import Thread
from typing import List
import rclpy
import serial
from rclpy.client import Client
from rclpy.context import Context
from rclpy.node import Node
from rclpy.parameter import Parameter
from std_msgs.msg import Float32MultiArray
start_time = time.time() * 1000
def read_message_from_serial_stream_and_publish(stream: serial.Serial, speed_publisher):
"""
Read a message from the given serial stream.
:param stream: The serial stream.
:return: The message.
"""
while stream.is_open:
# Read until we find the start of a message.
while stream.read(1) != b"\xAA":
pass
# Read the rest of the message.
frame_type = stream.read(1)
if frame_type.hex() == "00":
message_type = "measurement"
elif frame_type.hex() == "20":
message_type = "command"
elif frame_type.hex() == "40":
message_type = "response"
else:
# print("Invalid frame type: %s" % frame_type)
continue
frame_length = stream.read(1)
frame_component = stream.read(1)
if frame_component.hex() == "00":
component = "default"
elif frame_component.hex() == "10":
component = "left_motor"
elif frame_component.hex() == "20":
component = "right_motor"
elif frame_component.hex() == "30":
component = "both_motor"
else:
# print("Invalid frame component: %s" % frame_component.hex())
continue
timestamp = stream.read(4)
payload = stream.read(int.from_bytes(frame_length, byteorder="big"))
checksum = stream.read(2)
# Check the checksum.
if sum(payload) != int.from_bytes(checksum, byteorder="big"):
# print("Invalid checksum")
# print(message_type,component,timestamp,payload,checksum)
continue
# Handle message contents.
if message_type == "measurement":
if component == "left_motor":
track_speed_left = struct.unpack(">f", payload[0:4])[0]
elif component == "right_motor":
track_speed_right = struct.unpack(">f", payload[0:4])[0]
elif component == "both_motor":
try:
track_speed_right = struct.unpack(">f", payload[0:4])[0]
track_speed_left = struct.unpack(">f", payload[4:8])[0]
except Exception as e:
# print(e)
# print("Invalid payload: %s" % payload.hex())
continue
msg = Float32MultiArray()
msg.data = [track_speed_left, track_speed_right]
# msg.header.stamp = node.get_clock().now().to_msg()
speed_publisher.publish(msg)
elif message_type == "command":
if payload == b"\x02":
print("RESET_TIMESTAMP")
global start_time
start_time = time.time() * 1000
def create_uart_message(
message_type="default", component="default", payload=b"", timestamp=0
):
"""
Create a UART message.
:param message_type: The type of the message.
:param component: The component of the message.
:param payload: The payload of the message.
:param timestamp: The timestamp of the message.
:return: The UART message.
"""
# Create the header.
message_type_bytes = None
if message_type == "measurement":
message_type_bytes = b"\x00"
elif message_type == "command":
message_type_bytes = b"\x20"
elif message_type == "response":
message_type_bytes = b"\x40"
else:
raise ValueError("Invalid message type: %s" % message_type)
component_bytes = None
if component == "default":
component_bytes = b"\x00"
elif component == "left_motor":
component_bytes = b"\x10"
elif component == "right_motor":
component_bytes = b"\x20"
elif component == "both_motor":
component_bytes = b"\x30"
else:
raise ValueError("Invalid component: %s" % component)
# Create the payload.
payload = bytearray(payload)
# Create the timestamp.
timestamp = bytearray(timestamp.to_bytes(4, byteorder="big"))
# Create the checksum.
checksum = bytearray(sum(payload).to_bytes(2, byteorder="little"))
# Create the message.
message = bytearray()
message.append(0xAA)
message.extend(message_type_bytes)
message.extend(len(payload).to_bytes(1, byteorder="big"))
message.extend(component_bytes)
message.extend(timestamp)
message.extend(payload)
message.extend(checksum)
# print(''.join([ "%02x" % b for b in message ]))
return message
class ROS2UARTNode(Node):
def __init__(
self,
node_name: str,
ser: serial.Serial,
*,
context: Context = None,
cli_args: List[str] = None,
namespace: str = None,
use_global_arguments: bool = True,
enable_rosout: bool = True,
start_parameter_services: bool = True,
parameter_overrides: List[Parameter] = None,
allow_undeclared_parameters: bool = False,
automatically_declare_parameters_from_overrides: bool = False
) -> None:
super().__init__(
node_name,
context=context,
cli_args=cli_args,
namespace=namespace,
use_global_arguments=use_global_arguments,
enable_rosout=enable_rosout,
start_parameter_services=start_parameter_services,
parameter_overrides=parameter_overrides,
allow_undeclared_parameters=allow_undeclared_parameters,
automatically_declare_parameters_from_overrides=automatically_declare_parameters_from_overrides,
)
self.create_subscription(Float32MultiArray, "/track_speeds", self.callback, 1)
self.left_speed = 0
self.right_speed = 0
self.ser = ser
def callback(self, msg: Float32MultiArray) -> None:
print(msg.data)
assert len(msg.data) == 2
self.left_speed, self.right_speed = msg.data
self.left_speed /= 1
self.right_speed /= 1
left_byte = struct.pack(">f", self.left_speed)
right_byte = struct.pack(">f", self.right_speed)
self.ser.write(
create_uart_message(
message_type="command",
component="both_motor",
payload=b"\x01" + left_byte + right_byte,
timestamp=0,
)
)
self.ser.flush()
class UART2ROSNode(Node):
def __init__(
self,
node_name: str,
ser: serial.Serial,
*,
context: Context = None,
cli_args: List[str] = None,
namespace: str = None,
use_global_arguments: bool = True,
enable_rosout: bool = True,
start_parameter_services: bool = True,
parameter_overrides: List[Parameter] = None,
allow_undeclared_parameters: bool = False,
automatically_declare_parameters_from_overrides: bool = False
) -> None:
super().__init__(
node_name,
context=context,
cli_args=cli_args,
namespace=namespace,
use_global_arguments=use_global_arguments,
enable_rosout=enable_rosout,
start_parameter_services=start_parameter_services,
parameter_overrides=parameter_overrides,
allow_undeclared_parameters=allow_undeclared_parameters,
automatically_declare_parameters_from_overrides=automatically_declare_parameters_from_overrides,
)
self.speed_publisher = self.create_publisher(Float32MultiArray, "topic", 10)
self.ser = ser
read_message_from_serial_stream_and_publish()
def destroy_node(self) -> bool:
self.process.join()
return super().destroy_node()
def main(args=None) -> None:
with serial.Serial("/dev/ttyUSB1", 115200, timeout=0.001) as ser:
rclpy.init(args=args)
ros2uart = ROS2UARTNode(node_name="ros2uart", ser=ser)
# uart2ros = UART2ROSNode(node_name='uart2ros', ser=ser)
executor = rclpy.executors.MultiThreadedExecutor()
# executor.add_node(uart2ros)
executor.add_node(ros2uart)
executor_thread = threading.Thread(target=executor.spin, daemon=True)
executor_thread.start()
# loop while not interrupted
curr_time = time.time()
rate = ros2uart.create_rate(2)
try:
while rclpy.ok():
print("Help me body, you are my only hope")
rate.sleep()
except KeyboardInterrupt:
pass
rclpy.shutdown()
executor_thread.join()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,99 @@
import math
from typing import List
import numpy as np
import rclpy
from rclpy.context import Context
from rclpy.node import Node
from rclpy.parameter import Parameter
from sensor_msgs.msg import Joy
from std_msgs.msg import Float32MultiArray
def calc_left_power(angle: float) -> float:
angle_deg = math.degrees(angle)
if -180 <= angle_deg < -90:
return 2 / 90 * (angle_deg + 180) - 1
elif -90 <= angle_deg < 0:
return 1
elif 0 <= angle_deg < 90:
return -2 / 90 * angle_deg + 1
elif 90 <= angle_deg <= 180:
return -1
def calc_right_power(angle: float) -> float:
angle_deg = math.degrees(angle)
if -180 <= angle_deg < -90:
return -1
elif -90 <= angle_deg < 0:
return 2 / 90 * (angle_deg + 90) - 1
elif 0 <= angle_deg < 90:
return 1
elif 90 <= angle_deg <= 180:
return -2 / 90 * (angle_deg - 90) + 1
def calc_power_from_euclidean_joystick(x, y):
x, y = y, -x # rotate 90 so that up is 0 degress
r = np.linalg.norm([x, y]) / np.sqrt(2)
angle = np.arctan2(y, x)
return calc_left_power(angle) * r, calc_right_power(angle) * r
class JoystickSpeedDistributorNode(Node):
def __init__(
self,
node_name: str,
*,
context: Context = None,
cli_args: List[str] = None,
namespace: str = None,
use_global_arguments: bool = True,
enable_rosout: bool = True,
start_parameter_services: bool = True,
parameter_overrides: List[Parameter] = None,
allow_undeclared_parameters: bool = False,
automatically_declare_parameters_from_overrides: bool = False
) -> None:
super().__init__(
node_name,
context=context,
cli_args=cli_args,
namespace=namespace,
use_global_arguments=use_global_arguments,
enable_rosout=enable_rosout,
start_parameter_services=start_parameter_services,
parameter_overrides=parameter_overrides,
allow_undeclared_parameters=allow_undeclared_parameters,
automatically_declare_parameters_from_overrides=automatically_declare_parameters_from_overrides,
)
self.l = 0.1
self.create_subscription(Joy, "/joy", self.joy_callback, 1)
self.pub = self.create_publisher(Float32MultiArray, "/track_speeds", 1)
def joy_callback(self, joy_message):
x, y = joy_message.axes[:2]
y = -y
powers = calc_power_from_euclidean_joystick(x, y)
msg = Float32MultiArray()
msg.data = list(map(float, powers))
self.pub.publish(msg)
def main(args=None) -> None:
rclpy.init(args=args)
node = JoystickSpeedDistributorNode("speed_distributor")
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,53 @@
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
from sensor_msgs.msg import Joy
import numpy as np
def calc_speed_from_joy(x, y):
linear = -y
angular = -x
return linear, angular
class TwistPublisher(Node):
def __init__(self):
super().__init__('twist_publisher')
self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 1)
self.create_subscription(Joy, "/joy", self.joy_callback, 1)
def joy_callback(self, joy_message):
x, y = joy_message.axes[:2]
linear, angular = calc_speed_from_joy(x,y)
msg = Twist()
msg.linear.x = linear
msg.angular.z = angular
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg)
def main(args=None):
rclpy.init(args=args)
minimal_publisher = TwistPublisher()
rclpy.spin(minimal_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

5
src/dcaitirobot/embedded/.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.pio
.vscode/.browse.c_cpp.db*
.vscode/c_cpp_properties.json
.vscode/launch.json
.vscode/ipch

View File

@ -0,0 +1,10 @@
{
// See http://go.microsoft.com/fwlink/?LinkId=827846
// for the documentation about the extensions.json format
"recommendations": [
"platformio.platformio-ide"
],
"unwantedRecommendations": [
"ms-vscode.cpptools-extension-pack"
]
}

View File

@ -0,0 +1 @@
# DCAITI

View File

@ -0,0 +1,39 @@
This directory is intended for project header files.
A header file is a file containing C declarations and macro definitions
to be shared between several project source files. You request the use of a
header file in your project source file (C, C++, etc) located in `src` folder
by including it, with the C preprocessing directive `#include'.
```src/main.c
#include "header.h"
int main (void)
{
...
}
```
Including a header file produces the same results as copying the header file
into each source file that needs it. Such copying would be time-consuming
and error-prone. With a header file, the related declarations appear
in only one place. If they need to be changed, they can be changed in one
place, and programs that include the header file will automatically use the
new version when next recompiled. The header file eliminates the labor of
finding and changing all the copies as well as the risk that a failure to
find one copy will result in inconsistencies within a program.
In C, the usual convention is to give header files names that end with `.h'.
It is most portable to use only letters, digits, dashes, and underscores in
header file names, and at most one dot.
Read more about using header files in official GCC documentation:
* Include Syntax
* Include Operation
* Once-Only Headers
* Computed Includes
https://gcc.gnu.org/onlinedocs/cpp/Header-Files.html

View File

@ -0,0 +1,54 @@
#ifndef UART_COM_HPP
#define UART_COM_HPP
/**
* @file UARTCom.hpp
* @author your name (you@domain.com)
* @brief class to manage uart communication
* @version 0.1
* @date 2021-07-22
*
* @copyright Copyright (c) 2021
*
*/
#include "Arduino.h"
#include "constants.h"
#include "R2WD.h"
class UARTCom {
private:
/* functions */
public:
/* variables */
static uint8_t data[MAX_PACKET_LENGTH];
/* functions */
/**
* @brief initializes uart communication
*
* @param d data package to be initialized
*/
static void init (uint8_t* d);
/**
* @brief reads uart interface and stores the data into an uint8_t array
*
* @param d array to store the data into
*/
static void read (uint8_t* d);
/**
* @brief assembles and sends a uart package
*
* @param ft frametype of message
* @param pll payload length
* @param c component
* @param pl payload
*/
static void send (U_FrameType ft, uint8_t pll, U_Component c, uint8_t* pl);
};
#endif

View File

@ -0,0 +1,36 @@
#ifndef U_COMMANDS_HPP
#define U_COMMANDS_HPP
/**
* @file UCommands.hpp
* @author your name (you@domain.com)
* @brief implementation of commands according to uart protocol
* @version 0.1
* @date 2021-07-22
*
* @copyright Copyright (c) 2021
*
*/
#include "Arduino.h"
#include "constants.h"
class UCommands {
private:
/* functions */
public:
/* variables */
/* functions */
/**
* @brief sets the desired velocity of a motor
*
* @param d uart data package to read the velocity from
* @param c motor to apply the value to
*/
static void setSetpoint (uint8_t* d, U_Component c);
};
#endif

View File

@ -0,0 +1,43 @@
#ifndef U_VALUE_HPP
#define U_VALUE_HPP
/**
* @file UValue.hpp
* @author your name (you@domain.com)
* @brief class to send out encoder values
* @version 0.1
* @date 2021-07-22
*
* @copyright Copyright (c) 2021
*
*/
#include "constants.h"
class UValue {
private:
/* functions */
public:
/* variables */
/* functions */
/**
* @brief sends encoder value via uart
*
* @param cp component to which the value refers to
* @param value encoder value
*/
static void sendEncoderValue (U_Component cp, float value);
/**
* @brief send both encoder values via uart
*
* @param value_left encoder value left
* @param value_right encoder value right
*/
static void sendBothEncoderValues(float value_left, float value_right);
};
#endif

View File

@ -0,0 +1,95 @@
#ifndef CONSTANTS_H
#define CONSTANTS_H
/**
* @brief all constants and custom data types
*
*/
#define DCAITI_DEBUG 0
/* 1 is on the right, 2 on the left */
/* motors */
#define NUM_MOTORS 2
#define M1_EN_PIN 10
#define M1_DIR_PIN 11
#define M2_EN_PIN 9
#define M2_DIR_PIN 8
#define SPEED_THRESHOLD 8
/* encoders */
#define ENC_1_A 6
#define ENC_1_B 7
#define ENC_2_A 5
#define ENC_2_B 4
#define COUNT_PER_ROTATION 3533
#define CHAIN_LENGTH_M 0.741
#define VEL_UPDATE_RATE_MS 20
#define MAX_SPEED 0.6
/* pid control */
#define KP_1 10
#define KD_1 0.0
#define KI_1 0.0
#define KP_2 10
#define KD_2 0.0
#define KI_2 0.0
#define VEL2TORQUE_RATIO 1.0
#define P_GAIN 1.0
/* misc */
#define PRINT_RATE_MS 500
/* uart */
#define UART_BAUD_RATE 115200
#define MAX_PACKET_LENGTH 50
#define PACKAGE_LENGTH_NO_P 10
#define RX_328_PIN 6
#define TX_328_PIN 7
#define UART_TRANSMIT_MS 10
#define PACKAGE_PAUSE_MS 10
#define START_BYTE_POS 0
#define FRAME_TYPE_POS 1
#define PAYLOAD_LENGTH_POS 2
#define COMPONENT_POS 3
#define TIMESTAMP_POS 4
#define COMMAND_POS 8
#define PAYLOAD_POS 8
#define FRAME_TYPE_STARTBIT 7
#define FRAME_TYPE_ENDBIT 5
#define COMPONENT_STARTBIT 7
#define COMPONENT_ENDBIT 4
#define START_BYTE 0xAA
#define REQUEST_DRIVE_MOTOR 0x01
#define REQUEST_RESET_TIMESTAMP 0x02
#define FT_VALUE_BV 0
#define FT_REQUEST_BV 32
#define FT_RESPONSE_BV 64
#define CP_DEFAULT_BV 0
#define CP_LEFT_M_BV 16
#define CP_RIGHT_M_BV 32
#define CP_ALL_M_BV 48
#define LAST(k,n) ((k) & ((1<<(n))-1))
#define MID(k,m,n) LAST((k)>>(m),((n)-(m)))
typedef enum frametypes {
VALUE, REQUEST, RESPONSE
} U_FrameType;
typedef enum components {
DEFAULT_COMPONENT, LEFT_MOTOR, RIGHT_MOTOR, ALL_MOTORS
} U_Component;
typedef enum requests {
DEFAULT_REQUEST, DRIVE_MOTOR, RESET_TIMESTAMP
} U_Request;
#endif

View File

@ -0,0 +1,50 @@
#ifndef UTILS_HPP
#define UTILS_HPP
/**
* @file Utils.hpp
* @author your name (you@domain.com)
* @brief useful converting and computing functions
* @version 0.1
* @date 2021-07-22
*
* @copyright Copyright (c) 2021
*
*/
#include "Arduino.h"
#include "R2WD.h"
class Utils {
private:
/* functions */
public:
/* variables */
/* functions */
/**
* @brief converts 4 bytes to float (big endian)
*
* @param b0 first byte
* @param b1 second byte
* @param b2 third byte
* @param b3 fourth byte
* @return float
*/
static float bytesToFloat (uint8_t b0, uint8_t b1, uint8_t b2, uint8_t b3);
/**
* @brief converts 4 bytes to uint32_t (big endian)
*
* @param b0 first byte
* @param b1 second byte
* @param b2 third byte
* @param b3 fourth byte
* @return uint32_t
*/
static uint32_t bytesToInt (uint8_t b0, uint8_t b1, uint8_t b2, uint8_t b3);
};
#endif

View File

@ -0,0 +1,17 @@
; PlatformIO Project Configuration File
;
; Build options: build flags, source filter
; Upload options: custom upload port, speed and extra flags
; Library options: dependencies, extra library storages
; Advanced options: extra scripting
;
; Please visit documentation for the other options and examples
; https://docs.platformio.org/page/projectconf.html
[env:diecimilaatmega328]
platform = atmelavr
board = diecimilaatmega328
framework = arduino
monitor_speed = 115200
lib_deps =
arduino-libraries

View File

@ -0,0 +1,314 @@
#include "communication/UARTCom.hpp"
#include "communication/UCommands.hpp"
#include "misc/Utils.hpp"
#include "constants.h"
uint8_t UARTCom::data[MAX_PACKET_LENGTH];
uint8_t currentParseIndex;
int payloadLength;
uint32_t currentTimestamp = 0;
/**
* @brief extracts frametype from data package
*
* @param d data package
* @return U_FrameType
*/
U_FrameType parseFrameType (uint8_t* d)
{
uint8_t b = d[FRAME_TYPE_POS];
return (U_FrameType) MID(b, FRAME_TYPE_ENDBIT, FRAME_TYPE_STARTBIT + 1);
}
/**
* @brief translates frametype to corresponding byte value
*
* @param ft frametype
* @return uint8_t
*/
uint8_t fromFrameType (U_FrameType ft)
{
switch (ft) {
case VALUE: return FT_VALUE_BV;
case REQUEST: return FT_REQUEST_BV;
case RESPONSE: return FT_RESPONSE_BV;
default: return 1;
}
}
/**
* @brief extract component information
*
* @param d data package
* @return U_Component
*/
U_Component parseComponent (uint8_t* d)
{
uint8_t b = d[COMPONENT_POS];
return (U_Component) MID(b, COMPONENT_ENDBIT, COMPONENT_STARTBIT + 1);
}
/**
* @brief translates component into byte value
*
* @param cp component
* @return uint8_t
*/
uint8_t fromComponent (U_Component cp)
{
switch (cp) {
case DEFAULT_COMPONENT: return CP_DEFAULT_BV;
case LEFT_MOTOR: return CP_LEFT_M_BV;
case RIGHT_MOTOR: return CP_RIGHT_M_BV;
case ALL_MOTORS: return CP_ALL_M_BV;
default: return 1;
}
}
/**
* @brief extracts request information from data package
*
* @param d data package
* @return U_Request
*/
U_Request parseRequest (uint8_t* d)
{
return (U_Request) d[COMMAND_POS];
}
/**
* @brief extracts timestamp from data package
*
* @param d data package
* @return uint32_t
*/
uint32_t parseTimestamp (uint8_t* d)
{
return Utils::bytesToInt(d[TIMESTAMP_POS],
d[TIMESTAMP_POS + 1],
d[TIMESTAMP_POS + 2],
d[TIMESTAMP_POS + 3]);
}
/**
* @brief computes checksum from data package
*
* @param d data package
* @param pl payload length
* @return uint16_t
*/
uint16_t computeChecksum (uint8_t* d, uint8_t pl)
{
uint16_t r = 0;
for (int i = PAYLOAD_POS; i < PAYLOAD_POS + pl; i++) {
r += (uint8_t) d[i];
}
return r;
}
/**
* @brief checks if checksum inside package and computed checksum from package data are the same
*
* @param d data package
* @return int
*/
int correctChecksum (uint8_t* d)
{
uint8_t payloadLength = d[PAYLOAD_LENGTH_POS];
uint16_t checksumRead = d[PACKAGE_LENGTH_NO_P + payloadLength - 2] | d[PACKAGE_LENGTH_NO_P + payloadLength - 1] << 8;
return checksumRead == computeChecksum(d, payloadLength);
}
/**
* @brief debugging function, prints out package data
*
* @param d data package
* @param pl payload length
*/
void showPackage (uint8_t* d, int pl)
{
Serial.println("Package to be handled:");
for (int i = 0; i < PACKAGE_LENGTH_NO_P + pl; i++) {
char tmp[3];
sprintf(tmp, "%02X", d[i]);
Serial.print(tmp);
}
Serial.println();
}
/**
* @brief erases all package data and sets the array to 0
*
* @param d data package
*/
void clearPackage (uint8_t* d)
{
for (int i = 0; i < MAX_PACKET_LENGTH; i++) {
d[i] = 0;
}
currentParseIndex = 0;
payloadLength = -1;
}
/**
* @brief executes commands according to incoming request
*
* @param d data package
*/
void handleRequest (uint8_t* d)
{
switch (parseRequest(d)) {
case DRIVE_MOTOR: {
UCommands::setSetpoint(d, parseComponent(d));
break;
}
case RESET_TIMESTAMP: {
currentTimestamp = 0;
break;
}
default: {
Serial.println("UNKNOWN REQUEST");
break;
}
}
}
/**
* @brief package handler, checks for errors first, then proceeds to
* extract information from the data package
*
* @param d
* @param pl
*/
void handlePackage (uint8_t* d, int pl)
{
if (d[START_BYTE_POS] != START_BYTE || !correctChecksum(d)) {
Serial.println("INCORRECT START BYTE OR CHECKSUM");
return;
}
#ifdef DCAITI_DEBUG
showPackage(d, pl);
#endif
/*
if (parseTimestamp(d) <= currentTimestamp) {
if (parseFrameType(d) == REQUEST && parseRequest(d) == RESET_TIMESTAMP) {
currentTimestamp = 0;
}
else {
return;
}
}
else if (parseTimestamp(d) - currentTimestamp < 10000) {
currentTimestamp = parseTimestamp(d);
}
else return;*/
switch (parseFrameType(d)) {
case VALUE: {
// TODO, currently not used
break;
}
case REQUEST: {
handleRequest(d);
break;
}
case RESPONSE: {
// TODO, currently not used
break;
}
default: {
break;
}
}
}
void UARTCom::init (uint8_t* d)
{
Serial.begin(UART_BAUD_RATE);
clearPackage(d);
}
void UARTCom::read (uint8_t* d)
{
uint8_t byteRead = 0;
while (Serial.available()) {
byteRead = Serial.read();
#ifdef DCAITI_DEBUG
Serial.print("INCOMING: ");
char tmp[3];
sprintf(tmp, "%02X", byteRead);
Serial.println(tmp);
#endif
switch (currentParseIndex) {
case START_BYTE_POS: {
if (byteRead != START_BYTE) {
clearPackage(d);
return;
}
d[currentParseIndex] = START_BYTE;
break;
}
case FRAME_TYPE_POS: {
d[currentParseIndex] = byteRead;
break;
}
case PAYLOAD_LENGTH_POS: {
payloadLength = byteRead;
d[currentParseIndex] = payloadLength;
break;
}
default: {
d[currentParseIndex] = byteRead;
break;
}
}
if (payloadLength != -1 && currentParseIndex == PACKAGE_LENGTH_NO_P + payloadLength - 1) {
handlePackage(d, payloadLength);
clearPackage(d);
}
else currentParseIndex++;
}
}
void UARTCom::send (U_FrameType ft, uint8_t pll, U_Component cp, uint8_t* pl)
{
uint8_t toSend[PACKAGE_LENGTH_NO_P + pll];
for (int i = 0; i < PACKAGE_LENGTH_NO_P + pll; i++) toSend[i] = 0;
toSend[START_BYTE_POS] = START_BYTE;
toSend[FRAME_TYPE_POS] = fromFrameType(ft);
toSend[PAYLOAD_LENGTH_POS] = pll;
toSend[COMPONENT_POS] = fromComponent(cp);
union {
uint32_t t;
uint8_t b[4];
} u;
u.t = (uint32_t) millis();
toSend[TIMESTAMP_POS] = u.b[3];
toSend[TIMESTAMP_POS + 1] = u.b[2];
toSend[TIMESTAMP_POS + 2] = u.b[1];
toSend[TIMESTAMP_POS + 3] = u.b[0];
uint16_t checksum = 0;
for (int i = 0; i < pll; i++) {
toSend[PAYLOAD_POS + i] = pl[i];
checksum += pl[i];
}
union {
uint16_t c;
uint8_t b[2];
} ch;
ch.c = checksum;
toSend[PAYLOAD_POS + pll] = ch.b[1];
toSend[PAYLOAD_POS + pll + 1] = ch.b[0];
Serial.write(toSend, sizeof(toSend));
}

View File

@ -0,0 +1,34 @@
#include "communication/UCommands.hpp"
#include "misc/Utils.hpp"
#include "R2WD.h"
extern R2WD _2WD;
void UCommands::setSetpoint (uint8_t* d, U_Component c)
{
float speed1 = Utils::bytesToFloat(d[COMMAND_POS + 1],
d[COMMAND_POS + 2],
d[COMMAND_POS + 3],
d[COMMAND_POS + 4]);
switch (c) {
case ALL_MOTORS: {
float speed2 = Utils::bytesToFloat(d[COMMAND_POS + 5],
d[COMMAND_POS + 6],
d[COMMAND_POS + 7],
d[COMMAND_POS + 8]);
_2WD.wheelLeftSetSpeedMMPS(abs(speed1*1000), speed1 > 0 ? DIR_BACKOFF : DIR_ADVANCE);
_2WD.wheelRightSetSpeedMMPS(abs(speed2*1000), speed2 > 0 ? DIR_ADVANCE : DIR_BACKOFF);
break;
}
case RIGHT_MOTOR: {
_2WD.wheelLeftSetSpeedMMPS(abs(speed1*1000), speed1 > 0 ? DIR_BACKOFF : DIR_ADVANCE);
break;
}
case LEFT_MOTOR: {
_2WD.wheelRightSetSpeedMMPS(abs(speed1*1000), speed1 > 0 ? DIR_ADVANCE : DIR_BACKOFF);
break;
}
default: {
break;
}
}
}

View File

@ -0,0 +1,38 @@
#include "communication/UValue.hpp"
#include "communication/UARTCom.hpp"
#include "Arduino.h"
#include "communication/UValue.hpp"
#include "constants.h"
void UValue::sendEncoderValue (U_Component cp, float value)
{
union {
float f;
uint8_t b[4];
} u;
u.f = value;
uint8_t switched_b[4];
for (int i = 0; i < 4; i++) {
switched_b[i] = u.b[3 - i];
}
UARTCom::send(VALUE, 4, cp, switched_b);
}
void UValue::sendBothEncoderValues (float value_right, float value_left)
{
union {
float f[2];
uint8_t b[8];
} u;
u.f[0] = value_right;
u.f[1] = value_left;
uint8_t switched_byte_array[8];
for (int i = 0; i < 4; i++) {
switched_byte_array[i] = u.b[3 - i];
switched_byte_array[i+4] = u.b[7 - i];
}
UARTCom::send(VALUE, 8, ALL_MOTORS, switched_byte_array);
}

View File

@ -0,0 +1,50 @@
#include <Arduino.h>
#include "communication/UARTCom.hpp"
#include "communication/UValue.hpp"
long controlTimer = 0;
long uartTransmitTimer = 0;
int speed = 0;
#include <PinChangeInt.h>
#include <PinChangeIntConfig.h>
#include <PID_Beta6.h>
#include <MotorWheel.h>
#include <R2WD.h>
#ifndef MICROS_PER_SEC
#define MICROS_PER_SEC 1000000
#endif
# define DEBUG true
irqISR(irq1, isr1);
irqISR(irq2, isr2);
MotorWheel wheelLeft(9, 8, 4, 5, &irq1, REDUCTION_RATIO, 300); // Motor PWM:Pin9, DIR:Pin8, Encoder A:Pin6, B:Pin7
MotorWheel wheelRight(10, 11, 6, 7, &irq2, REDUCTION_RATIO, 300); // Motor PWM:Pin9, DIR:Pin8, Encoder A:Pin6, B:Pin7
R2WD _2WD(&wheelLeft, &wheelRight, WHEELSPAN);
void setup(void)
{
TCCR1B = TCCR1B & 0xf8 | 0x01; // Pin9,Pin10 PWM 31250Hz, Silent PWM
_2WD.PIDEnable(0.26,0.02,0.02,10);// Enable PID
//_2WD.PIDEnable(KC, TAUI, TAUD, 10);
UARTCom::init(UARTCom::data);
controlTimer = millis();
uartTransmitTimer = millis();
delay(100);
uint8_t pl[1];
pl[0] = REQUEST_RESET_TIMESTAMP;
UARTCom::send(REQUEST, 1, DEFAULT_COMPONENT, pl);
_2WD.setCarAdvance(50);
}
void loop(void)
{
_2WD.PIDRegulate();
UARTCom::read(UARTCom::data);
if (millis() - uartTransmitTimer > 2 * UART_TRANSMIT_MS) {
UValue::sendBothEncoderValues(_2WD.wheelLeftGetSpeedMMPS()/1000.0, _2WD.wheelRightGetSpeedMMPS()/1000.0);
uartTransmitTimer = millis();
}
}

View File

@ -0,0 +1,27 @@
#include "misc/Utils.hpp"
float Utils::bytesToFloat (uint8_t b0, uint8_t b1, uint8_t b2, uint8_t b3)
{
union {
float f;
uint8_t b[4];
} u;
u.b[3] = b0;
u.b[2] = b1;
u.b[1] = b2;
u.b[0] = b3;
return u.f;
}
uint32_t Utils::bytesToInt (uint8_t b0, uint8_t b1, uint8_t b2, uint8_t b3)
{
union {
uint32_t i;
uint8_t b[4];
} u;
u.b[3] = b0;
u.b[2] = b1;
u.b[1] = b2;
u.b[0] = b3;
return u.i;
}

View File

@ -0,0 +1,11 @@
This directory is intended for PlatformIO Unit Testing and project tests.
Unit Testing is a software testing method by which individual units of
source code, sets of one or more MCU program modules together with associated
control data, usage procedures, and operating procedures, are tested to
determine whether they are fit for use. Unit testing finds problems early
in the development cycle.
More information about PlatformIO Unit Testing:
- https://docs.platformio.org/page/plus/unit-testing.html

View File

@ -0,0 +1,21 @@
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='dcaitirobot',
executable='joy',
name='joystick',
),
Node(
package='dcaitirobot',
executable='twistcalc',
name='twistcalc',
),
Node(
package='dcaitirobot',
executable='serial_comms',
name='serial',
)
])

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>dcaitirobot</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="nvidia@todo.todo">nvidia</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<exec_depend>ros2launch</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/dcaitirobot
[install]
install_scripts=$base/lib/dcaitirobot

32
src/dcaitirobot/setup.py Normal file
View File

@ -0,0 +1,32 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'dcaitirobot'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name), glob('launch/*.launch.py'))
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='nvidia',
maintainer_email='nvidia@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'speedcalc = dcaitirobot.speed:main',
'twistcalc = dcaitirobot.twist:main',
'serial_comms = dcaitirobot.serial_comms:main',
'joy = dcaitirobot.joy:main',
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'