Initial commit
This commit is contained in:
		
							
								
								
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@ -0,0 +1,2 @@
 | 
			
		||||
.vscode/
 | 
			
		||||
env/
 | 
			
		||||
							
								
								
									
										266
									
								
								charger-display.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										266
									
								
								charger-display.py
									
									
									
									
									
										Executable file
									
								
							@ -0,0 +1,266 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
import struct
 | 
			
		||||
import time
 | 
			
		||||
import serial
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
BITRATE = 115200  # baud/s
 | 
			
		||||
TIMEOUT = 1  # seconds
 | 
			
		||||
N_SLAVES = 6
 | 
			
		||||
LOG_FRAME_LENGTH = 8  # bytes
 | 
			
		||||
 | 
			
		||||
CELLS_PER_SLAVE = 10
 | 
			
		||||
TEMP_SENSORS_PER_SLAVE = 32
 | 
			
		||||
VOLTAGE_CONV = 5.0 / 255  # volts/quantum
 | 
			
		||||
TEMP_CONV = 0.0625 * 16  # °C/quantum
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
# Windows
 | 
			
		||||
if os.name == "nt":
 | 
			
		||||
    import msvcrt
 | 
			
		||||
 | 
			
		||||
# Posix (Linux, OS X)
 | 
			
		||||
else:
 | 
			
		||||
    import sys
 | 
			
		||||
    import termios
 | 
			
		||||
    import atexit
 | 
			
		||||
    from select import select
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class KBHit:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        """Creates a KBHit object that you can call to do various keyboard things."""
 | 
			
		||||
 | 
			
		||||
        if os.name == "nt":
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
 | 
			
		||||
            # Save the terminal settings
 | 
			
		||||
            self.fd = sys.stdin.fileno()
 | 
			
		||||
            self.new_term = termios.tcgetattr(self.fd)
 | 
			
		||||
            self.old_term = termios.tcgetattr(self.fd)
 | 
			
		||||
 | 
			
		||||
            # New terminal setting unbuffered
 | 
			
		||||
            self.new_term[3] = self.new_term[3] & ~termios.ICANON & ~termios.ECHO
 | 
			
		||||
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)
 | 
			
		||||
 | 
			
		||||
            # Support normal-terminal reset at exit
 | 
			
		||||
            atexit.register(self.set_normal_term)
 | 
			
		||||
 | 
			
		||||
    def set_normal_term(self):
 | 
			
		||||
        """Resets to normal terminal.  On Windows this is a no-op."""
 | 
			
		||||
 | 
			
		||||
        if os.name == "nt":
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)
 | 
			
		||||
 | 
			
		||||
    def getch(self):
 | 
			
		||||
        """Returns a keyboard character after kbhit() has been called.
 | 
			
		||||
        Should not be called in the same program as getarrow().
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        s = ""
 | 
			
		||||
 | 
			
		||||
        if os.name == "nt":
 | 
			
		||||
            return msvcrt.getch().decode("utf-8")
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            return sys.stdin.read(1)
 | 
			
		||||
 | 
			
		||||
    def getarrow(self):
 | 
			
		||||
        """Returns an arrow-key code after kbhit() has been called. Codes are
 | 
			
		||||
        0 : up
 | 
			
		||||
        1 : right
 | 
			
		||||
        2 : down
 | 
			
		||||
        3 : left
 | 
			
		||||
        Should not be called in the same program as getch().
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        if os.name == "nt":
 | 
			
		||||
            msvcrt.getch()  # skip 0xE0
 | 
			
		||||
            c = msvcrt.getch()
 | 
			
		||||
            vals = [72, 77, 80, 75]
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            c = sys.stdin.read(3)[2]
 | 
			
		||||
            vals = [65, 67, 66, 68]
 | 
			
		||||
 | 
			
		||||
        return vals.index(ord(c.decode("utf-8")))
 | 
			
		||||
 | 
			
		||||
    def kbhit(self):
 | 
			
		||||
        """Returns True if keyboard character was hit, False otherwise."""
 | 
			
		||||
        if os.name == "nt":
 | 
			
		||||
            return msvcrt.kbhit()
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            dr, dw, de = select([sys.stdin], [], [], 0)
 | 
			
		||||
            return dr != []
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if len(sys.argv) != 2:
 | 
			
		||||
    print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
 | 
			
		||||
    sys.exit(os.EX_USAGE)
 | 
			
		||||
 | 
			
		||||
