QML doesn't have a method for replacing all points at once, which makes it extremely slow when loading large profiles
107 lines
2.8 KiB
Python
107 lines
2.8 KiB
Python
import enum
|
|
import csv
|
|
import time
|
|
|
|
from PySide6.QtCore import QObject, Signal, Slot, QTimer, QPointF
|
|
from PySide6.QtCharts import QLineSeries
|
|
|
|
|
|
class ProfileState(enum.Enum):
|
|
STOPPED = 0
|
|
RUNNING = 1
|
|
PAUSED = 2
|
|
|
|
|
|
class ProfileHandler(QObject):
|
|
currentChanged = Signal(float)
|
|
profileChanged = Signal(list)
|
|
finished = Signal()
|
|
|
|
state: ProfileState
|
|
|
|
_timer: QTimer
|
|
|
|
profile_start: float
|
|
_profile: list[list[float]]
|
|
_pause_time: float
|
|
_last_current: float
|
|
|
|
def __init__(self):
|
|
super().__init__(parent=None)
|
|
self.state = ProfileState.STOPPED
|
|
self._timer = QTimer()
|
|
self._timer.timeout.connect(self._update)
|
|
self._timer.start(100)
|
|
|
|
self._profile = []
|
|
self.profile_start = 0
|
|
self._pause_time = 0
|
|
self._last_current = 0
|
|
|
|
def load_profile(self, path: str):
|
|
with open(path) as fh:
|
|
result = []
|
|
t = 0
|
|
for l in csv.reader(fh):
|
|
i = float(l[0])
|
|
result.append([t, i])
|
|
t += float(l[1])
|
|
result.append([t, i])
|
|
self._profile = result
|
|
self.profileChanged.emit(result)
|
|
return result
|
|
|
|
@Slot(QLineSeries)
|
|
def fill_series(self, series: QLineSeries):
|
|
points = []
|
|
for x, y in self._profile:
|
|
points.append(QPointF(x, y))
|
|
series.replace(points)
|
|
pass
|
|
|
|
def start(self):
|
|
assert self.state == ProfileState.STOPPED
|
|
|
|
self.profile_start = time.time()
|
|
self.state = ProfileState.RUNNING
|
|
current = self._profile[0][1]
|
|
self._last_current = current
|
|
self.currentChanged.emit(current)
|
|
|
|
def pause(self):
|
|
assert self.state == ProfileState.RUNNING
|
|
|
|
self.state = ProfileState.PAUSED
|
|
self.currentChanged.emit(0)
|
|
self._last_current = 0
|
|
self._pause_time = time.time()
|
|
|
|
def resume(self):
|
|
assert self.state == ProfileState.PAUSED
|
|
|
|
dt = time.time() - self._pause_time
|
|
self.profile_start += dt
|
|
self.state = ProfileState.RUNNING
|
|
|
|
def stop(self):
|
|
assert self.state != ProfileState.STOPPED
|
|
|
|
self.state = ProfileState.STOPPED
|
|
self.currentChanged.emit(0)
|
|
self._last_current = 0
|
|
|
|
def _update(self):
|
|
if self.state == ProfileState.RUNNING:
|
|
dt = time.time() - self.profile_start
|
|
try:
|
|
current = next(t[1] for t in self._profile if t[0] >= dt)
|
|
if current != self._last_current:
|
|
self.currentChanged.emit(current)
|
|
self._last_current = current
|
|
except StopIteration:
|
|
self.state = ProfileState.STOPPED
|
|
self.finished.emit()
|
|
else:
|
|
self._last_current = 0
|
|
self.currentChanged.emit(0)
|