from abc import abstractmethod import time from dataclasses import dataclass from enum import Enum from .load import Load from .temperatures import N_SENSORS from PySide6.QtCore import QObject, Signal, Slot, Qt from pymodbus.client import ModbusSerialClient MODBUS_BAUDRATE = 115200 VOLTAGE_ADDRESS = 0x00 SLAVE_UNIT_ID = 0x02 NUM_CELLS = 3 VOLTAGE_QUANT = 1 / 10000.0 THRESH_UV = 2.5 THRESH_OV = 4.2 THRESH_UT = 1 THRESH_OT = 60 THRESH_VOLTAGE_TIMEOUT = 1 THRESH_TEMP_TIMEOUT = 1 class BMSError(Enum): NONE = 0 UV = 1 OV = 2 UT = 3 OT = 4 TIMEOUT_V = 5 TIMEOUT_T = 6 def __str__(self): if self == self.NONE: return "" elif self == self.UV: return "Undervoltage" elif self == self.OV: return "Overvoltage" elif self == self.UT: return "Undertemperature" elif self == self.OT: return "Overtemperature" elif self == self.TIMEOUT_V: return "Timeout (voltages)" elif self == self.TIMEOUT_T: return "Timeout (temps)" else: return "Unknown error" @dataclass class BMSData: voltages: list[float] temperatures: list[float] error: BMSError class BMS(QObject): dataUpdated = Signal(BMSData) _data: BMSData _load: Load _num_cells: int _num_sensors: int _last_voltage_time: float _last_temp_time: float def __init__(self, load: Load, num_cells: int, num_sensors: int): super().__init__(None) self._load = load self._num_cells = num_cells self._num_sensors = num_sensors self._data = BMSData([0.0] * num_cells, [0.0] * num_sensors, BMSError.NONE) def _check_for_errors(self): error = BMSError.NONE for v in self._data.voltages: if v > THRESH_OV: error = BMSError.OV break elif v < THRESH_UV: error = BMSError.UV break for t in self._data.temperatures: if t > THRESH_OT: error = BMSError.OT break if t < THRESH_UT: error = BMSError.UT break now = time.time() if now - self._last_voltage_time > THRESH_VOLTAGE_TIMEOUT: error = BMSError.TIMEOUT_V if now - self._last_temp_time > THRESH_TEMP_TIMEOUT: error = BMSError.TIMEOUT_T if error != self._data.error: print(f"Error changed: {error}") self._data.error = error self._load.set_error(error != BMSError.NONE) self.dataUpdated.emit(self._data) @abstractmethod def do_work(self): pass class BMSEvalBoard(BMS): _dev: ModbusSerialClient def __init__(self, uart_path: str, temperaturesUpdated: Signal, load: Load): super().__init__(load, NUM_CELLS, N_SENSORS) self._dev = ModbusSerialClient( method="rtu", port=uart_path, baudrate=MODBUS_BAUDRATE ) self._last_voltage_time = time.time() self._last_temp_time = time.time() temperaturesUpdated.connect( self._updateTemperatures, Qt.ConnectionType.DirectConnection ) def do_work(self): while True: time.sleep(0.1) result = self._dev.read_holding_registers( VOLTAGE_ADDRESS, NUM_CELLS, SLAVE_UNIT_ID ) if result.isError(): print(f"ERROR READING VOLTAGES: {result}") else: self._data.voltages = list( map(lambda v: v * VOLTAGE_QUANT, result.registers) ) self._last_voltage_time = time.time() self.dataUpdated.emit(self._data) self._check_for_errors() @Slot(list) def _updateTemperatures(self, temps: list[float]): assert len(temps) == N_SENSORS self._data.temperatures = temps self._last_temp_time = time.time() self._check_for_errors() self.dataUpdated.emit(self._data)