Compare commits

...

2 Commits

Author SHA1 Message Date
jazzpi 61f652ee8f Errorcodes, SoC, 9 slaves 2022-08-08 01:23:20 +02:00
jazzpi cf4b1763fe rename file 2022-07-22 09:12:02 +02:00
2 changed files with 599 additions and 760 deletions

View File

@ -1,126 +1,57 @@
#!/usr/bin/env python3
import math
import os
import struct
import time
import random
import serial
import sys
import numpy as np
from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal
from PyQt5.QtWidgets import (
QApplication,
QWidget,
QPushButton,
QVBoxLayout,
QHBoxLayout,
QGridLayout,
QLabel,
QGroupBox,
QFrame,
QSizePolicy,
)
BITRATE = 115200 # baud/s
TIMEOUT = 1 # seconds
N_SLAVES = 6
N_SLAVES = 9
LOG_FRAME_LENGTH = 8 # bytes
PARALLEL_CELLS = 9
CELLS_PER_SLAVE = 10
TEMP_SENSORS_PER_SLAVE = 32
VOLTAGE_CONV = 5.0 / 255 # volts/quantum
TEMP_CONV = 0.0625 * 16 # °C/quantum
import os
# Windows
if os.name == "nt":
import msvcrt
# Posix (Linux, OS X)
else:
import sys
import termios
import atexit
from select import select
class KBHit:
def __init__(self):
"""Creates a KBHit object that you can call to do various keyboard things."""
if os.name == "nt":
pass
else:
# Save the terminal settings
self.fd = sys.stdin.fileno()
self.new_term = termios.tcgetattr(self.fd)
self.old_term = termios.tcgetattr(self.fd)
# New terminal setting unbuffered
self.new_term[3] = self.new_term[3] & ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)
# Support normal-terminal reset at exit
atexit.register(self.set_normal_term)
def set_normal_term(self):
"""Resets to normal terminal. On Windows this is a no-op."""
if os.name == "nt":
pass
else:
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)
def getch(self):
"""Returns a keyboard character after kbhit() has been called.
Should not be called in the same program as getarrow().
"""
s = ""
if os.name == "nt":
return msvcrt.getch().decode("utf-8")
else:
return sys.stdin.read(1)
def getarrow(self):
"""Returns an arrow-key code after kbhit() has been called. Codes are
0 : up
1 : right
2 : down
3 : left
Should not be called in the same program as getch().
"""
if os.name == "nt":
msvcrt.getch() # skip 0xE0
c = msvcrt.getch()
vals = [72, 77, 80, 75]
else:
c = sys.stdin.read(3)[2]
vals = [65, 67, 66, 68]
return vals.index(ord(c.decode("utf-8")))
def kbhit(self):
"""Returns True if keyboard character was hit, False otherwise."""
if os.name == "nt":
return msvcrt.kbhit()
else:
dr, dw, de = select([sys.stdin], [], [], 0)
return dr != []
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
sys.exit(os.EX_USAGE)
SERIAL_PORT = sys.argv[1]
ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
def check_log_start(buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(buf: bytes):
return buf[:-12].find(b"PAN")
ERRORCODE_TIMEOUT_SLAVE = 1
ERRORCODE_SLAVE_PANIC = 2
ERRORCODE_TIMEOUT_SLAVE_FRAMES = 3
ERRORCODE_TOO_FEW_TEMPS = 4
ERRORCODE_TIMEOUT_SHUNT = 6
ERRORCODE_MASTER_THRESH = 7
SLAVE_ERROR_UV = 0
SLAVE_ERROR_OV = 1
SLAVE_ERROR_UT = 2
SLAVE_ERROR_OT = 3
SLAVE_ERROR_BQ = 4
SLAVE_ERROR_TMP144 = 5
MASTER_THRESH_UT = 0
MASTER_THRESH_OT = 1
MASTER_THRESH_UV = 2
MASTER_THRESH_OV = 3
class SlaveData:
@ -136,9 +67,12 @@ class AccumulatorData:
slaves: list[SlaveData]
min_voltage: float
max_voltage: float
min_soc: float
max_soc: float
min_temp: float
max_temp: float
last_frame: float
time_since_last_frame: float
current: float
panic: bool
panic_errorcode: int
@ -148,20 +82,433 @@ class AccumulatorData:
self.slaves = [SlaveData() for _ in range(N_SLAVES)]
self.min_voltage = (
self.max_voltage
) = (
self.min_soc
) = (
self.max_soc
) = self.min_temp = self.max_temp = self.last_frame = self.current = 0
self.panic = False
self.panic_errorcode = self.panic_errorarg = 0
self.time_since_last_frame = 0
def fill_dummy_data(self):
self.min_voltage = random.uniform(1, 3)
self.max_voltage = random.uniform(3, 5)
self.min_temp = random.uniform(0, 25)
self.max_temp = random.uniform(25, 60)
self.current = random.uniform(25, 60)
self.last_frame = time.time() - random.random()
self.panic = random.choice([True, False])
if self.panic:
self.panic_errorcode = random.randint(1, 10000)
self.panic_errorarg = "ABCDERFG"
else:
self.panic_errorcode = 0
self.panic_errorarg = ""
data.time_since_last_frame = random.uniform(1, 60)
for i in range(N_SLAVES):
self.slaves[i].cell_voltages = [
random.uniform(1, 5) for i in range(CELLS_PER_SLAVE)
]
self.slaves[i].cell_temps = [
random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE)
]
rx_buf = bytes()
data = AccumulatorData()
class StackGuiElement:
title: str
stack_id: int
min_voltage_label: QLabel
max_voltage_label: QLabel
min_temp_label: QLabel
max_temp_label: QLabel
groupBox: QGroupBox
detail_popup: QWidget
def __init__(self, title: str, stack_id: int):
self.title = title
self.stack_id = stack_id
self.min_voltage_label = QLabel()
self.max_voltage_label = QLabel()
self.min_temp_label = QLabel()
self.max_temp_label = QLabel()
self.groupBox = QGroupBox()
self.detail_popup = None
self.__post__init__()
def __post__init__(self):
l1 = QLabel("Min Voltage [V]")
l2 = QLabel("Max Voltage [V]")
l3 = QLabel("Min Temperature [°C]")
l4 = QLabel("Max Temperature [°C]")
popup_btn = QPushButton(self.title + " details")
popup_btn.clicked.connect(self.show_popup)
l1.setAlignment(Qt.AlignLeft)
l2.setAlignment(Qt.AlignLeft)
l3.setAlignment(Qt.AlignLeft)
l4.setAlignment(Qt.AlignLeft)
self.min_voltage_label.setAlignment(Qt.AlignLeft)
self.max_voltage_label.setAlignment(Qt.AlignLeft)
self.min_temp_label.setAlignment(Qt.AlignLeft)
self.max_temp_label.setAlignment(Qt.AlignLeft)
grid = QGridLayout()
grid.addWidget(l1, 0, 0)
grid.addWidget(l2, 1, 0)
grid.addWidget(l3, 2, 0)
grid.addWidget(l4, 3, 0)
grid.addWidget(self.min_voltage_label, 0, 1)
grid.addWidget(self.max_voltage_label, 1, 1)
grid.addWidget(self.min_temp_label, 2, 1)
grid.addWidget(self.max_temp_label, 3, 1)
grid.addWidget(popup_btn, 0, 2)
self.groupBox.setTitle(self.title)
self.groupBox.setLayout(grid)
def update_data_from_slave(self, slave: SlaveData):
self.min_voltage_label.setNum(min(slave.cell_voltages))
self.max_voltage_label.setNum(max(slave.cell_voltages))
self.min_temp_label.setNum(min(slave.cell_temps))
self.max_temp_label.setNum(max(slave.cell_temps))
def show_popup(self):
if self.detail_popup is None:
self.detail_popup = StackPopup(self.stack_id)
self.detail_popup.show()
def decode_log_frame(buf: bytes):
class QVSeperationLine(QFrame):
"""
a vertical seperation line
"""
def __init__(self):
super().__init__()
self.setFixedWidth(20)
self.setMinimumHeight(1)
self.setFrameShape(QFrame.VLine)
self.setFrameShadow(QFrame.Sunken)
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred)
return
class StackPopup(QWidget):
stack_id: int
voltage_labels: list[QLabel]
temp_labels: list[QLabel]
def __init__(self, stack_id: int):
super().__init__()
self.stack_id = stack_id
self.voltage_labels = []
self.temp_labels = []
layout = QVBoxLayout()
groupbox = QGroupBox()
grid = QGridLayout()
for i in range(len(data.slaves[stack_id].cell_voltages)):
l1 = QLabel(f"Voltage Cell {i}")
l1.setAlignment(Qt.AlignLeft)
l_v = QLabel()
l_v.setNum(data.slaves[stack_id].cell_voltages[i])
l_v.setAlignment(Qt.AlignLeft)
self.voltage_labels.append(l_v)
grid.addWidget(l1, i, 0)
grid.addWidget(l_v, i, 1)
# vertical separators between rows in popup
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1)
num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE
for i in range(num_temp_cols):
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1)
for i in range(len(data.slaves[stack_id].cell_temps)):
l2 = QLabel(f"Temp. Sensor {i}")
l2.setAlignment(Qt.AlignLeft)
l_t = QLabel()
l_t.setNum(data.slaves[stack_id].cell_temps[i])
l_t.setAlignment(Qt.AlignLeft)
self.temp_labels.append(l_t)
grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3)
grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3)
groupbox.setTitle(f"Stack {stack_id}")
groupbox.setLayout(grid)
layout.addWidget(groupbox)
self.setLayout(layout)
self.update_data()
timer.timeout.connect(self.update_data)
def update_data(self):
for i in range(len(data.slaves[self.stack_id].cell_voltages)):
self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i])
for i in range(len(data.slaves[self.stack_id].cell_temps)):
self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i])
class Window(QWidget):
stop_signal = (
pyqtSignal()
) # make a stop signal to communicate with the worker in another thread
def __init__(self, parent=None):
super(Window, self).__init__(parent)
win = QVBoxLayout()
### ACCUMULATOR GENERAL ###
self.l1 = QLabel("Min Voltage [V]")
self.l1.setAlignment(Qt.AlignLeft)
self.l_min_voltage = QLabel()
self.l_min_voltage.setNum(data.min_voltage)
self.l_min_voltage.setAlignment(Qt.AlignLeft)
self.l2 = QLabel("Max Voltage [V]")
self.l2.setAlignment(Qt.AlignLeft)
self.l_max_voltage = QLabel()
self.l_max_voltage.setNum(data.max_voltage)
self.l_max_voltage.setAlignment(Qt.AlignLeft)
self.l3 = QLabel("Min SoC [%]")
self.l3.setAlignment(Qt.AlignLeft)
self.l_min_soc = QLabel()
self.l_min_soc.setNum(data.min_soc)
self.l_min_soc.setAlignment(Qt.AlignLeft)
self.l4 = QLabel("Max SoC [%]")
self.l4.setAlignment(Qt.AlignLeft)
self.l_max_soc = QLabel()
self.l_max_soc.setNum(data.max_soc)
self.l_max_soc.setAlignment(Qt.AlignLeft)
self.l5 = QLabel("Min Temperature [°C]")
self.l5.setAlignment(Qt.AlignLeft)
self.l_min_temp = QLabel()
self.l_min_temp.setNum(data.min_temp)
self.l_min_temp.setAlignment(Qt.AlignLeft)
self.l6 = QLabel("Max Temperature [°C]")
self.l6.setAlignment(Qt.AlignLeft)
self.l_max_temp = QLabel()
self.l_max_temp.setNum(data.max_temp)
self.l_max_temp.setAlignment(Qt.AlignLeft)
self.l7 = QLabel("Current [A]")
self.l7.setAlignment(Qt.AlignLeft)
self.l_current = QLabel()
self.l_current.setNum(data.current)
self.l_current.setAlignment(Qt.AlignLeft)
self.l8 = QLabel("Error")
self.l_error = QLabel()
self.l_error.setText(str(data.panic))
self.l_error.setAlignment(Qt.AlignLeft)
self.l_errorcode = QLabel()
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l_errorarg = QLabel()
self.l_errorarg.setText(str(data.panic_errorarg))
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l9 = QLabel("Time Since Last Dataframe")
self.l_time_since_last_frame = QLabel()
self.l_time_since_last_frame.setText(str(data.time_since_last_frame))
self.l_time_since_last_frame.setAlignment(Qt.AlignLeft)
### LAYOUT ACCUMULATOR GENERAL ###
grid_accumulator = QGridLayout()
grid_accumulator.addWidget(self.l1, 0, 0)
grid_accumulator.addWidget(self.l2, 1, 0)
grid_accumulator.addWidget(self.l3, 2, 0)
grid_accumulator.addWidget(self.l4, 3, 0)
grid_accumulator.addWidget(self.l5, 0, 2)
grid_accumulator.addWidget(self.l6, 1, 2)
grid_accumulator.addWidget(self.l7, 2, 2)
grid_accumulator.addWidget(self.l8, 5, 0)
grid_accumulator.addWidget(self.l9, 3, 2)
grid_accumulator.addWidget(self.l_min_voltage, 0, 1)
grid_accumulator.addWidget(self.l_max_voltage, 1, 1)
grid_accumulator.addWidget(self.l_min_soc, 2, 1)
grid_accumulator.addWidget(self.l_max_soc, 3, 1)
grid_accumulator.addWidget(self.l_min_temp, 0, 3)
grid_accumulator.addWidget(self.l_max_temp, 1, 3)
grid_accumulator.addWidget(self.l_current, 2, 3)
grid_accumulator.addWidget(self.l_error, 5, 1)
grid_accumulator.addWidget(self.l_errorcode, 5, 2)
grid_accumulator.addWidget(self.l_errorarg, 5, 3)
grid_accumulator.addWidget(self.l_time_since_last_frame, 3, 3)
groupBox_accumulator = QGroupBox("Accumulator General")
groupBox_accumulator.setLayout(grid_accumulator)
win.addWidget(groupBox_accumulator)
### STACKS ###
self.stack_gui_elements = []
for i in range(N_SLAVES):
sge = StackGuiElement(f"Stack {i}", i)
sge.update_data_from_slave(data.slaves[i])
self.stack_gui_elements.append(sge)
### LAYOUT STACKS ###
n_slaves_half = math.ceil(N_SLAVES / 2)
grid_stacks = QGridLayout()
for i, sge in enumerate(self.stack_gui_elements):
grid_stacks.addWidget(
sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half
)
groupBox_stacks = QGroupBox("Individual Stacks")
groupBox_stacks.setLayout(grid_stacks)
win.addWidget(groupBox_stacks)
### BUTTONS ###
self.btn_start = QPushButton("Start Serial")
self.btn_start.resize(self.btn_start.sizeHint())
self.btn_start.move(50, 50)
self.btn_stop = QPushButton("Stop Serial")
self.btn_stop.resize(self.btn_stop.sizeHint())
self.btn_stop.move(150, 50)
self.btn_charge = QPushButton("Start Charging")
self.btn_charge.resize(self.btn_stop.sizeHint())
self.btn_charge.move(150, 50)
### LAYOUT BUTTONS ###
vbox_controls = QVBoxLayout()
vbox_controls.addWidget(self.btn_start)
vbox_controls.addWidget(self.btn_stop)
vbox_controls.addWidget(self.btn_charge)
groupBox_controls = QGroupBox("Controls")
groupBox_controls.setLayout(vbox_controls)
win.addWidget(groupBox_controls)
# Start Button action
self.btn_start.clicked.connect(self.start_thread)
# Stop Button action
self.btn_stop.clicked.connect(self.stop_thread)
# Charge Button action
self.btn_charge.clicked.connect(self.start_charge)
self.setLayout(win)
self.setWindowTitle("FT22 Charger Display")
# When start_btn is clicked this runs. Creates the worker and the thread.
def start_thread(self):
self.thread = QThread()
self.worker = Worker()
self.stop_signal.connect(
self.worker.stop
) # connect stop signal to worker stop method
self.worker.moveToThread(self.thread)
self.worker.finished.connect(
self.thread.quit
) # connect the workers finished signal to stop thread
self.worker.finished.connect(
self.worker.deleteLater
) # connect the workers finished signal to clean up worker
self.thread.finished.connect(
self.thread.deleteLater
) # connect threads finished signal to clean up thread
self.thread.started.connect(self.worker.do_work)
self.thread.finished.connect(self.worker.stop)
self.thread.start()
# When stop_btn is clicked this runs. Terminates the worker and the thread.
def stop_thread(self):
self.stop_signal.emit() # emit the finished signal on stop
def start_charge(self):
ser.write(b"C")
def update(self):
# Accumulator
self.l_min_voltage.setText(f"{data.min_voltage:.02f}")
self.l_max_voltage.setText(f"{data.max_voltage:.02f}")
self.l_min_soc.setText(f"{data.min_soc:.01f}")
self.l_max_soc.setText(f"{data.max_soc:.01f}")
self.l_min_temp.setText(f"{data.min_temp:.00f}")
self.l_max_temp.setText(f"{data.max_temp:.00f}")
self.l_current.setText(f"{data.current:.03f}")
self.l_error.setText(str(data.panic))
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorarg.setText(str(data.panic_errorarg))
self.l_time_since_last_frame.setText(str(time.time() - data.last_frame))
# Stacks
for i, sge in enumerate(self.stack_gui_elements):
sge.update_data_from_slave(data.slaves[i])
class Worker(QObject):
finished = pyqtSignal() # give worker class a finished signal
def __init__(self, parent=None):
QObject.__init__(self, parent=parent)
self.continue_run = True # provide a bool run condition for the class
def do_work(self):
i = 1
while self.continue_run: # give the loop a stoppable condition
# data.fill_dummy_data()
self.charger_communication()
# QThread.msleep(1)
self.finished.emit() # emit the finished signal when the loop is done
def charger_communication(self):
rx_data = ser.read(256)
# print(len(rx_data), rx_data)
while len(rx_data) > 0:
if (frame_start := self.check_log_start(rx_data)) != -1:
self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_current_start(rx_data)) != -1:
self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_panic_start(rx_data)) != -1:
self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
break
def check_log_start(self, buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(self, buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(self, buf: bytes):
return buf[:-12].find(b"PAN")
def decode_log_frame(self, buf: bytes):
msg_id = buf[0]
slave = msg_id >> 4
frame_id = msg_id & 0x0F
if slave >= N_SLAVES:
print(f"Unknown slave: {slave}", file=sys.stderr)
return
if frame_id == 0:
@ -185,29 +532,15 @@ def decode_log_frame(buf: bytes):
for i in range(7):
data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV
else:
# print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
# time.sleep(1)
return
data.last_frame = time.time()
def decode_current_frame(buf: bytes):
# current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
current = struct.unpack(">i", buf[2:6])[0]
data.current = current / 1000.0
def decode_panic_frame(buf: bytes):
data.panic = True
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
def update_display():
voltages = [
slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves
slave.cell_voltages[i]
for i in range(CELLS_PER_SLAVE)
for slave in data.slaves
]
self.parse_cell_temps(slave)
temps = [
slave.cell_temps[i]
for i in range(TEMP_SENSORS_PER_SLAVE)
@ -215,52 +548,117 @@ def update_display():
]
data.min_voltage = min(voltages)
data.max_voltage = max(voltages)
data.min_soc = self.calculate_soc(data.min_voltage)
data.max_soc = self.calculate_soc(data.max_voltage)
data.min_temp = min(temps)
data.max_temp = max(temps)
time_since_last_frame = time.time() - data.last_frame
data.time_since_last_frame = time.time() - data.last_frame
print("\033[2J\033[H", end="")
print("-" * 20)
if data.panic:
print("!!!!! PANIC !!!!!")
print(f"Error code: {data.panic_errorcode}")
print(f"Error arg: {data.panic_errorarg}")
time.sleep(0.5)
return
data.last_frame = time.time()
print(f"Time since last frame: {time_since_last_frame} s")
print(f"Current: {data.current:.2f} A")
print(f"Min voltage: {data.min_voltage:.2f} V")
print(f"Max voltage: {data.max_voltage:.2f} V")
print(f"Min temp: {data.min_temp:.1f} °C")
print(f"Max temp: {data.max_temp:.1f} °C")
for i in range(N_SLAVES):
min_v = min(data.slaves[i].cell_voltages)
max_v = max(data.slaves[i].cell_voltages)
min_t = min(data.slaves[i].cell_temps)
max_t = max(data.slaves[i].cell_temps)
print(
f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]"
def decode_current_frame(self, buf: bytes):
# current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
current = struct.unpack(">i", buf[2:6])[0]
data.current = current / 1000.0
def decode_panic_frame(self, buf: bytes):
data.panic = True
errorcode = int(buf[0])
errorargs = [int(buf[1]), int(buf[2])]
if errorcode == ERRORCODE_TIMEOUT_SLAVE:
data.panic_errorcode = "Slave timeout"
data.panic_errorarg = f"Slave ID {errorargs[0]}"
elif errorcode == ERRORCODE_SLAVE_PANIC:
data.panic_errorcode = "Slave panic"
if errorargs[1] == SLAVE_ERROR_UV:
slave_error = "Undervoltage"
elif errorargs[1] == SLAVE_ERROR_OV:
slave_error = "Overvoltage"
elif errorargs[1] == SLAVE_ERROR_UT:
slave_error = "Undertemperature"
elif errorargs[1] == SLAVE_ERROR_OT:
slave_error = "Overtemperature"
elif errorargs[1] == SLAVE_ERROR_BQ:
slave_error = "BQ Communication error"
elif errorargs[1] == SLAVE_ERROR_TMP144:
slave_error = "TMP144 communication error"
else:
slave_error = f"Unknown error ({errorargs[1]})"
data.panic_errorarg = f"Slave ID {errorargs[0]}: {slave_error}"
elif errorcode == ERRORCODE_TIMEOUT_SLAVE_FRAMES:
data.panic_errorcode = "Slave frame timeout"
data.panic_errorarg = f"Slave ID {errorargs[0]}, frame {errorargs[1]}"
elif errorcode == ERRORCODE_TOO_FEW_TEMPS:
data.panic_errorcode = "Too few temperature sensors"
data.panic_errorarg = f"Slave ID {errorargs[0]}"
elif errorcode == ERRORCODE_TIMEOUT_SHUNT:
data.panic_errorcode = "Shunt timeout"
elif errorcode == ERRORCODE_MASTER_THRESH:
data.panic_errorcode = "Master detected threshold"
if errorargs[0] == MASTER_THRESH_UT:
data.panic_errorarg = "Undertemperature"
elif errorargs[0] == MASTER_THRESH_OT:
data.panic_errorarg = "Overtemperature"
elif errorargs[0] == MASTER_THRESH_UV:
data.panic_errorarg = "Undervoltage"
elif errorargs[0] == MASTER_THRESH_OV:
data.panic_errorarg = "Overvoltage"
else:
data.panic_errorarg = f"Unknown threshold ({errorargs[0]})"
else:
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
INTERNAL_RESISTANCE_CURVE_X = [2.0, 4.12]
INTERNAL_RESISTANCE_CURVE_Y = [0.0528, 0.0294]
SOC_OCV_X = [2.1, 2.9, 3.2, 3.3, 3.4, 3.5, 3.68, 4.0, 4.15, 4.2]
SOC_OCV_Y = [0, 0.023, 0.06, 0.08, 0.119, 0.227, 0.541, 0.856, 0.985, 1.0]
def calculate_soc(self, voltage: float):
r_i = np.interp(
[voltage],
self.INTERNAL_RESISTANCE_CURVE_X,
self.INTERNAL_RESISTANCE_CURVE_Y,
)[0]
# i = data.current / PARALLEL_CELLS
i = 3 / PARALLEL_CELLS
ocv = voltage - i * r_i
return np.interp([ocv], self.SOC_OCV_X, self.SOC_OCV_Y)[0] * 100
def parse_cell_temps(self, slave: int):
temps = list(
filter(lambda t: t > 0 and t < 60, data.slaves[slave].cell_temps[:16])
)
if len(temps) == 0:
temps = [-1]
min_t = min(temps)
max_t = max(temps)
for i in range(16, 32):
data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t)
def stop(self):
self.continue_run = False # set the run condition to false on stop
kb = KBHit()
while True:
rx_data = ser.read(32)
if len(rx_data) > 0:
rx_buf = rx_data
if (start := check_log_start(rx_buf)) != -1:
decode_log_frame(rx_buf[start + 3 : start + 11])
rx_buf = b""
elif (start := check_current_start(rx_buf)) != -1:
decode_current_frame(rx_buf[start + 3 : start + 11])
rx_buf = b""
elif (start := check_panic_start(rx_buf)) != -1:
decode_panic_frame(rx_buf[start + 3 : start + 11])
rx_buf = b""
if kb.kbhit():
c = kb.getch()
if c == "C":
print(f"KBHIT: {c}", file=sys.stderr)
print(ser.write(b"C"), file=sys.stderr)
update_display()
if __name__ == "__main__":
data = AccumulatorData()
rx_buf = bytes()
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
sys.exit(os.EX_USAGE)
SERIAL_PORT = sys.argv[1]
print(SERIAL_PORT)
ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
app = QApplication(sys.argv)
gui = Window()
gui.show()
timer = QTimer()
# timer.timeout.connect(data.fill_dummy_data)
timer.timeout.connect(gui.update)
timer.start(1000) # every 1,000 milliseconds
sys.exit(app.exec_())

559
test.py
View File

@ -1,559 +0,0 @@
#!/usr/bin/env python3
import math
import os
import struct
import time
import random
import serial
import sys
from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal
from PyQt5.QtWidgets import (
QApplication,
QWidget,
QPushButton,
QVBoxLayout,
QHBoxLayout,
QGridLayout,
QLabel,
QGroupBox,
QFrame,
QSizePolicy,
)
BITRATE = 115200 # baud/s
TIMEOUT = 1 # seconds
N_SLAVES = 7
LOG_FRAME_LENGTH = 8 # bytes
CELLS_PER_SLAVE = 10
TEMP_SENSORS_PER_SLAVE = 32
VOLTAGE_CONV = 5.0 / 255 # volts/quantum
TEMP_CONV = 0.0625 * 16 # °C/quantum
class SlaveData:
cell_voltages: list[float]
cell_temps: list[float]
def __init__(self) -> None:
self.cell_voltages = [-1] * CELLS_PER_SLAVE
self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE
class AccumulatorData:
slaves: list[SlaveData]
min_voltage: float
max_voltage: float
min_temp: float
max_temp: float
last_frame: float
time_since_last_frame: float
current: float
panic: bool
panic_errorcode: int
panic_errorarg: int
def __init__(self) -> None:
self.slaves = [SlaveData() for _ in range(N_SLAVES)]
self.min_voltage = (
self.max_voltage
) = self.min_temp = self.max_temp = self.last_frame = self.current = 0
self.panic = False
self.panic_errorcode = self.panic_errorarg = 0
self.time_since_last_frame = 0
def fill_dummy_data(self):
self.min_voltage = random.uniform(1, 3)
self.max_voltage = random.uniform(3, 5)
self.min_temp = random.uniform(0, 25)
self.max_temp = random.uniform(25, 60)
self.current = random.uniform(25, 60)
self.last_frame = time.time() - random.random()
self.panic = random.choice([True, False])
if self.panic:
self.panic_errorcode = random.randint(1, 10000)
self.panic_errorarg = "ABCDERFG"
else:
self.panic_errorcode = 0
self.panic_errorarg = ""
data.time_since_last_frame = random.uniform(1, 60)
for i in range(N_SLAVES):
self.slaves[i].cell_voltages = [
random.uniform(1, 5) for i in range(CELLS_PER_SLAVE)
]
self.slaves[i].cell_temps = [
random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE)
]
class StackGuiElement:
title: str
stack_id: int
min_voltage_label: QLabel
max_voltage_label: QLabel
min_temp_label: QLabel
max_temp_label: QLabel
groupBox: QGroupBox
detail_popup: QWidget
def __init__(self, title: str, stack_id: int):
self.title = title
self.stack_id = stack_id
self.min_voltage_label = QLabel()
self.max_voltage_label = QLabel()
self.min_temp_label = QLabel()
self.max_temp_label = QLabel()
self.groupBox = QGroupBox()
self.detail_popup = None
self.__post__init__()
def __post__init__(self):
l1 = QLabel("Min Voltage [V]")
l2 = QLabel("Max Voltage [V]")
l3 = QLabel("Min Temperature [°C]")
l4 = QLabel("Max Temperature [°C]")
popup_btn = QPushButton(self.title + " details")
popup_btn.clicked.connect(self.show_popup)
l1.setAlignment(Qt.AlignLeft)
l2.setAlignment(Qt.AlignLeft)
l3.setAlignment(Qt.AlignLeft)
l4.setAlignment(Qt.AlignLeft)
self.min_voltage_label.setAlignment(Qt.AlignLeft)
self.max_voltage_label.setAlignment(Qt.AlignLeft)
self.min_temp_label.setAlignment(Qt.AlignLeft)
self.max_temp_label.setAlignment(Qt.AlignLeft)
grid = QGridLayout()
grid.addWidget(l1, 0, 0)
grid.addWidget(l2, 1, 0)
grid.addWidget(l3, 2, 0)
grid.addWidget(l4, 3, 0)
grid.addWidget(self.min_voltage_label, 0, 1)
grid.addWidget(self.max_voltage_label, 1, 1)
grid.addWidget(self.min_temp_label, 2, 1)
grid.addWidget(self.max_temp_label, 3, 1)
grid.addWidget(popup_btn, 0, 2)
self.groupBox.setTitle(self.title)
self.groupBox.setLayout(grid)
def update_data_from_slave(self, slave: SlaveData):
self.min_voltage_label.setNum(min(slave.cell_voltages))
self.max_voltage_label.setNum(max(slave.cell_voltages))
self.min_temp_label.setNum(min(slave.cell_temps))
self.max_temp_label.setNum(max(slave.cell_temps))
def show_popup(self):
if self.detail_popup is None:
self.detail_popup = StackPopup(self.stack_id)
self.detail_popup.show()
class QVSeperationLine(QFrame):
"""
a vertical seperation line
"""
def __init__(self):
super().__init__()
self.setFixedWidth(20)
self.setMinimumHeight(1)
self.setFrameShape(QFrame.VLine)
self.setFrameShadow(QFrame.Sunken)
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred)
return
class StackPopup(QWidget):
stack_id: int
voltage_labels: list[QLabel]
temp_labels: list[QLabel]
def __init__(self, stack_id: int):
super().__init__()
self.stack_id = stack_id
self.voltage_labels = []
self.temp_labels = []
layout = QVBoxLayout()
groupbox = QGroupBox()
grid = QGridLayout()
for i in range(len(data.slaves[stack_id].cell_voltages)):
l1 = QLabel(f"Voltage Cell {i}")
l1.setAlignment(Qt.AlignLeft)
l_v = QLabel()
l_v.setNum(data.slaves[stack_id].cell_voltages[i])
l_v.setAlignment(Qt.AlignLeft)
self.voltage_labels.append(l_v)
grid.addWidget(l1, i, 0)
grid.addWidget(l_v, i, 1)
# vertical separators between rows in popup
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1)
num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE
for i in range(num_temp_cols):
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1)
for i in range(len(data.slaves[stack_id].cell_temps)):
l2 = QLabel(f"Temp. Sensor {i}")
l2.setAlignment(Qt.AlignLeft)
l_t = QLabel()
l_t.setNum(data.slaves[stack_id].cell_temps[i])
l_t.setAlignment(Qt.AlignLeft)
self.temp_labels.append(l_t)
grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3)
grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3)
groupbox.setTitle(f"Stack {stack_id}")
groupbox.setLayout(grid)
layout.addWidget(groupbox)
self.setLayout(layout)
self.update_data()
timer.timeout.connect(self.update_data)
def update_data(self):
for i in range(len(data.slaves[self.stack_id].cell_voltages)):
self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i])
for i in range(len(data.slaves[self.stack_id].cell_temps)):
self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i])
class Window(QWidget):
stop_signal = (
pyqtSignal()
) # make a stop signal to communicate with the worker in another thread
def __init__(self, parent=None):
super(Window, self).__init__(parent)
win = QVBoxLayout()
### ACCUMULATOR GENERAL ###
self.l1 = QLabel("Min Voltage [V]")
self.l1.setAlignment(Qt.AlignLeft)
self.l_min_voltage = QLabel()
self.l_min_voltage.setNum(data.min_voltage)
self.l_min_voltage.setAlignment(Qt.AlignLeft)
self.l2 = QLabel("Max Voltage [V]")
self.l2.setAlignment(Qt.AlignLeft)
self.l_max_voltage = QLabel()
self.l_max_voltage.setNum(data.max_voltage)
self.l_max_voltage.setAlignment(Qt.AlignLeft)
self.l3 = QLabel("Min Temperature [°C]")
self.l3.setAlignment(Qt.AlignLeft)
self.l_min_temp = QLabel()
self.l_min_temp.setNum(data.min_temp)
self.l_min_temp.setAlignment(Qt.AlignLeft)
self.l4 = QLabel("Max Temperature [°C]")
self.l4.setAlignment(Qt.AlignLeft)
self.l_max_temp = QLabel()
self.l_max_temp.setNum(data.max_temp)
self.l_max_temp.setAlignment(Qt.AlignLeft)
self.l5 = QLabel("Current [A]")
self.l5.setAlignment(Qt.AlignLeft)
self.l_current = QLabel()
self.l_current.setNum(data.current)
self.l_current.setAlignment(Qt.AlignLeft)
self.l6 = QLabel("Error")
self.l_error = QLabel()
self.l_error.setText(str(data.panic))
self.l_error.setAlignment(Qt.AlignLeft)
self.l_errorcode = QLabel()
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l_errorarg = QLabel()
self.l_errorarg.setText(str(data.panic_errorarg))
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l7 = QLabel("Time Since Last Dataframe")
self.l_time_since_last_frame = QLabel()
self.l_time_since_last_frame.setText(str(data.time_since_last_frame))
self.l_time_since_last_frame.setAlignment(Qt.AlignLeft)
### LAYOUT ACCUMULATOR GENERAL ###
grid_accumulator = QGridLayout()
grid_accumulator.addWidget(self.l1, 0, 0)
grid_accumulator.addWidget(self.l2, 1, 0)
grid_accumulator.addWidget(self.l3, 0, 2)
grid_accumulator.addWidget(self.l4, 1, 2)
grid_accumulator.addWidget(self.l5, 2, 0)
grid_accumulator.addWidget(self.l6, 3, 0)
grid_accumulator.addWidget(self.l7, 4, 0)
grid_accumulator.addWidget(self.l_min_voltage, 0, 1)
grid_accumulator.addWidget(self.l_max_voltage, 1, 1)
grid_accumulator.addWidget(self.l_min_temp, 0, 3)
grid_accumulator.addWidget(self.l_max_temp, 1, 3)
grid_accumulator.addWidget(self.l_current, 2, 1)
grid_accumulator.addWidget(self.l_error, 3, 1)
grid_accumulator.addWidget(self.l_errorcode, 3, 2)
grid_accumulator.addWidget(self.l_errorarg, 3, 3)
grid_accumulator.addWidget(self.l_time_since_last_frame, 4, 1)
groupBox_accumulator = QGroupBox("Accumulator General")
groupBox_accumulator.setLayout(grid_accumulator)
win.addWidget(groupBox_accumulator)
### STACKS ###
self.stack_gui_elements = []
for i in range(N_SLAVES):
sge = StackGuiElement(f"Stack {i}", i)
sge.update_data_from_slave(data.slaves[i])
self.stack_gui_elements.append(sge)
### LAYOUT STACKS ###
n_slaves_half = math.ceil(N_SLAVES / 2)
grid_stacks = QGridLayout()
for i, sge in enumerate(self.stack_gui_elements):
grid_stacks.addWidget(
sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half
)
groupBox_stacks = QGroupBox("Individual Stacks")
groupBox_stacks.setLayout(grid_stacks)
win.addWidget(groupBox_stacks)
### BUTTONS ###
self.btn_start = QPushButton("Start Serial")
self.btn_start.resize(self.btn_start.sizeHint())
self.btn_start.move(50, 50)
self.btn_stop = QPushButton("Stop Serial")
self.btn_stop.resize(self.btn_stop.sizeHint())
self.btn_stop.move(150, 50)
self.btn_charge = QPushButton("Start Charging")
self.btn_charge.resize(self.btn_stop.sizeHint())
self.btn_charge.move(150, 50)
### LAYOUT BUTTONS ###
vbox_controls = QVBoxLayout()
vbox_controls.addWidget(self.btn_start)
vbox_controls.addWidget(self.btn_stop)
vbox_controls.addWidget(self.btn_charge)
groupBox_controls = QGroupBox("Controls")
groupBox_controls.setLayout(vbox_controls)
win.addWidget(groupBox_controls)
# Start Button action
self.btn_start.clicked.connect(self.start_thread)
# Stop Button action
self.btn_stop.clicked.connect(self.stop_thread)
# Charge Button action
self.btn_charge.clicked.connect(self.start_charge)
self.setLayout(win)
self.setWindowTitle("FT22 Charger Display")
# When start_btn is clicked this runs. Creates the worker and the thread.
def start_thread(self):
self.thread = QThread()
self.worker = Worker()
self.stop_signal.connect(
self.worker.stop
) # connect stop signal to worker stop method
self.worker.moveToThread(self.thread)
self.worker.finished.connect(
self.thread.quit
) # connect the workers finished signal to stop thread
self.worker.finished.connect(
self.worker.deleteLater
) # connect the workers finished signal to clean up worker
self.thread.finished.connect(
self.thread.deleteLater
) # connect threads finished signal to clean up thread
self.thread.started.connect(self.worker.do_work)
self.thread.finished.connect(self.worker.stop)
self.thread.start()
# When stop_btn is clicked this runs. Terminates the worker and the thread.
def stop_thread(self):
self.stop_signal.emit() # emit the finished signal on stop
def start_charge(self):
ser.write(b"C")
def update(self):
# Accumulator
self.l_min_voltage.setNum(data.min_voltage)
self.l_max_voltage.setNum(data.max_voltage)
self.l_min_temp.setNum(data.min_temp)
self.l_max_temp.setNum(data.max_temp)
self.l_current.setNum(data.current)
self.l_error.setText(str(data.panic))
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorarg.setText(str(data.panic_errorarg))
self.l_time_since_last_frame.setText(str(data.time_since_last_frame))
# Stacks
for i, sge in enumerate(self.stack_gui_elements):
sge.update_data_from_slave(data.slaves[i])
class Worker(QObject):
finished = pyqtSignal() # give worker class a finished signal
def __init__(self, parent=None):
QObject.__init__(self, parent=parent)
self.continue_run = True # provide a bool run condition for the class
def do_work(self):
i = 1
while self.continue_run: # give the loop a stoppable condition
# data.fill_dummy_data()
self.charger_communication()
# QThread.msleep(1)
self.finished.emit() # emit the finished signal when the loop is done
def charger_communication(self):
rx_data = ser.read(256)
while len(rx_data) > 0:
if (frame_start := self.check_log_start(rx_data)) != -1:
self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_current_start(rx_data)) != -1:
self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_panic_start(rx_data)) != -1:
self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
break
def check_log_start(self, buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(self, buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(self, buf: bytes):
return buf[:-12].find(b"PAN")
def decode_log_frame(self, buf: bytes):
msg_id = buf[0]
slave = msg_id >> 4
frame_id = msg_id & 0x0F
if slave >= N_SLAVES:
print(f"Unknown slave: {slave}", file=sys.stderr)
return
if frame_id == 0:
for i in range(7):
data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV
elif frame_id == 1:
for i in range(3):
data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV
for i in range(4):
data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV
elif frame_id == 2:
for i in range(7):
data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV
elif frame_id == 3:
for i in range(7):
data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV
elif frame_id == 4:
for i in range(7):
data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV
elif frame_id == 5:
for i in range(7):
data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV
else:
print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
# time.sleep(1)
return
voltages = [
slave.cell_voltages[i]
for i in range(CELLS_PER_SLAVE)
for slave in data.slaves
]
self.parse_cell_temps(slave)
temps = [
slave.cell_temps[i]
for i in range(TEMP_SENSORS_PER_SLAVE)
for slave in data.slaves
]
data.min_voltage = min(voltages)
data.max_voltage = max(voltages)
data.min_temp = min(temps)
data.max_temp = max(temps)
data.time_since_last_frame = time.time() - data.last_frame
data.last_frame = time.time()
def decode_current_frame(self, buf: bytes):
# current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
current = struct.unpack(">i", buf[2:6])[0]
data.current = current / 1000.0
def decode_panic_frame(self, buf: bytes):
data.panic = True
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
def parse_cell_temps(self, slave: int):
temps = list(filter(lambda t: t > 0, data.slaves[slave].cell_temps[:16]))
if len(temps) == 0:
temps = [-1]
min_t = min(temps)
max_t = max(temps)
for i in range(16, 32):
data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t)
def stop(self):
self.continue_run = False # set the run condition to false on stop
if __name__ == "__main__":
data = AccumulatorData()
rx_buf = bytes()
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
sys.exit(os.EX_USAGE)
SERIAL_PORT = sys.argv[1]
print(SERIAL_PORT)
try:
ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
except serial.serialutil.SerialException:
pass
app = QApplication(sys.argv)
gui = Window()
gui.show()
timer = QTimer()
# timer.timeout.connect(data.fill_dummy_data)
timer.timeout.connect(gui.update)
timer.start(1000) # every 1,000 milliseconds
sys.exit(app.exec_())