From b3bbcf730b239e97a04d3b46e68718b6d7aba1cb Mon Sep 17 00:00:00 2001 From: "f.geissler" Date: Tue, 19 Jul 2022 13:04:30 +0200 Subject: [PATCH] gui and extra thread worker implemented --- charger-display copy.py | 179 -------------------------- test.py | 278 +++++++++++++++++++++++++++++++--------- 2 files changed, 215 insertions(+), 242 deletions(-) delete mode 100755 charger-display copy.py diff --git a/charger-display copy.py b/charger-display copy.py deleted file mode 100755 index 4b73017..0000000 --- a/charger-display copy.py +++ /dev/null @@ -1,179 +0,0 @@ -#!/usr/bin/env python3 - -import os -import struct -import time -import serial -import sys - -BITRATE = 115200 # baud/s -TIMEOUT = 1 # seconds -N_SLAVES = 6 -LOG_FRAME_LENGTH = 8 # bytes - -CELLS_PER_SLAVE = 10 -TEMP_SENSORS_PER_SLAVE = 32 -VOLTAGE_CONV = 5.0 / 255 # volts/quantum -TEMP_CONV = 0.0625 * 16 # °C/quantum - - -class SlaveData: - cell_voltages: list[float] - cell_temps: list[float] - - def __init__(self) -> None: - self.cell_voltages = [-1] * CELLS_PER_SLAVE - self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE - - -class AccumulatorData: - slaves: list[SlaveData] - min_voltage: float - max_voltage: float - min_temp: float - max_temp: float - last_frame: float - current: float - panic: bool - panic_errorcode: int - panic_errorarg: int - - def __init__(self) -> None: - self.slaves = [SlaveData() for _ in range(N_SLAVES)] - self.min_voltage = ( - self.max_voltage - ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 - self.panic = False - self.panic_errorcode = self.panic_errorarg = 0 - - -def check_log_start(buf: bytes): - return buf[:-12].find(b"LOG") - - -def check_current_start(buf: bytes): - return buf[:-12].find(b"CUR") - - -def check_panic_start(buf: bytes): - return buf[:-12].find(b"PAN") - - -def decode_log_frame(buf: bytes): - msg_id = buf[0] - slave = msg_id >> 4 - frame_id = msg_id & 0x0F - if slave >= N_SLAVES: - return - - if frame_id == 0: - for i in range(7): - data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV - elif frame_id == 1: - for i in range(3): - data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV - for i in range(4): - data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV - elif frame_id == 2: - for i in range(7): - data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV - elif frame_id == 3: - for i in range(7): - data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV - elif frame_id == 4: - for i in range(7): - data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV - elif frame_id == 5: - for i in range(7): - data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV - else: - # print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) - # time.sleep(1) - return - - data.last_frame = time.time() - - -def decode_current_frame(buf: bytes): - # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) - current = struct.unpack(">i", buf[2:6])[0] - data.current = current / 1000.0 - - -def decode_panic_frame(buf: bytes): - data.panic = True - data.panic_errorcode = buf[0] - data.panic_errorarg = buf[1] - - -def update_display(): - voltages = [ - slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves - ] - temps = [ - slave.cell_temps[i] - for i in range(TEMP_SENSORS_PER_SLAVE) - for slave in data.slaves - ] - data.min_voltage = min(voltages) - data.max_voltage = max(voltages) - data.min_temp = min(temps) - data.max_temp = max(temps) - time_since_last_frame = time.time() - data.last_frame - - print("\033[2J\033[H", end="") - print("-" * 20) - if data.panic: - print("!!!!! PANIC !!!!!") - print(f"Error code: {data.panic_errorcode}") - print(f"Error arg: {data.panic_errorarg}") - time.sleep(0.5) - return - - print(f"Time since last frame: {time_since_last_frame} s") - print(f"Current: {data.current:.2f} A") - print(f"Min voltage: {data.min_voltage:.2f} V") - print(f"Max voltage: {data.max_voltage:.2f} V") - print(f"Min temp: {data.min_temp:.1f} °C") - print(f"Max temp: {data.max_temp:.1f} °C") - for i in range(N_SLAVES): - min_v = min(data.slaves[i].cell_voltages) - max_v = max(data.slaves[i].cell_voltages) - min_t = min(data.slaves[i].cell_temps) - max_t = max(data.slaves[i].cell_temps) - print( - f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]" - ) - - -if len(sys.argv) != 2: - print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) - sys.exit(os.EX_USAGE) - -SERIAL_PORT = sys.argv[1] -ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) - -rx_buf = bytes() -data = AccumulatorData() - -while True: - rx_data = ser.read(32) - if len(rx_data) > 0: - rx_buf = rx_data - if (start := check_log_start(rx_buf)) != -1: - decode_log_frame(rx_buf[start + 3 : start + 11]) - rx_buf = b"" - elif (start := check_current_start(rx_buf)) != -1: - decode_current_frame(rx_buf[start + 3 : start + 11]) - rx_buf = b"" - elif (start := check_panic_start(rx_buf)) != -1: - decode_panic_frame(rx_buf[start + 3 : start + 11]) - rx_buf = b"" - - """ - if Button Charge is Pressed - print(f"KBHIT: {c}", file=sys.stderr) - print(ser.write(b"C"), file=sys.stderr) - """ - - update_display() diff --git a/test.py b/test.py index 75c4b76..85391fa 100644 --- a/test.py +++ b/test.py @@ -8,8 +8,9 @@ import random # import serial import sys +from tkinter.tix import CELL -from PyQt5.QtCore import Qt, QTimer +from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal from PyQt5.QtWidgets import ( QApplication, QWidget, @@ -17,9 +18,7 @@ from PyQt5.QtWidgets import ( QVBoxLayout, QHBoxLayout, QGridLayout, - QSlider, QLabel, - QRadioButton, QGroupBox, ) @@ -51,6 +50,7 @@ class AccumulatorData: min_temp: float max_temp: float last_frame: float + time_since_last_frame: float current: float panic: bool panic_errorcode: int @@ -63,6 +63,31 @@ class AccumulatorData: ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 self.panic = False self.panic_errorcode = self.panic_errorarg = 0 + self.time_since_last_frame = 0 + + def fill_dummy_data(self): + self.min_voltage = random.uniform(1, 3) + self.max_voltage = random.uniform(3, 5) + self.min_temp = random.uniform(0, 25) + self.max_temp = random.uniform(25, 60) + self.current = random.uniform(25, 60) + self.last_frame = time.time() - random.random() + self.panic = random.choice([True, False]) + if self.panic: + self.panic_errorcode = random.randint(1, 10000) + self.panic_errorarg = "ABCDERFG" + else: + self.panic_errorcode = 0 + self.panic_errorarg = "" + data.time_since_last_frame = random.uniform(1, 60) + + for i in range(N_SLAVES): + self.slaves[i].cell_voltages = [ + random.uniform(1, 5) for i in range(CELLS_PER_SLAVE) + ] + self.slaves[i].cell_temps = [ + random.uniform(25, 60) for i in range(CELLS_PER_SLAVE) + ] class StackGuiElement: @@ -119,63 +144,12 @@ class StackGuiElement: self.max_temp_label.setNum(max(slave.cell_temps)) -def fill_dummy_data(): - data.min_voltage = random.uniform(1, 3) - data.max_voltage = random.uniform(3, 5) - data.min_temp = random.uniform(0, 25) - data.max_temp = random.uniform(25, 60) - data.current = random.uniform(25, 60) - data.last_frame = time.time() - random.random() - data.panic = random.choice([True, False]) - if data.panic: - data.panic_errorcode = random.randint(1, 10000) - data.panic_errorarg = "ABCDERFG" - else: - data.panic_errorcode = 0 - data.panic_errorarg = "" - - -def update_display(): - voltages = [ - slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves - ] - temps = [ - slave.cell_temps[i] - for i in range(TEMP_SENSORS_PER_SLAVE) - for slave in data.slaves - ] - data.min_voltage = min(voltages) - data.max_voltage = max(voltages) - data.min_temp = min(temps) - data.max_temp = max(temps) - time_since_last_frame = time.time() - data.last_frame - - print("\033[2J\033[H", end="") - print("-" * 20) - if data.panic: - print("!!!!! PANIC !!!!!") - print(f"Error code: {data.panic_errorcode}") - print(f"Error arg: {data.panic_errorarg}") - time.sleep(0.5) - return - - print(f"Time since last frame: {time_since_last_frame} s") - print(f"Current: {data.current:.2f} A") - print(f"Min voltage: {data.min_voltage:.2f} V") - print(f"Max voltage: {data.max_voltage:.2f} V") - print(f"Min temp: {data.min_temp:.1f} °C") - print(f"Max temp: {data.max_temp:.1f} °C") - for i in range(N_SLAVES): - min_v = min(data.slaves[i].cell_voltages) - max_v = max(data.slaves[i].cell_voltages) - min_t = min(data.slaves[i].cell_temps) - max_t = max(data.slaves[i].cell_temps) - print( - f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]" - ) - - class Window(QWidget): + + stop_signal = ( + pyqtSignal() + ) # make a stop signal to communicate with the worker in another thread + def __init__(self, parent=None): super(Window, self).__init__(parent) @@ -223,6 +197,12 @@ class Window(QWidget): self.l_errorarg.setText(str(data.panic_errorarg)) self.l_errorcode.setAlignment(Qt.AlignLeft) + self.l7 = QLabel("Time Since Last Dataframe") + self.l_time_since_last_frame = QLabel() + self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) + self.l_time_since_last_frame.setAlignment(Qt.AlignLeft) + + ### LAYOUT ACCUMULATOR GENERAL ### grid_accumulator = QGridLayout() grid_accumulator.addWidget(self.l1, 0, 0) grid_accumulator.addWidget(self.l2, 1, 0) @@ -230,6 +210,7 @@ class Window(QWidget): grid_accumulator.addWidget(self.l4, 1, 2) grid_accumulator.addWidget(self.l5, 2, 0) grid_accumulator.addWidget(self.l6, 3, 0) + grid_accumulator.addWidget(self.l7, 4, 0) grid_accumulator.addWidget(self.l_min_voltage, 0, 1) grid_accumulator.addWidget(self.l_max_voltage, 1, 1) @@ -239,6 +220,7 @@ class Window(QWidget): grid_accumulator.addWidget(self.l_error, 3, 1) grid_accumulator.addWidget(self.l_errorcode, 3, 2) grid_accumulator.addWidget(self.l_errorarg, 3, 3) + grid_accumulator.addWidget(self.l_time_since_last_frame, 4, 1) groupBox_accumulator = QGroupBox("Accumulator General") groupBox_accumulator.setLayout(grid_accumulator) @@ -246,14 +228,13 @@ class Window(QWidget): win.addWidget(groupBox_accumulator) ### STACKS ### - self.stack_gui_elements = [] for i in range(N_SLAVES): sge = StackGuiElement(f"Stack {i}") sge.update_data_from_slave(data.slaves[i]) self.stack_gui_elements.append(sge) - ### Layout Stacks ### + ### LAYOUT STACKS ### n_slaves_half = N_SLAVES // 2 grid_stacks = QGridLayout() for i, sge in enumerate(self.stack_gui_elements): @@ -266,9 +247,68 @@ class Window(QWidget): win.addWidget(groupBox_stacks) + ### BUTTONS ### + self.btn_start = QPushButton("Start Serial") + self.btn_start.resize(self.btn_start.sizeHint()) + self.btn_start.move(50, 50) + self.btn_stop = QPushButton("Stop Serial") + self.btn_stop.resize(self.btn_stop.sizeHint()) + self.btn_stop.move(150, 50) + self.btn_charge = QPushButton("Start Charging") + self.btn_charge.resize(self.btn_stop.sizeHint()) + self.btn_charge.move(150, 50) + + ### LAYOUT BUTTONS ### + vbox_controls = QVBoxLayout() + vbox_controls.addWidget(self.btn_start) + vbox_controls.addWidget(self.btn_stop) + vbox_controls.addWidget(self.btn_charge) + groupBox_controls = QGroupBox("Controls") + groupBox_controls.setLayout(vbox_controls) + win.addWidget(groupBox_controls) + + # Start Button action + self.btn_start.clicked.connect(self.start_thread) + + # Stop Button action + self.btn_stop.clicked.connect(self.stop_thread) + + # Charge Button action + self.btn_charge.clicked.connect(self.start_charge) + self.setLayout(win) self.setWindowTitle("FT22 Charger Display") + # When start_btn is clicked this runs. Creates the worker and the thread. + def start_thread(self): + self.thread = QThread() + self.worker = Worker() + self.stop_signal.connect( + self.worker.stop + ) # connect stop signal to worker stop method + self.worker.moveToThread(self.thread) + + self.worker.finished.connect( + self.thread.quit + ) # connect the workers finished signal to stop thread + self.worker.finished.connect( + self.worker.deleteLater + ) # connect the workers finished signal to clean up worker + self.thread.finished.connect( + self.thread.deleteLater + ) # connect threads finished signal to clean up thread + + self.thread.started.connect(self.worker.do_work) + self.thread.finished.connect(self.worker.stop) + self.thread.start() + + # When stop_btn is clicked this runs. Terminates the worker and the thread. + def stop_thread(self): + self.stop_signal.emit() # emit the finished signal on stop + + def start_charge(self): + ser.write(b"C") + def update(self): # Accumulator self.l_min_voltage.setNum(data.min_voltage) @@ -279,21 +319,133 @@ class Window(QWidget): self.l_error.setText(str(data.panic)) self.l_errorcode.setText(str(data.panic_errorcode)) self.l_errorarg.setText(str(data.panic_errorarg)) + self.l_time_since_last_frame.setText(str(data.time_since_last_frame)) - # Cells + # Stacks for i, sge in enumerate(self.stack_gui_elements): sge.update_data_from_slave(data.slaves[i]) +class Worker(QObject): + + finished = pyqtSignal() # give worker class a finished signal + + def __init__(self, parent=None): + QObject.__init__(self, parent=parent) + self.continue_run = True # provide a bool run condition for the class + + def do_work(self): + i = 1 + while self.continue_run: # give the loop a stoppable condition + data.fill_dummy_data() + # self.charger_communication() + # QThread.sleep(1) + + self.finished.emit() # emit the finished signal when the loop is done + + def charger_communication(self): + rx_data = ser.read(32) + if len(rx_data) > 0: + rx_buf = rx_data + if (start := self.check_log_start(rx_buf)) != -1: + self.decode_log_frame(rx_buf[start + 3 : start + 11]) + rx_buf = b"" + elif (start := self.check_current_start(rx_buf)) != -1: + self.decode_current_frame(rx_buf[start + 3 : start + 11]) + rx_buf = b"" + elif (start := self.check_panic_start(rx_buf)) != -1: + self.decode_panic_frame(rx_buf[start + 3 : start + 11]) + rx_buf = b"" + + def check_log_start(buf: bytes): + return buf[:-12].find(b"LOG") + + def check_current_start(buf: bytes): + return buf[:-12].find(b"CUR") + + def check_panic_start(buf: bytes): + return buf[:-12].find(b"PAN") + + def decode_log_frame(buf: bytes): + msg_id = buf[0] + slave = msg_id >> 4 + frame_id = msg_id & 0x0F + if slave >= N_SLAVES: + return + + if frame_id == 0: + for i in range(7): + data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV + elif frame_id == 1: + for i in range(3): + data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV + for i in range(4): + data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV + elif frame_id == 2: + for i in range(7): + data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV + elif frame_id == 3: + for i in range(7): + data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV + elif frame_id == 4: + for i in range(7): + data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV + elif frame_id == 5: + for i in range(7): + data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV + else: + # print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) + # time.sleep(1) + return + voltages = [ + slave.cell_voltages[i] + for i in range(CELLS_PER_SLAVE) + for slave in data.slaves + ] + temps = [ + slave.cell_temps[i] + for i in range(TEMP_SENSORS_PER_SLAVE) + for slave in data.slaves + ] + data.min_voltage = min(voltages) + data.max_voltage = max(voltages) + data.min_temp = min(temps) + data.max_temp = max(temps) + data.time_since_last_frame = time.time() - data.last_frame + + data.last_frame = time.time() + + def decode_current_frame(buf: bytes): + # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5]) + current = struct.unpack(">i", buf[2:6])[0] + data.current = current / 1000.0 + + def decode_panic_frame(buf: bytes): + data.panic = True + data.panic_errorcode = buf[0] + data.panic_errorarg = buf[1] + + def stop(self): + self.continue_run = False # set the run condition to false on stop + + if __name__ == "__main__": data = AccumulatorData() + rx_buf = bytes() + # if len(sys.argv) != 2: + # print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr) + # sys.exit(os.EX_USAGE) + + # SERIAL_PORT = sys.argv[1] + # ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT) + app = QApplication(sys.argv) gui = Window() gui.show() timer = QTimer() + # timer.timeout.connect(data.fill_dummy_data) timer.timeout.connect(gui.update) - timer.timeout.connect(fill_dummy_data) timer.start(1000) # every 1,000 milliseconds # update_display()