#!/usr/bin/env python3 import os import struct import time import serial import sys BITRATE = 115200 # baud/s TIMEOUT = 1 # seconds N_SLAVES = 6 LOG_FRAME_LENGTH = 8 # bytes CELLS_PER_SLAVE = 10 TEMP_SENSORS_PER_SLAVE = 32 VOLTAGE_CONV = 5.0 / 255 # volts/quantum TEMP_CONV = 0.0625 * 16 # °C/quantum import os # Windows if os.name == "nt": import msvcrt # Posix (Linux, OS X) else: import sys import termios import atexit from select import select class KBHit: def __init__(self): """Creates a KBHit object that you can call to do various keyboard things.""" if os.name == "nt": pass else: # Save the terminal settings self.fd = sys.stdin.fileno() self.new_term = termios.tcgetattr(self.fd) self.old_term = termios.tcgetattr(self.fd) # New terminal setting unbuffered self.new_term[3] = self.new_term[3] & ~termios.ICANON & ~termios.ECHO termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term) # Support normal-terminal reset at exit atexit.register(self.set_normal_term) def set_normal_term(self): """Resets to normal terminal. On Windows this is a no-op.""" if os.name == "nt": pass else: termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term) def getch(self): """Returns a keyboard character after kbhit() has been called. Should not be called in the same program as getarrow(). """ s = "" if os.name == "nt": return msvcrt.getch().decode("utf-8") else: return sys.stdin.read(1) def getarrow(self): """Returns an arrow-key code after kbhit() has been called. Codes are 0 : up 1 : right 2 : down 3 : left Should not be called in the same program as getch(). """ if os.name == "nt": msvcrt.getch() # skip 0xE0 c = msvcrt.getch() vals = [72, 77, 80, 75] else: c = sys.stdin.read(3)[2] vals = [65, 67, 66, 68] return vals.index(ord(c.decode("utf-8"))) def kbhit(self): """Returns True if keyboard character was hit, False otherwise.""" if os.name == "nt": return msvcrt.kbhit() else: dr, dw, de = select([sys.stdin], [], [], 0) return dr != [] if len(sys.argv) != 2: print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) sys.exit(os.EX_USAGE) SERIAL_PORT = sys.argv[1] ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) def check_log_start(buf: bytes): return buf[:-12].find(b"LOG") def check_current_start(buf: bytes): return buf[:-12].find(b"CUR") def check_panic_start(buf: bytes): return buf[:-12].find(b"PAN") class SlaveData: cell_voltages: list[float] cell_temps: list[float] def __init__(self) -> None: self.cell_voltages = [-1] * CELLS_PER_SLAVE self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE class AccumulatorData: slaves: list[SlaveData] min_voltage: float max_voltage: float min_temp: float max_temp: float last_frame: float current: float panic: bool panic_errorcode: int panic_errorarg: int def __init__(self) -> None: self.slaves = [SlaveData() for _ in range(N_SLAVES)] self.min_voltage = ( self.max_voltage ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 self.panic = False self.panic_errorcode = self.panic_errorarg = 0 rx_buf = bytes() data = AccumulatorData() def decode_log_frame(buf: bytes): msg_id = buf[0] slave = msg_id >> 4 frame_id = msg_id & 0x0F if slave >= N_SLAVES: return if frame_id == 0: for i in range(7): data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV elif frame_id == 1: for i in range(3): data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV for i in range(4): data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV elif frame_id == 2: for i in range(7): data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV elif frame_id == 3: for i in range(7): data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV elif frame_id == 4: for i in range(7): data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV elif frame_id == 5: for i in range(7): data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV else: # print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) # time.sleep(1) return data.last_frame = time.time() def decode_current_frame(buf: bytes): # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) current = struct.unpack(">i", buf[2:6])[0] data.current = current / 1000.0 def decode_panic_frame(buf: bytes): data.panic = True data.panic_errorcode = buf[0] data.panic_errorarg = buf[1] def update_display(): voltages = [ slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves ] temps = [ slave.cell_temps[i] for i in range(TEMP_SENSORS_PER_SLAVE) for slave in data.slaves ] data.min_voltage = min(voltages) data.max_voltage = max(voltages) data.min_temp = min(temps) data.max_temp = max(temps) time_since_last_frame = time.time() - data.last_frame print("\033[2J\033[H", end="") print("-" * 20) if data.panic: print("!!!!! PANIC !!!!!") print(f"Error code: {data.panic_errorcode}") print(f"Error arg: {data.panic_errorarg}") time.sleep(0.5) return print(f"Time since last frame: {time_since_last_frame} s") print(f"Current: {data.current:.2f} A") print(f"Min voltage: {data.min_voltage:.2f} V") print(f"Max voltage: {data.max_voltage:.2f} V") print(f"Min temp: {data.min_temp:.1f} °C") print(f"Max temp: {data.max_temp:.1f} °C") for i in range(N_SLAVES): min_v = min(data.slaves[i].cell_voltages) max_v = max(data.slaves[i].cell_voltages) min_t = min(data.slaves[i].cell_temps) max_t = max(data.slaves[i].cell_temps) print( f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]" ) kb = KBHit() while True: rx_data = ser.read(32) if len(rx_data) > 0: rx_buf = rx_data if (start := check_log_start(rx_buf)) != -1: decode_log_frame(rx_buf[start + 3 : start + 11]) rx_buf = b"" elif (start := check_current_start(rx_buf)) != -1: decode_current_frame(rx_buf[start + 3 : start + 11]) rx_buf = b"" elif (start := check_panic_start(rx_buf)) != -1: decode_panic_frame(rx_buf[start + 3 : start + 11]) rx_buf = b"" if kb.kbhit(): c = kb.getch() if c == "C": print(f"KBHIT: {c}", file=sys.stderr) print(ser.write(b"C"), file=sys.stderr) update_display()