#!/usr/bin/env python3 from enum import Enum import math import os import struct import time import random import serial import sys import numpy as np from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal from PyQt5.QtWidgets import ( QApplication, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QGroupBox, QFrame, QSizePolicy, ) BITRATE = 115200 # baud/s TIMEOUT = 1 # seconds N_SLAVES = 9 LOG_FRAME_LENGTH = 8 # bytes PARALLEL_CELLS = 9 CELLS_PER_SLAVE = 10 TEMP_SENSORS_PER_SLAVE = 32 VOLTAGE_CONV = 5.0 / 0xFFFF # volts/quantum TEMP_CONV = 0.0625 # °C/quantum ERRORCODE_TIMEOUT_SLAVE = 1 ERRORCODE_SLAVE_PANIC = 2 ERRORCODE_TIMEOUT_SLAVE_FRAMES = 3 ERRORCODE_TOO_FEW_TEMPS = 4 ERRORCODE_TIMEOUT_SHUNT = 6 ERRORCODE_MASTER_THRESH = 7 SLAVE_ERROR_UV = 0 SLAVE_ERROR_OV = 1 SLAVE_ERROR_UT = 2 SLAVE_ERROR_OT = 3 SLAVE_ERROR_BQ = 4 SLAVE_ERROR_TMP144 = 5 MASTER_THRESH_UT = 0 MASTER_THRESH_OT = 1 MASTER_THRESH_UV = 2 MASTER_THRESH_OV = 3 INTERNAL_RESISTANCE_CURVE_X = [2.0, 4.12] INTERNAL_RESISTANCE_CURVE_Y = [0.0528, 0.0294] SOC_OCV_X = [2.1, 2.9, 3.2, 3.3, 3.4, 3.5, 3.68, 4.0, 4.15, 4.2] SOC_OCV_Y = [0, 0.023, 0.06, 0.08, 0.119, 0.227, 0.541, 0.856, 0.985, 1.0] def estimate_soc(voltage: float) -> float: r_i = np.interp( [voltage], INTERNAL_RESISTANCE_CURVE_X, INTERNAL_RESISTANCE_CURVE_Y, )[0] i = data.current / PARALLEL_CELLS ocv = voltage - i * r_i return np.interp([ocv], SOC_OCV_X, SOC_OCV_Y)[0] * 100 class SlaveData: cell_voltages: list[float] cell_temps: list[float] def __init__(self) -> None: self.cell_voltages = [-1] * CELLS_PER_SLAVE self.cell_temps = ([-1] * (TEMP_SENSORS_PER_SLAVE // 2)) + ( [0] * (TEMP_SENSORS_PER_SLAVE // 2) ) class TSState(Enum): TS_INACTIVE = 0 TS_ACTIVE = 1 TS_PRECHARGE = 2 TS_DISCHARGE = 3 TS_ERROR = 4 TS_CHARGING_CHECK = 5 TS_CHARGING = 6 TS_BALANCING = 7 class AccumulatorData: slaves: list[SlaveData] min_voltage: float max_voltage: float min_soc: float max_soc: float min_temp: float max_temp: float last_frame: float time_since_last_frame: float current: float panic: bool panic_errorcode: int panic_errorarg: int ts_state: TSState def __init__(self) -> None: self.slaves = [SlaveData() for _ in range(N_SLAVES)] self.min_voltage = ( self.max_voltage ) = ( self.min_soc ) = ( self.max_soc ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 self.panic = False self.panic_errorcode = self.panic_errorarg = 0 self.time_since_last_frame = 0 self.ts_state = TSState.TS_INACTIVE def fill_dummy_data(self): self.min_voltage = random.uniform(1, 3) self.max_voltage = random.uniform(3, 5) self.min_temp = random.uniform(0, 25) self.max_temp = random.uniform(25, 60) self.current = random.uniform(25, 60) self.last_frame = time.time() - random.random() self.panic = random.choice([True, False]) if self.panic: self.panic_errorcode = random.randint(1, 10000) self.panic_errorarg = "ABCDERFG" else: self.panic_errorcode = 0 self.panic_errorarg = "" data.time_since_last_frame = random.uniform(1, 60) for i in range(N_SLAVES): self.slaves[i].cell_voltages = [ random.uniform(1, 5) for i in range(CELLS_PER_SLAVE) ] self.slaves[i].cell_temps = [ random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE) ] class StackGuiElement: title: str stack_id: int voltage_label: QLabel soc_label: QLabel temp_label: QLabel groupBox: QGroupBox detail_popup: QWidget def __init__(self, title: str, stack_id: int): self.title = title self.stack_id = stack_id self.voltage_label = QLabel() self.soc_label = QLabel() self.temp_label = QLabel() self.groupBox = QGroupBox() self.detail_popup = None self.__post__init__() def __post__init__(self): l1 = QLabel("Voltage [V]") l2 = QLabel("SoC [%]") l3 = QLabel("Temperature [°C]") popup_btn = QPushButton("Details") popup_btn.clicked.connect(self.show_popup) l1.setAlignment(Qt.AlignLeft) l2.setAlignment(Qt.AlignLeft) l3.setAlignment(Qt.AlignLeft) self.voltage_label.setAlignment(Qt.AlignLeft) self.soc_label.setAlignment(Qt.AlignLeft) self.temp_label.setAlignment(Qt.AlignLeft) grid = QGridLayout() grid.addWidget(l1, 0, 0) grid.addWidget(l2, 1, 0) grid.addWidget(l3, 2, 0) grid.addWidget(self.voltage_label, 0, 1) grid.addWidget(self.soc_label, 1, 1) grid.addWidget(self.temp_label, 2, 1) grid.addWidget(popup_btn, 3, 0) self.groupBox.setTitle(self.title) self.groupBox.setLayout(grid) def update_data_from_slave(self, slave: SlaveData): min_v, max_v = min(slave.cell_voltages), max(slave.cell_voltages) self.voltage_label.setText(f"[{min_v:.02f}, {max_v:.02f}]") min_soc, max_soc = estimate_soc(min_v), estimate_soc(max_v) self.soc_label.setText(f"[{min_soc:02.0f}, {max_soc:02.0f}]") min_t, max_t = min(slave.cell_temps), max(slave.cell_temps) self.temp_label.setText(f"[{min_t:.02f}, {max_t:.02f}]") def show_popup(self): if self.detail_popup is None: self.detail_popup = StackPopup(self.stack_id) self.detail_popup.show() class QVSeperationLine(QFrame): """ a vertical seperation line """ def __init__(self): super().__init__() self.setFixedWidth(20) self.setMinimumHeight(1) self.setFrameShape(QFrame.VLine) self.setFrameShadow(QFrame.Sunken) self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred) return class StackPopup(QWidget): stack_id: int voltage_labels: list[QLabel] temp_labels: list[QLabel] def __init__(self, stack_id: int): super().__init__() self.stack_id = stack_id self.voltage_labels = [] self.temp_labels = [] layout = QVBoxLayout() groupbox = QGroupBox() grid = QGridLayout() for i in range(len(data.slaves[stack_id].cell_voltages)): l1 = QLabel(f"Voltage Cell {i}") l1.setAlignment(Qt.AlignLeft) l_v = QLabel() l_v.setNum(data.slaves[stack_id].cell_voltages[i]) l_v.setAlignment(Qt.AlignLeft) self.voltage_labels.append(l_v) grid.addWidget(l1, i, 0) grid.addWidget(l_v, i, 1) # vertical separators between rows in popup separator_vertical = QVSeperationLine() grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1) num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE for i in range(num_temp_cols): separator_vertical = QVSeperationLine() grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1) for i in range(len(data.slaves[stack_id].cell_temps)): l2 = QLabel(f"Temp. Sensor {i}") l2.setAlignment(Qt.AlignLeft) l_t = QLabel() l_t.setNum(data.slaves[stack_id].cell_temps[i]) l_t.setAlignment(Qt.AlignLeft) self.temp_labels.append(l_t) grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3) grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3) groupbox.setTitle(f"Stack {stack_id}") groupbox.setLayout(grid) layout.addWidget(groupbox) self.setLayout(layout) self.setWindowTitle(f"FT22 Charger Display - Stack {stack_id} Details") self.update_data() timer.timeout.connect(self.update_data) def update_data(self): for i in range(len(data.slaves[self.stack_id].cell_voltages)): self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i]) for i in range(len(data.slaves[self.stack_id].cell_temps)): self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i]) class Window(QWidget): stop_signal = ( pyqtSignal() ) # make a stop signal to communicate with the worker in another thread def __init__(self, parent=None): super(Window, self).__init__(parent) win = QVBoxLayout() ### ACCUMULATOR GENERAL ### self.l1 = QLabel("Min Voltage [V]") self.l1.setAlignment(Qt.AlignLeft) self.l_min_voltage = QLabel() self.l_min_voltage.setNum(data.min_voltage) self.l_min_voltage.setAlignment(Qt.AlignLeft) self.l2 = QLabel("Max Voltage [V]") self.l2.setAlignment(Qt.AlignLeft) self.l_max_voltage = QLabel() self.l_max_voltage.setNum(data.max_voltage) self.l_max_voltage.setAlignment(Qt.AlignLeft) self.l3 = QLabel("Min SoC [%]") self.l3.setAlignment(Qt.AlignLeft) self.l_min_soc = QLabel() self.l_min_soc.setNum(data.min_soc) self.l_min_soc.setAlignment(Qt.AlignLeft) self.l4 = QLabel("Max SoC [%]") self.l4.setAlignment(Qt.AlignLeft) self.l_max_soc = QLabel() self.l_max_soc.setNum(data.max_soc) self.l_max_soc.setAlignment(Qt.AlignLeft) self.l5 = QLabel("Min Temperature [°C]") self.l5.setAlignment(Qt.AlignLeft) self.l_min_temp = QLabel() self.l_min_temp.setNum(data.min_temp) self.l_min_temp.setAlignment(Qt.AlignLeft) self.l6 = QLabel("Max Temperature [°C]") self.l6.setAlignment(Qt.AlignLeft) self.l_max_temp = QLabel() self.l_max_temp.setNum(data.max_temp) self.l_max_temp.setAlignment(Qt.AlignLeft) self.l7 = QLabel("Current [A]") self.l7.setAlignment(Qt.AlignLeft) self.l_current = QLabel() self.l_current.setNum(data.current) self.l_current.setAlignment(Qt.AlignLeft) self.l8 = QLabel("State") self.l_state = QLabel() self.l_state.setText(data.ts_state.name) self.l_state.setAlignment(Qt.AlignLeft) self.l_errorcode = QLabel() self.l_errorcode.setText("") self.l_errorcode.setAlignment(Qt.AlignLeft) self.l_errorarg = QLabel() self.l_errorarg.setText("") self.l_errorcode.setAlignment(Qt.AlignLeft) self.l9 = QLabel("Time Since Last Dataframe") self.l_time_since_last_frame = QLabel() self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) self.l_time_since_last_frame.setAlignment(Qt.AlignLeft) ### LAYOUT ACCUMULATOR GENERAL ### grid_accumulator = QGridLayout() grid_accumulator.addWidget(self.l1, 0, 0) grid_accumulator.addWidget(self.l2, 1, 0) grid_accumulator.addWidget(self.l3, 2, 0) grid_accumulator.addWidget(self.l4, 3, 0) grid_accumulator.addWidget(self.l5, 0, 2) grid_accumulator.addWidget(self.l6, 1, 2) grid_accumulator.addWidget(self.l7, 2, 2) grid_accumulator.addWidget(self.l8, 5, 0) grid_accumulator.addWidget(self.l9, 3, 2) grid_accumulator.addWidget(self.l_min_voltage, 0, 1) grid_accumulator.addWidget(self.l_max_voltage, 1, 1) grid_accumulator.addWidget(self.l_min_soc, 2, 1) grid_accumulator.addWidget(self.l_max_soc, 3, 1) grid_accumulator.addWidget(self.l_min_temp, 0, 3) grid_accumulator.addWidget(self.l_max_temp, 1, 3) grid_accumulator.addWidget(self.l_current, 2, 3) grid_accumulator.addWidget(self.l_state, 5, 1) grid_accumulator.addWidget(self.l_errorcode, 5, 2) grid_accumulator.addWidget(self.l_errorarg, 5, 3) grid_accumulator.addWidget(self.l_time_since_last_frame, 3, 3) groupBox_accumulator = QGroupBox("Accumulator General") groupBox_accumulator.setLayout(grid_accumulator) win.addWidget(groupBox_accumulator) ### STACKS ### self.stack_gui_elements = [] for i in range(N_SLAVES): sge = StackGuiElement(f"Stack {i}", i) sge.update_data_from_slave(data.slaves[i]) self.stack_gui_elements.append(sge) ### LAYOUT STACKS ### n_slaves_half = math.ceil(N_SLAVES / 2) grid_stacks = QGridLayout() for i, sge in enumerate(self.stack_gui_elements): grid_stacks.addWidget( sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half ) groupBox_stacks = QGroupBox("Individual Stacks") groupBox_stacks.setLayout(grid_stacks) win.addWidget(groupBox_stacks) ### BUTTONS ### self.btn_start = QPushButton("Start Serial") self.btn_start.resize(self.btn_start.sizeHint()) self.btn_start.move(50, 50) self.btn_stop = QPushButton("Stop Serial") self.btn_stop.resize(self.btn_stop.sizeHint()) self.btn_stop.move(150, 50) self.btn_charge = QPushButton("Start Charging") self.btn_charge.resize(self.btn_charge.sizeHint()) self.btn_charge.move(250, 50) self.btn_balance = QPushButton("Start Balancing") self.btn_balance.resize(self.btn_balance.sizeHint()) self.btn_balance.move(350, 50) ### LAYOUT BUTTONS ### vbox_controls = QVBoxLayout() vbox_controls.addWidget(self.btn_start) vbox_controls.addWidget(self.btn_stop) vbox_controls.addWidget(self.btn_charge) vbox_controls.addWidget(self.btn_balance) groupBox_controls = QGroupBox("Controls") groupBox_controls.setLayout(vbox_controls) win.addWidget(groupBox_controls) # Start Button action self.btn_start.clicked.connect(self.start_thread) # Stop Button action self.btn_stop.clicked.connect(self.stop_thread) # Charge Button action self.btn_charge.clicked.connect(self.start_charge) # Balance Button action self.btn_balance.clicked.connect(self.start_balance) self.setLayout(win) self.setWindowTitle("FT22 Charger Display") # When start_btn is clicked this runs. Creates the worker and the thread. def start_thread(self): self.thread = QThread() self.worker = Worker() self.stop_signal.connect( self.worker.stop ) # connect stop signal to worker stop method self.worker.moveToThread(self.thread) self.worker.finished.connect( self.thread.quit ) # connect the workers finished signal to stop thread self.worker.finished.connect( self.worker.deleteLater ) # connect the workers finished signal to clean up worker self.thread.finished.connect( self.thread.deleteLater ) # connect threads finished signal to clean up thread self.thread.started.connect(self.worker.do_work) self.thread.finished.connect(self.worker.stop) self.thread.start() # When stop_btn is clicked this runs. Terminates the worker and the thread. def stop_thread(self): self.stop_signal.emit() # emit the finished signal on stop def start_charge(self): ser.write(b"C") def start_balance(self): ser.write(b"B") def update(self): # Accumulator self.l_min_voltage.setText(f"{data.min_voltage:.02f}") self.l_max_voltage.setText(f"{data.max_voltage:.02f}") self.l_min_soc.setText(f"{data.min_soc:02.0f}") self.l_max_soc.setText(f"{data.max_soc:02.0f}") self.l_min_temp.setText(f"{data.min_temp:.02f}") self.l_max_temp.setText(f"{data.max_temp:.02f}") self.l_current.setText(f"{data.current:.02f}") self.l_state.setText(data.ts_state.name) if data.panic: self.l_errorcode.setText(str(data.panic_errorcode)) self.l_errorarg.setText(str(data.panic_errorarg)) for l in (self.l_state, self.l_errorcode, self.l_errorarg): l.setStyleSheet("color: red; font-weight: bold;") elif data.ts_state == TSState.TS_CHARGING: for l in (self.l_state, self.l_errorcode, self.l_errorarg): l.setStyleSheet("color: green; font-weight: bold;") else: self.l_errorcode.setText("") self.l_errorarg.setText("") for l in (self.l_state, self.l_errorcode, self.l_errorarg): l.setStyleSheet("color: black; font-weight: normal;") last_time = time.time() - data.last_frame self.l_time_since_last_frame.setText(f"{last_time:.03f}") # Stacks for i, sge in enumerate(self.stack_gui_elements): sge.update_data_from_slave(data.slaves[i]) class Worker(QObject): finished = pyqtSignal() # give worker class a finished signal def __init__(self, parent=None): QObject.__init__(self, parent=parent) self.continue_run = True # provide a bool run condition for the class def do_work(self): i = 1 while self.continue_run: # give the loop a stoppable condition # data.fill_dummy_data() self.charger_communication() # QThread.msleep(1) self.finished.emit() # emit the finished signal when the loop is done def charger_communication(self): rx_data = ser.read(256) # print(len(rx_data), rx_data) while len(rx_data) > 0: if (frame_start := self.check_log_start(rx_data)) != -1: self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue elif (frame_start := self.check_current_start(rx_data)) != -1: self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue elif (frame_start := self.check_panic_start(rx_data)) != -1: self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue elif (frame_start := self.check_status_start(rx_data)) != -1: self.decode_status_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue break def check_log_start(self, buf: bytes): return buf[:-12].find(b"LOG") def check_current_start(self, buf: bytes): return buf[:-12].find(b"CUR") def check_panic_start(self, buf: bytes): return buf[:-12].find(b"PAN") def check_status_start(self, buf: bytes): return buf[:-12].find(b"STA") def decode_log_frame(self, buf: bytes): msg_id = buf[0] slave = msg_id >> 4 frame_id = msg_id & 0x0F if slave >= N_SLAVES: print(f"Unknown slave: {slave}", file=sys.stderr) return if frame_id == 0: for i in range(3): raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] data.slaves[slave].cell_voltages[i] = raw * VOLTAGE_CONV elif frame_id == 1: for i in range(3): raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] data.slaves[slave].cell_voltages[i + 3] = raw * VOLTAGE_CONV elif frame_id == 2: for i in range(3): raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] data.slaves[slave].cell_voltages[i + 6] = raw * VOLTAGE_CONV elif frame_id == 3: for i in range(1): raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] data.slaves[slave].cell_voltages[i + 9] = raw * VOLTAGE_CONV for i in range(2): raw = (buf[i * 2 + 3] << 8) | buf[i * 2 + 4] data.slaves[slave].cell_temps[i] = raw * TEMP_CONV elif frame_id == 4: for i in range(3): raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] data.slaves[slave].cell_temps[i + 2] = raw * TEMP_CONV elif frame_id == 5: for i in range(3): raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] data.slaves[slave].cell_temps[i + 5] = raw * TEMP_CONV elif frame_id == 6: for i in range(3): raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] data.slaves[slave].cell_temps[i + 8] = raw * TEMP_CONV elif frame_id == 7: for i in range(3): raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] data.slaves[slave].cell_temps[i + 11] = raw * TEMP_CONV elif frame_id == 8: for i in range(3): raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] data.slaves[slave].cell_temps[i + 14] = raw * TEMP_CONV else: print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) # time.sleep(1) return voltages = [ slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves ] self.parse_cell_temps(slave) temps = [ slave.cell_temps[i] for i in range(TEMP_SENSORS_PER_SLAVE) for slave in data.slaves ] data.min_voltage = min(voltages) data.max_voltage = max(voltages) data.min_soc = estimate_soc(data.min_voltage) data.max_soc = estimate_soc(data.max_voltage) data.min_temp = min(temps) data.max_temp = max(temps) data.time_since_last_frame = time.time() - data.last_frame data.last_frame = time.time() def decode_current_frame(self, buf: bytes): # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) current = struct.unpack(">i", buf[2:6])[0] data.current = current / 1000.0 def decode_panic_frame(self, buf: bytes): data.panic = True errorcode = int(buf[0]) errorargs = [int(buf[1]), int(buf[2])] if errorcode == ERRORCODE_TIMEOUT_SLAVE: data.panic_errorcode = "Slave timeout" data.panic_errorarg = f"Slave ID {errorargs[0]}" elif errorcode == ERRORCODE_SLAVE_PANIC: data.panic_errorcode = "Slave panic" if errorargs[1] == SLAVE_ERROR_UV: slave_error = "Undervoltage" elif errorargs[1] == SLAVE_ERROR_OV: slave_error = "Overvoltage" elif errorargs[1] == SLAVE_ERROR_UT: slave_error = "Undertemperature" elif errorargs[1] == SLAVE_ERROR_OT: slave_error = "Overtemperature" elif errorargs[1] == SLAVE_ERROR_BQ: slave_error = "BQ Communication error" elif errorargs[1] == SLAVE_ERROR_TMP144: slave_error = "TMP144 communication error" else: slave_error = f"Unknown error ({errorargs[1]})" data.panic_errorarg = f"Slave ID {errorargs[0]}: {slave_error}" elif errorcode == ERRORCODE_TIMEOUT_SLAVE_FRAMES: data.panic_errorcode = "Slave frame timeout" data.panic_errorarg = f"Slave ID {errorargs[0]}, frame {errorargs[1]}" elif errorcode == ERRORCODE_TOO_FEW_TEMPS: data.panic_errorcode = "Too few temperature sensors" data.panic_errorarg = f"Slave ID {errorargs[0]}" elif errorcode == ERRORCODE_TIMEOUT_SHUNT: data.panic_errorcode = "Shunt timeout" elif errorcode == ERRORCODE_MASTER_THRESH: data.panic_errorcode = "Master detected threshold" if errorargs[0] == MASTER_THRESH_UT: data.panic_errorarg = "Undertemperature" elif errorargs[0] == MASTER_THRESH_OT: data.panic_errorarg = "Overtemperature" elif errorargs[0] == MASTER_THRESH_UV: data.panic_errorarg = "Undervoltage" elif errorargs[0] == MASTER_THRESH_OV: data.panic_errorarg = "Overvoltage" else: data.panic_errorarg = f"Unknown threshold ({errorargs[0]})" else: data.panic_errorcode = buf[0] data.panic_errorarg = buf[1] def decode_status_frame(self, buf: bytes): state = buf[0] & 0x7F data.ts_state = TSState(state) data.panic = data.ts_state == TSState.TS_ERROR def parse_cell_temps(self, slave: int): temps = list( filter(lambda t: t > 0 and t < 60, data.slaves[slave].cell_temps[:16]) ) if len(temps) == 0: temps = [-1] min_t = min(temps) max_t = max(temps) # for i in range(16, 32): # data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t) def stop(self): self.continue_run = False # set the run condition to false on stop if __name__ == "__main__": data = AccumulatorData() rx_buf = bytes() if len(sys.argv) != 2: print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) sys.exit(os.EX_USAGE) SERIAL_PORT = sys.argv[1] print(SERIAL_PORT) ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) app = QApplication(sys.argv) gui = Window() gui.show() timer = QTimer() # timer.timeout.connect(data.fill_dummy_data) timer.timeout.connect(gui.update) timer.start(1000) # every 1,000 milliseconds sys.exit(app.exec_())