Compare commits

...

2 Commits

Author SHA1 Message Date
jazzpi 61f652ee8f Errorcodes, SoC, 9 slaves 2022-08-08 01:23:20 +02:00
jazzpi cf4b1763fe rename file 2022-07-22 09:12:02 +02:00
2 changed files with 599 additions and 760 deletions

View File

@ -1,126 +1,57 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import math
import os import os
import struct import struct
import time import time
import random
import serial import serial
import sys import sys
import numpy as np
from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal
from PyQt5.QtWidgets import (
QApplication,
QWidget,
QPushButton,
QVBoxLayout,
QHBoxLayout,
QGridLayout,
QLabel,
QGroupBox,
QFrame,
QSizePolicy,
)
BITRATE = 115200 # baud/s BITRATE = 115200 # baud/s
TIMEOUT = 1 # seconds TIMEOUT = 1 # seconds
N_SLAVES = 6 N_SLAVES = 9
LOG_FRAME_LENGTH = 8 # bytes LOG_FRAME_LENGTH = 8 # bytes
PARALLEL_CELLS = 9
CELLS_PER_SLAVE = 10 CELLS_PER_SLAVE = 10
TEMP_SENSORS_PER_SLAVE = 32 TEMP_SENSORS_PER_SLAVE = 32
VOLTAGE_CONV = 5.0 / 255 # volts/quantum VOLTAGE_CONV = 5.0 / 255 # volts/quantum
TEMP_CONV = 0.0625 * 16 # °C/quantum TEMP_CONV = 0.0625 * 16 # °C/quantum
import os ERRORCODE_TIMEOUT_SLAVE = 1
ERRORCODE_SLAVE_PANIC = 2
# Windows ERRORCODE_TIMEOUT_SLAVE_FRAMES = 3
if os.name == "nt": ERRORCODE_TOO_FEW_TEMPS = 4
import msvcrt ERRORCODE_TIMEOUT_SHUNT = 6
ERRORCODE_MASTER_THRESH = 7
# Posix (Linux, OS X) SLAVE_ERROR_UV = 0
else: SLAVE_ERROR_OV = 1
import sys SLAVE_ERROR_UT = 2
import termios SLAVE_ERROR_OT = 3
import atexit SLAVE_ERROR_BQ = 4
from select import select SLAVE_ERROR_TMP144 = 5
MASTER_THRESH_UT = 0
MASTER_THRESH_OT = 1
class KBHit: MASTER_THRESH_UV = 2
def __init__(self): MASTER_THRESH_OV = 3
"""Creates a KBHit object that you can call to do various keyboard things."""
if os.name == "nt":
pass
else:
# Save the terminal settings
self.fd = sys.stdin.fileno()
self.new_term = termios.tcgetattr(self.fd)
self.old_term = termios.tcgetattr(self.fd)
# New terminal setting unbuffered
self.new_term[3] = self.new_term[3] & ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)
# Support normal-terminal reset at exit
atexit.register(self.set_normal_term)
def set_normal_term(self):
"""Resets to normal terminal. On Windows this is a no-op."""
if os.name == "nt":
pass
else:
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)
def getch(self):
"""Returns a keyboard character after kbhit() has been called.
Should not be called in the same program as getarrow().
"""
s = ""
if os.name == "nt":
return msvcrt.getch().decode("utf-8")
else:
return sys.stdin.read(1)
def getarrow(self):
"""Returns an arrow-key code after kbhit() has been called. Codes are
0 : up
1 : right
2 : down
3 : left
Should not be called in the same program as getch().
"""
if os.name == "nt":
msvcrt.getch() # skip 0xE0
c = msvcrt.getch()
vals = [72, 77, 80, 75]
else:
c = sys.stdin.read(3)[2]
vals = [65, 67, 66, 68]
return vals.index(ord(c.decode("utf-8")))
def kbhit(self):
"""Returns True if keyboard character was hit, False otherwise."""
if os.name == "nt":
return msvcrt.kbhit()
else:
dr, dw, de = select([sys.stdin], [], [], 0)
return dr != []
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
sys.exit(os.EX_USAGE)
SERIAL_PORT = sys.argv[1]
ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
def check_log_start(buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(buf: bytes):
return buf[:-12].find(b"PAN")
class SlaveData: class SlaveData:
@ -136,9 +67,12 @@ class AccumulatorData:
slaves: list[SlaveData] slaves: list[SlaveData]
min_voltage: float min_voltage: float
max_voltage: float max_voltage: float
min_soc: float
max_soc: float
min_temp: float min_temp: float
max_temp: float max_temp: float
last_frame: float last_frame: float
time_since_last_frame: float
current: float current: float
panic: bool panic: bool
panic_errorcode: int panic_errorcode: int
@ -148,119 +82,583 @@ class AccumulatorData:
self.slaves = [SlaveData() for _ in range(N_SLAVES)] self.slaves = [SlaveData() for _ in range(N_SLAVES)]
self.min_voltage = ( self.min_voltage = (
self.max_voltage self.max_voltage
) = (
self.min_soc
) = (
self.max_soc
) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0
self.panic = False self.panic = False
self.panic_errorcode = self.panic_errorarg = 0 self.panic_errorcode = self.panic_errorarg = 0
self.time_since_last_frame = 0
def fill_dummy_data(self):
self.min_voltage = random.uniform(1, 3)
self.max_voltage = random.uniform(3, 5)
self.min_temp = random.uniform(0, 25)
self.max_temp = random.uniform(25, 60)
self.current = random.uniform(25, 60)
self.last_frame = time.time() - random.random()
self.panic = random.choice([True, False])
if self.panic:
self.panic_errorcode = random.randint(1, 10000)
self.panic_errorarg = "ABCDERFG"
else:
self.panic_errorcode = 0
self.panic_errorarg = ""
data.time_since_last_frame = random.uniform(1, 60)
for i in range(N_SLAVES):
self.slaves[i].cell_voltages = [
random.uniform(1, 5) for i in range(CELLS_PER_SLAVE)
]
self.slaves[i].cell_temps = [
random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE)
]
rx_buf = bytes() class StackGuiElement:
data = AccumulatorData() title: str
stack_id: int
min_voltage_label: QLabel
max_voltage_label: QLabel
min_temp_label: QLabel
max_temp_label: QLabel
groupBox: QGroupBox
detail_popup: QWidget
def __init__(self, title: str, stack_id: int):
self.title = title
self.stack_id = stack_id
self.min_voltage_label = QLabel()
self.max_voltage_label = QLabel()
self.min_temp_label = QLabel()
self.max_temp_label = QLabel()
self.groupBox = QGroupBox()
self.detail_popup = None
self.__post__init__()
def __post__init__(self):
l1 = QLabel("Min Voltage [V]")
l2 = QLabel("Max Voltage [V]")
l3 = QLabel("Min Temperature [°C]")
l4 = QLabel("Max Temperature [°C]")
popup_btn = QPushButton(self.title + " details")
popup_btn.clicked.connect(self.show_popup)
l1.setAlignment(Qt.AlignLeft)
l2.setAlignment(Qt.AlignLeft)
l3.setAlignment(Qt.AlignLeft)
l4.setAlignment(Qt.AlignLeft)
self.min_voltage_label.setAlignment(Qt.AlignLeft)
self.max_voltage_label.setAlignment(Qt.AlignLeft)
self.min_temp_label.setAlignment(Qt.AlignLeft)
self.max_temp_label.setAlignment(Qt.AlignLeft)
grid = QGridLayout()
grid.addWidget(l1, 0, 0)
grid.addWidget(l2, 1, 0)
grid.addWidget(l3, 2, 0)
grid.addWidget(l4, 3, 0)
grid.addWidget(self.min_voltage_label, 0, 1)
grid.addWidget(self.max_voltage_label, 1, 1)
grid.addWidget(self.min_temp_label, 2, 1)
grid.addWidget(self.max_temp_label, 3, 1)
grid.addWidget(popup_btn, 0, 2)
self.groupBox.setTitle(self.title)
self.groupBox.setLayout(grid)
def update_data_from_slave(self, slave: SlaveData):
self.min_voltage_label.setNum(min(slave.cell_voltages))
self.max_voltage_label.setNum(max(slave.cell_voltages))
self.min_temp_label.setNum(min(slave.cell_temps))
self.max_temp_label.setNum(max(slave.cell_temps))
def show_popup(self):
if self.detail_popup is None:
self.detail_popup = StackPopup(self.stack_id)
self.detail_popup.show()
def decode_log_frame(buf: bytes): class QVSeperationLine(QFrame):
msg_id = buf[0] """
slave = msg_id >> 4 a vertical seperation line
frame_id = msg_id & 0x0F """
if slave >= N_SLAVES:
def __init__(self):
super().__init__()
self.setFixedWidth(20)
self.setMinimumHeight(1)
self.setFrameShape(QFrame.VLine)
self.setFrameShadow(QFrame.Sunken)
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred)
return return
if frame_id == 0:
for i in range(7):
data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV
elif frame_id == 1:
for i in range(3):
data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV
for i in range(4):
data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV
elif frame_id == 2:
for i in range(7):
data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV
elif frame_id == 3:
for i in range(7):
data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV
elif frame_id == 4:
for i in range(7):
data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV
elif frame_id == 5:
for i in range(7):
data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV
else:
# print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
# time.sleep(1)
return
data.last_frame = time.time() class StackPopup(QWidget):
stack_id: int
voltage_labels: list[QLabel]
temp_labels: list[QLabel]
def __init__(self, stack_id: int):
super().__init__()
self.stack_id = stack_id
self.voltage_labels = []
self.temp_labels = []
layout = QVBoxLayout()
groupbox = QGroupBox()
grid = QGridLayout()
for i in range(len(data.slaves[stack_id].cell_voltages)):
l1 = QLabel(f"Voltage Cell {i}")
l1.setAlignment(Qt.AlignLeft)
l_v = QLabel()
l_v.setNum(data.slaves[stack_id].cell_voltages[i])
l_v.setAlignment(Qt.AlignLeft)
self.voltage_labels.append(l_v)
grid.addWidget(l1, i, 0)
grid.addWidget(l_v, i, 1)
# vertical separators between rows in popup
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1)
num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE
for i in range(num_temp_cols):
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1)
for i in range(len(data.slaves[stack_id].cell_temps)):
l2 = QLabel(f"Temp. Sensor {i}")
l2.setAlignment(Qt.AlignLeft)
l_t = QLabel()
l_t.setNum(data.slaves[stack_id].cell_temps[i])
l_t.setAlignment(Qt.AlignLeft)
self.temp_labels.append(l_t)
grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3)
grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3)
groupbox.setTitle(f"Stack {stack_id}")
groupbox.setLayout(grid)
layout.addWidget(groupbox)
self.setLayout(layout)
self.update_data()
timer.timeout.connect(self.update_data)
def update_data(self):
for i in range(len(data.slaves[self.stack_id].cell_voltages)):
self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i])
for i in range(len(data.slaves[self.stack_id].cell_temps)):
self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i])
def decode_current_frame(buf: bytes): class Window(QWidget):
# current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
current = struct.unpack(">i", buf[2:6])[0] stop_signal = (
data.current = current / 1000.0 pyqtSignal()
) # make a stop signal to communicate with the worker in another thread
def __init__(self, parent=None):
super(Window, self).__init__(parent)
win = QVBoxLayout()
### ACCUMULATOR GENERAL ###
self.l1 = QLabel("Min Voltage [V]")
self.l1.setAlignment(Qt.AlignLeft)
self.l_min_voltage = QLabel()
self.l_min_voltage.setNum(data.min_voltage)
self.l_min_voltage.setAlignment(Qt.AlignLeft)
self.l2 = QLabel("Max Voltage [V]")
self.l2.setAlignment(Qt.AlignLeft)
self.l_max_voltage = QLabel()
self.l_max_voltage.setNum(data.max_voltage)
self.l_max_voltage.setAlignment(Qt.AlignLeft)
self.l3 = QLabel("Min SoC [%]")
self.l3.setAlignment(Qt.AlignLeft)
self.l_min_soc = QLabel()
self.l_min_soc.setNum(data.min_soc)
self.l_min_soc.setAlignment(Qt.AlignLeft)
self.l4 = QLabel("Max SoC [%]")
self.l4.setAlignment(Qt.AlignLeft)
self.l_max_soc = QLabel()
self.l_max_soc.setNum(data.max_soc)
self.l_max_soc.setAlignment(Qt.AlignLeft)
self.l5 = QLabel("Min Temperature [°C]")
self.l5.setAlignment(Qt.AlignLeft)
self.l_min_temp = QLabel()
self.l_min_temp.setNum(data.min_temp)
self.l_min_temp.setAlignment(Qt.AlignLeft)
self.l6 = QLabel("Max Temperature [°C]")
self.l6.setAlignment(Qt.AlignLeft)
self.l_max_temp = QLabel()
self.l_max_temp.setNum(data.max_temp)
self.l_max_temp.setAlignment(Qt.AlignLeft)
self.l7 = QLabel("Current [A]")
self.l7.setAlignment(Qt.AlignLeft)
self.l_current = QLabel()
self.l_current.setNum(data.current)
self.l_current.setAlignment(Qt.AlignLeft)
self.l8 = QLabel("Error")
self.l_error = QLabel()
self.l_error.setText(str(data.panic))
self.l_error.setAlignment(Qt.AlignLeft)
self.l_errorcode = QLabel()
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l_errorarg = QLabel()
self.l_errorarg.setText(str(data.panic_errorarg))
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l9 = QLabel("Time Since Last Dataframe")
self.l_time_since_last_frame = QLabel()
self.l_time_since_last_frame.setText(str(data.time_since_last_frame))
self.l_time_since_last_frame.setAlignment(Qt.AlignLeft)
### LAYOUT ACCUMULATOR GENERAL ###
grid_accumulator = QGridLayout()
grid_accumulator.addWidget(self.l1, 0, 0)
grid_accumulator.addWidget(self.l2, 1, 0)
grid_accumulator.addWidget(self.l3, 2, 0)
grid_accumulator.addWidget(self.l4, 3, 0)
grid_accumulator.addWidget(self.l5, 0, 2)
grid_accumulator.addWidget(self.l6, 1, 2)
grid_accumulator.addWidget(self.l7, 2, 2)
grid_accumulator.addWidget(self.l8, 5, 0)
grid_accumulator.addWidget(self.l9, 3, 2)
grid_accumulator.addWidget(self.l_min_voltage, 0, 1)
grid_accumulator.addWidget(self.l_max_voltage, 1, 1)
grid_accumulator.addWidget(self.l_min_soc, 2, 1)
grid_accumulator.addWidget(self.l_max_soc, 3, 1)
grid_accumulator.addWidget(self.l_min_temp, 0, 3)
grid_accumulator.addWidget(self.l_max_temp, 1, 3)
grid_accumulator.addWidget(self.l_current, 2, 3)
grid_accumulator.addWidget(self.l_error, 5, 1)
grid_accumulator.addWidget(self.l_errorcode, 5, 2)
grid_accumulator.addWidget(self.l_errorarg, 5, 3)
grid_accumulator.addWidget(self.l_time_since_last_frame, 3, 3)
groupBox_accumulator = QGroupBox("Accumulator General")
groupBox_accumulator.setLayout(grid_accumulator)
win.addWidget(groupBox_accumulator)
### STACKS ###
self.stack_gui_elements = []
for i in range(N_SLAVES):
sge = StackGuiElement(f"Stack {i}", i)
sge.update_data_from_slave(data.slaves[i])
self.stack_gui_elements.append(sge)
### LAYOUT STACKS ###
n_slaves_half = math.ceil(N_SLAVES / 2)
grid_stacks = QGridLayout()
for i, sge in enumerate(self.stack_gui_elements):
grid_stacks.addWidget(
sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half
)
groupBox_stacks = QGroupBox("Individual Stacks")
groupBox_stacks.setLayout(grid_stacks)
win.addWidget(groupBox_stacks)
### BUTTONS ###
self.btn_start = QPushButton("Start Serial")
self.btn_start.resize(self.btn_start.sizeHint())
self.btn_start.move(50, 50)
self.btn_stop = QPushButton("Stop Serial")
self.btn_stop.resize(self.btn_stop.sizeHint())
self.btn_stop.move(150, 50)
self.btn_charge = QPushButton("Start Charging")
self.btn_charge.resize(self.btn_stop.sizeHint())
self.btn_charge.move(150, 50)
### LAYOUT BUTTONS ###
vbox_controls = QVBoxLayout()
vbox_controls.addWidget(self.btn_start)
vbox_controls.addWidget(self.btn_stop)
vbox_controls.addWidget(self.btn_charge)
groupBox_controls = QGroupBox("Controls")
groupBox_controls.setLayout(vbox_controls)
win.addWidget(groupBox_controls)
# Start Button action
self.btn_start.clicked.connect(self.start_thread)
# Stop Button action
self.btn_stop.clicked.connect(self.stop_thread)
# Charge Button action
self.btn_charge.clicked.connect(self.start_charge)
self.setLayout(win)
self.setWindowTitle("FT22 Charger Display")
# When start_btn is clicked this runs. Creates the worker and the thread.
def start_thread(self):
self.thread = QThread()
self.worker = Worker()
self.stop_signal.connect(
self.worker.stop
) # connect stop signal to worker stop method
self.worker.moveToThread(self.thread)
self.worker.finished.connect(
self.thread.quit
) # connect the workers finished signal to stop thread
self.worker.finished.connect(
self.worker.deleteLater
) # connect the workers finished signal to clean up worker
self.thread.finished.connect(
self.thread.deleteLater
) # connect threads finished signal to clean up thread
self.thread.started.connect(self.worker.do_work)
self.thread.finished.connect(self.worker.stop)
self.thread.start()
# When stop_btn is clicked this runs. Terminates the worker and the thread.
def stop_thread(self):
self.stop_signal.emit() # emit the finished signal on stop
def start_charge(self):
ser.write(b"C")
def update(self):
# Accumulator
self.l_min_voltage.setText(f"{data.min_voltage:.02f}")
self.l_max_voltage.setText(f"{data.max_voltage:.02f}")
self.l_min_soc.setText(f"{data.min_soc:.01f}")
self.l_max_soc.setText(f"{data.max_soc:.01f}")
self.l_min_temp.setText(f"{data.min_temp:.00f}")
self.l_max_temp.setText(f"{data.max_temp:.00f}")
self.l_current.setText(f"{data.current:.03f}")
self.l_error.setText(str(data.panic))
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorarg.setText(str(data.panic_errorarg))
self.l_time_since_last_frame.setText(str(time.time() - data.last_frame))
# Stacks
for i, sge in enumerate(self.stack_gui_elements):
sge.update_data_from_slave(data.slaves[i])
def decode_panic_frame(buf: bytes): class Worker(QObject):
data.panic = True
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
finished = pyqtSignal() # give worker class a finished signal
def update_display(): def __init__(self, parent=None):
voltages = [ QObject.__init__(self, parent=parent)
slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves self.continue_run = True # provide a bool run condition for the class
]
temps = [
slave.cell_temps[i]
for i in range(TEMP_SENSORS_PER_SLAVE)
for slave in data.slaves
]
data.min_voltage = min(voltages)
data.max_voltage = max(voltages)
data.min_temp = min(temps)
data.max_temp = max(temps)
time_since_last_frame = time.time() - data.last_frame
print("\033[2J\033[H", end="") def do_work(self):
print("-" * 20) i = 1
if data.panic: while self.continue_run: # give the loop a stoppable condition
print("!!!!! PANIC !!!!!") # data.fill_dummy_data()
print(f"Error code: {data.panic_errorcode}") self.charger_communication()
print(f"Error arg: {data.panic_errorarg}") # QThread.msleep(1)
time.sleep(0.5)
return
print(f"Time since last frame: {time_since_last_frame} s") self.finished.emit() # emit the finished signal when the loop is done
print(f"Current: {data.current:.2f} A")
print(f"Min voltage: {data.min_voltage:.2f} V") def charger_communication(self):
print(f"Max voltage: {data.max_voltage:.2f} V") rx_data = ser.read(256)
print(f"Min temp: {data.min_temp:.1f} °C") # print(len(rx_data), rx_data)
print(f"Max temp: {data.max_temp:.1f} °C") while len(rx_data) > 0:
for i in range(N_SLAVES): if (frame_start := self.check_log_start(rx_data)) != -1:
min_v = min(data.slaves[i].cell_voltages) self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11])
max_v = max(data.slaves[i].cell_voltages) rx_data = rx_data[frame_start + 11 :]
min_t = min(data.slaves[i].cell_temps) continue
max_t = max(data.slaves[i].cell_temps) elif (frame_start := self.check_current_start(rx_data)) != -1:
print( self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11])
f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]" rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_panic_start(rx_data)) != -1:
self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
break
def check_log_start(self, buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(self, buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(self, buf: bytes):
return buf[:-12].find(b"PAN")
def decode_log_frame(self, buf: bytes):
msg_id = buf[0]
slave = msg_id >> 4
frame_id = msg_id & 0x0F
if slave >= N_SLAVES:
print(f"Unknown slave: {slave}", file=sys.stderr)
return
if frame_id == 0:
for i in range(7):
data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV
elif frame_id == 1:
for i in range(3):
data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV
for i in range(4):
data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV
elif frame_id == 2:
for i in range(7):
data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV
elif frame_id == 3:
for i in range(7):
data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV
elif frame_id == 4:
for i in range(7):
data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV
elif frame_id == 5:
for i in range(7):
data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV
else:
print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
# time.sleep(1)
return
voltages = [
slave.cell_voltages[i]
for i in range(CELLS_PER_SLAVE)
for slave in data.slaves
]
self.parse_cell_temps(slave)
temps = [
slave.cell_temps[i]
for i in range(TEMP_SENSORS_PER_SLAVE)
for slave in data.slaves
]
data.min_voltage = min(voltages)
data.max_voltage = max(voltages)
data.min_soc = self.calculate_soc(data.min_voltage)
data.max_soc = self.calculate_soc(data.max_voltage)
data.min_temp = min(temps)
data.max_temp = max(temps)
data.time_since_last_frame = time.time() - data.last_frame
data.last_frame = time.time()
def decode_current_frame(self, buf: bytes):
# current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
current = struct.unpack(">i", buf[2:6])[0]
data.current = current / 1000.0
def decode_panic_frame(self, buf: bytes):
data.panic = True
errorcode = int(buf[0])
errorargs = [int(buf[1]), int(buf[2])]
if errorcode == ERRORCODE_TIMEOUT_SLAVE:
data.panic_errorcode = "Slave timeout"
data.panic_errorarg = f"Slave ID {errorargs[0]}"
elif errorcode == ERRORCODE_SLAVE_PANIC:
data.panic_errorcode = "Slave panic"
if errorargs[1] == SLAVE_ERROR_UV:
slave_error = "Undervoltage"
elif errorargs[1] == SLAVE_ERROR_OV:
slave_error = "Overvoltage"
elif errorargs[1] == SLAVE_ERROR_UT:
slave_error = "Undertemperature"
elif errorargs[1] == SLAVE_ERROR_OT:
slave_error = "Overtemperature"
elif errorargs[1] == SLAVE_ERROR_BQ:
slave_error = "BQ Communication error"
elif errorargs[1] == SLAVE_ERROR_TMP144:
slave_error = "TMP144 communication error"
else:
slave_error = f"Unknown error ({errorargs[1]})"
data.panic_errorarg = f"Slave ID {errorargs[0]}: {slave_error}"
elif errorcode == ERRORCODE_TIMEOUT_SLAVE_FRAMES:
data.panic_errorcode = "Slave frame timeout"
data.panic_errorarg = f"Slave ID {errorargs[0]}, frame {errorargs[1]}"
elif errorcode == ERRORCODE_TOO_FEW_TEMPS:
data.panic_errorcode = "Too few temperature sensors"
data.panic_errorarg = f"Slave ID {errorargs[0]}"
elif errorcode == ERRORCODE_TIMEOUT_SHUNT:
data.panic_errorcode = "Shunt timeout"
elif errorcode == ERRORCODE_MASTER_THRESH:
data.panic_errorcode = "Master detected threshold"
if errorargs[0] == MASTER_THRESH_UT:
data.panic_errorarg = "Undertemperature"
elif errorargs[0] == MASTER_THRESH_OT:
data.panic_errorarg = "Overtemperature"
elif errorargs[0] == MASTER_THRESH_UV:
data.panic_errorarg = "Undervoltage"
elif errorargs[0] == MASTER_THRESH_OV:
data.panic_errorarg = "Overvoltage"
else:
data.panic_errorarg = f"Unknown threshold ({errorargs[0]})"
else:
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
INTERNAL_RESISTANCE_CURVE_X = [2.0, 4.12]
INTERNAL_RESISTANCE_CURVE_Y = [0.0528, 0.0294]
SOC_OCV_X = [2.1, 2.9, 3.2, 3.3, 3.4, 3.5, 3.68, 4.0, 4.15, 4.2]
SOC_OCV_Y = [0, 0.023, 0.06, 0.08, 0.119, 0.227, 0.541, 0.856, 0.985, 1.0]
def calculate_soc(self, voltage: float):
r_i = np.interp(
[voltage],
self.INTERNAL_RESISTANCE_CURVE_X,
self.INTERNAL_RESISTANCE_CURVE_Y,
)[0]
# i = data.current / PARALLEL_CELLS
i = 3 / PARALLEL_CELLS
ocv = voltage - i * r_i
return np.interp([ocv], self.SOC_OCV_X, self.SOC_OCV_Y)[0] * 100
def parse_cell_temps(self, slave: int):
temps = list(
filter(lambda t: t > 0 and t < 60, data.slaves[slave].cell_temps[:16])
) )
if len(temps) == 0:
temps = [-1]
min_t = min(temps)
max_t = max(temps)
for i in range(16, 32):
data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t)
def stop(self):
self.continue_run = False # set the run condition to false on stop
kb = KBHit() if __name__ == "__main__":
while True: data = AccumulatorData()
rx_data = ser.read(32) rx_buf = bytes()
if len(rx_data) > 0:
rx_buf = rx_data if len(sys.argv) != 2:
if (start := check_log_start(rx_buf)) != -1: print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
decode_log_frame(rx_buf[start + 3 : start + 11]) sys.exit(os.EX_USAGE)
rx_buf = b""
elif (start := check_current_start(rx_buf)) != -1: SERIAL_PORT = sys.argv[1]
decode_current_frame(rx_buf[start + 3 : start + 11]) print(SERIAL_PORT)
rx_buf = b"" ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
elif (start := check_panic_start(rx_buf)) != -1:
decode_panic_frame(rx_buf[start + 3 : start + 11]) app = QApplication(sys.argv)
rx_buf = b"" gui = Window()
if kb.kbhit(): gui.show()
c = kb.getch()
if c == "C": timer = QTimer()
print(f"KBHIT: {c}", file=sys.stderr) # timer.timeout.connect(data.fill_dummy_data)
print(ser.write(b"C"), file=sys.stderr) timer.timeout.connect(gui.update)
update_display() timer.start(1000) # every 1,000 milliseconds
sys.exit(app.exec_())

