load-controller/load_controller/profile_handler.py

96 lines
2.5 KiB
Python
Raw Normal View History

2023-01-04 00:22:27 +01:00
import enum
import csv
import time
from PyQt6.QtCore import QObject, pyqtSignal, QTimer
class ProfileState(enum.Enum):
STOPPED = 0
RUNNING = 1
PAUSED = 2
class ProfileHandler(QObject):
current_changed = pyqtSignal(float)
finished = pyqtSignal()
state: ProfileState
_timer: QTimer
profile_start: float
_profile: list[tuple[float, float]]
_pause_time: float
_last_current: float
def __init__(self):
super().__init__(parent=None)
self.state = ProfileState.STOPPED
self._timer = QTimer()
self._timer.timeout.connect(self._update)
self._timer.start(100)
self._profile = []
self.profile_start = 0
self._pause_time = 0
self._last_current = 0
def load_profile(self, path: str) -> list[tuple[float, float]]:
with open(path) as fh:
result = []
t = 0
for l in csv.reader(fh):
i = float(l[0])
result.append((t, i))
t += float(l[1])
result.append((t, i))
self._profile = result
return result
def start(self):
assert self.state == ProfileState.STOPPED
self.profile_start = time.time()
self.state = ProfileState.RUNNING
current = self._profile[0][1]
self._last_current = current
self.current_changed.emit(current)
def pause(self):
assert self.state == ProfileState.RUNNING
self.state = ProfileState.PAUSED
self.current_changed.emit(0)
self._last_current = 0
self._pause_time = time.time()
def resume(self):
assert self.state == ProfileState.PAUSED
dt = time.time() - self._pause_time
self.profile_start += dt
self.state = ProfileState.RUNNING
def stop(self):
assert self.state != ProfileState.STOPPED
self.state = ProfileState.STOPPED
self.current_changed.emit(0)
self._last_current = 0
def _update(self):
if self.state == ProfileState.RUNNING:
dt = time.time() - self.profile_start
try:
current = next(t[1] for t in self._profile if t[0] >= dt)
if current != self._last_current:
self.current_changed.emit(current)
self._last_current = current
except StopIteration:
self.state = ProfileState.STOPPED
self.finished.emit()
else:
self._last_current = 0
self.current_changed.emit(0)