Compare commits

..

No commits in common. "gui" and "master" have entirely different histories.
gui ... master

3 changed files with 210 additions and 770 deletions

View File

@ -1,74 +1,126 @@
#!/usr/bin/env python3
from enum import Enum
import math
import os
import struct
import time
import random
import serial
import sys
import numpy as np
from PyQt5.QtCore import Qt, QTimer, QThread, QObject, pyqtSignal
from PyQt5.QtWidgets import (
QApplication,
QWidget,
QPushButton,
QVBoxLayout,
QHBoxLayout,
QGridLayout,
QLabel,
QGroupBox,
QFrame,
QSizePolicy,
)
BITRATE = 115200 # baud/s
TIMEOUT = 1 # seconds
N_SLAVES = 9
N_SLAVES = 6
LOG_FRAME_LENGTH = 8 # bytes
PARALLEL_CELLS = 9
CELLS_PER_SLAVE = 10
TEMP_SENSORS_PER_SLAVE = 32
VOLTAGE_CONV = 5.0 / 0xFFFF # volts/quantum
TEMP_CONV = 0.0625 # °C/quantum
VOLTAGE_CONV = 5.0 / 255 # volts/quantum
TEMP_CONV = 0.0625 * 16 # °C/quantum
ERRORCODE_TIMEOUT_SLAVE = 1
ERRORCODE_SLAVE_PANIC = 2
ERRORCODE_TIMEOUT_SLAVE_FRAMES = 3
ERRORCODE_TOO_FEW_TEMPS = 4
ERRORCODE_TIMEOUT_SHUNT = 6
ERRORCODE_MASTER_THRESH = 7
SLAVE_ERROR_UV = 0
SLAVE_ERROR_OV = 1
SLAVE_ERROR_UT = 2
SLAVE_ERROR_OT = 3
SLAVE_ERROR_BQ = 4
SLAVE_ERROR_TMP144 = 5
MASTER_THRESH_UT = 0
MASTER_THRESH_OT = 1
MASTER_THRESH_UV = 2
MASTER_THRESH_OV = 3
import os
INTERNAL_RESISTANCE_CURVE_X = [2.0, 4.12]
INTERNAL_RESISTANCE_CURVE_Y = [0.0528, 0.0294]
SOC_OCV_X = [2.1, 2.9, 3.2, 3.3, 3.4, 3.5, 3.68, 4.0, 4.15, 4.2]
SOC_OCV_Y = [0, 0.023, 0.06, 0.08, 0.119, 0.227, 0.541, 0.856, 0.985, 1.0]
# Windows
if os.name == "nt":
import msvcrt
# Posix (Linux, OS X)
else:
import sys
import termios
import atexit
from select import select
def estimate_soc(voltage: float) -> float:
r_i = np.interp(
[voltage],
INTERNAL_RESISTANCE_CURVE_X,
INTERNAL_RESISTANCE_CURVE_Y,
)[0]
i = data.current / PARALLEL_CELLS
ocv = voltage - i * r_i
return np.interp([ocv], SOC_OCV_X, SOC_OCV_Y)[0] * 100
class KBHit:
def __init__(self):
"""Creates a KBHit object that you can call to do various keyboard things."""
if os.name == "nt":
pass
else:
# Save the terminal settings
self.fd = sys.stdin.fileno()
self.new_term = termios.tcgetattr(self.fd)
self.old_term = termios.tcgetattr(self.fd)
# New terminal setting unbuffered
self.new_term[3] = self.new_term[3] & ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)
# Support normal-terminal reset at exit
atexit.register(self.set_normal_term)
def set_normal_term(self):
"""Resets to normal terminal. On Windows this is a no-op."""
if os.name == "nt":
pass
else:
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)
def getch(self):
"""Returns a keyboard character after kbhit() has been called.
Should not be called in the same program as getarrow().
"""
s = ""
if os.name == "nt":
return msvcrt.getch().decode("utf-8")
else:
return sys.stdin.read(1)
def getarrow(self):
"""Returns an arrow-key code after kbhit() has been called. Codes are
0 : up
1 : right
2 : down
3 : left
Should not be called in the same program as getch().
"""
if os.name == "nt":
msvcrt.getch() # skip 0xE0
c = msvcrt.getch()
vals = [72, 77, 80, 75]
else:
c = sys.stdin.read(3)[2]
vals = [65, 67, 66, 68]
return vals.index(ord(c.decode("utf-8")))
def kbhit(self):
"""Returns True if keyboard character was hit, False otherwise."""
if os.name == "nt":
return msvcrt.kbhit()
else:
dr, dw, de = select([sys.stdin], [], [], 0)
return dr != []
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
sys.exit(os.EX_USAGE)
SERIAL_PORT = sys.argv[1]
ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
def check_log_start(buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(buf: bytes):
return buf[:-12].find(b"PAN")
class SlaveData:
@ -77,653 +129,138 @@ class SlaveData:
def __init__(self) -> None:
self.cell_voltages = [-1] * CELLS_PER_SLAVE
self.cell_temps = ([-1] * (TEMP_SENSORS_PER_SLAVE // 2)) + (
[0] * (TEMP_SENSORS_PER_SLAVE // 2)
)
class TSState(Enum):
TS_INACTIVE = 0
TS_ACTIVE = 1
TS_PRECHARGE = 2
TS_DISCHARGE = 3
TS_ERROR = 4
TS_CHARGING_CHECK = 5
TS_CHARGING = 6
TS_BALANCING = 7
self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE
class AccumulatorData:
slaves: list[SlaveData]
min_voltage: float
max_voltage: float
min_soc: float
max_soc: float
min_temp: float
max_temp: float
last_frame: float
time_since_last_frame: float
current: float
panic: bool
panic_errorcode: int
panic_errorarg: int
ts_state: TSState
def __init__(self) -> None:
self.slaves = [SlaveData() for _ in range(N_SLAVES)]
self.min_voltage = (
self.max_voltage
) = (
self.min_soc
) = (
self.max_soc
) = self.min_temp = self.max_temp = self.last_frame = self.current = 0
self.panic = False
self.panic_errorcode = self.panic_errorarg = 0
self.time_since_last_frame = 0
self.ts_state = TSState.TS_INACTIVE
def fill_dummy_data(self):
self.min_voltage = random.uniform(1, 3)
self.max_voltage = random.uniform(3, 5)
self.min_temp = random.uniform(0, 25)
self.max_temp = random.uniform(25, 60)
self.current = random.uniform(25, 60)
self.last_frame = time.time() - random.random()
self.panic = random.choice([True, False])
if self.panic:
self.panic_errorcode = random.randint(1, 10000)
self.panic_errorarg = "ABCDERFG"
else:
self.panic_errorcode = 0
self.panic_errorarg = ""
data.time_since_last_frame = random.uniform(1, 60)
for i in range(N_SLAVES):
self.slaves[i].cell_voltages = [
random.uniform(1, 5) for i in range(CELLS_PER_SLAVE)
]
self.slaves[i].cell_temps = [
random.uniform(25, 60) for i in range(TEMP_SENSORS_PER_SLAVE)
]
class StackGuiElement:
title: str
stack_id: int
voltage_label: QLabel
soc_label: QLabel
temp_label: QLabel
groupBox: QGroupBox
detail_popup: QWidget
def __init__(self, title: str, stack_id: int):
self.title = title
self.stack_id = stack_id
self.voltage_label = QLabel()
self.soc_label = QLabel()
self.temp_label = QLabel()
self.groupBox = QGroupBox()
self.detail_popup = None
self.__post__init__()
def __post__init__(self):
l1 = QLabel("Voltage [V]")
l2 = QLabel("SoC [%]")
l3 = QLabel("Temperature [°C]")
popup_btn = QPushButton("Details")
popup_btn.clicked.connect(self.show_popup)
l1.setAlignment(Qt.AlignLeft)
l2.setAlignment(Qt.AlignLeft)
l3.setAlignment(Qt.AlignLeft)
self.voltage_label.setAlignment(Qt.AlignLeft)
self.soc_label.setAlignment(Qt.AlignLeft)
self.temp_label.setAlignment(Qt.AlignLeft)
grid = QGridLayout()
grid.addWidget(l1, 0, 0)
grid.addWidget(l2, 1, 0)
grid.addWidget(l3, 2, 0)
grid.addWidget(self.voltage_label, 0, 1)
grid.addWidget(self.soc_label, 1, 1)
grid.addWidget(self.temp_label, 2, 1)
grid.addWidget(popup_btn, 3, 0)
self.groupBox.setTitle(self.title)
self.groupBox.setLayout(grid)
def update_data_from_slave(self, slave: SlaveData):
min_v, max_v = min(slave.cell_voltages), max(slave.cell_voltages)
self.voltage_label.setText(f"[{min_v:.02f}, {max_v:.02f}]")
min_soc, max_soc = estimate_soc(min_v), estimate_soc(max_v)
self.soc_label.setText(f"[{min_soc:02.0f}, {max_soc:02.0f}]")
min_t, max_t = min(slave.cell_temps), max(slave.cell_temps)
self.temp_label.setText(f"[{min_t:.02f}, {max_t:.02f}]")
def show_popup(self):
if self.detail_popup is None:
self.detail_popup = StackPopup(self.stack_id)
self.detail_popup.show()
rx_buf = bytes()
data = AccumulatorData()
class QVSeperationLine(QFrame):
"""
a vertical seperation line
"""
def __init__(self):
super().__init__()
self.setFixedWidth(20)
self.setMinimumHeight(1)
self.setFrameShape(QFrame.VLine)
self.setFrameShadow(QFrame.Sunken)
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred)
def decode_log_frame(buf: bytes):
msg_id = buf[0]
slave = msg_id >> 4
frame_id = msg_id & 0x0F
if slave >= N_SLAVES:
return
class StackPopup(QWidget):
stack_id: int
voltage_labels: list[QLabel]
temp_labels: list[QLabel]
def __init__(self, stack_id: int):
super().__init__()
self.stack_id = stack_id
self.voltage_labels = []
self.temp_labels = []
layout = QVBoxLayout()
groupbox = QGroupBox()
grid = QGridLayout()
for i in range(len(data.slaves[stack_id].cell_voltages)):
l1 = QLabel(f"Voltage Cell {i}")
l1.setAlignment(Qt.AlignLeft)
l_v = QLabel()
l_v.setNum(data.slaves[stack_id].cell_voltages[i])
l_v.setAlignment(Qt.AlignLeft)
self.voltage_labels.append(l_v)
grid.addWidget(l1, i, 0)
grid.addWidget(l_v, i, 1)
# vertical separators between rows in popup
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 2, CELLS_PER_SLAVE, 1)
num_temp_cols = len(data.slaves[stack_id].cell_temps) // CELLS_PER_SLAVE
for i in range(num_temp_cols):
separator_vertical = QVSeperationLine()
grid.addWidget(separator_vertical, 0, 5 + i * 3, CELLS_PER_SLAVE, 1)
for i in range(len(data.slaves[stack_id].cell_temps)):
l2 = QLabel(f"Temp. Sensor {i}")
l2.setAlignment(Qt.AlignLeft)
l_t = QLabel()
l_t.setNum(data.slaves[stack_id].cell_temps[i])
l_t.setAlignment(Qt.AlignLeft)
self.temp_labels.append(l_t)
grid.addWidget(l2, i % CELLS_PER_SLAVE, 3 + (i // CELLS_PER_SLAVE) * 3)
grid.addWidget(l_t, i % CELLS_PER_SLAVE, 4 + (i // CELLS_PER_SLAVE) * 3)
groupbox.setTitle(f"Stack {stack_id}")
groupbox.setLayout(grid)
layout.addWidget(groupbox)
self.setLayout(layout)
self.setWindowTitle(f"FT22 Charger Display - Stack {stack_id} Details")
self.update_data()
timer.timeout.connect(self.update_data)
def update_data(self):
for i in range(len(data.slaves[self.stack_id].cell_voltages)):
self.voltage_labels[i].setNum(data.slaves[self.stack_id].cell_voltages[i])
for i in range(len(data.slaves[self.stack_id].cell_temps)):
self.temp_labels[i].setNum(data.slaves[self.stack_id].cell_temps[i])
class Window(QWidget):
stop_signal = (
pyqtSignal()
) # make a stop signal to communicate with the worker in another thread
def __init__(self, parent=None):
super(Window, self).__init__(parent)
win = QVBoxLayout()
### ACCUMULATOR GENERAL ###
self.l1 = QLabel("Min Voltage [V]")
self.l1.setAlignment(Qt.AlignLeft)
self.l_min_voltage = QLabel()
self.l_min_voltage.setNum(data.min_voltage)
self.l_min_voltage.setAlignment(Qt.AlignLeft)
self.l2 = QLabel("Max Voltage [V]")
self.l2.setAlignment(Qt.AlignLeft)
self.l_max_voltage = QLabel()
self.l_max_voltage.setNum(data.max_voltage)
self.l_max_voltage.setAlignment(Qt.AlignLeft)
self.l3 = QLabel("Min SoC [%]")
self.l3.setAlignment(Qt.AlignLeft)
self.l_min_soc = QLabel()
self.l_min_soc.setNum(data.min_soc)
self.l_min_soc.setAlignment(Qt.AlignLeft)
self.l4 = QLabel("Max SoC [%]")
self.l4.setAlignment(Qt.AlignLeft)
self.l_max_soc = QLabel()
self.l_max_soc.setNum(data.max_soc)
self.l_max_soc.setAlignment(Qt.AlignLeft)
self.l5 = QLabel("Min Temperature [°C]")
self.l5.setAlignment(Qt.AlignLeft)
self.l_min_temp = QLabel()
self.l_min_temp.setNum(data.min_temp)
self.l_min_temp.setAlignment(Qt.AlignLeft)
self.l6 = QLabel("Max Temperature [°C]")
self.l6.setAlignment(Qt.AlignLeft)
self.l_max_temp = QLabel()
self.l_max_temp.setNum(data.max_temp)
self.l_max_temp.setAlignment(Qt.AlignLeft)
self.l7 = QLabel("Current [A]")
self.l7.setAlignment(Qt.AlignLeft)
self.l_current = QLabel()
self.l_current.setNum(data.current)
self.l_current.setAlignment(Qt.AlignLeft)
self.l8 = QLabel("State")
self.l_state = QLabel()
self.l_state.setText(data.ts_state.name)
self.l_state.setAlignment(Qt.AlignLeft)
self.l_errorcode = QLabel()
self.l_errorcode.setText("")
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l_errorarg = QLabel()
self.l_errorarg.setText("")
self.l_errorcode.setAlignment(Qt.AlignLeft)
self.l9 = QLabel("Time Since Last Dataframe")
self.l_time_since_last_frame = QLabel()
self.l_time_since_last_frame.setText(str(data.time_since_last_frame))
self.l_time_since_last_frame.setAlignment(Qt.AlignLeft)
### LAYOUT ACCUMULATOR GENERAL ###
grid_accumulator = QGridLayout()
grid_accumulator.addWidget(self.l1, 0, 0)
grid_accumulator.addWidget(self.l2, 1, 0)
grid_accumulator.addWidget(self.l3, 2, 0)
grid_accumulator.addWidget(self.l4, 3, 0)
grid_accumulator.addWidget(self.l5, 0, 2)
grid_accumulator.addWidget(self.l6, 1, 2)
grid_accumulator.addWidget(self.l7, 2, 2)
grid_accumulator.addWidget(self.l8, 5, 0)
grid_accumulator.addWidget(self.l9, 3, 2)
grid_accumulator.addWidget(self.l_min_voltage, 0, 1)
grid_accumulator.addWidget(self.l_max_voltage, 1, 1)
grid_accumulator.addWidget(self.l_min_soc, 2, 1)
grid_accumulator.addWidget(self.l_max_soc, 3, 1)
grid_accumulator.addWidget(self.l_min_temp, 0, 3)
grid_accumulator.addWidget(self.l_max_temp, 1, 3)
grid_accumulator.addWidget(self.l_current, 2, 3)
grid_accumulator.addWidget(self.l_state, 5, 1)
grid_accumulator.addWidget(self.l_errorcode, 5, 2)
grid_accumulator.addWidget(self.l_errorarg, 5, 3)
grid_accumulator.addWidget(self.l_time_since_last_frame, 3, 3)
groupBox_accumulator = QGroupBox("Accumulator General")
groupBox_accumulator.setLayout(grid_accumulator)
win.addWidget(groupBox_accumulator)
### STACKS ###
self.stack_gui_elements = []
for i in range(N_SLAVES):
sge = StackGuiElement(f"Stack {i}", i)
sge.update_data_from_slave(data.slaves[i])
self.stack_gui_elements.append(sge)
### LAYOUT STACKS ###
n_slaves_half = math.ceil(N_SLAVES / 2)
grid_stacks = QGridLayout()
for i, sge in enumerate(self.stack_gui_elements):
grid_stacks.addWidget(
sge.groupBox, 0 if i < n_slaves_half else 1, i % n_slaves_half
)
groupBox_stacks = QGroupBox("Individual Stacks")
groupBox_stacks.setLayout(grid_stacks)
win.addWidget(groupBox_stacks)
### BUTTONS ###
self.btn_start = QPushButton("Start Serial")
self.btn_start.resize(self.btn_start.sizeHint())
self.btn_start.move(50, 50)
self.btn_stop = QPushButton("Stop Serial")
self.btn_stop.resize(self.btn_stop.sizeHint())
self.btn_stop.move(150, 50)
self.btn_charge = QPushButton("Start Charging")
self.btn_charge.resize(self.btn_charge.sizeHint())
self.btn_charge.move(250, 50)
self.btn_balance = QPushButton("Start Balancing")
self.btn_balance.resize(self.btn_balance.sizeHint())
self.btn_balance.move(350, 50)
### LAYOUT BUTTONS ###
vbox_controls = QVBoxLayout()
vbox_controls.addWidget(self.btn_start)
vbox_controls.addWidget(self.btn_stop)
vbox_controls.addWidget(self.btn_charge)
vbox_controls.addWidget(self.btn_balance)
groupBox_controls = QGroupBox("Controls")
groupBox_controls.setLayout(vbox_controls)
win.addWidget(groupBox_controls)
# Start Button action
self.btn_start.clicked.connect(self.start_thread)
# Stop Button action
self.btn_stop.clicked.connect(self.stop_thread)
# Charge Button action
self.btn_charge.clicked.connect(self.start_charge)
# Balance Button action
self.btn_balance.clicked.connect(self.start_balance)
self.setLayout(win)
self.setWindowTitle("FT22 Charger Display")
# When start_btn is clicked this runs. Creates the worker and the thread.
def start_thread(self):
self.thread = QThread()
self.worker = Worker()
self.stop_signal.connect(
self.worker.stop
) # connect stop signal to worker stop method
self.worker.moveToThread(self.thread)
self.worker.finished.connect(
self.thread.quit
) # connect the workers finished signal to stop thread
self.worker.finished.connect(
self.worker.deleteLater
) # connect the workers finished signal to clean up worker
self.thread.finished.connect(
self.thread.deleteLater
) # connect threads finished signal to clean up thread
self.thread.started.connect(self.worker.do_work)
self.thread.finished.connect(self.worker.stop)
self.thread.start()
# When stop_btn is clicked this runs. Terminates the worker and the thread.
def stop_thread(self):
self.stop_signal.emit() # emit the finished signal on stop
def start_charge(self):
ser.write(b"C")
def start_balance(self):
ser.write(b"B")
def update(self):
# Accumulator
self.l_min_voltage.setText(f"{data.min_voltage:.02f}")
self.l_max_voltage.setText(f"{data.max_voltage:.02f}")
self.l_min_soc.setText(f"{data.min_soc:02.0f}")
self.l_max_soc.setText(f"{data.max_soc:02.0f}")
self.l_min_temp.setText(f"{data.min_temp:.02f}")
self.l_max_temp.setText(f"{data.max_temp:.02f}")
self.l_current.setText(f"{data.current:.02f}")
self.l_state.setText(data.ts_state.name)
if data.panic:
self.l_errorcode.setText(str(data.panic_errorcode))
self.l_errorarg.setText(str(data.panic_errorarg))
for l in (self.l_state, self.l_errorcode, self.l_errorarg):
l.setStyleSheet("color: red; font-weight: bold;")
elif data.ts_state == TSState.TS_CHARGING:
for l in (self.l_state, self.l_errorcode, self.l_errorarg):
l.setStyleSheet("color: green; font-weight: bold;")
else:
self.l_errorcode.setText("")
self.l_errorarg.setText("")
for l in (self.l_state, self.l_errorcode, self.l_errorarg):
l.setStyleSheet("color: black; font-weight: normal;")
last_time = time.time() - data.last_frame
self.l_time_since_last_frame.setText(f"{last_time:.03f}")
# Stacks
for i, sge in enumerate(self.stack_gui_elements):
sge.update_data_from_slave(data.slaves[i])
class Worker(QObject):
finished = pyqtSignal() # give worker class a finished signal
def __init__(self, parent=None):
QObject.__init__(self, parent=parent)
self.continue_run = True # provide a bool run condition for the class
def do_work(self):
i = 1
while self.continue_run: # give the loop a stoppable condition
# data.fill_dummy_data()
self.charger_communication()
# QThread.msleep(1)
self.finished.emit() # emit the finished signal when the loop is done
def charger_communication(self):
rx_data = ser.read(256)
# print(len(rx_data), rx_data)
while len(rx_data) > 0:
if (frame_start := self.check_log_start(rx_data)) != -1:
self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_current_start(rx_data)) != -1:
self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_panic_start(rx_data)) != -1:
self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
elif (frame_start := self.check_status_start(rx_data)) != -1:
self.decode_status_frame(rx_data[frame_start + 3 : frame_start + 11])
rx_data = rx_data[frame_start + 11 :]
continue
break
def check_log_start(self, buf: bytes):
return buf[:-12].find(b"LOG")
def check_current_start(self, buf: bytes):
return buf[:-12].find(b"CUR")
def check_panic_start(self, buf: bytes):
return buf[:-12].find(b"PAN")
def check_status_start(self, buf: bytes):
return buf[:-12].find(b"STA")
def decode_log_frame(self, buf: bytes):
msg_id = buf[0]
slave = msg_id >> 4
frame_id = msg_id & 0x0F
if slave >= N_SLAVES:
print(f"Unknown slave: {slave}", file=sys.stderr)
return
if frame_id == 0:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_voltages[i] = raw * VOLTAGE_CONV
elif frame_id == 1:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_voltages[i + 3] = raw * VOLTAGE_CONV
elif frame_id == 2:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_voltages[i + 6] = raw * VOLTAGE_CONV
elif frame_id == 3:
for i in range(1):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_voltages[i + 9] = raw * VOLTAGE_CONV
for i in range(2):
raw = (buf[i * 2 + 3] << 8) | buf[i * 2 + 4]
data.slaves[slave].cell_temps[i] = raw * TEMP_CONV
elif frame_id == 4:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 2] = raw * TEMP_CONV
elif frame_id == 5:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 5] = raw * TEMP_CONV
elif frame_id == 6:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 8] = raw * TEMP_CONV
elif frame_id == 7:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 11] = raw * TEMP_CONV
elif frame_id == 8:
for i in range(3):
raw = (buf[i * 2 + 1] << 8) | buf[i * 2 + 2]
data.slaves[slave].cell_temps[i + 14] = raw * TEMP_CONV
else:
print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
# time.sleep(1)
return
voltages = [
slave.cell_voltages[i]
for i in range(CELLS_PER_SLAVE)
for slave in data.slaves
]
self.parse_cell_temps(slave)
temps = [
slave.cell_temps[i]
for i in range(TEMP_SENSORS_PER_SLAVE)
for slave in data.slaves
]
data.min_voltage = min(voltages)
data.max_voltage = max(voltages)
data.min_soc = estimate_soc(data.min_voltage)
data.max_soc = estimate_soc(data.max_voltage)
data.min_temp = min(temps)
data.max_temp = max(temps)
data.time_since_last_frame = time.time() - data.last_frame
data.last_frame = time.time()
def decode_current_frame(self, buf: bytes):
# current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
current = struct.unpack(">i", buf[2:6])[0]
data.current = current / 1000.0
def decode_panic_frame(self, buf: bytes):
data.panic = True
errorcode = int(buf[0])
errorargs = [int(buf[1]), int(buf[2])]
if errorcode == ERRORCODE_TIMEOUT_SLAVE:
data.panic_errorcode = "Slave timeout"
data.panic_errorarg = f"Slave ID {errorargs[0]}"
elif errorcode == ERRORCODE_SLAVE_PANIC:
data.panic_errorcode = "Slave panic"
if errorargs[1] == SLAVE_ERROR_UV:
slave_error = "Undervoltage"
elif errorargs[1] == SLAVE_ERROR_OV:
slave_error = "Overvoltage"
elif errorargs[1] == SLAVE_ERROR_UT:
slave_error = "Undertemperature"
elif errorargs[1] == SLAVE_ERROR_OT:
slave_error = "Overtemperature"
elif errorargs[1] == SLAVE_ERROR_BQ:
slave_error = "BQ Communication error"
elif errorargs[1] == SLAVE_ERROR_TMP144:
slave_error = "TMP144 communication error"
else:
slave_error = f"Unknown error ({errorargs[1]})"
data.panic_errorarg = f"Slave ID {errorargs[0]}: {slave_error}"
elif errorcode == ERRORCODE_TIMEOUT_SLAVE_FRAMES:
data.panic_errorcode = "Slave frame timeout"
data.panic_errorarg = f"Slave ID {errorargs[0]}, frame {errorargs[1]}"
elif errorcode == ERRORCODE_TOO_FEW_TEMPS:
data.panic_errorcode = "Too few temperature sensors"
data.panic_errorarg = f"Slave ID {errorargs[0]}"
elif errorcode == ERRORCODE_TIMEOUT_SHUNT:
data.panic_errorcode = "Shunt timeout"
elif errorcode == ERRORCODE_MASTER_THRESH:
data.panic_errorcode = "Master detected threshold"
if errorargs[0] == MASTER_THRESH_UT:
data.panic_errorarg = "Undertemperature"
elif errorargs[0] == MASTER_THRESH_OT:
data.panic_errorarg = "Overtemperature"
elif errorargs[0] == MASTER_THRESH_UV:
data.panic_errorarg = "Undervoltage"
elif errorargs[0] == MASTER_THRESH_OV:
data.panic_errorarg = "Overvoltage"
else:
data.panic_errorarg = f"Unknown threshold ({errorargs[0]})"
else:
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
def decode_status_frame(self, buf: bytes):
state = buf[0] & 0x7F
data.ts_state = TSState(state)
data.panic = data.ts_state == TSState.TS_ERROR
def parse_cell_temps(self, slave: int):
temps = list(
filter(lambda t: t > 0 and t < 60, data.slaves[slave].cell_temps[:16])
if frame_id == 0:
for i in range(7):
data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV
elif frame_id == 1:
for i in range(3):
data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV
for i in range(4):
data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV
elif frame_id == 2:
for i in range(7):
data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV
elif frame_id == 3:
for i in range(7):
data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV
elif frame_id == 4:
for i in range(7):
data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV
elif frame_id == 5:
for i in range(7):
data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV
else:
# print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
# time.sleep(1)
return
data.last_frame = time.time()
def decode_current_frame(buf: bytes):
# current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
current = struct.unpack(">i", buf[2:6])[0]
data.current = current / 1000.0
def decode_panic_frame(buf: bytes):
data.panic = True
data.panic_errorcode = buf[0]
data.panic_errorarg = buf[1]
def update_display():
voltages = [
slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves
]
temps = [
slave.cell_temps[i]
for i in range(TEMP_SENSORS_PER_SLAVE)
for slave in data.slaves
]
data.min_voltage = min(voltages)
data.max_voltage = max(voltages)
data.min_temp = min(temps)
data.max_temp = max(temps)
time_since_last_frame = time.time() - data.last_frame
print("\033[2J\033[H", end="")
print("-" * 20)
if data.panic:
print("!!!!! PANIC !!!!!")
print(f"Error code: {data.panic_errorcode}")
print(f"Error arg: {data.panic_errorarg}")
time.sleep(0.5)
return
print(f"Time since last frame: {time_since_last_frame} s")
print(f"Current: {data.current:.2f} A")
print(f"Min voltage: {data.min_voltage:.2f} V")
print(f"Max voltage: {data.max_voltage:.2f} V")
print(f"Min temp: {data.min_temp:.1f} °C")
print(f"Max temp: {data.max_temp:.1f} °C")
for i in range(N_SLAVES):
min_v = min(data.slaves[i].cell_voltages)
max_v = max(data.slaves[i].cell_voltages)
min_t = min(data.slaves[i].cell_temps)
max_t = max(data.slaves[i].cell_temps)
print(
f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]"
)
if len(temps) == 0:
temps = [-1]
min_t = min(temps)
max_t = max(temps)
# for i in range(16, 32):
# data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t)
def stop(self):
self.continue_run = False # set the run condition to false on stop
if __name__ == "__main__":
data = AccumulatorData()
rx_buf = bytes()
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
sys.exit(os.EX_USAGE)
SERIAL_PORT = sys.argv[1]
print(SERIAL_PORT)
ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
app = QApplication(sys.argv)
gui = Window()
gui.show()
timer = QTimer()
# timer.timeout.connect(data.fill_dummy_data)
timer.timeout.connect(gui.update)
timer.start(1000) # every 1,000 milliseconds
sys.exit(app.exec_())
kb = KBHit()
while True:
rx_data = ser.read(32)
if len(rx_data) > 0:
rx_buf = rx_data
if (start := check_log_start(rx_buf)) != -1:
decode_log_frame(rx_buf[start + 3 : start + 11])
rx_buf = b""
elif (start := check_current_start(rx_buf)) != -1:
decode_current_frame(rx_buf[start + 3 : start + 11])
rx_buf = b""
elif (start := check_panic_start(rx_buf)) != -1:
decode_panic_frame(rx_buf[start + 3 : start + 11])
rx_buf = b""
if kb.kbhit():
c = kb.getch()
if c == "C":
print(f"KBHIT: {c}", file=sys.stderr)
print(ser.write(b"C"), file=sys.stderr)
update_display()

