from dataclasses import dataclass from functools import cache, cached_property import math import re from typing import Collection, Mapping, Self, Sequence from utils import inrange, lin_fun_interpolate, with_ops_scalar, with_ops_unary, with_ops_vector MPH = 0.44704 RPM = 2 * math.pi / 60 @with_ops_unary('neg') @with_ops_vector('sub', 'add') @with_ops_scalar('mul', 'truediv') @dataclass(frozen=True) class PropOppoint: speed: float thrust: float torque: float rpm: float @cached_property def rot_speed(self) -> float: return self.rpm * RPM def __neg__(self) -> Self: ... def __truediv__(self, other: float) -> Self: ... def __mul__(self, other: float) -> Self: ... def __add__(self, other: Self) -> Self: ... def __sub__(self, other: Self) -> Self: ... @dataclass(frozen=True) class ApcPerfdata: oppoints: Collection[PropOppoint] @cached_property def rpms(self) -> Sequence[float]: return sorted({ op.rpm for op in self.oppoints }) def speed_range_at_rpm(self, rpm: float) -> tuple[float, float]: speeds = [op.speed for op in self.rpm_series[rpm]] return min(speeds), max(speeds) def rpms_at_speed(self, speed: float): return [rpm for rpm in self.rpms if inrange(speed, *self.speed_range_at_rpm(rpm))] def rpm_range_at_speed(self, speed: float) -> tuple[float, float]: rpms = self.rpms_at_speed(speed) return min(rpms), max(rpms) def _rpm_serie(self, rpm: float) -> Sequence[PropOppoint]: return sorted([op for op in self.oppoints if op.rpm == rpm], key=lambda op: op.speed) @cached_property def rpm_series(self) -> Mapping[float, Sequence[PropOppoint]]: return { rpm: self._rpm_serie(rpm) for rpm in self.rpms } def get_op_interpspeed(self, rpm: float, speed: float) -> PropOppoint: s = self.rpm_series[rpm] def mk_op(op): return lambda: op return lin_fun_interpolate([op.speed for op in s], [mk_op(op) for op in s], speed) def get_op_interp(self, rpm: float, speed: float) -> PropOppoint: loc_rpms = self.rpms_at_speed(speed) def mk_op(_rpm): return lambda: self.get_op_interpspeed(_rpm, speed) return lin_fun_interpolate(loc_rpms, [mk_op(_rpm) for _rpm in loc_rpms], rpm) @classmethod def from_file(cls, path: str): with open(path, 'r') as file: cont = file.read() data = parse_apc_data(cont) return cls(data) def parse_apc_data(raw_data: str) -> Collection[PropOppoint]: data_points = [] current_rpm = None for line in raw_data.splitlines(): if 'PROP RPM' in line: current_rpm = float(line.split()[-1]) elif current_rpm is None: continue try: vals = list(map(float, line.split())) except ValueError: continue if len(vals) != 15: continue speed = vals[0] * MPH torque = vals[9] thrust = vals[10] data_points.append(PropOppoint(speed, thrust, torque, current_rpm)) return data_points