559
test.py
View File

@ -1,559 +0,0 @@
#!/usr/bin/env python3
import math
import os
import struct
import time
import random
import serial
import sys
from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal
from PyQt5.QtWidgets import (
QApplication,
QWidget,
QPushButton,
QVBoxLayout,
QHBoxLayout,
QGridLayout,
QLabel,
QGroupBox,
QFrame,
QSizePolicy,
)
BITRATE = 115200 # baud/s
TIMEOUT = 1 # seconds
N_SLAVES = 7
LOG_FRAME_LENGTH = 8 # bytes
CELLS_PER_SLAVE = 10
TEMP_SENSORS_PER_SLAVE = 32
VOLTAGE_CONV = 5.0 / 255 # volts/quantum
TEMP_CONV = 0.0625 * 16 # °C/quantum
class SlaveData:
cell_voltages: list[float]
cell_temps: list[float]
def __init__(self) -> None:
self.cell_voltages = [-1] * CELLS_PER_SLAVE
self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE
class AccumulatorData:
slaves: list[SlaveData]
min_voltage: float
max_voltage: float
min_temp: float
max_temp: float
last_frame: float
time_since_last_frame: float
current: float
panic: bool
panic_errorcode: int
panic_errorarg: int
def __init__(self) -> None:
self.slaves = [SlaveData() for _ in range(N_SLAVES)]
self.min_voltage = (
self.max_voltage
) = self.min_temp = self.max_temp = self.last_frame = self.current = 0
self.panic = False
self.panic_errorcode = self.panic_errorarg = 0
self.time_since_last_frame = 0
def fill_dummy_data(self):
self.min_voltage = random.uniform(1, 3)
self.max_voltage = random.uniform(3, 5)
self.min_temp = random.uniform(0, 25)
self.max_temp = random.uniform(25, 60)
self.current = random.uniform(25, 60)
self.last_frame = time.time() - random.random()
self.panic = random.choice([True, False])
if self.panic:
self.panic_errorcode = random.randint(1, 10000)
self.panic_errorarg = "ABCDERFG"
else:
self.panic_errorcode = 0
self.panic_errorarg = ""
data.time_since_last_frame = random.uniform(1, 60)
for i in range(N_SLAVES):
self.slaves[i].cell_voltages = [
random.uniform(1, 5) for i in range(CELLS_PER_SLAVE)
]
self.slaves[i].cell_temps = [
random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE)
]
class StackGuiElement:
title: str
stack_id: int
min_voltage_label: QLabel
max_voltage_label: QLabel
min_temp_label: QLabel
max_temp_label: QLabel
groupBox: QGroupBox
detail_popup: QWidget
def __init__(self, title: str, stack_id: int):
self.title = title
self.stack_id = stack_id
self.min_voltage_label = QLabel()
self.max_voltage_label = QLabel()
self.min_temp_label = QLabel()
self.max_temp_label = QLabel()
self.groupBox = QGroupBox()
self.detail_popup = None
self.__post__init__()
def __post__init__(self):
l1 = QLabel("Min Voltage [V]")
l2 = QLabel("Max Voltage [V]")
l3 = QLabel("Min Temperature [°C]")
l4 = QLabel("Max Temperature [°C]")
popup_btn = QPushButton(self.title + " details")
popup_btn.clicked.connect(self.show_popup)
l1.setAlignment(Qt.AlignLeft)
l2.setAlignment(Qt.AlignLeft)
l3.setAlignment(Qt.AlignLeft)
l4.setAlignment(Qt.AlignLeft)
self.min_voltage_label.setAlignment(Qt.AlignLeft)
self.max_voltage_label.setAlignment(Qt.AlignLeft)
self.min_temp_label.setAlignment(Qt.AlignLeft)
self.max_temp_label.setAlignment(Qt.AlignLeft)
grid = QGridLayout()
grid.addWidget(l1, 0, 0)
grid.addWidget(l2, 1, 0)
grid.addWidget(l3, 2, 0)
grid.addWidget(l4, 3, 0)
grid.addWidget(self.min_voltage_label, 0, 1)
grid.addWidget(self.max_voltage_label, 1, 1)
grid.addWidget(self.min_temp_label, 2, 1)
grid.addWidget(self.max_temp_label, 3, 1)
grid.addWidget(popup_btn, 0, 2)
self.groupBox.setTitle(self.title)
self.groupBox.setLayout(grid)
def update_data_from_slave(self, slave: SlaveData):
self.min_voltage_label.setNum(min(slave.cell_voltages))
self.max_voltage_label.setNum(max(slave.cell_voltages))
self.min_temp_label.setNum(min(slave.cell_temps))
self.max_temp_label.setNum(max(slave.cell_temps))
def show_popup(self):
if self.detail_popup is None:
self.detail_popup = StackPopup(self.stack_id)
self.detail_popup.show()
class QVSeperationLine(QFrame):
"""
a vertical seperation line
"""
def __init__(self):
super().__init__()
self.setFixedWidth(20)
self.setMinimumHeight(1)
self.setFrameShape(QFrame.VLine)
self.setFrameShadow(QFrame.Sunken)
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred)
return
class StackPopup(QWidget):
stack_id: int
voltage_labels: list[QLabel]
temp_labels: list[QLabel]
def __init__(self, stack_id: int):
super().__init__()
self.stack_id = stack_id
self.voltage_labels = []
self.temp_labels = []
layout = QVBoxLayout()
groupbox = QGroupBox()
grid = QGridLayout()
for i in range(len(data.slaves[stack_id].cell_voltages)):
l1 = QLabel(f"Voltage Cell {i}")
l1.setAlignment(Qt.AlignLeft)
l_v = QLabel()
l_v.setNum(data.slaves[stack_id].cell_voltages[i])
l_v.setAlignment(Qt.AlignLeft)
self.voltage_labels.append(l_v)
grid.addWidget(l1, i, 0)
grid.addWidget(l_v, i, 1)
# vertical separators between rows in popup
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1)
num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE
for i in range(num_temp_cols):
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1)
for i in range(len(data.slaves[stack_id].cell_temps)):
l2 = QLabel(f"Temp. Sensor {i}")
l2.setAlignment(Qt.AlignLeft)
l_t = QLabel()
l_t.setNum(data.slaves[stack_id].cell_temps[i])
l_t.setAlignment(Qt.AlignLeft)
self.temp_labels.append(l_t)
grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3)
grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3)
groupbox.setTitle(f"Stack {stack_id}")
groupbox.setLayout(grid)
layout.addWidget(groupbox)
self.setLayout(layout)
self.update_data()
timer.timeout.connect(self.update_data)
def update_data(self):
for i in range(len(data.slaves[self.stack_id].cell_voltages)):
self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i])
for i in range(len(data.slaves[self.stack_id].cell_temps)):
self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i])
class Window(QWidget):
stop_signal = (
pyqtSignal()
) # make a stop signal to communicate with the worker in another thread
def __init__(self, parent=None):
super(Window, self).__init__(parent)
win = QVBoxLayout()
### ACCUMULATOR GENERAL ###
self.l1 = QLabel("Min Voltage [V]")
self.l1.setAlignment(Qt.AlignLeft)
self.l_min_voltage = QLabel()
self.l_min_voltage.setNum(data.min_voltage)
self.l_min_voltage.setAlignment(Qt.AlignLeft)
self.l2 = QLabel("Max Voltage [V]")
self.l2.setAlignment(Qt.AlignLeft)
self.l_max_voltage = QLabel()
self.l_max_voltage.setNum(data.max_voltage)
self.l_max_voltage.setAlignment(Qt.AlignLeft)
self.l3 = QLabel("Min Temperature [°C]")
self.l3.setAlignment(Qt.AlignLeft)
self.l_min_temp = QLabel()
self.l_min_temp.setNum(data.min_temp)
self.l_min_temp.setAlignment(Qt.AlignLeft)
self.l4 = QLabel("Max Temperature [°C]")
self.l4.setAlignment(Qt.AlignLeft)
self.l_max_temp = QLabel()
self.l_max_temp.setNum(data.max_temp)
self.l_max_temp.setAlignment(Qt.AlignLeft)
self.l5 = QLabel("Current [A]")
self.l5.setAlignment(Qt.AlignLeft)
self.l_current = QLabel()
self.l_current.setNum(data.current)
self.l_current.setAlignment(Qt.AlignLeft)
self.l6 = QLabel("Error")
self.l_error = QLabel()
self.l_error.setText(str(data.panic))
self.l_error.setAlignment(Qt.AlignLeft)
self.l_errorcode = QLabel()
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l_errorarg = QLabel()
self.l_errorarg.setText(str(data.panic_errorarg))
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l7 = QLabel("Time Since Last Dataframe")
self.l_time_since_last_frame = QLabel()
self.l_time_since_last_frame.setText(str(data.time_since_last_frame))
self.l_time_since_last_frame.setAlignment(Qt.AlignLeft)
### LAYOUT ACCUMULATOR GENERAL ###
grid_accumulator = QGridLayout()
grid_accumulator.addWidget(self.l1, 0, 0)
grid_accumulator.addWidget(self.l2, 1, 0)
grid_accumulator.addWidget(self.l3, 0, 2)
grid_accumulator.addWidget(self.l4, 1, 2)
grid_accumulator.addWidget(self.l5, 2, 0)
grid_accumulator.addWidget(self.l6, 3, 0)
grid_accumulator.addWidget(self.l7, 4, 0)
grid_accumulator.addWidget(self.l_min_voltage, 0, 1)
grid_accumulator.addWidget(self.l_max_voltage, 1, 1)
grid_accumulator.addWidget(self.l_min_temp, 0, 3)
grid_accumulator.addWidget(self.l_max_temp, 1, 3)
grid_accumulator.addWidget(self.l_current, 2, 1)
grid_accumulator.addWidget(self.l_error, 3, 1)
grid_accumulator.addWidget(self.l_errorcode, 3, 2)
grid_accumulator.addWidget(self.l_errorarg, 3, 3)
grid_accumulator.addWidget(self.l_time_since_last_frame, 4, 1)
groupBox_accumulator = QGroupBox("Accumulator General")
groupBox_accumulator.setLayout(grid_accumulator)
win.addWidget(groupBox_accumulator)
### STACKS ###
self.stack_gui_elements = []
for i in range(N_SLAVES):
sge = StackGuiElement(f"Stack {i}", i)
sge.update_data_from_slave(data.slaves[i])
self.stack_gui_elements.append(sge)
### LAYOUT STACKS ###
n_slaves_half = math.ceil(N_SLAVES / 2)
grid_stacks = QGridLayout()
for i, sge in enumerate(self.stack_gui_elements):
grid_stacks.addWidget(
sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half
)
groupBox_stacks = QGroupBox("Individual Stacks")
groupBox_stacks.setLayout(grid_stacks)
win.addWidget(groupBox_stacks)
### BUTTONS ###
self.btn_start = QPushButton("Start Serial")
self.btn_start.resize(self.btn_start.sizeHint())
self.btn_start.move(50, 50)
self.btn_stop = QPushButton("Stop Serial")
self.btn_stop.resize(self.btn_stop.sizeHint())
self.btn_stop.move(150, 50)
self.btn_charge = QPushButton("Start Charging")
self.btn_charge.resize(self.btn_stop.sizeHint())
self.btn_charge.move(150, 50)
### LAYOUT BUTTONS ###
vbox_controls = QVBoxLayout()
vbox_controls.addWidget(self.btn_start)
vbox_controls.addWidget(self.btn_stop)
vbox_controls.addWidget(self.btn_charge)
groupBox_controls = QGroupBox("Controls")
groupBox_controls.setLayout(vbox_controls)
win.addWidget(groupBox_controls)
# Start Button action
self.btn_start.clicked.connect(self.start_thread)
# Stop Button action
self.btn_stop.clicked.connect(self.stop_thread)
# Charge Button action
self.btn_charge.clicked.connect(self.start_charge)
self.setLayout(win)
self.setWindowTitle("FT22 Charger Display")
# When start_btn is clicked this runs. Creates the worker and the thread.
def start_thread(self):
self.thread = QThread()
self.worker = Worker()
self.stop_signal.connect(
self.worker.stop
) # connect stop signal to worker stop method
self.worker.moveToThread(self.thread)
self.worker.finished.connect(
self.thread.quit
) # connect the workers finished signal to stop thread
self.worker.finished.connect(
self.worker.deleteLater
) # connect the workers finished signal to clean up worker
self.thread.finished.connect(
self.thread.deleteLater
) # connect threads finished signal to clean up thread
self.thread.started.connect(self.worker.do_work)
self.thread.finished.connect(self.worker.stop)
self.thread.start()
# When stop_btn is clicked this runs. Terminates the worker and the thread.
def stop_thread(self):
self.stop_signal.emit() # emit the finished signal on stop
def start_charge(self):
ser.write(b"C")
def update(self):
# Accumulator
self.l_min_voltage.setNum(data.min_voltage)
self.l_max_voltage.setNum(data.max_voltage)
self.l_min_temp.setNum(data.min_temp)
self.l_max_temp.setNum(data.max_temp)
self.l_current.setNum(data.current)
self.l_error.setText(str(data.panic))
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorarg.setText(str(data.panic_errorarg))
self.l_time_since_last_frame.setText(str(data.time_since_last_frame))
# Stacks
for i, sge in enumerate(self.stack_gui_elements):
sge.update_data_from_slave(data.slaves[i])
class Worker(QObject):
finished = pyqtSignal() # give worker class a finished signal
def __init__(self, parent=None):
QObject.__init__(self, parent=parent)
self.continue_run = True # provide a bool run condition for the class
def do_work(self):
i = 1
while self.continue_run: # give the loop a stoppable condition
# data.fill_dummy_data()
self.charger_communication()
# QThread.msleep(1)
self.finished.emit() # emit the finished signal when the loop is done
def charger_communication(self):
rx_data = ser.read(256)
while len(rx_data) > 0:
if (frame_start := self.check_log_start(rx_data)) != -1:
self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_current_start(rx_data)) != -1:
self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_panic_start(rx_data)) != -1:
self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
break
def check_log_start(self, buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(self, buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(self, buf: bytes):
return buf[:-12].find(b"PAN")
def decode_log_frame(self, buf: bytes):
msg_id = buf[0]
slave = msg_id >> 4
frame_id = msg_id & 0x0F
if slave >= N_SLAVES:
print(f"Unknown slave: {slave}", file=sys.stderr)
return
if frame_id == 0:
for i in range(7):
data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV
elif frame_id == 1:
for i in range(3):
data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV
for i in range(4):
data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV
elif frame_id == 2:
for i in range(7):
data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV
elif frame_id == 3:
for i in range(7):
data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV
elif frame_id == 4:
for i in range(7):
data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV
elif frame_id == 5:
for i in range(7):
data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV
else:
print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
# time.sleep(1)
return
voltages = [
slave.cell_voltages[i]
for i in range(CELLS_PER_SLAVE)
for slave in data.slaves
]
self.parse_cell_temps(slave)
temps = [
slave.cell_temps[i]
for i in range(TEMP_SENSORS_PER_SLAVE)
for slave in data.slaves
]
data.min_voltage = min(voltages)
data.max_voltage = max(voltages)
data.min_temp = min(temps)
data.max_temp = max(temps)
data.time_since_last_frame = time.time() - data.last_frame
data.last_frame = time.time()
def decode_current_frame(self, buf: bytes):
# current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
current = struct.unpack(">i", buf[2:6])[0]
data.current = current / 1000.0
def decode_panic_frame(self, buf: bytes):
data.panic = True
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
def parse_cell_temps(self, slave: int):
temps = list(filter(lambda t: t > 0, data.slaves[slave].cell_temps[:16]))
if len(temps) == 0:
temps = [-1]
min_t = min(temps)
max_t = max(temps)
for i in range(16, 32):
data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t)
def stop(self):
self.continue_run = False # set the run condition to false on stop
if __name__ == "__main__":
data = AccumulatorData()
rx_buf = bytes()
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
sys.exit(os.EX_USAGE)
SERIAL_PORT = sys.argv[1]
print(SERIAL_PORT)
try:
ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
except serial.serialutil.SerialException:
pass
app = QApplication(sys.argv)
gui = Window()
gui.show()
timer = QTimer()
# timer.timeout.connect(data.fill_dummy_data)
timer.timeout.connect(gui.update)
timer.start(1000) # every 1,000 milliseconds
sys.exit(app.exec_())