From 7b3f44c7640146f30eda5b5906b8f1ec10406cba Mon Sep 17 00:00:00 2001
From: jazzpi <jasper@mezzo.de>
Date: Mon, 18 Jul 2022 11:31:30 +0200
Subject: [PATCH] Initial commit

---
 .gitignore         |   2 +
 charger-display.py | 266 +++++++++++++++++++++++++++++++++++++++++++++
 requirements.txt   |   7 ++
 3 files changed, 275 insertions(+)
 create mode 100644 .gitignore
 create mode 100755 charger-display.py
 create mode 100644 requirements.txt

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7240063
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.vscode/
+env/
\ No newline at end of file
diff --git a/charger-display.py b/charger-display.py
new file mode 100755
index 0000000..c2e6f3a
--- /dev/null
+++ b/charger-display.py
@@ -0,0 +1,266 @@
+#!/usr/bin/env python3
+
+import os
+import struct
+import time
+import serial
+import sys
+
+BITRATE = 115200  # baud/s
+TIMEOUT = 1  # seconds
+N_SLAVES = 6
+LOG_FRAME_LENGTH = 8  # bytes
+
+CELLS_PER_SLAVE = 10
+TEMP_SENSORS_PER_SLAVE = 32
+VOLTAGE_CONV = 5.0 / 255  # volts/quantum
+TEMP_CONV = 0.0625 * 16  # °C/quantum
+
+import os
+
+# Windows
+if os.name == "nt":
+    import msvcrt
+
+# Posix (Linux, OS X)
+else:
+    import sys
+    import termios
+    import atexit
+    from select import select
+
+
+class KBHit:
+    def __init__(self):
+        """Creates a KBHit object that you can call to do various keyboard things."""
+
+        if os.name == "nt":
+            pass
+
+        else:
+
+            # Save the terminal settings
+            self.fd = sys.stdin.fileno()
+            self.new_term = termios.tcgetattr(self.fd)
+            self.old_term = termios.tcgetattr(self.fd)
+
+            # New terminal setting unbuffered
+            self.new_term[3] = self.new_term[3] & ~termios.ICANON & ~termios.ECHO
+            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)
+
+            # Support normal-terminal reset at exit
+            atexit.register(self.set_normal_term)
+
+    def set_normal_term(self):
+        """Resets to normal terminal.  On Windows this is a no-op."""
+
+        if os.name == "nt":
+            pass
+
+        else:
+            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)
+
+    def getch(self):
+        """Returns a keyboard character after kbhit() has been called.
+        Should not be called in the same program as getarrow().
+        """
+
+        s = ""
+
+        if os.name == "nt":
+            return msvcrt.getch().decode("utf-8")
+
+        else:
+            return sys.stdin.read(1)
+
+    def getarrow(self):
+        """Returns an arrow-key code after kbhit() has been called. Codes are
+        0 : up
+        1 : right
+        2 : down
+        3 : left
+        Should not be called in the same program as getch().
+        """
+
+        if os.name == "nt":
+            msvcrt.getch()  # skip 0xE0
+            c = msvcrt.getch()
+            vals = [72, 77, 80, 75]
+
+        else:
+            c = sys.stdin.read(3)[2]
+            vals = [65, 67, 66, 68]
+
+        return vals.index(ord(c.decode("utf-8")))
+
+    def kbhit(self):
+        """Returns True if keyboard character was hit, False otherwise."""
+        if os.name == "nt":
+            return msvcrt.kbhit()
+
+        else:
+            dr, dw, de = select([sys.stdin], [], [], 0)
+            return dr != []
+
+
+if len(sys.argv) != 2:
+    print(f"Usage: {sys.argv[0]} SERIAL-PORT", file=sys.stderr)
+    sys.exit(os.EX_USAGE)
+
+SERIAL_PORT = sys.argv[1]
+ser = serial.Serial(SERIAL_PORT, BITRATE, timeout=TIMEOUT)
+
+
+def check_log_start(buf: bytes):
+    return buf[:-12].find(b"LOG")
+
+
+def check_current_start(buf: bytes):
+    return buf[:-12].find(b"CUR")
+
+
+def check_panic_start(buf: bytes):
+    return buf[:-12].find(b"PAN")
+
+
+class SlaveData:
+    cell_voltages: list[float]
+    cell_temps: list[float]
+
+    def __init__(self) -> None:
+        self.cell_voltages = [-1] * CELLS_PER_SLAVE
+        self.cell_temps = [-1] * TEMP_SENSORS_PER_SLAVE
+
+
+class AccumulatorData:
+    slaves: list[SlaveData]
+    min_voltage: float
+    max_voltage: float
+    min_temp: float
+    max_temp: float
+    last_frame: float
+    current: float
+    panic: bool
+    panic_errorcode: int
+    panic_errorarg: int
+
+    def __init__(self) -> None:
+        self.slaves = [SlaveData() for _ in range(N_SLAVES)]
+        self.min_voltage = (
+            self.max_voltage
+        ) = self.min_temp = self.max_temp = self.last_frame = self.current = 0
+        self.panic = False
+        self.panic_errorcode = self.panic_errorarg = 0
+
+
+rx_buf = bytes()
+data = AccumulatorData()
+
+
+def decode_log_frame(buf: bytes):
+    msg_id = buf[0]
+    slave = msg_id >> 4
+    frame_id = msg_id & 0x0F
+    if slave >= N_SLAVES:
+        return
+
+    if frame_id == 0:
+        for i in range(7):
+            data.slaves[slave].cell_voltages[i] = buf[i + 1] * VOLTAGE_CONV
+    elif frame_id == 1:
+        for i in range(3):
+            data.slaves[slave].cell_voltages[i + 7] = buf[i + 1] * VOLTAGE_CONV
+        for i in range(4):
+            data.slaves[slave].cell_temps[i] = buf[i + 4] * TEMP_CONV
+    elif frame_id == 2:
+        for i in range(7):
+            data.slaves[slave].cell_temps[i + 4] = buf[i + 1] * TEMP_CONV
+    elif frame_id == 3:
+        for i in range(7):
+            data.slaves[slave].cell_temps[i + 11] = buf[i + 1] * TEMP_CONV
+    elif frame_id == 4:
+        for i in range(7):
+            data.slaves[slave].cell_temps[i + 18] = buf[i + 1] * TEMP_CONV
+    elif frame_id == 5:
+        for i in range(7):
+            data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV
+    else:
+        # print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
+        # time.sleep(1)
+        return
+
+    data.last_frame = time.time()
+
+
+def decode_current_frame(buf: bytes):
+    # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
+    current = struct.unpack(">i", buf[2:6])[0]
+    data.current = current / 1000.0
+
+
+def decode_panic_frame(buf: bytes):
+    data.panic = True
+    data.panic_errorcode = buf[0]
+    data.panic_errorarg = buf[1]
+
+
+def update_display():
+    voltages = [
+        slave.cell_voltages[i] for i in range(CELLS_PER_SLAVE) for slave in data.slaves
+    ]
+    temps = [
+        slave.cell_temps[i]
+        for i in range(TEMP_SENSORS_PER_SLAVE)
+        for slave in data.slaves
+    ]
+    data.min_voltage = min(voltages)
+    data.max_voltage = max(voltages)
+    data.min_temp = min(temps)
+    data.max_temp = max(temps)
+    time_since_last_frame = time.time() - data.last_frame
+
+    print("\033[2J\033[H", end="")
+    print("-" * 20)
+    if data.panic:
+        print("!!!!! PANIC !!!!!")
+        print(f"Error code: {data.panic_errorcode}")
+        print(f"Error arg: {data.panic_errorarg}")
+        time.sleep(0.5)
+        return
+
+    print(f"Time since last frame: {time_since_last_frame} s")
+    print(f"Current: {data.current:.2f} A")
+    print(f"Min voltage: {data.min_voltage:.2f} V")
+    print(f"Max voltage: {data.max_voltage:.2f} V")
+    print(f"Min temp: {data.min_temp:.1f} °C")
+    print(f"Max temp: {data.max_temp:.1f} °C")
+    for i in range(N_SLAVES):
+        min_v = min(data.slaves[i].cell_voltages)
+        max_v = max(data.slaves[i].cell_voltages)
+        min_t = min(data.slaves[i].cell_temps)
+        max_t = max(data.slaves[i].cell_temps)
+        print(
+            f"Stack {i}: V ∈ [{min_v:.2f}, {max_v:.2f}]\tT ∈ [{min_t:.1f}, {max_t:.1f}]"
+        )
+
+
+kb = KBHit()
+while True:
+    rx_data = ser.read(32)
+    if len(rx_data) > 0:
+        rx_buf = rx_data
+        if (start := check_log_start(rx_buf)) != -1:
+            decode_log_frame(rx_buf[start + 3 : start + 11])
+            rx_buf = b""
+        elif (start := check_current_start(rx_buf)) != -1:
+            decode_current_frame(rx_buf[start + 3 : start + 11])
+            rx_buf = b""
+        elif (start := check_panic_start(rx_buf)) != -1:
+            decode_panic_frame(rx_buf[start + 3 : start + 11])
+            rx_buf = b""
+    if kb.kbhit():
+        c = kb.getch()
+        if c == "C":
+            print(f"KBHIT: {c}", file=sys.stderr)
+            print(ser.write(b"C"), file=sys.stderr)
+    update_display()
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..bb6304a
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,7 @@
+black==22.6.0
+click==8.1.3
+mypy-extensions==0.4.3
+pathspec==0.9.0
+platformdirs==2.5.2
+pyserial==3.5
+tomli==2.0.1