81
poetry.lock generated
View File

@ -1,81 +0,0 @@
[[package]]
name = "pyqt5"
version = "5.15.7"
description = "Python bindings for the Qt cross platform application toolkit"
category = "main"
optional = false
python-versions = ">=3.7"
[package.dependencies]
PyQt5-Qt5 = ">=5.15.0"
PyQt5-sip = ">=12.11,<13"
[[package]]
name = "pyqt5-qt5"
version = "5.15.2"
description = "The subset of a Qt installation needed by PyQt5."
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "pyqt5-sip"
version = "12.11.0"
description = "The sip module support for PyQt5"
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "pyserial"
version = "3.5"
description = "Python Serial Port Extension"
category = "main"
optional = false
python-versions = "*"
[package.extras]
cp2110 = ["hidapi"]
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "9d0dcef05f7296031f76c658f997974caf9b45c1efa59678ef5823b803580e43"
[metadata.files]
pyqt5 = [
{file = "PyQt5-5.15.7-cp37-abi3-macosx_10_13_x86_64.whl", hash = "sha256:1a793748c60d5aff3850b7abf84d47c1d41edb11231b7d7c16bef602c36be643"},
{file = "PyQt5-5.15.7-cp37-abi3-manylinux1_x86_64.whl", hash = "sha256:e319c9d8639e0729235c1b09c99afdadad96fa3dbd8392ab561b5ab5946ee6ef"},
{file = "PyQt5-5.15.7-cp37-abi3-win32.whl", hash = "sha256:08694f0a4c7d4f3d36b2311b1920e6283240ad3b7c09b515e08262e195dcdf37"},
{file = "PyQt5-5.15.7-cp37-abi3-win_amd64.whl", hash = "sha256:232fe5b135a095cbd024cf341d928fc672c963f88e6a52b0c605be8177c2fdb5"},
{file = "PyQt5-5.15.7.tar.gz", hash = "sha256:755121a52b3a08cb07275c10ebb96576d36e320e572591db16cfdbc558101594"},
]
pyqt5-qt5 = [
{file = "PyQt5_Qt5-5.15.2-py3-none-macosx_10_13_intel.whl", hash = "sha256:76980cd3d7ae87e3c7a33bfebfaee84448fd650bad6840471d6cae199b56e154"},
{file = "PyQt5_Qt5-5.15.2-py3-none-manylinux2014_x86_64.whl", hash = "sha256:1988f364ec8caf87a6ee5d5a3a5210d57539988bf8e84714c7d60972692e2f4a"},
{file = "PyQt5_Qt5-5.15.2-py3-none-win32.whl", hash = "sha256:9cc7a768b1921f4b982ebc00a318ccb38578e44e45316c7a4a850e953e1dd327"},
{file = "PyQt5_Qt5-5.15.2-py3-none-win_amd64.whl", hash = "sha256:750b78e4dba6bdf1607febedc08738e318ea09e9b10aea9ff0d73073f11f6962"},
]
pyqt5-sip = [
{file = "PyQt5_sip-12.11.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:f1f9e312ff8284d6dfebc5366f6f7d103f84eec23a4da0be0482403933e68660"},
{file = "PyQt5_sip-12.11.0-cp310-cp310-manylinux1_x86_64.whl", hash = "sha256:4031547dfb679be309094bfa79254f5badc5ddbe66b9ad38e319d84a7d612443"},
{file = "PyQt5_sip-12.11.0-cp310-cp310-win32.whl", hash = "sha256:ad21ca0ee8cae2a41b61fc04949dccfab6fe008749627d94e8c7078cb7a73af1"},
{file = "PyQt5_sip-12.11.0-cp310-cp310-win_amd64.whl", hash = "sha256:3126c84568ab341c12e46ded2230f62a9a78752a70fdab13713f89a71cd44f73"},
{file = "PyQt5_sip-12.11.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:0f77655c62ec91d47c2c99143f248624d44dd2d8a12d016e7c020508ad418aca"},
{file = "PyQt5_sip-12.11.0-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:ec5e9ef78852e1f96f86d7e15c9215878422b83dde36d44f1539a3062942f19c"},
{file = "PyQt5_sip-12.11.0-cp37-cp37m-win32.whl", hash = "sha256:d12b81c3a08abf7657a2ebc7d3649852a1f327eb2146ebadf45930486d32e920"},
{file = "PyQt5_sip-12.11.0-cp37-cp37m-win_amd64.whl", hash = "sha256:b69a1911f768b489846335e31e49eb34795c6b5a038ca24d894d751e3b0b44da"},
{file = "PyQt5_sip-12.11.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:51e377789d59196213eddf458e6927f33ba9d217b614d17d20df16c9a8b2c41c"},
{file = "PyQt5_sip-12.11.0-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:4e5c1559311515291ea0ab0635529f14536954e3b973a7c7890ab7e4de1c2c23"},
{file = "PyQt5_sip-12.11.0-cp38-cp38-win32.whl", hash = "sha256:9bca450c5306890cb002fe36bbca18f979dd9e5b810b766dce8e3ce5e66ba795"},
{file = "PyQt5_sip-12.11.0-cp38-cp38-win_amd64.whl", hash = "sha256:f6b72035da4e8fecbb0bc4a972e30a5674a9ad5608dbddaa517e983782dbf3bf"},
{file = "PyQt5_sip-12.11.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:9356260d4feb60dbac0ab66f8a791a0d2cda1bf98c9dec8e575904a045fbf7c5"},
{file = "PyQt5_sip-12.11.0-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:205f3e1b3eea3597d8e878936c1a06e04bd23a59e8b179ee806465d72eea3071"},
{file = "PyQt5_sip-12.11.0-cp39-cp39-win32.whl", hash = "sha256:686071be054e5be6ca5aaaef7960931d4ba917277e839e2e978c7cbe3f43bb6e"},
{file = "PyQt5_sip-12.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:42320e7a94b1085ed85d49794ed4ccfe86f1cae80b44a894db908a8aba2bc60e"},
{file = "PyQt5_sip-12.11.0.tar.gz", hash = "sha256:b4710fd85b57edef716cc55fae45bfd5bfac6fc7ba91036f1dcc3f331ca0eb39"},
]
pyserial = [
{file = "pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0"},
{file = "pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb"},
]

View File

@ -1,16 +0,0 @@
[tool.poetry]
name = "charger-display"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = "^3.10"
PyQt5 = "^5.15.7"
pyserial = "^3.5"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"