diff --git a/charger-display.py b/charger-display.py index c2e6f3a..806d6af 100755 --- a/charger-display.py +++ b/charger-display.py @@ -1,126 +1,74 @@ #!/usr/bin/env python3 +from enum import Enum +import math import os import struct import time +import random import serial import sys +import numpy as np + +from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal +from PyQt5.QtWidgets import ( + QApplication, + QWidget, + QPushButton, + QVBoxLayout, + QHBoxLayout, + QGridLayout, + QLabel, + QGroupBox, + QFrame, + QSizePolicy, +) + + BITRATE = 115200 # baud/s TIMEOUT = 1 # seconds -N_SLAVES = 6 +N_SLAVES = 9 LOG_FRAME_LENGTH = 8 # bytes +PARALLEL_CELLS = 9 CELLS_PER_SLAVE = 10 TEMP_SENSORS_PER_SLAVE = 32 -VOLTAGE_CONV = 5.0 / 255 # volts/quantum -TEMP_CONV = 0.0625 * 16 # °C/quantum +VOLTAGE_CONV = 5.0 / 0xFFFF # volts/quantum +TEMP_CONV = 0.0625 # °C/quantum -import os +ERRORCODE_TIMEOUT_SLAVE = 1 +ERRORCODE_SLAVE_PANIC = 2 +ERRORCODE_TIMEOUT_SLAVE_FRAMES = 3 +ERRORCODE_TOO_FEW_TEMPS = 4 +ERRORCODE_TIMEOUT_SHUNT = 6 +ERRORCODE_MASTER_THRESH = 7 +SLAVE_ERROR_UV = 0 +SLAVE_ERROR_OV = 1 +SLAVE_ERROR_UT = 2 +SLAVE_ERROR_OT = 3 +SLAVE_ERROR_BQ = 4 +SLAVE_ERROR_TMP144 = 5 +MASTER_THRESH_UT = 0 +MASTER_THRESH_OT = 1 +MASTER_THRESH_UV = 2 +MASTER_THRESH_OV = 3 -# Windows -if os.name == "nt": - import msvcrt - -# Posix (Linux, OS X) -else: - import sys - import termios - import atexit - from select import select +INTERNAL_RESISTANCE_CURVE_X = [2.0, 4.12] +INTERNAL_RESISTANCE_CURVE_Y = [0.0528, 0.0294] +SOC_OCV_X = [2.1, 2.9, 3.2, 3.3, 3.4, 3.5, 3.68, 4.0, 4.15, 4.2] +SOC_OCV_Y = [0, 0.023, 0.06, 0.08, 0.119, 0.227, 0.541, 0.856, 0.985, 1.0] -class KBHit: - def __init__(self): - """Creates a KBHit object that you can call to do various keyboard things.""" - - if os.name == "nt": - pass - - else: - - # Save the terminal settings - self.fd = sys.stdin.fileno() - self.new_term = termios.tcgetattr(self.fd) - self.old_term = termios.tcgetattr(self.fd) - - # New terminal setting unbuffered - self.new_term[3] = self.new_term[3] & ~termios.ICANON & ~termios.ECHO - termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term) - - # Support normal-terminal reset at exit - atexit.register(self.set_normal_term) - - def set_normal_term(self): - """Resets to normal terminal. On Windows this is a no-op.""" - - if os.name == "nt": - pass - - else: - termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term) - - def getch(self): - """Returns a keyboard character after kbhit() has been called. - Should not be called in the same program as getarrow(). - """ - - s = "" - - if os.name == "nt": - return msvcrt.getch().decode("utf-8") - - else: - return sys.stdin.read(1) - - def getarrow(self): - """Returns an arrow-key code after kbhit() has been called. Codes are - 0 : up - 1 : right - 2 : down - 3 : left - Should not be called in the same program as getch(). - """ - - if os.name == "nt": - msvcrt.getch() # skip 0xE0 - c = msvcrt.getch() - vals = [72, 77, 80, 75] - - else: - c = sys.stdin.read(3)[2] - vals = [65, 67, 66, 68] - - return vals.index(ord(c.decode("utf-8"))) - - def kbhit(self): - """Returns True if keyboard character was hit, False otherwise.""" - if os.name == "nt": - return msvcrt.kbhit() - - else: - dr, dw, de = select([sys.stdin], [], [], 0) - return dr != [] - - -if len(sys.argv) != 2: - print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) - sys.exit(os.EX_USAGE) - -SERIAL_PORT = sys.argv[1] -ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) - - -def check_log_start(buf: bytes): - return buf[:-12].find(b"LOG") - - -def check_current_start(buf: bytes): - return buf[:-12].find(b"CUR") - - -def check_panic_start(buf: bytes): - return buf[:-12].find(b"PAN") +def estimate_soc(voltage: float) -> float: + r_i = np.interp( + [voltage], + INTERNAL_RESISTANCE_CURVE_X, + INTERNAL_RESISTANCE_CURVE_Y, + )[0] + i = data.current / PARALLEL_CELLS + ocv = voltage - i * r_i + return np.interp([ocv], SOC_OCV_X, SOC_OCV_Y)[0] * 100 class SlaveData: @@ -129,138 +77,653 @@ class SlaveData: def __init__(self) -> None: self.cell_voltages = [-1] * CELLS_PER_SLAVE - self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE + self.cell_temps = ([-1] * (TEMP_SENSORS_PER_SLAVE // 2)) + ( + [0] * (TEMP_SENSORS_PER_SLAVE // 2) + ) + + +class TSState(Enum): + TS_INACTIVE = 0 + TS_ACTIVE = 1 + TS_PRECHARGE = 2 + TS_DISCHARGE = 3 + TS_ERROR = 4 + TS_CHARGING_CHECK = 5 + TS_CHARGING = 6 + TS_BALANCING = 7 class AccumulatorData: slaves: list[SlaveData] min_voltage: float max_voltage: float + min_soc: float + max_soc: float min_temp: float max_temp: float last_frame: float + time_since_last_frame: float current: float panic: bool panic_errorcode: int panic_errorarg: int + ts_state: TSState def __init__(self) -> None: self.slaves = [SlaveData() for _ in range(N_SLAVES)] self.min_voltage = ( self.max_voltage + ) = ( + self.min_soc + ) = ( + self.max_soc ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 self.panic = False self.panic_errorcode = self.panic_errorarg = 0 + self.time_since_last_frame = 0 + self.ts_state = TSState.TS_INACTIVE + + def fill_dummy_data(self): + self.min_voltage = random.uniform(1, 3) + self.max_voltage = random.uniform(3, 5) + self.min_temp = random.uniform(0, 25) + self.max_temp = random.uniform(25, 60) + self.current = random.uniform(25, 60) + self.last_frame = time.time() - random.random() + self.panic = random.choice([True, False]) + if self.panic: + self.panic_errorcode = random.randint(1, 10000) + self.panic_errorarg = "ABCDERFG" + else: + self.panic_errorcode = 0 + self.panic_errorarg = "" + data.time_since_last_frame = random.uniform(1, 60) + + for i in range(N_SLAVES): + self.slaves[i].cell_voltages = [ + random.uniform(1, 5) for i in range(CELLS_PER_SLAVE) + ] + self.slaves[i].cell_temps = [ + random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE) + ] -rx_buf = bytes() -data = AccumulatorData() +class StackGuiElement: + title: str + stack_id: int + voltage_label: QLabel + soc_label: QLabel + temp_label: QLabel + groupBox: QGroupBox + detail_popup: QWidget + + def __init__(self, title: str, stack_id: int): + self.title = title + self.stack_id = stack_id + self.voltage_label = QLabel() + self.soc_label = QLabel() + self.temp_label = QLabel() + self.groupBox = QGroupBox() + self.detail_popup = None + self.__post__init__() + + def __post__init__(self): + l1 = QLabel("Voltage [V]") + l2 = QLabel("SoC [%]") + l3 = QLabel("Temperature [°C]") + popup_btn = QPushButton("Details") + popup_btn.clicked.connect(self.show_popup) + + l1.setAlignment(Qt.AlignLeft) + l2.setAlignment(Qt.AlignLeft) + l3.setAlignment(Qt.AlignLeft) + + self.voltage_label.setAlignment(Qt.AlignLeft) + self.soc_label.setAlignment(Qt.AlignLeft) + self.temp_label.setAlignment(Qt.AlignLeft) + + grid = QGridLayout() + grid.addWidget(l1, 0, 0) + grid.addWidget(l2, 1, 0) + grid.addWidget(l3, 2, 0) + + grid.addWidget(self.voltage_label, 0, 1) + grid.addWidget(self.soc_label, 1, 1) + grid.addWidget(self.temp_label, 2, 1) + + grid.addWidget(popup_btn, 3, 0) + + self.groupBox.setTitle(self.title) + self.groupBox.setLayout(grid) + + def update_data_from_slave(self, slave: SlaveData): + min_v, max_v = min(slave.cell_voltages), max(slave.cell_voltages) + self.voltage_label.setText(f"[{min_v:.02f}, {max_v:.02f}]") + min_soc, max_soc = estimate_soc(min_v), estimate_soc(max_v) + self.soc_label.setText(f"[{min_soc:02.0f}, {max_soc:02.0f}]") + min_t, max_t = min(slave.cell_temps), max(slave.cell_temps) + self.temp_label.setText(f"[{min_t:.02f}, {max_t:.02f}]") + + def show_popup(self): + if self.detail_popup is None: + self.detail_popup = StackPopup(self.stack_id) + self.detail_popup.show() -def decode_log_frame(buf: bytes): - msg_id = buf[0] - slave = msg_id >> 4 - frame_id = msg_id & 0x0F - if slave >= N_SLAVES: +class QVSeperationLine(QFrame): + """ + a vertical seperation line + """ + + def __init__(self): + super().__init__() + self.setFixedWidth(20) + self.setMinimumHeight(1) + self.setFrameShape(QFrame.VLine) + self.setFrameShadow(QFrame.Sunken) + self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred) return - if frame_id == 0: - for i in range(7): - data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV - elif frame_id == 1: - for i in range(3): - data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV - for i in range(4): - data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV - elif frame_id == 2: - for i in range(7): - data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV - elif frame_id == 3: - for i in range(7): - data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV - elif frame_id == 4: - for i in range(7): - data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV - elif frame_id == 5: - for i in range(7): - data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV - else: - # print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) - # time.sleep(1) - return - data.last_frame = time.time() +class StackPopup(QWidget): + stack_id: int + voltage_labels: list[QLabel] + temp_labels: list[QLabel] + + def __init__(self, stack_id: int): + super().__init__() + self.stack_id = stack_id + self.voltage_labels = [] + self.temp_labels = [] + layout = QVBoxLayout() + groupbox = QGroupBox() + + grid = QGridLayout() + + for i in range(len(data.slaves[stack_id].cell_voltages)): + l1 = QLabel(f"Voltage Cell {i}") + l1.setAlignment(Qt.AlignLeft) + + l_v = QLabel() + l_v.setNum(data.slaves[stack_id].cell_voltages[i]) + l_v.setAlignment(Qt.AlignLeft) + self.voltage_labels.append(l_v) + + grid.addWidget(l1, i, 0) + grid.addWidget(l_v, i, 1) + + # vertical separators between rows in popup + separator_vertical = QVSeperationLine() + grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1) + num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE + for i in range(num_temp_cols): + separator_vertical = QVSeperationLine() + grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1) + + for i in range(len(data.slaves[stack_id].cell_temps)): + l2 = QLabel(f"Temp. Sensor {i}") + l2.setAlignment(Qt.AlignLeft) + + l_t = QLabel() + l_t.setNum(data.slaves[stack_id].cell_temps[i]) + l_t.setAlignment(Qt.AlignLeft) + self.temp_labels.append(l_t) + + grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3) + grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3) + + groupbox.setTitle(f"Stack {stack_id}") + groupbox.setLayout(grid) + layout.addWidget(groupbox) + self.setLayout(layout) + self.setWindowTitle(f"FT22 Charger Display - Stack {stack_id} Details") + self.update_data() + timer.timeout.connect(self.update_data) + + def update_data(self): + for i in range(len(data.slaves[self.stack_id].cell_voltages)): + self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i]) + for i in range(len(data.slaves[self.stack_id].cell_temps)): + self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i]) -def decode_current_frame(buf: bytes): - # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) - current = struct.unpack(">i", buf[2:6])[0] - data.current = current / 1000.0 +class Window(QWidget): + + stop_signal = ( + pyqtSignal() + ) # make a stop signal to communicate with the worker in another thread + + def __init__(self, parent=None): + super(Window, self).__init__(parent) + + win = QVBoxLayout() + + ### ACCUMULATOR GENERAL ### + self.l1 = QLabel("Min Voltage [V]") + self.l1.setAlignment(Qt.AlignLeft) + self.l_min_voltage = QLabel() + self.l_min_voltage.setNum(data.min_voltage) + self.l_min_voltage.setAlignment(Qt.AlignLeft) + + self.l2 = QLabel("Max Voltage [V]") + self.l2.setAlignment(Qt.AlignLeft) + self.l_max_voltage = QLabel() + self.l_max_voltage.setNum(data.max_voltage) + self.l_max_voltage.setAlignment(Qt.AlignLeft) + + self.l3 = QLabel("Min SoC [%]") + self.l3.setAlignment(Qt.AlignLeft) + self.l_min_soc = QLabel() + self.l_min_soc.setNum(data.min_soc) + self.l_min_soc.setAlignment(Qt.AlignLeft) + + self.l4 = QLabel("Max SoC [%]") + self.l4.setAlignment(Qt.AlignLeft) + self.l_max_soc = QLabel() + self.l_max_soc.setNum(data.max_soc) + self.l_max_soc.setAlignment(Qt.AlignLeft) + + self.l5 = QLabel("Min Temperature [°C]") + self.l5.setAlignment(Qt.AlignLeft) + self.l_min_temp = QLabel() + self.l_min_temp.setNum(data.min_temp) + self.l_min_temp.setAlignment(Qt.AlignLeft) + + self.l6 = QLabel("Max Temperature [°C]") + self.l6.setAlignment(Qt.AlignLeft) + self.l_max_temp = QLabel() + self.l_max_temp.setNum(data.max_temp) + self.l_max_temp.setAlignment(Qt.AlignLeft) + + self.l7 = QLabel("Current [A]") + self.l7.setAlignment(Qt.AlignLeft) + self.l_current = QLabel() + self.l_current.setNum(data.current) + self.l_current.setAlignment(Qt.AlignLeft) + + self.l8 = QLabel("State") + self.l_state = QLabel() + self.l_state.setText(data.ts_state.name) + self.l_state.setAlignment(Qt.AlignLeft) + self.l_errorcode = QLabel() + self.l_errorcode.setText("") + self.l_errorcode.setAlignment(Qt.AlignLeft) + self.l_errorarg = QLabel() + self.l_errorarg.setText("") + self.l_errorcode.setAlignment(Qt.AlignLeft) + + self.l9 = QLabel("Time Since Last Dataframe") + self.l_time_since_last_frame = QLabel() + self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) + self.l_time_since_last_frame.setAlignment(Qt.AlignLeft) + + ### LAYOUT ACCUMULATOR GENERAL ### + grid_accumulator = QGridLayout() + grid_accumulator.addWidget(self.l1, 0, 0) + grid_accumulator.addWidget(self.l2, 1, 0) + grid_accumulator.addWidget(self.l3, 2, 0) + grid_accumulator.addWidget(self.l4, 3, 0) + grid_accumulator.addWidget(self.l5, 0, 2) + grid_accumulator.addWidget(self.l6, 1, 2) + grid_accumulator.addWidget(self.l7, 2, 2) + grid_accumulator.addWidget(self.l8, 5, 0) + grid_accumulator.addWidget(self.l9, 3, 2) + + grid_accumulator.addWidget(self.l_min_voltage, 0, 1) + grid_accumulator.addWidget(self.l_max_voltage, 1, 1) + grid_accumulator.addWidget(self.l_min_soc, 2, 1) + grid_accumulator.addWidget(self.l_max_soc, 3, 1) + grid_accumulator.addWidget(self.l_min_temp, 0, 3) + grid_accumulator.addWidget(self.l_max_temp, 1, 3) + grid_accumulator.addWidget(self.l_current, 2, 3) + grid_accumulator.addWidget(self.l_state, 5, 1) + grid_accumulator.addWidget(self.l_errorcode, 5, 2) + grid_accumulator.addWidget(self.l_errorarg, 5, 3) + grid_accumulator.addWidget(self.l_time_since_last_frame, 3, 3) + + groupBox_accumulator = QGroupBox("Accumulator General") + groupBox_accumulator.setLayout(grid_accumulator) + + win.addWidget(groupBox_accumulator) + + ### STACKS ### + self.stack_gui_elements = [] + for i in range(N_SLAVES): + sge = StackGuiElement(f"Stack {i}", i) + sge.update_data_from_slave(data.slaves[i]) + self.stack_gui_elements.append(sge) + + ### LAYOUT STACKS ### + n_slaves_half = math.ceil(N_SLAVES / 2) + grid_stacks = QGridLayout() + for i, sge in enumerate(self.stack_gui_elements): + grid_stacks.addWidget( + sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half + ) + + groupBox_stacks = QGroupBox("Individual Stacks") + groupBox_stacks.setLayout(grid_stacks) + + win.addWidget(groupBox_stacks) + + ### BUTTONS ### + self.btn_start = QPushButton("Start Serial") + self.btn_start.resize(self.btn_start.sizeHint()) + self.btn_start.move(50, 50) + self.btn_stop = QPushButton("Stop Serial") + self.btn_stop.resize(self.btn_stop.sizeHint()) + self.btn_stop.move(150, 50) + self.btn_charge = QPushButton("Start Charging") + self.btn_charge.resize(self.btn_charge.sizeHint()) + self.btn_charge.move(250, 50) + self.btn_balance = QPushButton("Start Balancing") + self.btn_balance.resize(self.btn_balance.sizeHint()) + self.btn_balance.move(350, 50) + + ### LAYOUT BUTTONS ### + vbox_controls = QVBoxLayout() + vbox_controls.addWidget(self.btn_start) + vbox_controls.addWidget(self.btn_stop) + vbox_controls.addWidget(self.btn_charge) + vbox_controls.addWidget(self.btn_balance) + groupBox_controls = QGroupBox("Controls") + groupBox_controls.setLayout(vbox_controls) + win.addWidget(groupBox_controls) + + # Start Button action + self.btn_start.clicked.connect(self.start_thread) + + # Stop Button action + self.btn_stop.clicked.connect(self.stop_thread) + + # Charge Button action + self.btn_charge.clicked.connect(self.start_charge) + + # Balance Button action + self.btn_balance.clicked.connect(self.start_balance) + + self.setLayout(win) + self.setWindowTitle("FT22 Charger Display") + + # When start_btn is clicked this runs. Creates the worker and the thread. + def start_thread(self): + self.thread = QThread() + self.worker = Worker() + self.stop_signal.connect( + self.worker.stop + ) # connect stop signal to worker stop method + self.worker.moveToThread(self.thread) + + self.worker.finished.connect( + self.thread.quit + ) # connect the workers finished signal to stop thread + self.worker.finished.connect( + self.worker.deleteLater + ) # connect the workers finished signal to clean up worker + self.thread.finished.connect( + self.thread.deleteLater + ) # connect threads finished signal to clean up thread + + self.thread.started.connect(self.worker.do_work) + self.thread.finished.connect(self.worker.stop) + self.thread.start() + + # When stop_btn is clicked this runs. Terminates the worker and the thread. + def stop_thread(self): + self.stop_signal.emit() # emit the finished signal on stop + + def start_charge(self): + ser.write(b"C") + + def start_balance(self): + ser.write(b"B") + + def update(self): + # Accumulator + self.l_min_voltage.setText(f"{data.min_voltage:.02f}") + self.l_max_voltage.setText(f"{data.max_voltage:.02f}") + self.l_min_soc.setText(f"{data.min_soc:02.0f}") + self.l_max_soc.setText(f"{data.max_soc:02.0f}") + self.l_min_temp.setText(f"{data.min_temp:.02f}") + self.l_max_temp.setText(f"{data.max_temp:.02f}") + self.l_current.setText(f"{data.current:.02f}") + self.l_state.setText(data.ts_state.name) + if data.panic: + self.l_errorcode.setText(str(data.panic_errorcode)) + self.l_errorarg.setText(str(data.panic_errorarg)) + for l in (self.l_state, self.l_errorcode, self.l_errorarg): + l.setStyleSheet("color: red; font-weight: bold;") + elif data.ts_state == TSState.TS_CHARGING: + for l in (self.l_state, self.l_errorcode, self.l_errorarg): + l.setStyleSheet("color: green; font-weight: bold;") + else: + self.l_errorcode.setText("") + self.l_errorarg.setText("") + for l in (self.l_state, self.l_errorcode, self.l_errorarg): + l.setStyleSheet("color: black; font-weight: normal;") + last_time = time.time() - data.last_frame + self.l_time_since_last_frame.setText(f"{last_time:.03f}") + + # Stacks + for i, sge in enumerate(self.stack_gui_elements): + sge.update_data_from_slave(data.slaves[i]) -def decode_panic_frame(buf: bytes): - data.panic = True - data.panic_errorcode = buf[0] - data.panic_errorarg = buf[1] +class Worker(QObject): + finished = pyqtSignal() # give worker class a finished signal -def update_display(): - voltages = [ - slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves - ] - temps = [ - slave.cell_temps[i] - for i in range(TEMP_SENSORS_PER_SLAVE) - for slave in data.slaves - ] - data.min_voltage = min(voltages) - data.max_voltage = max(voltages) - data.min_temp = min(temps) - data.max_temp = max(temps) - time_since_last_frame = time.time() - data.last_frame + def __init__(self, parent=None): + QObject.__init__(self, parent=parent) + self.continue_run = True # provide a bool run condition for the class - print("\033[2J\033[H", end="") - print("-" * 20) - if data.panic: - print("!!!!! PANIC !!!!!") - print(f"Error code: {data.panic_errorcode}") - print(f"Error arg: {data.panic_errorarg}") - time.sleep(0.5) - return + def do_work(self): + i = 1 + while self.continue_run: # give the loop a stoppable condition + # data.fill_dummy_data() + self.charger_communication() + # QThread.msleep(1) - print(f"Time since last frame: {time_since_last_frame} s") - print(f"Current: {data.current:.2f} A") - print(f"Min voltage: {data.min_voltage:.2f} V") - print(f"Max voltage: {data.max_voltage:.2f} V") - print(f"Min temp: {data.min_temp:.1f} °C") - print(f"Max temp: {data.max_temp:.1f} °C") - for i in range(N_SLAVES): - min_v = min(data.slaves[i].cell_voltages) - max_v = max(data.slaves[i].cell_voltages) - min_t = min(data.slaves[i].cell_temps) - max_t = max(data.slaves[i].cell_temps) - print( - f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]" + self.finished.emit() # emit the finished signal when the loop is done + + def charger_communication(self): + rx_data = ser.read(256) + # print(len(rx_data), rx_data) + while len(rx_data) > 0: + if (frame_start := self.check_log_start(rx_data)) != -1: + self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11]) + rx_data = rx_data[frame_start + 11 :] + continue + elif (frame_start := self.check_current_start(rx_data)) != -1: + self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11]) + rx_data = rx_data[frame_start + 11 :] + continue + elif (frame_start := self.check_panic_start(rx_data)) != -1: + self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11]) + rx_data = rx_data[frame_start + 11 :] + continue + elif (frame_start := self.check_status_start(rx_data)) != -1: + self.decode_status_frame(rx_data[frame_start + 3 : frame_start + 11]) + rx_data = rx_data[frame_start + 11 :] + continue + break + + def check_log_start(self, buf: bytes): + return buf[:-12].find(b"LOG") + + def check_current_start(self, buf: bytes): + return buf[:-12].find(b"CUR") + + def check_panic_start(self, buf: bytes): + return buf[:-12].find(b"PAN") + + def check_status_start(self, buf: bytes): + return buf[:-12].find(b"STA") + + def decode_log_frame(self, buf: bytes): + msg_id = buf[0] + slave = msg_id >> 4 + frame_id = msg_id & 0x0F + if slave >= N_SLAVES: + print(f"Unknown slave: {slave}", file=sys.stderr) + return + + if frame_id == 0: + for i in range(3): + raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] + data.slaves[slave].cell_voltages[i] = raw * VOLTAGE_CONV + elif frame_id == 1: + for i in range(3): + raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] + data.slaves[slave].cell_voltages[i + 3] = raw * VOLTAGE_CONV + elif frame_id == 2: + for i in range(3): + raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] + data.slaves[slave].cell_voltages[i + 6] = raw * VOLTAGE_CONV + elif frame_id == 3: + for i in range(1): + raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] + data.slaves[slave].cell_voltages[i + 9] = raw * VOLTAGE_CONV + for i in range(2): + raw = (buf[i * 2 + 3] << 8) | buf[i * 2 + 4] + data.slaves[slave].cell_temps[i] = raw * TEMP_CONV + elif frame_id == 4: + for i in range(3): + raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] + data.slaves[slave].cell_temps[i + 2] = raw * TEMP_CONV + elif frame_id == 5: + for i in range(3): + raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] + data.slaves[slave].cell_temps[i + 5] = raw * TEMP_CONV + elif frame_id == 6: + for i in range(3): + raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] + data.slaves[slave].cell_temps[i + 8] = raw * TEMP_CONV + elif frame_id == 7: + for i in range(3): + raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] + data.slaves[slave].cell_temps[i + 11] = raw * TEMP_CONV + elif frame_id == 8: + for i in range(3): + raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2] + data.slaves[slave].cell_temps[i + 14] = raw * TEMP_CONV + else: + print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) + # time.sleep(1) + return + voltages = [ + slave.cell_voltages[i] + for i in range(CELLS_PER_SLAVE) + for slave in data.slaves + ] + self.parse_cell_temps(slave) + temps = [ + slave.cell_temps[i] + for i in range(TEMP_SENSORS_PER_SLAVE) + for slave in data.slaves + ] + data.min_voltage = min(voltages) + data.max_voltage = max(voltages) + data.min_soc = estimate_soc(data.min_voltage) + data.max_soc = estimate_soc(data.max_voltage) + data.min_temp = min(temps) + data.max_temp = max(temps) + data.time_since_last_frame = time.time() - data.last_frame + + data.last_frame = time.time() + + def decode_current_frame(self, buf: bytes): + # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) + current = struct.unpack(">i", buf[2:6])[0] + data.current = current / 1000.0 + + def decode_panic_frame(self, buf: bytes): + data.panic = True + errorcode = int(buf[0]) + errorargs = [int(buf[1]), int(buf[2])] + if errorcode == ERRORCODE_TIMEOUT_SLAVE: + data.panic_errorcode = "Slave timeout" + data.panic_errorarg = f"Slave ID {errorargs[0]}" + elif errorcode == ERRORCODE_SLAVE_PANIC: + data.panic_errorcode = "Slave panic" + if errorargs[1] == SLAVE_ERROR_UV: + slave_error = "Undervoltage" + elif errorargs[1] == SLAVE_ERROR_OV: + slave_error = "Overvoltage" + elif errorargs[1] == SLAVE_ERROR_UT: + slave_error = "Undertemperature" + elif errorargs[1] == SLAVE_ERROR_OT: + slave_error = "Overtemperature" + elif errorargs[1] == SLAVE_ERROR_BQ: + slave_error = "BQ Communication error" + elif errorargs[1] == SLAVE_ERROR_TMP144: + slave_error = "TMP144 communication error" + else: + slave_error = f"Unknown error ({errorargs[1]})" + data.panic_errorarg = f"Slave ID {errorargs[0]}: {slave_error}" + elif errorcode == ERRORCODE_TIMEOUT_SLAVE_FRAMES: + data.panic_errorcode = "Slave frame timeout" + data.panic_errorarg = f"Slave ID {errorargs[0]}, frame {errorargs[1]}" + elif errorcode == ERRORCODE_TOO_FEW_TEMPS: + data.panic_errorcode = "Too few temperature sensors" + data.panic_errorarg = f"Slave ID {errorargs[0]}" + elif errorcode == ERRORCODE_TIMEOUT_SHUNT: + data.panic_errorcode = "Shunt timeout" + elif errorcode == ERRORCODE_MASTER_THRESH: + data.panic_errorcode = "Master detected threshold" + if errorargs[0] == MASTER_THRESH_UT: + data.panic_errorarg = "Undertemperature" + elif errorargs[0] == MASTER_THRESH_OT: + data.panic_errorarg = "Overtemperature" + elif errorargs[0] == MASTER_THRESH_UV: + data.panic_errorarg = "Undervoltage" + elif errorargs[0] == MASTER_THRESH_OV: + data.panic_errorarg = "Overvoltage" + else: + data.panic_errorarg = f"Unknown threshold ({errorargs[0]})" + else: + data.panic_errorcode = buf[0] + data.panic_errorarg = buf[1] + + def decode_status_frame(self, buf: bytes): + state = buf[0] & 0x7F + data.ts_state = TSState(state) + data.panic = data.ts_state == TSState.TS_ERROR + + def parse_cell_temps(self, slave: int): + temps = list( + filter(lambda t: t > 0 and t < 60, data.slaves[slave].cell_temps[:16]) ) + if len(temps) == 0: + temps = [-1] + min_t = min(temps) + max_t = max(temps) + # for i in range(16, 32): + # data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t) + + def stop(self): + self.continue_run = False # set the run condition to false on stop -kb = KBHit() -while True: - rx_data = ser.read(32) - if len(rx_data) > 0: - rx_buf = rx_data - if (start := check_log_start(rx_buf)) != -1: - decode_log_frame(rx_buf[start + 3 : start + 11]) - rx_buf = b"" - elif (start := check_current_start(rx_buf)) != -1: - decode_current_frame(rx_buf[start + 3 : start + 11]) - rx_buf = b"" - elif (start := check_panic_start(rx_buf)) != -1: - decode_panic_frame(rx_buf[start + 3 : start + 11]) - rx_buf = b"" - if kb.kbhit(): - c = kb.getch() - if c == "C": - print(f"KBHIT: {c}", file=sys.stderr) - print(ser.write(b"C"), file=sys.stderr) - update_display() +if __name__ == "__main__": + data = AccumulatorData() + rx_buf = bytes() + + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) + sys.exit(os.EX_USAGE) + + SERIAL_PORT = sys.argv[1] + print(SERIAL_PORT) + ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) + + app = QApplication(sys.argv) + gui = Window() + gui.show() + + timer = QTimer() + # timer.timeout.connect(data.fill_dummy_data) + timer.timeout.connect(gui.update) + timer.start(1000) # every 1,000 milliseconds + + sys.exit(app.exec_()) diff --git a/poetry.lock b/poetry.lock new file mode 100644 index 0000000..2570341 --- /dev/null +++ b/poetry.lock @@ -0,0 +1,81 @@ +[[package]] +name = "pyqt5" +version = "5.15.7" +description = "Python bindings for the Qt cross platform application toolkit" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +PyQt5-Qt5 = ">=5.15.0" +PyQt5-sip = ">=12.11,<13" + +[[package]] +name = "pyqt5-qt5" +version = "5.15.2" +description = "The subset of a Qt installation needed by PyQt5." +category = "main" +optional = false +python-versions = "*" + +[[package]] +name = "pyqt5-sip" +version = "12.11.0" +description = "The sip module support for PyQt5" +category = "main" +optional = false +python-versions = ">=3.7" + +[[package]] +name = "pyserial" +version = "3.5" +description = "Python Serial Port Extension" +category = "main" +optional = false +python-versions = "*" + +[package.extras] +cp2110 = ["hidapi"] + +[metadata] +lock-version = "1.1" +python-versions = "^3.10" +content-hash = "9d0dcef05f7296031f76c658f997974caf9b45c1efa59678ef5823b803580e43" + +[metadata.files] +pyqt5 = [ + {file = "PyQt5-5.15.7-cp37-abi3-macosx_10_13_x86_64.whl", hash = "sha256:1a793748c60d5aff3850b7abf84d47c1d41edb11231b7d7c16bef602c36be643"}, + {file = "PyQt5-5.15.7-cp37-abi3-manylinux1_x86_64.whl", hash = "sha256:e319c9d8639e0729235c1b09c99afdadad96fa3dbd8392ab561b5ab5946ee6ef"}, + {file = "PyQt5-5.15.7-cp37-abi3-win32.whl", hash = "sha256:08694f0a4c7d4f3d36b2311b1920e6283240ad3b7c09b515e08262e195dcdf37"}, + {file = "PyQt5-5.15.7-cp37-abi3-win_amd64.whl", hash = "sha256:232fe5b135a095cbd024cf341d928fc672c963f88e6a52b0c605be8177c2fdb5"}, + {file = "PyQt5-5.15.7.tar.gz", hash = "sha256:755121a52b3a08cb07275c10ebb96576d36e320e572591db16cfdbc558101594"}, +] +pyqt5-qt5 = [ + {file = "PyQt5_Qt5-5.15.2-py3-none-macosx_10_13_intel.whl", hash = "sha256:76980cd3d7ae87e3c7a33bfebfaee84448fd650bad6840471d6cae199b56e154"}, + {file = "PyQt5_Qt5-5.15.2-py3-none-manylinux2014_x86_64.whl", hash = "sha256:1988f364ec8caf87a6ee5d5a3a5210d57539988bf8e84714c7d60972692e2f4a"}, + {file = "PyQt5_Qt5-5.15.2-py3-none-win32.whl", hash = "sha256:9cc7a768b1921f4b982ebc00a318ccb38578e44e45316c7a4a850e953e1dd327"}, + {file = "PyQt5_Qt5-5.15.2-py3-none-win_amd64.whl", hash = "sha256:750b78e4dba6bdf1607febedc08738e318ea09e9b10aea9ff0d73073f11f6962"}, +] +pyqt5-sip = [ + {file = "PyQt5_sip-12.11.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:f1f9e312ff8284d6dfebc5366f6f7d103f84eec23a4da0be0482403933e68660"}, + {file = "PyQt5_sip-12.11.0-cp310-cp310-manylinux1_x86_64.whl", hash = "sha256:4031547dfb679be309094bfa79254f5badc5ddbe66b9ad38e319d84a7d612443"}, + {file = "PyQt5_sip-12.11.0-cp310-cp310-win32.whl", hash = "sha256:ad21ca0ee8cae2a41b61fc04949dccfab6fe008749627d94e8c7078cb7a73af1"}, + {file = "PyQt5_sip-12.11.0-cp310-cp310-win_amd64.whl", hash = "sha256:3126c84568ab341c12e46ded2230f62a9a78752a70fdab13713f89a71cd44f73"}, + {file = "PyQt5_sip-12.11.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:0f77655c62ec91d47c2c99143f248624d44dd2d8a12d016e7c020508ad418aca"}, + {file = "PyQt5_sip-12.11.0-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:ec5e9ef78852e1f96f86d7e15c9215878422b83dde36d44f1539a3062942f19c"}, + {file = "PyQt5_sip-12.11.0-cp37-cp37m-win32.whl", hash = "sha256:d12b81c3a08abf7657a2ebc7d3649852a1f327eb2146ebadf45930486d32e920"}, + {file = "PyQt5_sip-12.11.0-cp37-cp37m-win_amd64.whl", hash = "sha256:b69a1911f768b489846335e31e49eb34795c6b5a038ca24d894d751e3b0b44da"}, + {file = "PyQt5_sip-12.11.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:51e377789d59196213eddf458e6927f33ba9d217b614d17d20df16c9a8b2c41c"}, + {file = "PyQt5_sip-12.11.0-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:4e5c1559311515291ea0ab0635529f14536954e3b973a7c7890ab7e4de1c2c23"}, + {file = "PyQt5_sip-12.11.0-cp38-cp38-win32.whl", hash = "sha256:9bca450c5306890cb002fe36bbca18f979dd9e5b810b766dce8e3ce5e66ba795"}, + {file = "PyQt5_sip-12.11.0-cp38-cp38-win_amd64.whl", hash = "sha256:f6b72035da4e8fecbb0bc4a972e30a5674a9ad5608dbddaa517e983782dbf3bf"}, + {file = "PyQt5_sip-12.11.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:9356260d4feb60dbac0ab66f8a791a0d2cda1bf98c9dec8e575904a045fbf7c5"}, + {file = "PyQt5_sip-12.11.0-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:205f3e1b3eea3597d8e878936c1a06e04bd23a59e8b179ee806465d72eea3071"}, + {file = "PyQt5_sip-12.11.0-cp39-cp39-win32.whl", hash = "sha256:686071be054e5be6ca5aaaef7960931d4ba917277e839e2e978c7cbe3f43bb6e"}, + {file = "PyQt5_sip-12.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:42320e7a94b1085ed85d49794ed4ccfe86f1cae80b44a894db908a8aba2bc60e"}, + {file = "PyQt5_sip-12.11.0.tar.gz", hash = "sha256:b4710fd85b57edef716cc55fae45bfd5bfac6fc7ba91036f1dcc3f331ca0eb39"}, +] +pyserial = [ + {file = "pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0"}, + {file = "pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb"}, +] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1a7d44b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[tool.poetry] +name = "charger-display" +version = "0.1.0" +description = "" +authors = ["Your Name "] + +[tool.poetry.dependencies] +python = "^3.10" +PyQt5 = "^5.15.7" +pyserial = "^3.5" + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api"