#!/usr/bin/env python3 import math import os import struct import time import random import serial import sys import numpy as np from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal from PyQt5.QtWidgets import ( QApplication, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QGroupBox, QFrame, QSizePolicy, ) BITRATE = 115200 # baud/s TIMEOUT = 1 # seconds N_SLAVES = 9 LOG_FRAME_LENGTH = 8 # bytes PARALLEL_CELLS = 9 CELLS_PER_SLAVE = 10 TEMP_SENSORS_PER_SLAVE = 32 VOLTAGE_CONV = 5.0 / 255 # volts/quantum TEMP_CONV = 0.0625 * 16 # °C/quantum ERRORCODE_TIMEOUT_SLAVE = 1 ERRORCODE_SLAVE_PANIC = 2 ERRORCODE_TIMEOUT_SLAVE_FRAMES = 3 ERRORCODE_TOO_FEW_TEMPS = 4 ERRORCODE_TIMEOUT_SHUNT = 6 ERRORCODE_MASTER_THRESH = 7 SLAVE_ERROR_UV = 0 SLAVE_ERROR_OV = 1 SLAVE_ERROR_UT = 2 SLAVE_ERROR_OT = 3 SLAVE_ERROR_BQ = 4 SLAVE_ERROR_TMP144 = 5 MASTER_THRESH_UT = 0 MASTER_THRESH_OT = 1 MASTER_THRESH_UV = 2 MASTER_THRESH_OV = 3 class SlaveData: cell_voltages: list[float] cell_temps: list[float] def __init__(self) -> None: self.cell_voltages = [-1] * CELLS_PER_SLAVE self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE class AccumulatorData: slaves: list[SlaveData] min_voltage: float max_voltage: float min_soc: float max_soc: float min_temp: float max_temp: float last_frame: float time_since_last_frame: float current: float panic: bool panic_errorcode: int panic_errorarg: int def __init__(self) -> None: self.slaves = [SlaveData() for _ in range(N_SLAVES)] self.min_voltage = ( self.max_voltage ) = ( self.min_soc ) = ( self.max_soc ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 self.panic = False self.panic_errorcode = self.panic_errorarg = 0 self.time_since_last_frame = 0 def fill_dummy_data(self): self.min_voltage = random.uniform(1, 3) self.max_voltage = random.uniform(3, 5) self.min_temp = random.uniform(0, 25) self.max_temp = random.uniform(25, 60) self.current = random.uniform(25, 60) self.last_frame = time.time() - random.random() self.panic = random.choice([True, False]) if self.panic: self.panic_errorcode = random.randint(1, 10000) self.panic_errorarg = "ABCDERFG" else: self.panic_errorcode = 0 self.panic_errorarg = "" data.time_since_last_frame = random.uniform(1, 60) for i in range(N_SLAVES): self.slaves[i].cell_voltages = [ random.uniform(1, 5) for i in range(CELLS_PER_SLAVE) ] self.slaves[i].cell_temps = [ random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE) ] class StackGuiElement: title: str stack_id: int min_voltage_label: QLabel max_voltage_label: QLabel min_temp_label: QLabel max_temp_label: QLabel groupBox: QGroupBox detail_popup: QWidget def __init__(self, title: str, stack_id: int): self.title = title self.stack_id = stack_id self.min_voltage_label = QLabel() self.max_voltage_label = QLabel() self.min_temp_label = QLabel() self.max_temp_label = QLabel() self.groupBox = QGroupBox() self.detail_popup = None self.__post__init__() def __post__init__(self): l1 = QLabel("Min Voltage [V]") l2 = QLabel("Max Voltage [V]") l3 = QLabel("Min Temperature [°C]") l4 = QLabel("Max Temperature [°C]") popup_btn = QPushButton(self.title + " details") popup_btn.clicked.connect(self.show_popup) l1.setAlignment(Qt.AlignLeft) l2.setAlignment(Qt.AlignLeft) l3.setAlignment(Qt.AlignLeft) l4.setAlignment(Qt.AlignLeft) self.min_voltage_label.setAlignment(Qt.AlignLeft) self.max_voltage_label.setAlignment(Qt.AlignLeft) self.min_temp_label.setAlignment(Qt.AlignLeft) self.max_temp_label.setAlignment(Qt.AlignLeft) grid = QGridLayout() grid.addWidget(l1, 0, 0) grid.addWidget(l2, 1, 0) grid.addWidget(l3, 2, 0) grid.addWidget(l4, 3, 0) grid.addWidget(self.min_voltage_label, 0, 1) grid.addWidget(self.max_voltage_label, 1, 1) grid.addWidget(self.min_temp_label, 2, 1) grid.addWidget(self.max_temp_label, 3, 1) grid.addWidget(popup_btn, 0, 2) self.groupBox.setTitle(self.title) self.groupBox.setLayout(grid) def update_data_from_slave(self, slave: SlaveData): self.min_voltage_label.setNum(min(slave.cell_voltages)) self.max_voltage_label.setNum(max(slave.cell_voltages)) self.min_temp_label.setNum(min(slave.cell_temps)) self.max_temp_label.setNum(max(slave.cell_temps)) def show_popup(self): if self.detail_popup is None: self.detail_popup = StackPopup(self.stack_id) self.detail_popup.show() class QVSeperationLine(QFrame): """ a vertical seperation line """ def __init__(self): super().__init__() self.setFixedWidth(20) self.setMinimumHeight(1) self.setFrameShape(QFrame.VLine) self.setFrameShadow(QFrame.Sunken) self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred) return class StackPopup(QWidget): stack_id: int voltage_labels: list[QLabel] temp_labels: list[QLabel] def __init__(self, stack_id: int): super().__init__() self.stack_id = stack_id self.voltage_labels = [] self.temp_labels = [] layout = QVBoxLayout() groupbox = QGroupBox() grid = QGridLayout() for i in range(len(data.slaves[stack_id].cell_voltages)): l1 = QLabel(f"Voltage Cell {i}") l1.setAlignment(Qt.AlignLeft) l_v = QLabel() l_v.setNum(data.slaves[stack_id].cell_voltages[i]) l_v.setAlignment(Qt.AlignLeft) self.voltage_labels.append(l_v) grid.addWidget(l1, i, 0) grid.addWidget(l_v, i, 1) # vertical separators between rows in popup separator_vertical = QVSeperationLine() grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1) num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE for i in range(num_temp_cols): separator_vertical = QVSeperationLine() grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1) for i in range(len(data.slaves[stack_id].cell_temps)): l2 = QLabel(f"Temp. Sensor {i}") l2.setAlignment(Qt.AlignLeft) l_t = QLabel() l_t.setNum(data.slaves[stack_id].cell_temps[i]) l_t.setAlignment(Qt.AlignLeft) self.temp_labels.append(l_t) grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3) grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3) groupbox.setTitle(f"Stack {stack_id}") groupbox.setLayout(grid) layout.addWidget(groupbox) self.setLayout(layout) self.update_data() timer.timeout.connect(self.update_data) def update_data(self): for i in range(len(data.slaves[self.stack_id].cell_voltages)): self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i]) for i in range(len(data.slaves[self.stack_id].cell_temps)): self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i]) class Window(QWidget): stop_signal = ( pyqtSignal() ) # make a stop signal to communicate with the worker in another thread def __init__(self, parent=None): super(Window, self).__init__(parent) win = QVBoxLayout() ### ACCUMULATOR GENERAL ### self.l1 = QLabel("Min Voltage [V]") self.l1.setAlignment(Qt.AlignLeft) self.l_min_voltage = QLabel() self.l_min_voltage.setNum(data.min_voltage) self.l_min_voltage.setAlignment(Qt.AlignLeft) self.l2 = QLabel("Max Voltage [V]") self.l2.setAlignment(Qt.AlignLeft) self.l_max_voltage = QLabel() self.l_max_voltage.setNum(data.max_voltage) self.l_max_voltage.setAlignment(Qt.AlignLeft) self.l3 = QLabel("Min SoC [%]") self.l3.setAlignment(Qt.AlignLeft) self.l_min_soc = QLabel() self.l_min_soc.setNum(data.min_soc) self.l_min_soc.setAlignment(Qt.AlignLeft) self.l4 = QLabel("Max SoC [%]") self.l4.setAlignment(Qt.AlignLeft) self.l_max_soc = QLabel() self.l_max_soc.setNum(data.max_soc) self.l_max_soc.setAlignment(Qt.AlignLeft) self.l5 = QLabel("Min Temperature [°C]") self.l5.setAlignment(Qt.AlignLeft) self.l_min_temp = QLabel() self.l_min_temp.setNum(data.min_temp) self.l_min_temp.setAlignment(Qt.AlignLeft) self.l6 = QLabel("Max Temperature [°C]") self.l6.setAlignment(Qt.AlignLeft) self.l_max_temp = QLabel() self.l_max_temp.setNum(data.max_temp) self.l_max_temp.setAlignment(Qt.AlignLeft) self.l7 = QLabel("Current [A]") self.l7.setAlignment(Qt.AlignLeft) self.l_current = QLabel() self.l_current.setNum(data.current) self.l_current.setAlignment(Qt.AlignLeft) self.l8 = QLabel("Error") self.l_error = QLabel() self.l_error.setText(str(data.panic)) self.l_error.setAlignment(Qt.AlignLeft) self.l_errorcode = QLabel() self.l_errorcode.setText(str(data.panic_errorcode)) self.l_errorcode.setAlignment(Qt.AlignLeft) self.l_errorarg = QLabel() self.l_errorarg.setText(str(data.panic_errorarg)) self.l_errorcode.setAlignment(Qt.AlignLeft) self.l9 = QLabel("Time Since Last Dataframe") self.l_time_since_last_frame = QLabel() self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) self.l_time_since_last_frame.setAlignment(Qt.AlignLeft) ### LAYOUT ACCUMULATOR GENERAL ### grid_accumulator = QGridLayout() grid_accumulator.addWidget(self.l1, 0, 0) grid_accumulator.addWidget(self.l2, 1, 0) grid_accumulator.addWidget(self.l3, 2, 0) grid_accumulator.addWidget(self.l4, 3, 0) grid_accumulator.addWidget(self.l5, 0, 2) grid_accumulator.addWidget(self.l6, 1, 2) grid_accumulator.addWidget(self.l7, 2, 2) grid_accumulator.addWidget(self.l8, 5, 0) grid_accumulator.addWidget(self.l9, 3, 2) grid_accumulator.addWidget(self.l_min_voltage, 0, 1) grid_accumulator.addWidget(self.l_max_voltage, 1, 1) grid_accumulator.addWidget(self.l_min_soc, 2, 1) grid_accumulator.addWidget(self.l_max_soc, 3, 1) grid_accumulator.addWidget(self.l_min_temp, 0, 3) grid_accumulator.addWidget(self.l_max_temp, 1, 3) grid_accumulator.addWidget(self.l_current, 2, 3) grid_accumulator.addWidget(self.l_error, 5, 1) grid_accumulator.addWidget(self.l_errorcode, 5, 2) grid_accumulator.addWidget(self.l_errorarg, 5, 3) grid_accumulator.addWidget(self.l_time_since_last_frame, 3, 3) groupBox_accumulator = QGroupBox("Accumulator General") groupBox_accumulator.setLayout(grid_accumulator) win.addWidget(groupBox_accumulator) ### STACKS ### self.stack_gui_elements = [] for i in range(N_SLAVES): sge = StackGuiElement(f"Stack {i}", i) sge.update_data_from_slave(data.slaves[i]) self.stack_gui_elements.append(sge) ### LAYOUT STACKS ### n_slaves_half = math.ceil(N_SLAVES / 2) grid_stacks = QGridLayout() for i, sge in enumerate(self.stack_gui_elements): grid_stacks.addWidget( sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half ) groupBox_stacks = QGroupBox("Individual Stacks") groupBox_stacks.setLayout(grid_stacks) win.addWidget(groupBox_stacks) ### BUTTONS ### self.btn_start = QPushButton("Start Serial") self.btn_start.resize(self.btn_start.sizeHint()) self.btn_start.move(50, 50) self.btn_stop = QPushButton("Stop Serial") self.btn_stop.resize(self.btn_stop.sizeHint()) self.btn_stop.move(150, 50) self.btn_charge = QPushButton("Start Charging") self.btn_charge.resize(self.btn_stop.sizeHint()) self.btn_charge.move(150, 50) ### LAYOUT BUTTONS ### vbox_controls = QVBoxLayout() vbox_controls.addWidget(self.btn_start) vbox_controls.addWidget(self.btn_stop) vbox_controls.addWidget(self.btn_charge) groupBox_controls = QGroupBox("Controls") groupBox_controls.setLayout(vbox_controls) win.addWidget(groupBox_controls) # Start Button action self.btn_start.clicked.connect(self.start_thread) # Stop Button action self.btn_stop.clicked.connect(self.stop_thread) # Charge Button action self.btn_charge.clicked.connect(self.start_charge) self.setLayout(win) self.setWindowTitle("FT22 Charger Display") # When start_btn is clicked this runs. Creates the worker and the thread. def start_thread(self): self.thread = QThread() self.worker = Worker() self.stop_signal.connect( self.worker.stop ) # connect stop signal to worker stop method self.worker.moveToThread(self.thread) self.worker.finished.connect( self.thread.quit ) # connect the workers finished signal to stop thread self.worker.finished.connect( self.worker.deleteLater ) # connect the workers finished signal to clean up worker self.thread.finished.connect( self.thread.deleteLater ) # connect threads finished signal to clean up thread self.thread.started.connect(self.worker.do_work) self.thread.finished.connect(self.worker.stop) self.thread.start() # When stop_btn is clicked this runs. Terminates the worker and the thread. def stop_thread(self): self.stop_signal.emit() # emit the finished signal on stop def start_charge(self): ser.write(b"C") def update(self): # Accumulator self.l_min_voltage.setText(f"{data.min_voltage:.02f}") self.l_max_voltage.setText(f"{data.max_voltage:.02f}") self.l_min_soc.setText(f"{data.min_soc:.01f}") self.l_max_soc.setText(f"{data.max_soc:.01f}") self.l_min_temp.setText(f"{data.min_temp:.00f}") self.l_max_temp.setText(f"{data.max_temp:.00f}") self.l_current.setText(f"{data.current:.03f}") self.l_error.setText(str(data.panic)) self.l_errorcode.setText(str(data.panic_errorcode)) self.l_errorarg.setText(str(data.panic_errorarg)) self.l_time_since_last_frame.setText(str(time.time() - data.last_frame)) # Stacks for i, sge in enumerate(self.stack_gui_elements): sge.update_data_from_slave(data.slaves[i]) class Worker(QObject): finished = pyqtSignal() # give worker class a finished signal def __init__(self, parent=None): QObject.__init__(self, parent=parent) self.continue_run = True # provide a bool run condition for the class def do_work(self): i = 1 while self.continue_run: # give the loop a stoppable condition # data.fill_dummy_data() self.charger_communication() # QThread.msleep(1) self.finished.emit() # emit the finished signal when the loop is done def charger_communication(self): rx_data = ser.read(256) # print(len(rx_data), rx_data) while len(rx_data) > 0: if (frame_start := self.check_log_start(rx_data)) != -1: self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue elif (frame_start := self.check_current_start(rx_data)) != -1: self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue elif (frame_start := self.check_panic_start(rx_data)) != -1: self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11]) rx_data = rx_data[frame_start + 11 :] continue break def check_log_start(self, buf: bytes): return buf[:-12].find(b"LOG") def check_current_start(self, buf: bytes): return buf[:-12].find(b"CUR") def check_panic_start(self, buf: bytes): return buf[:-12].find(b"PAN") def decode_log_frame(self, buf: bytes): msg_id = buf[0] slave = msg_id >> 4 frame_id = msg_id & 0x0F if slave >= N_SLAVES: print(f"Unknown slave: {slave}", file=sys.stderr) return if frame_id == 0: for i in range(7): data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV elif frame_id == 1: for i in range(3): data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV for i in range(4): data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV elif frame_id == 2: for i in range(7): data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV elif frame_id == 3: for i in range(7): data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV elif frame_id == 4: for i in range(7): data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV elif frame_id == 5: for i in range(7): data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV else: print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) # time.sleep(1) return voltages = [ slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves ] self.parse_cell_temps(slave) temps = [ slave.cell_temps[i] for i in range(TEMP_SENSORS_PER_SLAVE) for slave in data.slaves ] data.min_voltage = min(voltages) data.max_voltage = max(voltages) data.min_soc = self.calculate_soc(data.min_voltage) data.max_soc = self.calculate_soc(data.max_voltage) data.min_temp = min(temps) data.max_temp = max(temps) data.time_since_last_frame = time.time() - data.last_frame data.last_frame = time.time() def decode_current_frame(self, buf: bytes): # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) current = struct.unpack(">i", buf[2:6])[0] data.current = current / 1000.0 def decode_panic_frame(self, buf: bytes): data.panic = True errorcode = int(buf[0]) errorargs = [int(buf[1]), int(buf[2])] if errorcode == ERRORCODE_TIMEOUT_SLAVE: data.panic_errorcode = "Slave timeout" data.panic_errorarg = f"Slave ID {errorargs[0]}" elif errorcode == ERRORCODE_SLAVE_PANIC: data.panic_errorcode = "Slave panic" if errorargs[1] == SLAVE_ERROR_UV: slave_error = "Undervoltage" elif errorargs[1] == SLAVE_ERROR_OV: slave_error = "Overvoltage" elif errorargs[1] == SLAVE_ERROR_UT: slave_error = "Undertemperature" elif errorargs[1] == SLAVE_ERROR_OT: slave_error = "Overtemperature" elif errorargs[1] == SLAVE_ERROR_BQ: slave_error = "BQ Communication error" elif errorargs[1] == SLAVE_ERROR_TMP144: slave_error = "TMP144 communication error" else: slave_error = f"Unknown error ({errorargs[1]})" data.panic_errorarg = f"Slave ID {errorargs[0]}: {slave_error}" elif errorcode == ERRORCODE_TIMEOUT_SLAVE_FRAMES: data.panic_errorcode = "Slave frame timeout" data.panic_errorarg = f"Slave ID {errorargs[0]}, frame {errorargs[1]}" elif errorcode == ERRORCODE_TOO_FEW_TEMPS: data.panic_errorcode = "Too few temperature sensors" data.panic_errorarg = f"Slave ID {errorargs[0]}" elif errorcode == ERRORCODE_TIMEOUT_SHUNT: data.panic_errorcode = "Shunt timeout" elif errorcode == ERRORCODE_MASTER_THRESH: data.panic_errorcode = "Master detected threshold" if errorargs[0] == MASTER_THRESH_UT: data.panic_errorarg = "Undertemperature" elif errorargs[0] == MASTER_THRESH_OT: data.panic_errorarg = "Overtemperature" elif errorargs[0] == MASTER_THRESH_UV: data.panic_errorarg = "Undervoltage" elif errorargs[0] == MASTER_THRESH_OV: data.panic_errorarg = "Overvoltage" else: data.panic_errorarg = f"Unknown threshold ({errorargs[0]})" else: data.panic_errorcode = buf[0] data.panic_errorarg = buf[1] INTERNAL_RESISTANCE_CURVE_X = [2.0, 4.12] INTERNAL_RESISTANCE_CURVE_Y = [0.0528, 0.0294] SOC_OCV_X = [2.1, 2.9, 3.2, 3.3, 3.4, 3.5, 3.68, 4.0, 4.15, 4.2] SOC_OCV_Y = [0, 0.023, 0.06, 0.08, 0.119, 0.227, 0.541, 0.856, 0.985, 1.0] def calculate_soc(self, voltage: float): r_i = np.interp( [voltage], self.INTERNAL_RESISTANCE_CURVE_X, self.INTERNAL_RESISTANCE_CURVE_Y, )[0] # i = data.current / PARALLEL_CELLS i = 3 / PARALLEL_CELLS ocv = voltage - i * r_i return np.interp([ocv], self.SOC_OCV_X, self.SOC_OCV_Y)[0] * 100 def parse_cell_temps(self, slave: int): temps = list( filter(lambda t: t > 0 and t < 60, data.slaves[slave].cell_temps[:16]) ) if len(temps) == 0: temps = [-1] min_t = min(temps) max_t = max(temps) for i in range(16, 32): data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t) def stop(self): self.continue_run = False # set the run condition to false on stop if __name__ == "__main__": data = AccumulatorData() rx_buf = bytes() if len(sys.argv) != 2: print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) sys.exit(os.EX_USAGE) SERIAL_PORT = sys.argv[1] print(SERIAL_PORT) ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) app = QApplication(sys.argv) gui = Window() gui.show() timer = QTimer() # timer.timeout.connect(data.fill_dummy_data) timer.timeout.connect(gui.update) timer.start(1000) # every 1,000 milliseconds sys.exit(app.exec_())