173 lines
4.9 KiB
Python
173 lines
4.9 KiB
Python
from abc import abstractmethod
|
|
import sys
|
|
import struct
|
|
import time
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
from .load import Load
|
|
from .temperatures import N_SENSORS
|
|
|
|
from PySide6.QtCore import QObject, Signal, Slot, Qt
|
|
|
|
from pymodbus.client import ModbusSerialClient
|
|
from pymodbus.payload import BinaryPayloadDecoder, Endian
|
|
|
|
MODBUS_BAUDRATE = 115200
|
|
VOLTAGE_ADDRESS = 0x00
|
|
CURRENT_ADDRESS = 2048
|
|
SLAVE_UNIT_ID = 0x02
|
|
|
|
NUM_CELLS = 3
|
|
VOLTAGE_QUANT = 1 / 10000.0
|
|
CURRENT_QUANT = 1 / (10000 * 500 * 72e-6)
|
|
|
|
THRESH_UV = 2.5
|
|
THRESH_OV = 4.2
|
|
THRESH_UT = 1
|
|
THRESH_OT = 60
|
|
THRESH_VOLTAGE_TIMEOUT = 1
|
|
THRESH_TEMP_TIMEOUT = 1
|
|
|
|
|
|
class BMSError(Enum):
|
|
NONE = 0
|
|
UV = 1
|
|
OV = 2
|
|
UT = 3
|
|
OT = 4
|
|
TIMEOUT_V = 5
|
|
TIMEOUT_T = 6
|
|
|
|
def __str__(self):
|
|
if self == self.NONE:
|
|
return ""
|
|
elif self == self.UV:
|
|
return "Undervoltage"
|
|
elif self == self.OV:
|
|
return "Overvoltage"
|
|
elif self == self.UT:
|
|
return "Undertemperature"
|
|
elif self == self.OT:
|
|
return "Overtemperature"
|
|
elif self == self.TIMEOUT_V:
|
|
return "Timeout (voltages)"
|
|
elif self == self.TIMEOUT_T:
|
|
return "Timeout (temps)"
|
|
else:
|
|
return "Unknown error"
|
|
|
|
|
|
@dataclass
|
|
class BMSData:
|
|
voltages: list[float]
|
|
temperatures: list[float]
|
|
error: BMSError
|
|
current: float
|
|
current_integrated: float
|
|
|
|
|
|
class BMS(QObject):
|
|
dataUpdated = Signal(BMSData)
|
|
|
|
_data: BMSData
|
|
_load: Load
|
|
_num_cells: int
|
|
_num_sensors: int
|
|
_last_voltage_time: float
|
|
_last_temp_time: float
|
|
|
|
def __init__(self, load: Load, num_cells: int, num_sensors: int):
|
|
super().__init__(None)
|
|
self._load = load
|
|
self._num_cells = num_cells
|
|
self._num_sensors = num_sensors
|
|
self._data = BMSData(
|
|
voltages=[0.0] * num_cells,
|
|
temperatures=[0.0] * num_sensors,
|
|
error=BMSError.NONE,
|
|
current=0,
|
|
current_integrated=0,
|
|
)
|
|
|
|
def _check_for_errors(self):
|
|
error = BMSError.NONE
|
|
for v in self._data.voltages:
|
|
if v > THRESH_OV:
|
|
error = BMSError.OV
|
|
break
|
|
elif v < THRESH_UV:
|
|
error = BMSError.UV
|
|
break
|
|
for t in self._data.temperatures:
|
|
if t > THRESH_OT:
|
|
error = BMSError.OT
|
|
break
|
|
if t < THRESH_UT:
|
|
error = BMSError.UT
|
|
break
|
|
now = time.time()
|
|
if now - self._last_voltage_time > THRESH_VOLTAGE_TIMEOUT:
|
|
error = BMSError.TIMEOUT_V
|
|
if now - self._last_temp_time > THRESH_TEMP_TIMEOUT:
|
|
error = BMSError.TIMEOUT_T
|
|
|
|
if error != self._data.error:
|
|
print(f"Error changed: {error}")
|
|
self._data.error = error
|
|
self._load.set_error(error != BMSError.NONE)
|
|
self.dataUpdated.emit(self._data)
|
|
|
|
@abstractmethod
|
|
def do_work(self):
|
|
pass
|
|
|
|
|
|
class BMSEvalBoard(BMS):
|
|
_dev: ModbusSerialClient
|
|
|
|
def __init__(self, uart_path: str, temperaturesUpdated: Signal, load: Load):
|
|
super().__init__(load, NUM_CELLS, N_SENSORS)
|
|
self._dev = ModbusSerialClient(
|
|
method="rtu", port=uart_path, baudrate=MODBUS_BAUDRATE
|
|
)
|
|
self._last_voltage_time = time.time()
|
|
self._last_temp_time = time.time()
|
|
temperaturesUpdated.connect(
|
|
self._updateTemperatures, Qt.ConnectionType.DirectConnection
|
|
)
|
|
|
|
def do_work(self):
|
|
while True:
|
|
self._check_for_errors()
|
|
time.sleep(0.1)
|
|
result = self._dev.read_holding_registers(
|
|
VOLTAGE_ADDRESS, NUM_CELLS, SLAVE_UNIT_ID
|
|
)
|
|
if result.isError():
|
|
print(f"ERROR READING VOLTAGES: {result}", file=sys.stderr)
|
|
continue
|
|
self._data.voltages = list(
|
|
map(lambda v: v * VOLTAGE_QUANT, result.registers)
|
|
)
|
|
self._last_voltage_time = time.time()
|
|
result = self._dev.read_holding_registers(CURRENT_ADDRESS, 6, SLAVE_UNIT_ID)
|
|
if result.isError():
|
|
print(f"ERROR READING CURRENT: {result}", file=sys.stderr)
|
|
continue
|
|
assert len(result.registers) == 6
|
|
dec = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big)
|
|
self._data.current = dec.decode_32bit_int() * CURRENT_QUANT
|
|
self._data.current_integrated = (
|
|
dec.decode_64bit_int() * CURRENT_QUANT * 0.05 / 3600
|
|
)
|
|
self.dataUpdated.emit(self._data)
|
|
|
|
@Slot(list)
|
|
def _updateTemperatures(self, temps: list[float]):
|
|
assert len(temps) == N_SENSORS
|
|
self._data.temperatures = temps
|
|
self._last_temp_time = time.time()
|
|
self._check_for_errors()
|
|
self.dataUpdated.emit(self._data)
|