Compare commits

...

15 Commits
master ... gui

Author SHA1 Message Date
jazzpi 08b02a5a5d SoC for stacks 2022-08-12 13:01:44 +02:00
jazzpi 857ca68d83 Display TS state 2022-08-12 01:25:58 +02:00
jazzpi 18d9517fcc More formatting, balancing 2022-08-12 01:14:14 +02:00
jazzpi a3f3b2c23e Parse new logging format 2022-08-10 13:14:43 +02:00
jazzpi 61f652ee8f Errorcodes, SoC, 9 slaves 2022-08-08 01:23:20 +02:00
jazzpi cf4b1763fe rename file 2022-07-22 09:12:02 +02:00
jazzpi 6c17c3a72d Event fixes 2022-07-20 11:17:17 +02:00
Tobias Petrich e8ca613292
correct amount of temperature sensors 2022-07-19 18:14:04 +02:00
Tobias Petrich a2b733a430
add stack detail popups 2022-07-19 17:36:36 +02:00
f.geissler 8eccc36f39 minor changes 2022-07-19 13:38:47 +02:00
f.geissler 17fe582924 minor changes 2022-07-19 13:36:40 +02:00
f.geissler b3bbcf730b gui and extra thread worker implemented 2022-07-19 13:04:30 +02:00
Tobias Petrich a4733d3578
implement class for stack elements and add poetry dependency management 2022-07-19 03:18:17 +02:00
f.geissler 39a7669fa7 gui not working yet 2022-07-18 17:22:43 +02:00
f.geissler cfbd522e88 gui tests 2022-07-18 16:06:19 +02:00
3 changed files with 761 additions and 201 deletions

View File

