from dataclasses import dataclass from functools import cache, cached_property import re from typing import Collection, Mapping, Self, Sequence from utils import lin_fun_interpolate, with_ops_scalar, with_ops_unary, with_ops_vector MPH = 0.44704 @with_ops_unary('neg') @with_ops_vector('sub', 'add') @with_ops_scalar('mul', 'truediv') @dataclass(frozen=True) class PropOppoint: speed: float thrust: float torque: float rpm: float def __neg__(self) -> Self: ... def __truediv__(self, other: float) -> Self: ... def __mul__(self, other: float) -> Self: ... def __add__(self, other: Self) -> Self: ... def __sub__(self, other: Self) -> Self: ... @dataclass(frozen=True) class ApcPerfdata: oppoints: Collection[PropOppoint] @cached_property def rpms(self) -> Collection[float]: return { op.rpm for op in self.oppoints } def _rpm_serie(self, rpm: float) -> Sequence[PropOppoint]: return sorted([op for op in self.oppoints if op.rpm == rpm], key=lambda op: op.speed) @cached_property def rpm_series(self) -> Mapping[float, Sequence[PropOppoint]]: return { rpm: self._rpm_serie(rpm) for rpm in self.rpms } def get_op_interpspeed(self, rpm: float, speed: float) -> PropOppoint: s = self.rpm_series[rpm] def mk_op(op): return lambda: op return lin_fun_interpolate([op.speed for op in s], [mk_op(op) for op in s], speed) @classmethod def from_file(cls, path: str): with open(path, 'r') as file: cont = file.read() data = parse_apc_data(cont) return cls(data) def parse_apc_data(raw_data: str) -> Collection[PropOppoint]: data_points = [] current_rpm = None for line in raw_data.splitlines(): if 'PROP RPM' in line: current_rpm = float(line.split()[-1]) elif current_rpm is None: continue try: vals = list(map(float, line.split())) except ValueError: continue if len(vals) != 15: continue speed = vals[0] * MPH thrust = vals[9] torque = vals[10] data_points.append(PropOppoint(speed, thrust, torque, current_rpm)) return data_points