SERIAL_PORT = sys.argv[1]
 | 
			
		||||
ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_log_start(buf: bytes):
 | 
			
		||||
    return buf[:-12].find(b"LOG")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_current_start(buf: bytes):
 | 
			
		||||
    return buf[:-12].find(b"CUR")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_panic_start(buf: bytes):
 | 
			
		||||
    return buf[:-12].find(b"PAN")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SlaveData:
 | 
			
		||||
    cell_voltages: list[float]
 | 
			
		||||
    cell_temps: list[float]
 | 
			
		||||
 | 
			
		||||
    def __init__(self) -> None:
 | 
			
		||||
        self.cell_voltages = [-1] * CELLS_PER_SLAVE
 | 
			
		||||
        self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AccumulatorData:
 | 
			
		||||
    slaves: list[SlaveData]
 | 
			
		||||
    min_voltage: float
 | 
			
		||||
    max_voltage: float
 | 
			
		||||
    min_temp: float
 | 
			
		||||
    max_temp: float
 | 
			
		||||
    last_frame: float
 | 
			
		||||
    current: float
 | 
			
		||||
    panic: bool
 | 
			
		||||
    panic_errorcode: int
 | 
			
		||||
    panic_errorarg: int
 | 
			
		||||
 | 
			
		||||
    def __init__(self) -> None:
 | 
			
		||||
        self.slaves = [SlaveData() for _ in range(N_SLAVES)]
 | 
			
		||||
        self.min_voltage = (
 | 
			
		||||
            self.max_voltage
 | 
			
		||||
        ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0
 | 
			
		||||
        self.panic = False
 | 
			
		||||
        self.panic_errorcode = self.panic_errorarg = 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
rx_buf = bytes()
 | 
			
		||||
data = AccumulatorData()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def decode_log_frame(buf: bytes):
 | 
			
		||||
    msg_id = buf[0]
 | 
			
		||||
    slave = msg_id >> 4
 | 
			
		||||
    frame_id = msg_id & 0x0F
 | 
			
		||||
    if slave >= N_SLAVES:
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    if frame_id == 0:
 | 
			
		||||
        for i in range(7):
 | 
			
		||||
            data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV
 | 
			
		||||
    elif frame_id == 1:
 | 
			
		||||
        for i in range(3):
 | 
			
		||||
            data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV
 | 
			
		||||
        for i in range(4):
 | 
			
		||||
            data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV
 | 
			
		||||
    elif frame_id == 2:
 | 
			
		||||
        for i in range(7):
 | 
			
		||||
            data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV
 | 
			
		||||
    elif frame_id == 3:
 | 
			
		||||
        for i in range(7):
 | 
			
		||||
            data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV
 | 
			
		||||
    elif frame_id == 4:
 | 
			
		||||
        for i in range(7):
 | 
			
		||||
            data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV
 | 
			
		||||
    elif frame_id == 5:
 | 
			
		||||
        for i in range(7):
 | 
			
		||||
            data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV
 | 
			
		||||
    else:
 | 
			
		||||
        # print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
 | 
			
		||||
        # time.sleep(1)
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    data.last_frame = time.time()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def decode_current_frame(buf: bytes):
 | 
			
		||||
    # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
 | 
			
		||||
    current = struct.unpack(">i", buf[2:6])[0]
 | 
			
		||||
    data.current = current / 1000.0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def decode_panic_frame(buf: bytes):
 | 
			
		||||
    data.panic = True
 | 
			
		||||
    data.panic_errorcode = buf[0]
 | 
			
		||||
    data.panic_errorarg = buf[1]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def update_display():
 | 
			
		||||
    voltages = [
 | 
			
		||||
        slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves
 | 
			
		||||
    ]
 | 
			
		||||
    temps = [
 | 
			
		||||
        slave.cell_temps[i]
 | 
			
		||||
        for i in range(TEMP_SENSORS_PER_SLAVE)
 | 
			
		||||
        for slave in data.slaves
 | 
			
		||||
    ]
 | 
			
		||||
    data.min_voltage = min(voltages)
 | 
			
		||||
    data.max_voltage = max(voltages)
 | 
			
		||||
    data.min_temp = min(temps)
 | 
			
		||||
    data.max_temp = max(temps)
 | 
			
		||||
    time_since_last_frame = time.time() - data.last_frame
 | 
			
		||||
 | 
			
		||||
    print("\033[2J\033[H", end="")
 | 
			
		||||
    print("-" * 20)
 | 
			
		||||
    if data.panic:
 | 
			
		||||
        print("!!!!! PANIC !!!!!")
 | 
			
		||||
        print(f"Error code: {data.panic_errorcode}")
 | 
			
		||||
        print(f"Error arg: {data.panic_errorarg}")
 | 
			
		||||
        time.sleep(0.5)
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    print(f"Time since last frame: {time_since_last_frame} s")
 | 
			
		||||
    print(f"Current: {data.current:.2f} A")
 | 
			
		||||
    print(f"Min voltage: {data.min_voltage:.2f} V")
 | 
			
		||||
    print(f"Max voltage: {data.max_voltage:.2f} V")
 | 
			
		||||
    print(f"Min temp: {data.min_temp:.1f} °C")
 | 
			
		||||
    print(f"Max temp: {data.max_temp:.1f} °C")
 | 
			
		||||
    for i in range(N_SLAVES):
 | 
			
		||||
        min_v = min(data.slaves[i].cell_voltages)
 | 
			
		||||
        max_v = max(data.slaves[i].cell_voltages)
 | 
			
		||||
        min_t = min(data.slaves[i].cell_temps)
 | 
			
		||||
        max_t = max(data.slaves[i].cell_temps)
 | 
			
		||||
        print(
 | 
			
		||||
            f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]"
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
kb = KBHit()
 | 
			
		||||
while True:
 | 
			
		||||
    rx_data = ser.read(32)
 | 
			
		||||
    if len(rx_data) > 0:
 | 
			
		||||
        rx_buf = rx_data
 | 
			
		||||
        if (start := check_log_start(rx_buf)) != -1:
 | 
			
		||||
            decode_log_frame(rx_buf[start + 3 : start + 11])
 | 
			
		||||
            rx_buf = b""
 | 
			
		||||
        elif (start := check_current_start(rx_buf)) != -1:
 | 
			
		||||
            decode_current_frame(rx_buf[start + 3 : start + 11])
 | 
			
		||||
            rx_buf = b""
 | 
			
		||||
        elif (start := check_panic_start(rx_buf)) != -1:
 | 
			
		||||
            decode_panic_frame(rx_buf[start + 3 : start + 11])
 | 
			
		||||
            rx_buf = b""
 | 
			
		||||
    if kb.kbhit():
 | 
			
		||||
        c = kb.getch()
 | 
			
		||||
        if c == "C":
 | 
			
		||||
            print(f"KBHIT: {c}", file=sys.stderr)
 | 
			
		||||
            print(ser.write(b"C"), file=sys.stderr)
 | 
			
		||||
    update_display()
 | 
			
		||||
							
								
								
									
										7
									
								
								requirements.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								requirements.txt
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,7 @@
 | 
			
		||||
black==22.6.0
 | 
			
		||||
click==8.1.3
 | 
			
		||||
mypy-extensions==0.4.3
 | 
			
		||||
pathspec==0.9.0
 | 
			
		||||
platformdirs==2.5.2
 | 
			
		||||
pyserial==3.5
 | 
			
		||||
tomli==2.0.1
 | 
			
		||||
		Reference in New Issue
	
	Block a user