@ -1,126 +1,74 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from enum import Enum
import math
import os import os
import struct import struct
import time import time
import random
import serial import serial
import sys import sys
import numpy as np
from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal
from PyQt5.QtWidgets import (
QApplication,
QWidget,
QPushButton,
QVBoxLayout,
QHBoxLayout,
QGridLayout,
QLabel,
QGroupBox,
QFrame,
QSizePolicy,
)
BITRATE = 115200 # baud/s BITRATE = 115200 # baud/s
TIMEOUT = 1 # seconds TIMEOUT = 1 # seconds
N_SLAVES = 6 N_SLAVES = 9
LOG_FRAME_LENGTH = 8 # bytes LOG_FRAME_LENGTH = 8 # bytes
PARALLEL_CELLS = 9
CELLS_PER_SLAVE = 10 CELLS_PER_SLAVE = 10
TEMP_SENSORS_PER_SLAVE = 32 TEMP_SENSORS_PER_SLAVE = 32
VOLTAGE_CONV = 5.0 / 255 # volts/quantum VOLTAGE_CONV = 5.0 / 0xFFFF # volts/quantum
TEMP_CONV = 0.0625 * 16 # °C/quantum TEMP_CONV = 0.0625 # °C/quantum
import os ERRORCODE_TIMEOUT_SLAVE = 1
ERRORCODE_SLAVE_PANIC = 2
ERRORCODE_TIMEOUT_SLAVE_FRAMES = 3
ERRORCODE_TOO_FEW_TEMPS = 4
ERRORCODE_TIMEOUT_SHUNT = 6
ERRORCODE_MASTER_THRESH = 7
SLAVE_ERROR_UV = 0
SLAVE_ERROR_OV = 1
SLAVE_ERROR_UT = 2
SLAVE_ERROR_OT = 3
SLAVE_ERROR_BQ = 4
SLAVE_ERROR_TMP144 = 5
MASTER_THRESH_UT = 0
MASTER_THRESH_OT = 1
MASTER_THRESH_UV = 2
MASTER_THRESH_OV = 3
# Windows INTERNAL_RESISTANCE_CURVE_X = [2.0, 4.12]
if os.name == "nt": INTERNAL_RESISTANCE_CURVE_Y = [0.0528, 0.0294]
import msvcrt SOC_OCV_X = [2.1, 2.9, 3.2, 3.3, 3.4, 3.5, 3.68, 4.0, 4.15, 4.2]
SOC_OCV_Y = [0, 0.023, 0.06, 0.08, 0.119, 0.227, 0.541, 0.856, 0.985, 1.0]
# Posix (Linux, OS X)
else:
import sys
import termios
import atexit
from select import select
class KBHit: def estimate_soc(voltage: float) -> float:
def __init__(self): r_i = np.interp(
"""Creates a KBHit object that you can call to do various keyboard things.""" [voltage],
INTERNAL_RESISTANCE_CURVE_X,
if os.name == "nt": INTERNAL_RESISTANCE_CURVE_Y,
pass )[0]
i = data.current / PARALLEL_CELLS
else: ocv = voltage - i * r_i
return np.interp([ocv], SOC_OCV_X, SOC_OCV_Y)[0] * 100
# Save the terminal settings
self.fd = sys.stdin.fileno()
self.new_term = termios.tcgetattr(self.fd)
self.old_term = termios.tcgetattr(self.fd)
# New terminal setting unbuffered
self.new_term[3] = self.new_term[3] & ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)
# Support normal-terminal reset at exit
atexit.register(self.set_normal_term)
def set_normal_term(self):
"""Resets to normal terminal. On Windows this is a no-op."""
if os.name == "nt":
pass
else:
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)
def getch(self):
"""Returns a keyboard character after kbhit() has been called.
Should not be called in the same program as getarrow().
"""
s = ""
if os.name == "nt":
return msvcrt.getch().decode("utf-8")
else:
return sys.stdin.read(1)
def getarrow(self):
"""Returns an arrow-key code after kbhit() has been called. Codes are
0 : up
1 : right
2 : down
3 : left
Should not be called in the same program as getch().
"""
if os.name == "nt":
msvcrt.getch() # skip 0xE0
c = msvcrt.getch()
vals = [72, 77, 80, 75]
else:
c = sys.stdin.read(3)[2]
vals = [65, 67, 66, 68]
return vals.index(ord(c.decode("utf-8")))
def kbhit(self):
"""Returns True if keyboard character was hit, False otherwise."""
if os.name == "nt":
return msvcrt.kbhit()
else:
dr, dw, de = select([sys.stdin], [], [], 0)
return dr != []
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
sys.exit(os.EX_USAGE)
SERIAL_PORT = sys.argv[1]
ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
def check_log_start(buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(buf: bytes):
return buf[:-12].find(b"PAN")
class SlaveData: class SlaveData:
@ -129,85 +77,546 @@ class SlaveData:
def __init__(self) -> None: def __init__(self) -> None:
self.cell_voltages = [-1] * CELLS_PER_SLAVE self.cell_voltages = [-1] * CELLS_PER_SLAVE
self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE self.cell_temps = ([-1] * (TEMP_SENSORS_PER_SLAVE // 2)) + (
[0] * (TEMP_SENSORS_PER_SLAVE // 2)
)
class TSState(Enum):
TS_INACTIVE = 0
TS_ACTIVE = 1
TS_PRECHARGE = 2
TS_DISCHARGE = 3
TS_ERROR = 4
TS_CHARGING_CHECK = 5
TS_CHARGING = 6
TS_BALANCING = 7
class AccumulatorData: class AccumulatorData:
slaves: list[SlaveData] slaves: list[SlaveData]
min_voltage: float min_voltage: float
max_voltage: float max_voltage: float
min_soc: float
max_soc: float
min_temp: float min_temp: float
max_temp: float max_temp: float
last_frame: float last_frame: float
time_since_last_frame: float
current: float current: float
panic: bool panic: bool
panic_errorcode: int panic_errorcode: int
panic_errorarg: int panic_errorarg: int
ts_state: TSState
def __init__(self) -> None: def __init__(self) -> None:
self.slaves = [SlaveData() for _ in range(N_SLAVES)] self.slaves = [SlaveData() for _ in range(N_SLAVES)]
self.min_voltage = ( self.min_voltage = (
self.max_voltage self.max_voltage
) = (
self.min_soc
) = (
self.max_soc
) = self.min_temp = self.max_temp = self.last_frame = self.current = 0 ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0
self.panic = False self.panic = False
self.panic_errorcode = self.panic_errorarg = 0 self.panic_errorcode = self.panic_errorarg = 0
self.time_since_last_frame = 0
self.ts_state = TSState.TS_INACTIVE
def fill_dummy_data(self):
self.min_voltage = random.uniform(1, 3)
self.max_voltage = random.uniform(3, 5)
self.min_temp = random.uniform(0, 25)
self.max_temp = random.uniform(25, 60)
self.current = random.uniform(25, 60)
self.last_frame = time.time() - random.random()
self.panic = random.choice([True, False])
if self.panic:
self.panic_errorcode = random.randint(1, 10000)
self.panic_errorarg = "ABCDERFG"
else:
self.panic_errorcode = 0
self.panic_errorarg = ""
data.time_since_last_frame = random.uniform(1, 60)
for i in range(N_SLAVES):
self.slaves[i].cell_voltages = [
random.uniform(1, 5) for i in range(CELLS_PER_SLAVE)
]
self.slaves[i].cell_temps = [
random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE)
]
rx_buf = bytes() class StackGuiElement:
data = AccumulatorData() title: str
stack_id: int
voltage_label: QLabel
soc_label: QLabel
temp_label: QLabel
groupBox: QGroupBox
detail_popup: QWidget
def __init__(self, title: str, stack_id: int):
self.title = title
self.stack_id = stack_id
self.voltage_label = QLabel()
self.soc_label = QLabel()
self.temp_label = QLabel()
self.groupBox = QGroupBox()
self.detail_popup = None
self.__post__init__()
def __post__init__(self):
l1 = QLabel("Voltage [V]")
l2 = QLabel("SoC [%]")
l3 = QLabel("Temperature [°C]")
popup_btn = QPushButton("Details")
popup_btn.clicked.connect(self.show_popup)
l1.setAlignment(Qt.AlignLeft)
l2.setAlignment(Qt.AlignLeft)
l3.setAlignment(Qt.AlignLeft)
self.voltage_label.setAlignment(Qt.AlignLeft)
self.soc_label.setAlignment(Qt.AlignLeft)
self.temp_label.setAlignment(Qt.AlignLeft)
grid = QGridLayout()
grid.addWidget(l1, 0, 0)
grid.addWidget(l2, 1, 0)
grid.addWidget(l3, 2, 0)
grid.addWidget(self.voltage_label, 0, 1)
grid.addWidget(self.soc_label, 1, 1)
grid.addWidget(self.temp_label, 2, 1)
grid.addWidget(popup_btn, 3, 0)
self.groupBox.setTitle(self.title)
self.groupBox.setLayout(grid)
def update_data_from_slave(self, slave: SlaveData):
min_v, max_v = min(slave.cell_voltages), max(slave.cell_voltages)
self.voltage_label.setText(f"[{min_v:.02f}, {max_v:.02f}]")
min_soc, max_soc = estimate_soc(min_v), estimate_soc(max_v)
self.soc_label.setText(f"[{min_soc:02.0f}, {max_soc:02.0f}]")
min_t, max_t = min(slave.cell_temps), max(slave.cell_temps)
self.temp_label.setText(f"[{min_t:.02f}, {max_t:.02f}]")
def show_popup(self):
if self.detail_popup is None:
self.detail_popup = StackPopup(self.stack_id)
self.detail_popup.show()
def decode_log_frame(buf: bytes): class QVSeperationLine(QFrame):
"""
a vertical seperation line
"""
def __init__(self):
super().__init__()
self.setFixedWidth(20)
self.setMinimumHeight(1)
self.setFrameShape(QFrame.VLine)
self.setFrameShadow(QFrame.Sunken)
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred)
return
class StackPopup(QWidget):
stack_id: int
voltage_labels: list[QLabel]
temp_labels: list[QLabel]
def __init__(self, stack_id: int):
super().__init__()
self.stack_id = stack_id
self.voltage_labels = []
self.temp_labels = []
layout = QVBoxLayout()
groupbox = QGroupBox()
grid = QGridLayout()
for i in range(len(data.slaves[stack_id].cell_voltages)):
l1 = QLabel(f"Voltage Cell {i}")
l1.setAlignment(Qt.AlignLeft)
l_v = QLabel()
l_v.setNum(data.slaves[stack_id].cell_voltages[i])
l_v.setAlignment(Qt.AlignLeft)
self.voltage_labels.append(l_v)
grid.addWidget(l1, i, 0)
grid.addWidget(l_v, i, 1)
# vertical separators between rows in popup
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1)
num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE
for i in range(num_temp_cols):
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1)
for i in range(len(data.slaves[stack_id].cell_temps)):
l2 = QLabel(f"Temp. Sensor {i}")
l2.setAlignment(Qt.AlignLeft)
l_t = QLabel()
l_t.setNum(data.slaves[stack_id].cell_temps[i])
l_t.setAlignment(Qt.AlignLeft)
self.temp_labels.append(l_t)
grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3)
grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3)
groupbox.setTitle(f"Stack {stack_id}")
groupbox.setLayout(grid)
layout.addWidget(groupbox)
self.setLayout(layout)
self.setWindowTitle(f"FT22 Charger Display - Stack {stack_id} Details")
self.update_data()
timer.timeout.connect(self.update_data)
def update_data(self):
for i in range(len(data.slaves[self.stack_id].cell_voltages)):
self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i])
for i in range(len(data.slaves[self.stack_id].cell_temps)):
self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i])
class Window(QWidget):
stop_signal = (
pyqtSignal()
) # make a stop signal to communicate with the worker in another thread
def __init__(self, parent=None):
super(Window, self).__init__(parent)
win = QVBoxLayout()
### ACCUMULATOR GENERAL ###
self.l1 = QLabel("Min Voltage [V]")
self.l1.setAlignment(Qt.AlignLeft)
self.l_min_voltage = QLabel()
self.l_min_voltage.setNum(data.min_voltage)
self.l_min_voltage.setAlignment(Qt.AlignLeft)
self.l2 = QLabel("Max Voltage [V]")
self.l2.setAlignment(Qt.AlignLeft)
self.l_max_voltage = QLabel()
self.l_max_voltage.setNum(data.max_voltage)
self.l_max_voltage.setAlignment(Qt.AlignLeft)
self.l3 = QLabel("Min SoC [%]")
self.l3.setAlignment(Qt.AlignLeft)
self.l_min_soc = QLabel()
self.l_min_soc.setNum(data.min_soc)
self.l_min_soc.setAlignment(Qt.AlignLeft)
self.l4 = QLabel("Max SoC [%]")
self.l4.setAlignment(Qt.AlignLeft)
self.l_max_soc = QLabel()
self.l_max_soc.setNum(data.max_soc)
self.l_max_soc.setAlignment(Qt.AlignLeft)
self.l5 = QLabel("Min Temperature [°C]")
self.l5.setAlignment(Qt.AlignLeft)
self.l_min_temp = QLabel()
self.l_min_temp.setNum(data.min_temp)
self.l_min_temp.setAlignment(Qt.AlignLeft)
self.l6 = QLabel("Max Temperature [°C]")
self.l6.setAlignment(Qt.AlignLeft)
self.l_max_temp = QLabel()
self.l_max_temp.setNum(data.max_temp)
self.l_max_temp.setAlignment(Qt.AlignLeft)
self.l7 = QLabel("Current [A]")
self.l7.setAlignment(Qt.AlignLeft)
self.l_current = QLabel()
self.l_current.setNum(data.current)
self.l_current.setAlignment(Qt.AlignLeft)
self.l8 = QLabel("State")
self.l_state = QLabel()
self.l_state.setText(data.ts_state.name)
self.l_state.setAlignment(Qt.AlignLeft)
self.l_errorcode = QLabel()
self.l_errorcode.setText("")
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l_errorarg = QLabel()
self.l_errorarg.setText("")
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l9 = QLabel("Time Since Last Dataframe")
self.l_time_since_last_frame = QLabel()
self.l_time_since_last_frame.setText(str(data.time_since_last_frame))
self.l_time_since_last_frame.setAlignment(Qt.AlignLeft)
### LAYOUT ACCUMULATOR GENERAL ###
grid_accumulator = QGridLayout()
grid_accumulator.addWidget(self.l1, 0, 0)
grid_accumulator.addWidget(self.l2, 1, 0)
grid_accumulator.addWidget(self.l3, 2, 0)
grid_accumulator.addWidget(self.l4, 3, 0)
grid_accumulator.addWidget(self.l5, 0, 2)
grid_accumulator.addWidget(self.l6, 1, 2)
grid_accumulator.addWidget(self.l7, 2, 2)
grid_accumulator.addWidget(self.l8, 5, 0)
grid_accumulator.addWidget(self.l9, 3, 2)
grid_accumulator.addWidget(self.l_min_voltage, 0, 1)
grid_accumulator.addWidget(self.l_max_voltage, 1, 1)
grid_accumulator.addWidget(self.l_min_soc, 2, 1)
grid_accumulator.addWidget(self.l_max_soc, 3, 1)
grid_accumulator.addWidget(self.l_min_temp, 0, 3)
grid_accumulator.addWidget(self.l_max_temp, 1, 3)
grid_accumulator.addWidget(self.l_current, 2, 3)
grid_accumulator.addWidget(self.l_state, 5, 1)
grid_accumulator.addWidget(self.l_errorcode, 5, 2)
grid_accumulator.addWidget(self.l_errorarg, 5, 3)
grid_accumulator.addWidget(self.l_time_since_last_frame, 3, 3)
groupBox_accumulator = QGroupBox("Accumulator General")
groupBox_accumulator.setLayout(grid_accumulator)
win.addWidget(groupBox_accumulator)
### STACKS ###
self.stack_gui_elements = []
for i in range(N_SLAVES):
sge = StackGuiElement(f"Stack {i}", i)
sge.update_data_from_slave(data.slaves[i])
self.stack_gui_elements.append(sge)
### LAYOUT STACKS ###
n_slaves_half = math.ceil(N_SLAVES / 2)
grid_stacks = QGridLayout()
for i, sge in enumerate(self.stack_gui_elements):
grid_stacks.addWidget(
sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half
)
groupBox_stacks = QGroupBox("Individual Stacks")
groupBox_stacks.setLayout(grid_stacks)
win.addWidget(groupBox_stacks)
### BUTTONS ###
self.btn_start = QPushButton("Start Serial")
self.btn_start.resize(self.btn_start.sizeHint())
self.btn_start.move(50, 50)
self.btn_stop = QPushButton("Stop Serial")
self.btn_stop.resize(self.btn_stop.sizeHint())
self.btn_stop.move(150, 50)
self.btn_charge = QPushButton("Start Charging")
self.btn_charge.resize(self.btn_charge.sizeHint())
self.btn_charge.move(250, 50)
self.btn_balance = QPushButton("Start Balancing")
self.btn_balance.resize(self.btn_balance.sizeHint())
self.btn_balance.move(350, 50)
### LAYOUT BUTTONS ###
vbox_controls = QVBoxLayout()
vbox_controls.addWidget(self.btn_start)
vbox_controls.addWidget(self.btn_stop)
vbox_controls.addWidget(self.btn_charge)
vbox_controls.addWidget(self.btn_balance)
groupBox_controls = QGroupBox("Controls")
groupBox_controls.setLayout(vbox_controls)
win.addWidget(groupBox_controls)
# Start Button action
self.btn_start.clicked.connect(self.start_thread)
# Stop Button action
self.btn_stop.clicked.connect(self.stop_thread)
# Charge Button action
self.btn_charge.clicked.connect(self.start_charge)
# Balance Button action
self.btn_balance.clicked.connect(self.start_balance)
self.setLayout(win)
self.setWindowTitle("FT22 Charger Display")
# When start_btn is clicked this runs. Creates the worker and the thread.
def start_thread(self):
self.thread = QThread()
self.worker = Worker()
self.stop_signal.connect(
self.worker.stop
) # connect stop signal to worker stop method
self.worker.moveToThread(self.thread)
self.worker.finished.connect(
self.thread.quit
) # connect the workers finished signal to stop thread
self.worker.finished.connect(
self.worker.deleteLater
) # connect the workers finished signal to clean up worker
self.thread.finished.connect(
self.thread.deleteLater
) # connect threads finished signal to clean up thread
self.thread.started.connect(self.worker.do_work)
self.thread.finished.connect(self.worker.stop)
self.thread.start()
# When stop_btn is clicked this runs. Terminates the worker and the thread.
def stop_thread(self):
self.stop_signal.emit() # emit the finished signal on stop
def start_charge(self):
ser.write(b"C")
def start_balance(self):
ser.write(b"B")
def update(self):
# Accumulator
self.l_min_voltage.setText(f"{data.min_voltage:.02f}")
self.l_max_voltage.setText(f"{data.max_voltage:.02f}")
self.l_min_soc.setText(f"{data.min_soc:02.0f}")
self.l_max_soc.setText(f"{data.max_soc:02.0f}")
self.l_min_temp.setText(f"{data.min_temp:.02f}")
self.l_max_temp.setText(f"{data.max_temp:.02f}")
self.l_current.setText(f"{data.current:.02f}")
self.l_state.setText(data.ts_state.name)
if data.panic:
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorarg.setText(str(data.panic_errorarg))
for l in (self.l_state, self.l_errorcode, self.l_errorarg):
l.setStyleSheet("color: red; font-weight: bold;")
elif data.ts_state == TSState.TS_CHARGING:
for l in (self.l_state, self.l_errorcode, self.l_errorarg):
l.setStyleSheet("color: green; font-weight: bold;")
else:
self.l_errorcode.setText("")
self.l_errorarg.setText("")
for l in (self.l_state, self.l_errorcode, self.l_errorarg):
l.setStyleSheet("color: black; font-weight: normal;")
last_time = time.time() - data.last_frame
self.l_time_since_last_frame.setText(f"{last_time:.03f}")
# Stacks
for i, sge in enumerate(self.stack_gui_elements):
sge.update_data_from_slave(data.slaves[i])
class Worker(QObject):
finished = pyqtSignal() # give worker class a finished signal
def __init__(self, parent=None):
QObject.__init__(self, parent=parent)
self.continue_run = True # provide a bool run condition for the class
def do_work(self):
i = 1
while self.continue_run: # give the loop a stoppable condition
# data.fill_dummy_data()
self.charger_communication()
# QThread.msleep(1)
self.finished.emit() # emit the finished signal when the loop is done
def charger_communication(self):
rx_data = ser.read(256)
# print(len(rx_data), rx_data)
while len(rx_data) > 0:
if (frame_start := self.check_log_start(rx_data)) != -1:
self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_current_start(rx_data)) != -1:
self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_panic_start(rx_data)) != -1:
self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_status_start(rx_data)) != -1:
self.decode_status_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
break
def check_log_start(self, buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(self, buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(self, buf: bytes):
return buf[:-12].find(b"PAN")
def check_status_start(self, buf: bytes):
return buf[:-12].find(b"STA")
def decode_log_frame(self, buf: bytes):
msg_id = buf[0] msg_id = buf[0]
slave = msg_id >> 4 slave = msg_id >> 4
frame_id = msg_id & 0x0F frame_id = msg_id & 0x0F
if slave >= N_SLAVES: if slave >= N_SLAVES:
print(f"Unknown slave: {slave}", file=sys.stderr)
return return
if frame_id == 0: if frame_id == 0:
for i in range(7): for i in range(3):
data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_voltages[i] = raw * VOLTAGE_CONV
elif frame_id == 1: elif frame_id == 1:
for i in range(3): for i in range(3):
data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
for i in range(4): data.slaves[slave].cell_voltages[i + 3] = raw * VOLTAGE_CONV
data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV
elif frame_id == 2: elif frame_id == 2:
for i in range(7): for i in range(3):
data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_voltages[i + 6] = raw * VOLTAGE_CONV
elif frame_id == 3: elif frame_id == 3:
for i in range(7): for i in range(1):
data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_voltages[i + 9] = raw * VOLTAGE_CONV
for i in range(2):
raw = (buf[i * 2 + 3] << 8) | buf[i * 2 + 4]
data.slaves[slave].cell_temps[i] = raw * TEMP_CONV
elif frame_id == 4: elif frame_id == 4:
for i in range(7): for i in range(3):
data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 2] = raw * TEMP_CONV
elif frame_id == 5: elif frame_id == 5:
for i in range(7): for i in range(3):
data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 5] = raw * TEMP_CONV
elif frame_id == 6:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 8] = raw * TEMP_CONV
elif frame_id == 7:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 11] = raw * TEMP_CONV
elif frame_id == 8:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 14] = raw * TEMP_CONV
else: else:
# print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr) print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
# time.sleep(1) # time.sleep(1)
return return
data.last_frame = time.time()
def decode_current_frame(buf: bytes):
# current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
current = struct.unpack(">i", buf[2:6])[0]
data.current = current / 1000.0
def decode_panic_frame(buf: bytes):
data.panic = True
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
def update_display():
voltages = [ voltages = [
slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves slave.cell_voltages[i]
for i in range(CELLS_PER_SLAVE)
for slave in data.slaves
] ]
self.parse_cell_temps(slave)
temps = [ temps = [
slave.cell_temps[i] slave.cell_temps[i]
for i in range(TEMP_SENSORS_PER_SLAVE) for i in range(TEMP_SENSORS_PER_SLAVE)
@ -215,52 +624,106 @@ def update_display():
] ]
data.min_voltage = min(voltages) data.min_voltage = min(voltages)
data.max_voltage = max(voltages) data.max_voltage = max(voltages)
data.min_soc = estimate_soc(data.min_voltage)
data.max_soc = estimate_soc(data.max_voltage)
data.min_temp = min(temps) data.min_temp = min(temps)
data.max_temp = max(temps) data.max_temp = max(temps)
time_since_last_frame = time.time() - data.last_frame data.time_since_last_frame = time.time() - data.last_frame
print("\033[2J\033[H", end="") data.last_frame = time.time()
print("-" * 20)
if data.panic:
print("!!!!! PANIC !!!!!")
print(f"Error code: {data.panic_errorcode}")
print(f"Error arg: {data.panic_errorarg}")
time.sleep(0.5)
return
print(f"Time since last frame: {time_since_last_frame} s") def decode_current_frame(self, buf: bytes):
print(f"Current: {data.current:.2f} A") # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
print(f"Min voltage: {data.min_voltage:.2f} V") current = struct.unpack(">i", buf[2:6])[0]
print(f"Max voltage: {data.max_voltage:.2f} V") data.current = current / 1000.0
print(f"Min temp: {data.min_temp:.1f} °C")
print(f"Max temp: {data.max_temp:.1f} °C") def decode_panic_frame(self, buf: bytes):
for i in range(N_SLAVES): data.panic = True
min_v = min(data.slaves[i].cell_voltages) errorcode = int(buf[0])
max_v = max(data.slaves[i].cell_voltages) errorargs = [int(buf[1]), int(buf[2])]
min_t = min(data.slaves[i].cell_temps) if errorcode == ERRORCODE_TIMEOUT_SLAVE:
max_t = max(data.slaves[i].cell_temps) data.panic_errorcode = "Slave timeout"
print( data.panic_errorarg = f"Slave ID {errorargs[0]}"
f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]" elif errorcode == ERRORCODE_SLAVE_PANIC:
data.panic_errorcode = "Slave panic"
if errorargs[1] == SLAVE_ERROR_UV:
slave_error = "Undervoltage"
elif errorargs[1] == SLAVE_ERROR_OV:
slave_error = "Overvoltage"
elif errorargs[1] == SLAVE_ERROR_UT:
slave_error = "Undertemperature"
elif errorargs[1] == SLAVE_ERROR_OT:
slave_error = "Overtemperature"
elif errorargs[1] == SLAVE_ERROR_BQ:
slave_error = "BQ Communication error"
elif errorargs[1] == SLAVE_ERROR_TMP144:
slave_error = "TMP144 communication error"
else:
slave_error = f"Unknown error ({errorargs[1]})"
data.panic_errorarg = f"Slave ID {errorargs[0]}: {slave_error}"
elif errorcode == ERRORCODE_TIMEOUT_SLAVE_FRAMES:
data.panic_errorcode = "Slave frame timeout"
data.panic_errorarg = f"Slave ID {errorargs[0]}, frame {errorargs[1]}"
elif errorcode == ERRORCODE_TOO_FEW_TEMPS:
data.panic_errorcode = "Too few temperature sensors"
data.panic_errorarg = f"Slave ID {errorargs[0]}"
elif errorcode == ERRORCODE_TIMEOUT_SHUNT:
data.panic_errorcode = "Shunt timeout"
elif errorcode == ERRORCODE_MASTER_THRESH:
data.panic_errorcode = "Master detected threshold"
if errorargs[0] == MASTER_THRESH_UT:
data.panic_errorarg = "Undertemperature"
elif errorargs[0] == MASTER_THRESH_OT:
data.panic_errorarg = "Overtemperature"
elif errorargs[0] == MASTER_THRESH_UV:
data.panic_errorarg = "Undervoltage"
elif errorargs[0] == MASTER_THRESH_OV:
data.panic_errorarg = "Overvoltage"
else:
data.panic_errorarg = f"Unknown threshold ({errorargs[0]})"
else:
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
def decode_status_frame(self, buf: bytes):
state = buf[0] & 0x7F
data.ts_state = TSState(state)
data.panic = data.ts_state == TSState.TS_ERROR
def parse_cell_temps(self, slave: int):
temps = list(
filter(lambda t: t > 0 and t < 60, data.slaves[slave].cell_temps[:16])
) )
if len(temps) == 0:
temps = [-1]
min_t = min(temps)
max_t = max(temps)
# for i in range(16, 32):
# data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t)
def stop(self):
self.continue_run = False # set the run condition to false on stop
kb = KBHit() if __name__ == "__main__":
while True: data = AccumulatorData()
rx_data = ser.read(32) rx_buf = bytes()
if len(rx_data) > 0:
rx_buf = rx_data if len(sys.argv) != 2:
if (start := check_log_start(rx_buf)) != -1: print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
decode_log_frame(rx_buf[start + 3 : start + 11]) sys.exit(os.EX_USAGE)
rx_buf = b""
elif (start := check_current_start(rx_buf)) != -1: SERIAL_PORT = sys.argv[1]
decode_current_frame(rx_buf[start + 3 : start + 11]) print(SERIAL_PORT)
rx_buf = b"" ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
elif (start := check_panic_start(rx_buf)) != -1:
decode_panic_frame(rx_buf[start + 3 : start + 11]) app = QApplication(sys.argv)
rx_buf = b"" gui = Window()
if kb.kbhit(): gui.show()
c = kb.getch()
if c == "C": timer = QTimer()
print(f"KBHIT: {c}", file=sys.stderr) # timer.timeout.connect(data.fill_dummy_data)
print(ser.write(b"C"), file=sys.stderr) timer.timeout.connect(gui.update)
update_display() timer.start(1000) # every 1,000 milliseconds
sys.exit(app.exec_())

