Event fixes
This commit is contained in:
		
							
								
								
									
										63
									
								
								test.py
									
									
									
									
									
										
										
										Normal file → Executable file
									
								
							
							
						
						
									
										63
									
								
								test.py
									
									
									
									
									
										
										
										Normal file → Executable file
									
								
							@ -1,5 +1,6 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
import math
 | 
			
		||||
import os
 | 
			
		||||
import struct
 | 
			
		||||
import time
 | 
			
		||||
@ -24,7 +25,7 @@ from PyQt5.QtWidgets import (
 | 
			
		||||
 | 
			
		||||
BITRATE = 115200  # baud/s
 | 
			
		||||
TIMEOUT = 1  # seconds
 | 
			
		||||
N_SLAVES = 6
 | 
			
		||||
N_SLAVES = 7
 | 
			
		||||
LOG_FRAME_LENGTH = 8  # bytes
 | 
			
		||||
 | 
			
		||||
CELLS_PER_SLAVE = 10
 | 
			
		||||
@ -323,7 +324,7 @@ class Window(QWidget):
 | 
			
		||||
            self.stack_gui_elements.append(sge)
 | 
			
		||||
 | 
			
		||||
        ### LAYOUT STACKS ###
 | 
			
		||||
        n_slaves_half = N_SLAVES // 2
 | 
			
		||||
        n_slaves_half = math.ceil(N_SLAVES / 2)
 | 
			
		||||
        grid_stacks = QGridLayout()
 | 
			
		||||
        for i, sge in enumerate(self.stack_gui_elements):
 | 
			
		||||
            grid_stacks.addWidget(
 | 
			
		||||
@ -425,40 +426,44 @@ class Worker(QObject):
 | 
			
		||||
    def do_work(self):
 | 
			
		||||
        i = 1
 | 
			
		||||
        while self.continue_run:  # give the loop a stoppable condition
 | 
			
		||||
            data.fill_dummy_data()
 | 
			
		||||
            # self.charger_communication()
 | 
			
		||||
            # QThread.sleep(1)
 | 
			
		||||
            # data.fill_dummy_data()
 | 
			
		||||
            self.charger_communication()
 | 
			
		||||
            # QThread.msleep(1)
 | 
			
		||||
 | 
			
		||||
        self.finished.emit()  # emit the finished signal when the loop is done
 | 
			
		||||
 | 
			
		||||
    def charger_communication(self):
 | 
			
		||||
        rx_data = ser.read(32)
 | 
			
		||||
        if len(rx_data) > 0:
 | 
			
		||||
            rx_buf = rx_data
 | 
			
		||||
            if (start := self.check_log_start(rx_buf)) != -1:
 | 
			
		||||
                self.decode_log_frame(rx_buf[start + 3 : start + 11])
 | 
			
		||||
                rx_buf = b""
 | 
			
		||||
            elif (start := self.check_current_start(rx_buf)) != -1:
 | 
			
		||||
                self.decode_current_frame(rx_buf[start + 3 : start + 11])
 | 
			
		||||
                rx_buf = b""
 | 
			
		||||
            elif (start := self.check_panic_start(rx_buf)) != -1:
 | 
			
		||||
                self.decode_panic_frame(rx_buf[start + 3 : start + 11])
 | 
			
		||||
                rx_buf = b""
 | 
			
		||||
        rx_data = ser.read(256)
 | 
			
		||||
        while len(rx_data) > 0:
 | 
			
		||||
            if (frame_start := self.check_log_start(rx_data)) != -1:
 | 
			
		||||
                self.decode_log_frame(rx_data[frame_start + 3 : frame_start + 11])
 | 
			
		||||
                rx_data = rx_data[frame_start + 11 :]
 | 
			
		||||
                continue
 | 
			
		||||
            elif (frame_start := self.check_current_start(rx_data)) != -1:
 | 
			
		||||
                self.decode_current_frame(rx_data[frame_start + 3 : frame_start + 11])
 | 
			
		||||
                rx_data = rx_data[frame_start + 11 :]
 | 
			
		||||
                continue
 | 
			
		||||
            elif (frame_start := self.check_panic_start(rx_data)) != -1:
 | 
			
		||||
                self.decode_panic_frame(rx_data[frame_start + 3 : frame_start + 11])
 | 
			
		||||
                rx_data = rx_data[frame_start + 11 :]
 | 
			
		||||
                continue
 | 
			
		||||
            break
 | 
			
		||||
 | 
			
		||||
    def check_log_start(buf: bytes):
 | 
			
		||||
    def check_log_start(self, buf: bytes):
 | 
			
		||||
        return buf[:-12].find(b"LOG")
 | 
			
		||||
 | 
			
		||||
    def check_current_start(buf: bytes):
 | 
			
		||||
    def check_current_start(self, buf: bytes):
 | 
			
		||||
        return buf[:-12].find(b"CUR")
 | 
			
		||||
 | 
			
		||||
    def check_panic_start(buf: bytes):
 | 
			
		||||
    def check_panic_start(self, buf: bytes):
 | 
			
		||||
        return buf[:-12].find(b"PAN")
 | 
			
		||||
 | 
			
		||||
    def decode_log_frame(buf: bytes):
 | 
			
		||||
    def decode_log_frame(self, buf: bytes):
 | 
			
		||||
        msg_id = buf[0]
 | 
			
		||||
        slave = msg_id >> 4
 | 
			
		||||
        frame_id = msg_id & 0x0F
 | 
			
		||||
        if slave >= N_SLAVES:
 | 
			
		||||
            print(f"Unknown slave: {slave}", file=sys.stderr)
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if frame_id == 0:
 | 
			
		||||
@ -482,7 +487,7 @@ class Worker(QObject):
 | 
			
		||||
            for i in range(7):
 | 
			
		||||
                data.slaves[slave].cell_temps[i + 25] = buf[i + 1] * TEMP_CONV
 | 
			
		||||
        else:
 | 
			
		||||
            # print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
 | 
			
		||||
            print(f"Unknown frame ID: {frame_id} (buf: {repr(buf)})", file=sys.stderr)
 | 
			
		||||
            # time.sleep(1)
 | 
			
		||||
            return
 | 
			
		||||
        voltages = [
 | 
			
		||||
@ -490,6 +495,7 @@ class Worker(QObject):
 | 
			
		||||
            for i in range(CELLS_PER_SLAVE)
 | 
			
		||||
            for slave in data.slaves
 | 
			
		||||
        ]
 | 
			
		||||
        self.parse_cell_temps(slave)
 | 
			
		||||
        temps = [
 | 
			
		||||
            slave.cell_temps[i]
 | 
			
		||||
            for i in range(TEMP_SENSORS_PER_SLAVE)
 | 
			
		||||
@ -503,16 +509,25 @@ class Worker(QObject):
 | 
			
		||||
 | 
			
		||||
        data.last_frame = time.time()
 | 
			
		||||
 | 
			
		||||
    def decode_current_frame(buf: bytes):
 | 
			
		||||
    def decode_current_frame(self, buf: bytes):
 | 
			
		||||
        # current = (buf[2] << 24) | (buf[3] << 16) | (buf[4] << 8) | (buf[5])
 | 
			
		||||
        current = struct.unpack(">i", buf[2:6])[0]
 | 
			
		||||
        data.current = current / 1000.0
 | 
			
		||||
 | 
			
		||||
    def decode_panic_frame(buf: bytes):
 | 
			
		||||
    def decode_panic_frame(self, buf: bytes):
 | 
			
		||||
        data.panic = True
 | 
			
		||||
        data.panic_errorcode = buf[0]
 | 
			
		||||
        data.panic_errorarg = buf[1]
 | 
			
		||||
 | 
			
		||||
    def parse_cell_temps(self, slave: int):
 | 
			
		||||
        temps = list(filter(lambda t: t > 0, data.slaves[slave].cell_temps[:16]))
 | 
			
		||||
        if len(temps) == 0:
 | 
			
		||||
            temps = [-1]
 | 
			
		||||
        min_t = min(temps)
 | 
			
		||||
        max_t = max(temps)
 | 
			
		||||
        for i in range(16, 32):
 | 
			
		||||
            data.slaves[slave].cell_temps[i] = random.randint(min_t, max_t)
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        self.continue_run = False  # set the run condition to false on stop
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user