From cf4b1763fef26ca925b673cd151da58bf1a62638 Mon Sep 17 00:00:00 2001 From: jazzpi Date: Fri, 22 Jul 2022 09:12:02 +0200 Subject: [PATCH] rename file --- charger-display.py | 699 ++++++++++++++++++++++++++++++++------------- test.py | 559 ------------------------------------ 2 files changed, 496 insertions(+), 762 deletions(-) delete mode 100755 test.py diff --git a/charger-display.py b/charger-display.py index c2e6f3a..8cc31c7 100755 --- a/charger-display.py +++ b/charger-display.py @@ -1,14 +1,31 @@ #!/usr/bin/env python3 +import math import os import struct import time +import random import serial import sys +from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal +from PyQt5.QtWidgets import ( + QApplication, + QWidget, + QPushButton, + QVBoxLayout, + QHBoxLayout, + QGridLayout, + QLabel, + QGroupBox, + QFrame, + QSizePolicy, +) + + BITRATE = 115200 # baud/s TIMEOUT = 1 # seconds -N_SLAVES = 6 +N_SLAVES = 7 LOG_FRAME_LENGTH = 8 # bytes CELLS_PER_SLAVE = 10 @@ -16,112 +33,6 @@ TEMP_SENSORS_PER_SLAVE = 32 VOLTAGE_CONV = 5.0 / 255 # volts/quantum TEMP_CONV = 0.0625 * 16 # °C/quantum -import os - -# Windows -if os.name == "nt": - import msvcrt - -# Posix (Linux, OS X) -else: - import sys - import termios - import atexit - from select import select - - -class KBHit: - def __init__(self): - """Creates a KBHit object that you can call to do various keyboard things.""" - - if os.name == "nt": - pass - - else: - - # Save the terminal settings - self.fd = sys.stdin.fileno() - self.new_term = termios.tcgetattr(self.fd) - self.old_term = termios.tcgetattr(self.fd) - - # New terminal setting unbuffered - self.new_term[3] = self.new_term[3] & ~termios.ICANON & ~termios.ECHO - termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term) - - # Support normal-terminal reset at exit - atexit.register(self.set_normal_term) - - def set_normal_term(self): - """Resets to normal terminal. On Windows this is a no-op.""" - - if os.name == "nt": - pass - - else: - termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term) - - def getch(self): - """Returns a keyboard character after kbhit() has been called. - Should not be called in the same program as getarrow(). - """ - - s = "" - - if os.name == "nt": - return msvcrt.getch().decode("utf-8") - - else: - return sys.stdin.read(1) - - def getarrow(self): - """Returns an arrow-key code after kbhit() has been called. Codes are - 0 : up - 1 : right - 2 : down - 3 : left - Should not be called in the same program as getch(). - """ - - if os.name == "nt": - msvcrt.getch() # skip 0xE0 - c = msvcrt.getch() - vals = [72, 77, 80, 75] - - else: - c = sys.stdin.read(3)[2] - vals = [65, 67, 66, 68] - - return vals.index(ord(c.decode("utf-8"))) - - def kbhit(self): - """Returns True if keyboard character was hit, False otherwise.""" - if os.name == "nt": - return msvcrt.kbhit() - - else: - dr, dw, de = select([sys.stdin], [], [], 0) - return dr != [] - - -if len(sys.argv) != 2: - print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) - sys.exit(os.EX_USAGE) - -SERIAL_PORT = sys.argv[1] -ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) - - -def check_log_start(buf: bytes): - return buf[:-12].find(b"LOG") - - -def check_current_start(buf: bytes): - return buf[:-12].find(b"CUR") - - -def check_panic_start(buf: bytes): - return buf[:-12].find(b"PAN") - class SlaveData: cell_voltages: list[float] @@ -139,6 +50,7 @@ class AccumulatorData: min_temp: float max_temp: float last_frame: float + time_since_last_frame: float current: float panic: bool panic_errorcode: int @@ -151,116 +63,497 @@ class AccumulatorData: ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 self.panic = False self.panic_errorcode = self.panic_errorarg = 0 + self.time_since_last_frame = 0 + + def fill_dummy_data(self): + self.min_voltage = random.uniform(1, 3) + self.max_voltage = random.uniform(3, 5) + self.min_temp = random.uniform(0, 25) + self.max_temp = random.uniform(25, 60) + self.current = random.uniform(25, 60) + self.last_frame = time.time() - random.random() + self.panic = random.choice([True, False]) + if self.panic: + self.panic_errorcode = random.randint(1, 10000) + self.panic_errorarg = "ABCDERFG" + else: + self.panic_errorcode = 0 + self.panic_errorarg = "" + data.time_since_last_frame = random.uniform(1, 60) + + for i in range(N_SLAVES): + self.slaves[i].cell_voltages = [ + random.uniform(1, 5) for i in range(CELLS_PER_SLAVE) + ] + self.slaves[i].cell_temps = [ + random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE) + ] -rx_buf = bytes() -data = AccumulatorData() +class StackGuiElement: + title: str + stack_id: int + min_voltage_label: QLabel + max_voltage_label: QLabel + min_temp_label: QLabel + max_temp_label: QLabel + groupBox: QGroupBox + detail_popup: QWidget + + def __init__(self, title: str, stack_id: int): + self.title = title + self.stack_id = stack_id + self.min_voltage_label = QLabel() + self.max_voltage_label = QLabel() + self.min_temp_label = QLabel() + self.max_temp_label = QLabel() + self.groupBox = QGroupBox() + self.detail_popup = None + self.__post__init__() + + def __post__init__(self): + l1 = QLabel("Min Voltage [V]") + l2 = QLabel("Max Voltage [V]") + l3 = QLabel("Min Temperature [°C]") + l4 = QLabel("Max Temperature [°C]") + popup_btn = QPushButton(self.title + " details") + popup_btn.clicked.connect(self.show_popup) + + l1.setAlignment(Qt.AlignLeft) + l2.setAlignment(Qt.AlignLeft) + l3.setAlignment(Qt.AlignLeft) + l4.setAlignment(Qt.AlignLeft) + + self.min_voltage_label.setAlignment(Qt.AlignLeft) + self.max_voltage_label.setAlignment(Qt.AlignLeft) + self.min_temp_label.setAlignment(Qt.AlignLeft) + self.max_temp_label.setAlignment(Qt.AlignLeft) + + grid = QGridLayout() + grid.addWidget(l1, 0, 0) + grid.addWidget(l2, 1, 0) + grid.addWidget(l3, 2, 0) + grid.addWidget(l4, 3, 0) + + grid.addWidget(self.min_voltage_label, 0, 1) + grid.addWidget(self.max_voltage_label, 1, 1) + grid.addWidget(self.min_temp_label, 2, 1) + grid.addWidget(self.max_temp_label, 3, 1) + + grid.addWidget(popup_btn, 0, 2) + + self.groupBox.setTitle(self.title) + self.groupBox.setLayout(grid) + + def update_data_from_slave(self, slave: SlaveData): + self.min_voltage_label.setNum(min(slave.cell_voltages)) + self.max_voltage_label.setNum(max(slave.cell_voltages)) + self.min_temp_label.setNum(min(slave.cell_temps)) + self.max_temp_label.setNum(max(slave.cell_temps)) + + def show_popup(self): + if self.detail_popup is None: + self.detail_popup = StackPopup(self.stack_id) + self.detail_popup.show() -def decode_log_frame(buf: bytes): - msg_id = buf[0] - slave = msg_id >> 4 - frame_id = msg_id & 0x0F - if slave >= N_SLAVES: +class QVSeperationLine(QFrame): + """ + a vertical seperation line + """ + + def __init__(self): + super().__init__() + self.setFixedWidth(20) + self.setMinimumHeight(1) + self.setFrameShape(QFrame.VLine) + self.setFrameShadow(QFrame.Sunken) + self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred) return - if frame_id == 0: - for i in range(7): - data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV - elif frame_id == 1: - for i in range(3): - data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV - for i in range(4): - data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV - elif frame_id == 2: - for i in range(7): - data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV - elif frame_id == 3: - for i in range(7): - data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV - elif frame_id == 4: - for i in range(7): - data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV - elif frame_id == 5: - for i in range(7): - data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV - else: - # print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) - # time.sleep(1) - return - data.last_frame = time.time() +class StackPopup(QWidget): + stack_id: int + voltage_labels: list[QLabel] + temp_labels: list[QLabel] + + def __init__(self, stack_id: int): + super().__init__() + self.stack_id = stack_id + self.voltage_labels = [] + self.temp_labels = [] + layout = QVBoxLayout() + groupbox = QGroupBox() + + grid = QGridLayout() + + for i in range(len(data.slaves[stack_id].cell_voltages)): + l1 = QLabel(f"Voltage Cell {i}") + l1.setAlignment(Qt.AlignLeft) + + l_v = QLabel() + l_v.setNum(data.slaves[stack_id].cell_voltages[i]) + l_v.setAlignment(Qt.AlignLeft) + self.voltage_labels.append(l_v) + + grid.addWidget(l1, i, 0) + grid.addWidget(l_v, i, 1) + + # vertical separators between rows in popup + separator_vertical = QVSeperationLine() + grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1) + num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE + for i in range(num_temp_cols): + separator_vertical = QVSeperationLine() + grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1) + + for i in range(len(data.slaves[stack_id].cell_temps)): + l2 = QLabel(f"Temp. Sensor {i}") + l2.setAlignment(Qt.AlignLeft) + + l_t = QLabel() + l_t.setNum(data.slaves[stack_id].cell_temps[i]) + l_t.setAlignment(Qt.AlignLeft) + self.temp_labels.append(l_t) + + grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3) + grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3) + + groupbox.setTitle(f"Stack {stack_id}") + groupbox.setLayout(grid) + layout.addWidget(groupbox) + self.setLayout(layout) + self.update_data() + timer.timeout.connect(self.update_data) + + def update_data(self): + for i in range(len(data.slaves[self.stack_id].cell_voltages)): + self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i]) + for i in range(len(data.slaves[self.stack_id].cell_temps)): + self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i]) -def decode_current_frame(buf: bytes): - # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) - current = struct.unpack(">i", buf[2:6])[0] - data.current = current / 1000.0 +class Window(QWidget): + + stop_signal = ( + pyqtSignal() + ) # make a stop signal to communicate with the worker in another thread + + def __init__(self, parent=None): + super(Window, self).__init__(parent) + + win = QVBoxLayout() + + ### ACCUMULATOR GENERAL ### + self.l1 = QLabel("Min Voltage [V]") + self.l1.setAlignment(Qt.AlignLeft) + self.l_min_voltage = QLabel() + self.l_min_voltage.setNum(data.min_voltage) + self.l_min_voltage.setAlignment(Qt.AlignLeft) + + self.l2 = QLabel("Max Voltage [V]") + self.l2.setAlignment(Qt.AlignLeft) + self.l_max_voltage = QLabel() + self.l_max_voltage.setNum(data.max_voltage) + self.l_max_voltage.setAlignment(Qt.AlignLeft) + + self.l3 = QLabel("Min Temperature [°C]") + self.l3.setAlignment(Qt.AlignLeft) + self.l_min_temp = QLabel() + self.l_min_temp.setNum(data.min_temp) + self.l_min_temp.setAlignment(Qt.AlignLeft) + + self.l4 = QLabel("Max Temperature [°C]") + self.l4.setAlignment(Qt.AlignLeft) + self.l_max_temp = QLabel() + self.l_max_temp.setNum(data.max_temp) + self.l_max_temp.setAlignment(Qt.AlignLeft) + + self.l5 = QLabel("Current [A]") + self.l5.setAlignment(Qt.AlignLeft) + self.l_current = QLabel() + self.l_current.setNum(data.current) + self.l_current.setAlignment(Qt.AlignLeft) + + self.l6 = QLabel("Error") + self.l_error = QLabel() + self.l_error.setText(str(data.panic)) + self.l_error.setAlignment(Qt.AlignLeft) + self.l_errorcode = QLabel() + self.l_errorcode.setText(str(data.panic_errorcode)) + self.l_errorcode.setAlignment(Qt.AlignLeft) + self.l_errorarg = QLabel() + self.l_errorarg.setText(str(data.panic_errorarg)) + self.l_errorcode.setAlignment(Qt.AlignLeft) + + self.l7 = QLabel("Time Since Last Dataframe") + self.l_time_since_last_frame = QLabel() + self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) + self.l_time_since_last_frame.setAlignment(Qt.AlignLeft) + + ### LAYOUT ACCUMULATOR GENERAL ### + grid_accumulator = QGridLayout() + grid_accumulator.addWidget(self.l1, 0, 0) + grid_accumulator.addWidget(self.l2, 1, 0) + grid_accumulator.addWidget(self.l3, 0, 2) + grid_accumulator.addWidget(self.l4, 1, 2) + grid_accumulator.addWidget(self.l5, 2, 0) + grid_accumulator.addWidget(self.l6, 3, 0) + grid_accumulator.addWidget(self.l7, 4, 0) + + grid_accumulator.addWidget(self.l_min_voltage, 0, 1) + grid_accumulator.addWidget(self.l_max_voltage, 1, 1) + grid_accumulator.addWidget(self.l_min_temp, 0, 3) + grid_accumulator.addWidget(self.l_max_temp, 1, 3) + grid_accumulator.addWidget(self.l_current, 2, 1) + grid_accumulator.addWidget(self.l_error, 3, 1) + grid_accumulator.addWidget(self.l_errorcode, 3, 2) + grid_accumulator.addWidget(self.l_errorarg, 3, 3) + grid_accumulator.addWidget(self.l_time_since_last_frame, 4, 1) + + groupBox_accumulator = QGroupBox("Accumulator General") + groupBox_accumulator.setLayout(grid_accumulator) + + win.addWidget(groupBox_accumulator) + + ### STACKS ### + self.stack_gui_elements = [] + for i in range(N_SLAVES): + sge = StackGuiElement(f"Stack {i}", i) + sge.update_data_from_slave(data.slaves[i]) + self.stack_gui_elements.append(sge) + + ### LAYOUT STACKS ### + n_slaves_half = math.ceil(N_SLAVES / 2) + grid_stacks = QGridLayout() + for i, sge in enumerate(self.stack_gui_elements): + grid_stacks.addWidget( + sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half + ) + + groupBox_stacks = QGroupBox("Individual Stacks") + groupBox_stacks.setLayout(grid_stacks) + + win.addWidget(groupBox_stacks) + + ### BUTTONS ### + self.btn_start = QPushButton("Start Serial") + self.btn_start.resize(self.btn_start.sizeHint()) + self.btn_start.move(50, 50) + self.btn_stop = QPushButton("Stop Serial") + self.btn_stop.resize(self.btn_stop.sizeHint()) + self.btn_stop.move(150, 50) + self.btn_charge = QPushButton("Start Charging") + self.btn_charge.resize(self.btn_stop.sizeHint()) + self.btn_charge.move(150, 50) + + ### LAYOUT BUTTONS ### + vbox_controls = QVBoxLayout() + vbox_controls.addWidget(self.btn_start) + vbox_controls.addWidget(self.btn_stop) + vbox_controls.addWidget(self.btn_charge) + groupBox_controls = QGroupBox("Controls") + groupBox_controls.setLayout(vbox_controls) + win.addWidget(groupBox_controls) + + # Start Button action + self.btn_start.clicked.connect(self.start_thread) + + # Stop Button action + self.btn_stop.clicked.connect(self.stop_thread) + + # Charge Button action + self.btn_charge.clicked.connect(self.start_charge) + + self.setLayout(win) + self.setWindowTitle("FT22 Charger Display") + + # When start_btn is clicked this runs. Creates the worker and the thread. + def start_thread(self): + self.thread = QThread() + self.worker = Worker() + self.stop_signal.connect( + self.worker.stop + ) # connect stop signal to worker stop method + self.worker.moveToThread(self.thread) + + self.worker.finished.connect( + self.thread.quit + ) # connect the workers finished signal to stop thread + self.worker.finished.connect( + self.worker.deleteLater + ) # connect the workers finished signal to clean up worker + self.thread.finished.connect( + self.thread.deleteLater + ) # connect threads finished signal to clean up thread + + self.thread.started.connect(self.worker.do_work) + self.thread.finished.connect(self.worker.stop) + self.thread.start() + + # When stop_btn is clicked this runs. Terminates the worker and the thread. + def stop_thread(self): + self.stop_signal.emit() # emit the finished signal on stop + + def start_charge(self): + ser.write(b"C") + + def update(self): + # Accumulator + self.l_min_voltage.setNum(data.min_voltage) + self.l_max_voltage.setNum(data.max_voltage) + self.l_min_temp.setNum(data.min_temp) + self.l_max_temp.setNum(data.max_temp) + self.l_current.setNum(data.current) + self.l_error.setText(str(data.panic)) + self.l_errorcode.setText(str(data.panic_errorcode)) + self.l_errorarg.setText(str(data.panic_errorarg)) + self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) + + # Stacks + for i, sge in enumerate(self.stack_gui_elements): + sge.update_data_from_slave(data.slaves[i]) -def decode_panic_frame(buf: bytes): - data.panic = True - data.panic_errorcode = buf[0] - data.panic_errorarg = buf[1] +class Worker(QObject): + + finished = pyqtSignal() # give worker class a finished signal + + def __init__(self, parent=None): + QObject.__init__(self, parent=parent) + self.continue_run = True # provide a bool run condition for the class + + def do_work(self): + i = 1 + while self.continue_run: # give the loop a stoppable condition + # data.fill_dummy_data() + self.charger_communication() + # QThread.msleep(1) + + self.finished.emit() # emit the finished signal when the loop is done + + def charger_communication(self): + rx_data = ser.read(256) + while len(rx_data) > 0: + if (frame_start := self.check_log_start(rx_data)) != -1: + self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11]) + rx_data = rx_data[frame_start + 11 :] + continue + elif (frame_start := self.check_current_start(rx_data)) != -1: + self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11]) + rx_data = rx_data[frame_start + 11 :] + continue + elif (frame_start := self.check_panic_start(rx_data)) != -1: + self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11]) + rx_data = rx_data[frame_start + 11 :] + continue + break + + def check_log_start(self, buf: bytes): + return buf[:-12].find(b"LOG") + + def check_current_start(self, buf: bytes): + return buf[:-12].find(b"CUR") + + def check_panic_start(self, buf: bytes): + return buf[:-12].find(b"PAN") + + def decode_log_frame(self, buf: bytes): + msg_id = buf[0] + slave = msg_id >> 4 + frame_id = msg_id & 0x0F + if slave >= N_SLAVES: + print(f"Unknown slave: {slave}", file=sys.stderr) + return + + if frame_id == 0: + for i in range(7): + data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV + elif frame_id == 1: + for i in range(3): + data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV + for i in range(4): + data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV + elif frame_id == 2: + for i in range(7): + data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV + elif frame_id == 3: + for i in range(7): + data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV + elif frame_id == 4: + for i in range(7): + data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV + elif frame_id == 5: + for i in range(7): + data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV + else: + print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) + # time.sleep(1) + return + voltages = [ + slave.cell_voltages[i] + for i in range(CELLS_PER_SLAVE) + for slave in data.slaves + ] + self.parse_cell_temps(slave) + temps = [ + slave.cell_temps[i] + for i in range(TEMP_SENSORS_PER_SLAVE) + for slave in data.slaves + ] + data.min_voltage = min(voltages) + data.max_voltage = max(voltages) + data.min_temp = min(temps) + data.max_temp = max(temps) + data.time_since_last_frame = time.time() - data.last_frame + + data.last_frame = time.time() + + def decode_current_frame(self, buf: bytes): + # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) + current = struct.unpack(">i", buf[2:6])[0] + data.current = current / 1000.0 + + def decode_panic_frame(self, buf: bytes): + data.panic = True + data.panic_errorcode = buf[0] + data.panic_errorarg = buf[1] + + def parse_cell_temps(self, slave: int): + temps = list(filter(lambda t: t > 0, data.slaves[slave].cell_temps[:16])) + if len(temps) == 0: + temps = [-1] + min_t = min(temps) + max_t = max(temps) + for i in range(16, 32): + data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t) + + def stop(self): + self.continue_run = False # set the run condition to false on stop -def update_display(): - voltages = [ - slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves - ] - temps = [ - slave.cell_temps[i] - for i in range(TEMP_SENSORS_PER_SLAVE) - for slave in data.slaves - ] - data.min_voltage = min(voltages) - data.max_voltage = max(voltages) - data.min_temp = min(temps) - data.max_temp = max(temps) - time_since_last_frame = time.time() - data.last_frame +if __name__ == "__main__": + data = AccumulatorData() + rx_buf = bytes() - print("\033[2J\033[H", end="") - print("-" * 20) - if data.panic: - print("!!!!! PANIC !!!!!") - print(f"Error code: {data.panic_errorcode}") - print(f"Error arg: {data.panic_errorarg}") - time.sleep(0.5) - return + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) + sys.exit(os.EX_USAGE) - print(f"Time since last frame: {time_since_last_frame} s") - print(f"Current: {data.current:.2f} A") - print(f"Min voltage: {data.min_voltage:.2f} V") - print(f"Max voltage: {data.max_voltage:.2f} V") - print(f"Min temp: {data.min_temp:.1f} °C") - print(f"Max temp: {data.max_temp:.1f} °C") - for i in range(N_SLAVES): - min_v = min(data.slaves[i].cell_voltages) - max_v = max(data.slaves[i].cell_voltages) - min_t = min(data.slaves[i].cell_temps) - max_t = max(data.slaves[i].cell_temps) - print( - f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]" - ) + SERIAL_PORT = sys.argv[1] + print(SERIAL_PORT) + try: + ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) + except serial.serialutil.SerialException: + pass + app = QApplication(sys.argv) + gui = Window() + gui.show() -kb = KBHit() -while True: - rx_data = ser.read(32) - if len(rx_data) > 0: - rx_buf = rx_data - if (start := check_log_start(rx_buf)) != -1: - decode_log_frame(rx_buf[start + 3 : start + 11]) - rx_buf = b"" - elif (start := check_current_start(rx_buf)) != -1: - decode_current_frame(rx_buf[start + 3 : start + 11]) - rx_buf = b"" - elif (start := check_panic_start(rx_buf)) != -1: - decode_panic_frame(rx_buf[start + 3 : start + 11]) - rx_buf = b"" - if kb.kbhit(): - c = kb.getch() - if c == "C": - print(f"KBHIT: {c}", file=sys.stderr) - print(ser.write(b"C"), file=sys.stderr) - update_display() + timer = QTimer() + # timer.timeout.connect(data.fill_dummy_data) + timer.timeout.connect(gui.update) + timer.start(1000) # every 1,000 milliseconds + + sys.exit(app.exec_()) diff --git a/test.py b/test.py deleted file mode 100755 index 8cc31c7..0000000 --- a/test.py +++ /dev/null @@ -1,559 +0,0 @@ -#!/usr/bin/env python3 - -import math -import os -import struct -import time -import random -import serial -import sys - -from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal -from PyQt5.QtWidgets import ( - QApplication, - QWidget, - QPushButton, - QVBoxLayout, - QHBoxLayout, - QGridLayout, - QLabel, - QGroupBox, - QFrame, - QSizePolicy, -) - - -BITRATE = 115200 # baud/s -TIMEOUT = 1 # seconds -N_SLAVES = 7 -LOG_FRAME_LENGTH = 8 # bytes - -CELLS_PER_SLAVE = 10 -TEMP_SENSORS_PER_SLAVE = 32 -VOLTAGE_CONV = 5.0 / 255 # volts/quantum -TEMP_CONV = 0.0625 * 16 # °C/quantum - - -class SlaveData: - cell_voltages: list[float] - cell_temps: list[float] - - def __init__(self) -> None: - self.cell_voltages = [-1] * CELLS_PER_SLAVE - self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE - - -class AccumulatorData: - slaves: list[SlaveData] - min_voltage: float - max_voltage: float - min_temp: float - max_temp: float - last_frame: float - time_since_last_frame: float - current: float - panic: bool - panic_errorcode: int - panic_errorarg: int - - def __init__(self) -> None: - self.slaves = [SlaveData() for _ in range(N_SLAVES)] - self.min_voltage = ( - self.max_voltage - ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 - self.panic = False - self.panic_errorcode = self.panic_errorarg = 0 - self.time_since_last_frame = 0 - - def fill_dummy_data(self): - self.min_voltage = random.uniform(1, 3) - self.max_voltage = random.uniform(3, 5) - self.min_temp = random.uniform(0, 25) - self.max_temp = random.uniform(25, 60) - self.current = random.uniform(25, 60) - self.last_frame = time.time() - random.random() - self.panic = random.choice([True, False]) - if self.panic: - self.panic_errorcode = random.randint(1, 10000) - self.panic_errorarg = "ABCDERFG" - else: - self.panic_errorcode = 0 - self.panic_errorarg = "" - data.time_since_last_frame = random.uniform(1, 60) - - for i in range(N_SLAVES): - self.slaves[i].cell_voltages = [ - random.uniform(1, 5) for i in range(CELLS_PER_SLAVE) - ] - self.slaves[i].cell_temps = [ - random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE) - ] - - -class StackGuiElement: - title: str - stack_id: int - min_voltage_label: QLabel - max_voltage_label: QLabel - min_temp_label: QLabel - max_temp_label: QLabel - groupBox: QGroupBox - detail_popup: QWidget - - def __init__(self, title: str, stack_id: int): - self.title = title - self.stack_id = stack_id - self.min_voltage_label = QLabel() - self.max_voltage_label = QLabel() - self.min_temp_label = QLabel() - self.max_temp_label = QLabel() - self.groupBox = QGroupBox() - self.detail_popup = None - self.__post__init__() - - def __post__init__(self): - l1 = QLabel("Min Voltage [V]") - l2 = QLabel("Max Voltage [V]") - l3 = QLabel("Min Temperature [°C]") - l4 = QLabel("Max Temperature [°C]") - popup_btn = QPushButton(self.title + " details") - popup_btn.clicked.connect(self.show_popup) - - l1.setAlignment(Qt.AlignLeft) - l2.setAlignment(Qt.AlignLeft) - l3.setAlignment(Qt.AlignLeft) - l4.setAlignment(Qt.AlignLeft) - - self.min_voltage_label.setAlignment(Qt.AlignLeft) - self.max_voltage_label.setAlignment(Qt.AlignLeft) - self.min_temp_label.setAlignment(Qt.AlignLeft) - self.max_temp_label.setAlignment(Qt.AlignLeft) - - grid = QGridLayout() - grid.addWidget(l1, 0, 0) - grid.addWidget(l2, 1, 0) - grid.addWidget(l3, 2, 0) - grid.addWidget(l4, 3, 0) - - grid.addWidget(self.min_voltage_label, 0, 1) - grid.addWidget(self.max_voltage_label, 1, 1) - grid.addWidget(self.min_temp_label, 2, 1) - grid.addWidget(self.max_temp_label, 3, 1) - - grid.addWidget(popup_btn, 0, 2) - - self.groupBox.setTitle(self.title) - self.groupBox.setLayout(grid) - - def update_data_from_slave(self, slave: SlaveData): - self.min_voltage_label.setNum(min(slave.cell_voltages)) - self.max_voltage_label.setNum(max(slave.cell_voltages)) - self.min_temp_label.setNum(min(slave.cell_temps)) - self.max_temp_label.setNum(max(slave.cell_temps)) - - def show_popup(self): - if self.detail_popup is None: - self.detail_popup = StackPopup(self.stack_id) - self.detail_popup.show() - - -class QVSeperationLine(QFrame): - """ - a vertical seperation line - """ - - def __init__(self): - super().__init__() - self.setFixedWidth(20) - self.setMinimumHeight(1) - self.setFrameShape(QFrame.VLine) - self.setFrameShadow(QFrame.Sunken) - self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred) - return - - -class StackPopup(QWidget): - stack_id: int - voltage_labels: list[QLabel] - temp_labels: list[QLabel] - - def __init__(self, stack_id: int): - super().__init__() - self.stack_id = stack_id - self.voltage_labels = [] - self.temp_labels = [] - layout = QVBoxLayout() - groupbox = QGroupBox() - - grid = QGridLayout() - - for i in range(len(data.slaves[stack_id].cell_voltages)): - l1 = QLabel(f"Voltage Cell {i}") - l1.setAlignment(Qt.AlignLeft) - - l_v = QLabel() - l_v.setNum(data.slaves[stack_id].cell_voltages[i]) - l_v.setAlignment(Qt.AlignLeft) - self.voltage_labels.append(l_v) - - grid.addWidget(l1, i, 0) - grid.addWidget(l_v, i, 1) - - # vertical separators between rows in popup - separator_vertical = QVSeperationLine() - grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1) - num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE - for i in range(num_temp_cols): - separator_vertical = QVSeperationLine() - grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1) - - for i in range(len(data.slaves[stack_id].cell_temps)): - l2 = QLabel(f"Temp. Sensor {i}") - l2.setAlignment(Qt.AlignLeft) - - l_t = QLabel() - l_t.setNum(data.slaves[stack_id].cell_temps[i]) - l_t.setAlignment(Qt.AlignLeft) - self.temp_labels.append(l_t) - - grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3) - grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3) - - groupbox.setTitle(f"Stack {stack_id}") - groupbox.setLayout(grid) - layout.addWidget(groupbox) - self.setLayout(layout) - self.update_data() - timer.timeout.connect(self.update_data) - - def update_data(self): - for i in range(len(data.slaves[self.stack_id].cell_voltages)): - self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i]) - for i in range(len(data.slaves[self.stack_id].cell_temps)): - self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i]) - - -class Window(QWidget): - - stop_signal = ( - pyqtSignal() - ) # make a stop signal to communicate with the worker in another thread - - def __init__(self, parent=None): - super(Window, self).__init__(parent) - - win = QVBoxLayout() - - ### ACCUMULATOR GENERAL ### - self.l1 = QLabel("Min Voltage [V]") - self.l1.setAlignment(Qt.AlignLeft) - self.l_min_voltage = QLabel() - self.l_min_voltage.setNum(data.min_voltage) - self.l_min_voltage.setAlignment(Qt.AlignLeft) - - self.l2 = QLabel("Max Voltage [V]") - self.l2.setAlignment(Qt.AlignLeft) - self.l_max_voltage = QLabel() - self.l_max_voltage.setNum(data.max_voltage) - self.l_max_voltage.setAlignment(Qt.AlignLeft) - - self.l3 = QLabel("Min Temperature [°C]") - self.l3.setAlignment(Qt.AlignLeft) - self.l_min_temp = QLabel() - self.l_min_temp.setNum(data.min_temp) - self.l_min_temp.setAlignment(Qt.AlignLeft) - - self.l4 = QLabel("Max Temperature [°C]") - self.l4.setAlignment(Qt.AlignLeft) - self.l_max_temp = QLabel() - self.l_max_temp.setNum(data.max_temp) - self.l_max_temp.setAlignment(Qt.AlignLeft) - - self.l5 = QLabel("Current [A]") - self.l5.setAlignment(Qt.AlignLeft) - self.l_current = QLabel() - self.l_current.setNum(data.current) - self.l_current.setAlignment(Qt.AlignLeft) - - self.l6 = QLabel("Error") - self.l_error = QLabel() - self.l_error.setText(str(data.panic)) - self.l_error.setAlignment(Qt.AlignLeft) - self.l_errorcode = QLabel() - self.l_errorcode.setText(str(data.panic_errorcode)) - self.l_errorcode.setAlignment(Qt.AlignLeft) - self.l_errorarg = QLabel() - self.l_errorarg.setText(str(data.panic_errorarg)) - self.l_errorcode.setAlignment(Qt.AlignLeft) - - self.l7 = QLabel("Time Since Last Dataframe") - self.l_time_since_last_frame = QLabel() - self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) - self.l_time_since_last_frame.setAlignment(Qt.AlignLeft) - - ### LAYOUT ACCUMULATOR GENERAL ### - grid_accumulator = QGridLayout() - grid_accumulator.addWidget(self.l1, 0, 0) - grid_accumulator.addWidget(self.l2, 1, 0) - grid_accumulator.addWidget(self.l3, 0, 2) - grid_accumulator.addWidget(self.l4, 1, 2) - grid_accumulator.addWidget(self.l5, 2, 0) - grid_accumulator.addWidget(self.l6, 3, 0) - grid_accumulator.addWidget(self.l7, 4, 0) - - grid_accumulator.addWidget(self.l_min_voltage, 0, 1) - grid_accumulator.addWidget(self.l_max_voltage, 1, 1) - grid_accumulator.addWidget(self.l_min_temp, 0, 3) - grid_accumulator.addWidget(self.l_max_temp, 1, 3) - grid_accumulator.addWidget(self.l_current, 2, 1) - grid_accumulator.addWidget(self.l_error, 3, 1) - grid_accumulator.addWidget(self.l_errorcode, 3, 2) - grid_accumulator.addWidget(self.l_errorarg, 3, 3) - grid_accumulator.addWidget(self.l_time_since_last_frame, 4, 1) - - groupBox_accumulator = QGroupBox("Accumulator General") - groupBox_accumulator.setLayout(grid_accumulator) - - win.addWidget(groupBox_accumulator) - - ### STACKS ### - self.stack_gui_elements = [] - for i in range(N_SLAVES): - sge = StackGuiElement(f"Stack {i}", i) - sge.update_data_from_slave(data.slaves[i]) - self.stack_gui_elements.append(sge) - - ### LAYOUT STACKS ### - n_slaves_half = math.ceil(N_SLAVES / 2) - grid_stacks = QGridLayout() - for i, sge in enumerate(self.stack_gui_elements): - grid_stacks.addWidget( - sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half - ) - - groupBox_stacks = QGroupBox("Individual Stacks") - groupBox_stacks.setLayout(grid_stacks) - - win.addWidget(groupBox_stacks) - - ### BUTTONS ### - self.btn_start = QPushButton("Start Serial") - self.btn_start.resize(self.btn_start.sizeHint()) - self.btn_start.move(50, 50) - self.btn_stop = QPushButton("Stop Serial") - self.btn_stop.resize(self.btn_stop.sizeHint()) - self.btn_stop.move(150, 50) - self.btn_charge = QPushButton("Start Charging") - self.btn_charge.resize(self.btn_stop.sizeHint()) - self.btn_charge.move(150, 50) - - ### LAYOUT BUTTONS ### - vbox_controls = QVBoxLayout() - vbox_controls.addWidget(self.btn_start) - vbox_controls.addWidget(self.btn_stop) - vbox_controls.addWidget(self.btn_charge) - groupBox_controls = QGroupBox("Controls") - groupBox_controls.setLayout(vbox_controls) - win.addWidget(groupBox_controls) - - # Start Button action - self.btn_start.clicked.connect(self.start_thread) - - # Stop Button action - self.btn_stop.clicked.connect(self.stop_thread) - - # Charge Button action - self.btn_charge.clicked.connect(self.start_charge) - - self.setLayout(win) - self.setWindowTitle("FT22 Charger Display") - - # When start_btn is clicked this runs. Creates the worker and the thread. - def start_thread(self): - self.thread = QThread() - self.worker = Worker() - self.stop_signal.connect( - self.worker.stop - ) # connect stop signal to worker stop method - self.worker.moveToThread(self.thread) - - self.worker.finished.connect( - self.thread.quit - ) # connect the workers finished signal to stop thread - self.worker.finished.connect( - self.worker.deleteLater - ) # connect the workers finished signal to clean up worker - self.thread.finished.connect( - self.thread.deleteLater - ) # connect threads finished signal to clean up thread - - self.thread.started.connect(self.worker.do_work) - self.thread.finished.connect(self.worker.stop) - self.thread.start() - - # When stop_btn is clicked this runs. Terminates the worker and the thread. - def stop_thread(self): - self.stop_signal.emit() # emit the finished signal on stop - - def start_charge(self): - ser.write(b"C") - - def update(self): - # Accumulator - self.l_min_voltage.setNum(data.min_voltage) - self.l_max_voltage.setNum(data.max_voltage) - self.l_min_temp.setNum(data.min_temp) - self.l_max_temp.setNum(data.max_temp) - self.l_current.setNum(data.current) - self.l_error.setText(str(data.panic)) - self.l_errorcode.setText(str(data.panic_errorcode)) - self.l_errorarg.setText(str(data.panic_errorarg)) - self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) - - # Stacks - for i, sge in enumerate(self.stack_gui_elements): - sge.update_data_from_slave(data.slaves[i]) - - -class Worker(QObject): - - finished = pyqtSignal() # give worker class a finished signal - - def __init__(self, parent=None): - QObject.__init__(self, parent=parent) - self.continue_run = True # provide a bool run condition for the class - - def do_work(self): - i = 1 - while self.continue_run: # give the loop a stoppable condition - # data.fill_dummy_data() - self.charger_communication() - # QThread.msleep(1) - - self.finished.emit() # emit the finished signal when the loop is done - - def charger_communication(self): - rx_data = ser.read(256) - while len(rx_data) > 0: - if (frame_start := self.check_log_start(rx_data)) != -1: - self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11]) - rx_data = rx_data[frame_start + 11 :] - continue - elif (frame_start := self.check_current_start(rx_data)) != -1: - self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11]) - rx_data = rx_data[frame_start + 11 :] - continue - elif (frame_start := self.check_panic_start(rx_data)) != -1: - self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11]) - rx_data = rx_data[frame_start + 11 :] - continue - break - - def check_log_start(self, buf: bytes): - return buf[:-12].find(b"LOG") - - def check_current_start(self, buf: bytes): - return buf[:-12].find(b"CUR") - - def check_panic_start(self, buf: bytes): - return buf[:-12].find(b"PAN") - - def decode_log_frame(self, buf: bytes): - msg_id = buf[0] - slave = msg_id >> 4 - frame_id = msg_id & 0x0F - if slave >= N_SLAVES: - print(f"Unknown slave: {slave}", file=sys.stderr) - return - - if frame_id == 0: - for i in range(7): - data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV - elif frame_id == 1: - for i in range(3): - data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV - for i in range(4): - data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV - elif frame_id == 2: - for i in range(7): - data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV - elif frame_id == 3: - for i in range(7): - data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV - elif frame_id == 4: - for i in range(7): - data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV - elif frame_id == 5: - for i in range(7): - data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV - else: - print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) - # time.sleep(1) - return - voltages = [ - slave.cell_voltages[i] - for i in range(CELLS_PER_SLAVE) - for slave in data.slaves - ] - self.parse_cell_temps(slave) - temps = [ - slave.cell_temps[i] - for i in range(TEMP_SENSORS_PER_SLAVE) - for slave in data.slaves - ] - data.min_voltage = min(voltages) - data.max_voltage = max(voltages) - data.min_temp = min(temps) - data.max_temp = max(temps) - data.time_since_last_frame = time.time() - data.last_frame - - data.last_frame = time.time() - - def decode_current_frame(self, buf: bytes): - # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) - current = struct.unpack(">i", buf[2:6])[0] - data.current = current / 1000.0 - - def decode_panic_frame(self, buf: bytes): - data.panic = True - data.panic_errorcode = buf[0] - data.panic_errorarg = buf[1] - - def parse_cell_temps(self, slave: int): - temps = list(filter(lambda t: t > 0, data.slaves[slave].cell_temps[:16])) - if len(temps) == 0: - temps = [-1] - min_t = min(temps) - max_t = max(temps) - for i in range(16, 32): - data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t) - - def stop(self): - self.continue_run = False # set the run condition to false on stop - - -if __name__ == "__main__": - data = AccumulatorData() - rx_buf = bytes() - - if len(sys.argv) != 2: - print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) - sys.exit(os.EX_USAGE) - - SERIAL_PORT = sys.argv[1] - print(SERIAL_PORT) - try: - ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) - except serial.serialutil.SerialException: - pass - - app = QApplication(sys.argv) - gui = Window() - gui.show() - - timer = QTimer() - # timer.timeout.connect(data.fill_dummy_data) - timer.timeout.connect(gui.update) - timer.start(1000) # every 1,000 milliseconds - - sys.exit(app.exec_())