81
poetry.lock generated Normal file
View File

@ -0,0 +1,81 @@
[[package]]
name = "pyqt5"
version = "5.15.7"
description = "Python bindings for the Qt cross platform application toolkit"
category = "main"
optional = false
python-versions = ">=3.7"
[package.dependencies]
PyQt5-Qt5 = ">=5.15.0"
PyQt5-sip = ">=12.11,<13"
[[package]]
name = "pyqt5-qt5"
version = "5.15.2"
description = "The subset of a Qt installation needed by PyQt5."
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "pyqt5-sip"
version = "12.11.0"
description = "The sip module support for PyQt5"
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "pyserial"
version = "3.5"
description = "Python Serial Port Extension"
category = "main"
optional = false
python-versions = "*"
[package.extras]
cp2110 = ["hidapi"]
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "9d0dcef05f7296031f76c658f997974caf9b45c1efa59678ef5823b803580e43"
[metadata.files]
pyqt5 = [
{file = "PyQt5-5.15.7-cp37-abi3-macosx_10_13_x86_64.whl", hash = "sha256:1a793748c60d5aff3850b7abf84d47c1d41edb11231b7d7c16bef602c36be643"},
{file = "PyQt5-5.15.7-cp37-abi3-manylinux1_x86_64.whl", hash = "sha256:e319c9d8639e0729235c1b09c99afdadad96fa3dbd8392ab561b5ab5946ee6ef"},
{file = "PyQt5-5.15.7-cp37-abi3-win32.whl", hash = "sha256:08694f0a4c7d4f3d36b2311b1920e6283240ad3b7c09b515e08262e195dcdf37"},
{file = "PyQt5-5.15.7-cp37-abi3-win_amd64.whl", hash = "sha256:232fe5b135a095cbd024cf341d928fc672c963f88e6a52b0c605be8177c2fdb5"},
{file = "PyQt5-5.15.7.tar.gz", hash = "sha256:755121a52b3a08cb07275c10ebb96576d36e320e572591db16cfdbc558101594"},
]
pyqt5-qt5 = [
{file = "PyQt5_Qt5-5.15.2-py3-none-macosx_10_13_intel.whl", hash = "sha256:76980cd3d7ae87e3c7a33bfebfaee84448fd650bad6840471d6cae199b56e154"},
{file = "PyQt5_Qt5-5.15.2-py3-none-manylinux2014_x86_64.whl", hash = "sha256:1988f364ec8caf87a6ee5d5a3a5210d57539988bf8e84714c7d60972692e2f4a"},
{file = "PyQt5_Qt5-5.15.2-py3-none-win32.whl", hash = "sha256:9cc7a768b1921f4b982ebc00a318ccb38578e44e45316c7a4a850e953e1dd327"},
{file = "PyQt5_Qt5-5.15.2-py3-none-win_amd64.whl", hash = "sha256:750b78e4dba6bdf1607febedc08738e318ea09e9b10aea9ff0d73073f11f6962"},
]
pyqt5-sip = [
{file = "PyQt5_sip-12.11.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:f1f9e312ff8284d6dfebc5366f6f7d103f84eec23a4da0be0482403933e68660"},
{file = "PyQt5_sip-12.11.0-cp310-cp310-manylinux1_x86_64.whl", hash = "sha256:4031547dfb679be309094bfa79254f5badc5ddbe66b9ad38e319d84a7d612443"},
{file = "PyQt5_sip-12.11.0-cp310-cp310-win32.whl", hash = "sha256:ad21ca0ee8cae2a41b61fc04949dccfab6fe008749627d94e8c7078cb7a73af1"},
{file = "PyQt5_sip-12.11.0-cp310-cp310-win_amd64.whl", hash = "sha256:3126c84568ab341c12e46ded2230f62a9a78752a70fdab13713f89a71cd44f73"},
{file = "PyQt5_sip-12.11.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:0f77655c62ec91d47c2c99143f248624d44dd2d8a12d016e7c020508ad418aca"},
{file = "PyQt5_sip-12.11.0-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:ec5e9ef78852e1f96f86d7e15c9215878422b83dde36d44f1539a3062942f19c"},
{file = "PyQt5_sip-12.11.0-cp37-cp37m-win32.whl", hash = "sha256:d12b81c3a08abf7657a2ebc7d3649852a1f327eb2146ebadf45930486d32e920"},
{file = "PyQt5_sip-12.11.0-cp37-cp37m-win_amd64.whl", hash = "sha256:b69a1911f768b489846335e31e49eb34795c6b5a038ca24d894d751e3b0b44da"},
{file = "PyQt5_sip-12.11.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:51e377789d59196213eddf458e6927f33ba9d217b614d17d20df16c9a8b2c41c"},
{file = "PyQt5_sip-12.11.0-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:4e5c1559311515291ea0ab0635529f14536954e3b973a7c7890ab7e4de1c2c23"},
{file = "PyQt5_sip-12.11.0-cp38-cp38-win32.whl", hash = "sha256:9bca450c5306890cb002fe36bbca18f979dd9e5b810b766dce8e3ce5e66ba795"},
{file = "PyQt5_sip-12.11.0-cp38-cp38-win_amd64.whl", hash = "sha256:f6b72035da4e8fecbb0bc4a972e30a5674a9ad5608dbddaa517e983782dbf3bf"},
{file = "PyQt5_sip-12.11.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:9356260d4feb60dbac0ab66f8a791a0d2cda1bf98c9dec8e575904a045fbf7c5"},
{file = "PyQt5_sip-12.11.0-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:205f3e1b3eea3597d8e878936c1a06e04bd23a59e8b179ee806465d72eea3071"},
{file = "PyQt5_sip-12.11.0-cp39-cp39-win32.whl", hash = "sha256:686071be054e5be6ca5aaaef7960931d4ba917277e839e2e978c7cbe3f43bb6e"},
{file = "PyQt5_sip-12.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:42320e7a94b1085ed85d49794ed4ccfe86f1cae80b44a894db908a8aba2bc60e"},
{file = "PyQt5_sip-12.11.0.tar.gz", hash = "sha256:b4710fd85b57edef716cc55fae45bfd5bfac6fc7ba91036f1dcc3f331ca0eb39"},
]
pyserial = [
{file = "pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0"},
{file = "pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb"},
]

16
pyproject.toml Normal file
View File

@ -0,0 +1,16 @@
[tool.poetry]
name = "charger-display"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = "^3.10"
PyQt5 = "^5.15.7"
pyserial = "^3.5"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"