#!/usr/bin/env python3 import math import os import struct import time import random import serial import sys from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal from PyQt5.QtWidgets import ( QApplication, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QGroupBox, QFrame, QSizePolicy, ) BITRATE = 115200 # baud/s TIMEOUT = 1 # seconds N_SLAVES = 7 LOG_FRAME_LENGTH = 8 # bytes CELLS_PER_SLAVE = 10 TEMP_SENSORS_PER_SLAVE = 32 VOLTAGE_CONV = 5.0 / 255 # volts/quantum TEMP_CONV = 0.0625 * 16 # °C/quantum class SlaveData: cell_voltages: list[float] cell_temps: list[float] def __init__(self) -> None: self.cell_voltages = [-1] * CELLS_PER_SLAVE self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE class AccumulatorData: slaves: list[SlaveData] min_voltage: float max_voltage: float min_temp: float max_temp: float last_frame: float time_since_last_frame: float current: float panic: bool panic_errorcode: int panic_errorarg: int def __init__(self) -> None: self.slaves = [SlaveData() for _ in range(N_SLAVES)] self.min_voltage = ( self.max_voltage ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 self.panic = False self.panic_errorcode = self.panic_errorarg = 0 self.time_since_last_frame = 0 def fill_dummy_data(self): self.min_voltage = random.uniform(1, 3) self.max_voltage = random.uniform(3, 5) self.min_temp = random.uniform(0, 25) self.max_temp = random.uniform(25, 60) self.current = random.uniform(25, 60) self.last_frame = time.time() - random.random() self.panic = random.choice([True, False]) if self.panic: self.panic_errorcode = random.randint(1, 10000) self.panic_errorarg = "ABCDERFG" else: self.panic_errorcode = 0 self.panic_errorarg = "" data.time_since_last_frame = random.uniform(1, 60) for i in range(N_SLAVES): self.slaves[i].cell_voltages = [ random.uniform(1, 5) for i in range(CELLS_PER_SLAVE) ] self.slaves[i].cell_temps = [ random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE) ] class StackGuiElement: title: str stack_id: int min_voltage_label: QLabel max_voltage_label: QLabel min_temp_label: QLabel max_temp_label: QLabel groupBox: QGroupBox detail_popup: QWidget def __init__(self, title: str, stack_id: int): self.title = title self.stack_id = stack_id self.min_voltage_label = QLabel() self.max_voltage_label = QLabel() self.min_temp_label = QLabel() self.max_temp_label = QLabel() self.groupBox = QGroupBox() self.detail_popup = None self.__post__init__() def __post__init__(self): l1 = QLabel("Min Voltage [V]") l2 = QLabel("Max Voltage [V]") l3 = QLabel("Min Temperature [°C]") l4 = QLabel("Max Temperature [°C]") popup_btn = QPushButton(self.title + " details") popup_btn.clicked.connect(self.show_popup) l1.setAlignment(Qt.AlignLeft) l2.setAlignment(Qt.AlignLeft) l3.setAlignment(Qt.AlignLeft) l4.setAlignment(Qt.AlignLeft) self.min_voltage_label.setAlignment(Qt.AlignLeft) self.max_voltage_label.setAlignment(Qt.AlignLeft) self.min_temp_label.setAlignment(Qt.AlignLeft) self.max_temp_label.setAlignment(Qt.AlignLeft) grid = QGridLayout() grid.addWidget(l1, 0, 0) grid.addWidget(l2, 1, 0) grid.addWidget(l3, 2, 0) grid.addWidget(l4, 3, 0) grid.addWidget(self.min_voltage_label, 0, 1) grid.addWidget(self.max_voltage_label, 1, 1) grid.addWidget(self.min_temp_label, 2, 1) grid.addWidget(self.max_temp_label, 3, 1) grid.addWidget(popup_btn, 0, 2) self.groupBox.setTitle(self.title) self.groupBox.setLayout(grid) def update_data_from_slave(self, slave: SlaveData): self.min_voltage_label.setNum(min(slave.cell_voltages)) self.max_voltage_label.setNum(max(slave.cell_voltages)) self.min_temp_label.setNum(min(slave.cell_temps)) self.max_temp_label.setNum(max(slave.cell_temps)) def show_popup(self): if self.detail_popup is None: self.detail_popup = StackPopup(self.stack_id) self.detail_popup.show() class QVSeperationLine(QFrame): """ a vertical seperation line """ def __init__(self): super().__init__() self.setFixedWidth(20) self.setMinimumHeight(1) self.setFrameShape(QFrame.VLine) self.setFrameShadow(QFrame.Sunken) self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred) return class StackPopup(QWidget): stack_id: int voltage_labels: list[QLabel] temp_labels: list[QLabel] def __init__(self, stack_id: int): super().__init__() self.stack_id = stack_id self.voltage_labels = [] self.temp_labels = [] layout = QVBoxLayout() groupbox = QGroupBox() grid = QGridLayout() for i in range(len(data.slaves[stack_id].cell_voltages)): l1 = QLabel(f"Voltage Cell {i}") l1.setAlignment(Qt.AlignLeft) l_v = QLabel() l_v.setNum(data.slaves[stack_id].cell_voltages[i]) l_v.setAlignment(Qt.AlignLeft) self.voltage_labels.append(l_v) grid.addWidget(l1, i, 0) grid.addWidget(l_v, i, 1) # vertical separators between rows in popup separator_vertical = QVSeperationLine() grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1) num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE for i in range(num_temp_cols): separator_vertical = QVSeperationLine() grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1) for i in range(len(data.slaves[stack_id].cell_temps)): l2 = QLabel(f"Temp. Sensor {i}") l2.setAlignment(Qt.AlignLeft) l_t = QLabel() l_t.setNum(data.slaves[stack_id].cell_temps[i]) l_t.setAlignment(Qt.AlignLeft) self.temp_labels.append(l_t) grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3) grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3) groupbox.setTitle(f"Stack {stack_id}") groupbox.setLayout(grid) layout.addWidget(groupbox) self.setLayout(layout) self.update_data() timer.timeout.connect(self.update_data) def update_data(self): for i in range(len(data.slaves[self.stack_id].cell_voltages)): self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i]) for i in range(len(data.slaves[self.stack_id].cell_temps)): self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i]) class Window(QWidget): stop_signal = ( pyqtSignal() ) # make a stop signal to communicate with the worker in another thread def __init__(self, parent=None): super(Window, self).__init__(parent) win = QVBoxLayout() ### ACCUMULATOR GENERAL ### self.l1 = QLabel("Min Voltage [V]") self.l1.setAlignment(Qt.AlignLeft) self.l_min_voltage = QLabel() self.l_min_voltage.setNum(data.min_voltage) self.l_min_voltage.setAlignment(Qt.AlignLeft) self.l2 = QLabel("Max Voltage [V]") self.l2.setAlignment(Qt.AlignLeft) self.l_max_voltage = QLabel() self.l_max_voltage.setNum(data.max_voltage) self.l_max_voltage.setAlignment(Qt.AlignLeft) self.l3 = QLabel("Min Temperature [°C]") self.l3.setAlignment(Qt.AlignLeft) self.l_min_temp = QLabel() self.l_min_temp.setNum(data.min_temp) self.l_min_temp.setAlignment(Qt.AlignLeft) self.l4 = QLabel("Max Temperature [°C]") self.l4.setAlignment(Qt.AlignLeft) self.l_max_temp = QLabel() self.l_max_temp.setNum(data.max_temp) self.l_max_temp.setAlignment(Qt.AlignLeft) self.l5 = QLabel("Current [A]") self.l5.setAlignment(Qt.AlignLeft) self.l_current = QLabel() self.l_current.setNum(data.current) self.l_current.setAlignment(Qt.AlignLeft) self.l6 = QLabel("Error") self.l_error = QLabel() self.l_error.setText(str(data.panic)) self.l_error.setAlignment(Qt.AlignLeft) self.l_errorcode = QLabel() self.l_errorcode.setText(str(data.panic_errorcode)) self.l_errorcode.setAlignment(Qt.AlignLeft) self.l_errorarg = QLabel() self.l_errorarg.setText(str(data.panic_errorarg)) self.l_errorcode.setAlignment(Qt.AlignLeft) self.l7 = QLabel("Time Since Last Dataframe") self.l_time_since_last_frame = QLabel() self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) self.l_time_since_last_frame.setAlignment(Qt.AlignLeft) ### LAYOUT ACCUMULATOR GENERAL ### grid_accumulator = QGridLayout() grid_accumulator.addWidget(self.l1, 0, 0) grid_accumulator.addWidget(self.l2, 1, 0) grid_accumulator.addWidget(self.l3, 0, 2) grid_accumulator.addWidget(self.l4, 1, 2) grid_accumulator.addWidget(self.l5, 2, 0) grid_accumulator.addWidget(self.l6, 3, 0) grid_accumulator.addWidget(self.l7, 4, 0) grid_accumulator.addWidget(self.l_min_voltage, 0, 1) grid_accumulator.addWidget(self.l_max_voltage, 1, 1) grid_accumulator.addWidget(self.l_min_temp, 0, 3) grid_accumulator.addWidget(self.l_max_temp, 1, 3) grid_accumulator.addWidget(self.l_current, 2, 1) grid_accumulator.addWidget(self.l_error, 3, 1) grid_accumulator.addWidget(self.l_errorcode, 3, 2) grid_accumulator.addWidget(self.l_errorarg, 3, 3) grid_accumulator.addWidget(self.l_time_since_last_frame, 4, 1) groupBox_accumulator = QGroupBox("Accumulator General") groupBox_accumulator.setLayout(grid_accumulator) win.addWidget(groupBox_accumulator) ### STACKS ### self.stack_gui_elements = [] for i in range(N_SLAVES): sge = StackGuiElement(f"Stack {i}", i) sge.update_data_from_slave(data.slaves[i]) self.stack_gui_elements.append(sge) ### LAYOUT STACKS ### n_slaves_half = math.ceil(N_SLAVES / 2) grid_stacks = QGridLayout() for i, sge in enumerate(self.stack_gui_elements): grid_stacks.addWidget( sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half ) groupBox_stacks = QGroupBox("Individual Stacks") groupBox_stacks.setLayout(grid_stacks) win.addWidget(groupBox_stacks) ### BUTTONS ### self.btn_start = QPushButton("Start Serial") self.btn_start.resize(self.btn_start.sizeHint()) self.btn_start.move(50, 50) self.btn_stop = QPushButton("Stop Serial") self.btn_stop.resize(self.btn_stop.sizeHint()) self.btn_stop.move(150, 50) self.btn_charge = QPushButton("Start Charging") self.btn_charge.resize(self.btn_stop.sizeHint()) self.btn_charge.move(150, 50) ### LAYOUT BUTTONS ### vbox_controls = QVBoxLayout() vbox_controls.addWidget(self.btn_start) vbox_controls.addWidget(self.btn_stop) vbox_controls.addWidget(self.btn_charge) groupBox_controls = QGroupBox("Controls") groupBox_controls.setLayout(vbox_controls) win.addWidget(groupBox_controls) # Start Button action self.btn_start.clicked.connect(self.start_thread) # Stop Button action self.btn_stop.clicked.connect(self.stop_thread) # Charge Button action self.btn_charge.clicked.connect(self.start_charge) self.setLayout(win) self.setWindowTitle("FT22 Charger Display") # When start_btn is clicked this runs. Creates the worker and the thread. def start_thread(self): self.thread = QThread() self.worker = Worker() self.stop_signal.connect( self.worker.stop ) # connect stop signal to worker stop method self.worker.moveToThread(self.thread) self.worker.finished.connect( self.thread.quit ) # connect the workers finished signal to stop thread self.worker.finished.connect( self.worker.deleteLater ) # connect the workers finished signal to clean up worker self.thread.finished.connect( self.thread.deleteLater ) # connect threads finished signal to clean up thread self.thread.started.connect(self.worker.do_work) self.thread.finished.connect(self.worker.stop) self.thread.start() # When stop_btn is clicked this runs. Terminates the worker and the thread. def stop_thread(self): self.stop_signal.emit() # emit the finished signal on stop def start_charge(self): ser.write(b"C") def update(self): # Accumulator self.l_min_voltage.setNum(data.min_voltage) self.l_max_voltage.setNum(data.max_voltage) self.l_min_temp.setNum(data.min_temp) self.l_max_temp.setNum(data.max_temp) self.l_current.setNum(data.current) self.l_error.setText(str(data.panic)) self.l_errorcode.setText(str(data.panic_errorcode)) self.l_errorarg.setText(str(data.panic_errorarg)) self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) # Stacks for i, sge in enumerate(self.stack_gui_elements): sge.update_data_from_slave(data.slaves[i]) class Worker(QObject): finished = pyqtSignal() # give worker class a finished signal def __init__(self, parent=None): QObject.__init__(self, parent=parent) self.continue_run = True # provide a bool run condition for the class def do_work(self): i = 1 while self.continue_run: # give the loop a stoppable condition # data.fill_dummy_data() self.charger_communication() # QThread.msleep(1) self.finished.emit() # emit the finished signal when the loop is done def charger_communication(self): rx_data = ser.read(256) while len(rx_data) > 0: if (frame_start := self.check_log_start(rx_data)) != -1: self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue elif (frame_start := self.check_current_start(rx_data)) != -1: self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue elif (frame_start := self.check_panic_start(rx_data)) != -1: self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue break def check_log_start(self, buf: bytes): return buf[:-12].find(b"LOG") def check_current_start(self, buf: bytes): return buf[:-12].find(b"CUR") def check_panic_start(self, buf: bytes): return buf[:-12].find(b"PAN") def decode_log_frame(self, buf: bytes): msg_id = buf[0] slave = msg_id >> 4 frame_id = msg_id & 0x0F if slave >= N_SLAVES: print(f"Unknown slave: {slave}", file=sys.stderr) return if frame_id == 0: for i in range(7): data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV elif frame_id == 1: for i in range(3): data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV for i in range(4): data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV elif frame_id == 2: for i in range(7): data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV elif frame_id == 3: for i in range(7): data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV elif frame_id == 4: for i in range(7): data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV elif frame_id == 5: for i in range(7): data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV else: print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) # time.sleep(1) return voltages = [ slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves ] self.parse_cell_temps(slave) temps = [ slave.cell_temps[i] for i in range(TEMP_SENSORS_PER_SLAVE) for slave in data.slaves ] data.min_voltage = min(voltages) data.max_voltage = max(voltages) data.min_temp = min(temps) data.max_temp = max(temps) data.time_since_last_frame = time.time() - data.last_frame data.last_frame = time.time() def decode_current_frame(self, buf: bytes): # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) current = struct.unpack(">i", buf[2:6])[0] data.current = current / 1000.0 def decode_panic_frame(self, buf: bytes): data.panic = True data.panic_errorcode = buf[0] data.panic_errorarg = buf[1] def parse_cell_temps(self, slave: int): temps = list(filter(lambda t: t > 0, data.slaves[slave].cell_temps[:16])) if len(temps) == 0: temps = [-1] min_t = min(temps) max_t = max(temps) for i in range(16, 32): data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t) def stop(self): self.continue_run = False # set the run condition to false on stop if __name__ == "__main__": data = AccumulatorData() rx_buf = bytes() if len(sys.argv) != 2: print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) sys.exit(os.EX_USAGE) SERIAL_PORT = sys.argv[1] print(SERIAL_PORT) try: ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) except serial.serialutil.SerialException: pass app = QApplication(sys.argv) gui = Window() gui.show() timer = QTimer() # timer.timeout.connect(data.fill_dummy_data) timer.timeout.connect(gui.update) timer.start(1000) # every 1,000 milliseconds sys.exit(app.exec_())