from abc import abstractmethod import sys import struct import time from dataclasses import dataclass from enum import Enum from .load import Load from .temperatures import N_SENSORS from PySide6.QtCore import QObject, Signal, Slot, Qt from pymodbus.client import ModbusSerialClient MODBUS_BAUDRATE = 115200 VOLTAGE_ADDRESS = 0x00 CURRENT_ADDRESS = 2048 SLAVE_UNIT_ID = 0x02 NUM_CELLS = 3 VOLTAGE_QUANT = 1 / 10000.0 CURRENT_QUANT = 1 / (500 * 72e-6) THRESH_UV = 2.5 THRESH_OV = 4.2 THRESH_UT = 1 THRESH_OT = 60 THRESH_VOLTAGE_TIMEOUT = 1 THRESH_TEMP_TIMEOUT = 1 class BMSError(Enum): NONE = 0 UV = 1 OV = 2 UT = 3 OT = 4 TIMEOUT_V = 5 TIMEOUT_T = 6 def __str__(self): if self == self.NONE: return "" elif self == self.UV: return "Undervoltage" elif self == self.OV: return "Overvoltage" elif self == self.UT: return "Undertemperature" elif self == self.OT: return "Overtemperature" elif self == self.TIMEOUT_V: return "Timeout (voltages)" elif self == self.TIMEOUT_T: return "Timeout (temps)" else: return "Unknown error" @dataclass class BMSData: voltages: list[float] temperatures: list[float] error: BMSError current: float current_integrated: float class BMS(QObject): dataUpdated = Signal(BMSData) _data: BMSData _load: Load _num_cells: int _num_sensors: int _last_voltage_time: float _last_temp_time: float def __init__(self, load: Load, num_cells: int, num_sensors: int): super().__init__(None) self._load = load self._num_cells = num_cells self._num_sensors = num_sensors self._data = BMSData( voltages=[0.0] * num_cells, temperatures=[0.0] * num_sensors, error=BMSError.NONE, current=0, current_integrated=0, ) def _check_for_errors(self): error = BMSError.NONE for v in self._data.voltages: if v > THRESH_OV: error = BMSError.OV break elif v < THRESH_UV: error = BMSError.UV break for t in self._data.temperatures: if t > THRESH_OT: error = BMSError.OT break if t < THRESH_UT: error = BMSError.UT break now = time.time() if now - self._last_voltage_time > THRESH_VOLTAGE_TIMEOUT: error = BMSError.TIMEOUT_V if now - self._last_temp_time > THRESH_TEMP_TIMEOUT: error = BMSError.TIMEOUT_T if error != self._data.error: print(f"Error changed: {error}") self._data.error = error self._load.set_error(error != BMSError.NONE) self.dataUpdated.emit(self._data) @abstractmethod def do_work(self): pass class BMSEvalBoard(BMS): _dev: ModbusSerialClient def __init__(self, uart_path: str, temperaturesUpdated: Signal, load: Load): super().__init__(load, NUM_CELLS, N_SENSORS) self._dev = ModbusSerialClient( method="rtu", port=uart_path, baudrate=MODBUS_BAUDRATE ) self._last_voltage_time = time.time() self._last_temp_time = time.time() temperaturesUpdated.connect( self._updateTemperatures, Qt.ConnectionType.DirectConnection ) def do_work(self): while True: self._check_for_errors() time.sleep(0.1) result = self._dev.read_holding_registers( VOLTAGE_ADDRESS, NUM_CELLS, SLAVE_UNIT_ID ) if result.isError(): print(f"ERROR READING VOLTAGES: {result}", file=sys.stderr) continue self._data.voltages = list( map(lambda v: v * VOLTAGE_QUANT, result.registers) ) self._last_voltage_time = time.time() result = self._dev.read_holding_registers(CURRENT_ADDRESS, 6, SLAVE_UNIT_ID) if result.isError(): print(f"ERROR READING CURRENT: {result}", file=sys.stderr) continue assert len(result.registers) == 6 self._data.current = ( (result.registers[0] << 16) | result.registers[1] ) * CURRENT_QUANT self._data.current_integrated = ( (result.registers[2] << 48) | (result.registers[3] << 32) | (result.registers[4] << 16) | result.registers[5] ) * CURRENT_QUANT self.dataUpdated.emit(self._data) @Slot(list) def _updateTemperatures(self, temps: list[float]): assert len(temps) == N_SENSORS self._data.temperatures = temps self._last_temp_time = time.time() self._check_for_errors() self.dataUpdated.emit(self._data)