From e5ac64c8489b604632e5b2f13174d16c7ed295d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Sadowski?= Date: Tue, 21 Oct 2025 18:09:18 +0000 Subject: [PATCH] init2 --- Pipfile | 13 +++++++ apc_perf.py | 80 +++++++++++++++++++++++++++++++++++++++ data/PERFILES2 | 1 + test.py | 33 ++++++++++++++++ utils.py | 101 +++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 228 insertions(+) create mode 100644 Pipfile create mode 100644 apc_perf.py create mode 120000 data/PERFILES2 create mode 100644 test.py create mode 100644 utils.py diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..c6d9bfd --- /dev/null +++ b/Pipfile @@ -0,0 +1,13 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +numpy = "*" +matplotlib = "*" + +[dev-packages] + +[requires] +python_version = "3.13" diff --git a/apc_perf.py b/apc_perf.py new file mode 100644 index 0000000..6520cb1 --- /dev/null +++ b/apc_perf.py @@ -0,0 +1,80 @@ +from dataclasses import dataclass +from functools import cache, cached_property +import re +from typing import Collection, Mapping, Self, Sequence + +from utils import lin_fun_interpolate, with_ops_scalar, with_ops_unary, with_ops_vector + +MPH = 0.44704 + +@with_ops_unary('neg') +@with_ops_vector('sub', 'add') +@with_ops_scalar('mul', 'truediv') +@dataclass(frozen=True) +class PropOppoint: + speed: float + thrust: float + torque: float + rpm: float + + def __neg__(self) -> Self: ... + + def __truediv__(self, other: float) -> Self: ... + def __mul__(self, other: float) -> Self: ... + + def __add__(self, other: Self) -> Self: ... + def __sub__(self, other: Self) -> Self: ... + +@dataclass(frozen=True) +class ApcPerfdata: + oppoints: Collection[PropOppoint] + + @cached_property + def rpms(self) -> Collection[float]: + return { op.rpm for op in self.oppoints } + + def _rpm_serie(self, rpm: float) -> Sequence[PropOppoint]: + return sorted([op for op in self.oppoints if op.rpm == rpm], key=lambda op: op.speed) + + @cached_property + def rpm_series(self) -> Mapping[float, Sequence[PropOppoint]]: + return { rpm: self._rpm_serie(rpm) for rpm in self.rpms } + + def get_op_interpspeed(self, rpm: float, speed: float) -> PropOppoint: + s = self.rpm_series[rpm] + def mk_op(op): + return lambda: op + return lin_fun_interpolate([op.speed for op in s], [mk_op(op) for op in s], speed) + + + @classmethod + def from_file(cls, path: str): + with open(path, 'r') as file: + cont = file.read() + data = parse_apc_data(cont) + return cls(data) + + +def parse_apc_data(raw_data: str) -> Collection[PropOppoint]: + data_points = [] + current_rpm = None + + for line in raw_data.splitlines(): + if 'PROP RPM' in line: + current_rpm = float(line.split()[-1]) + elif current_rpm is None: + continue + try: + vals = list(map(float, line.split())) + except ValueError: + continue + if len(vals) != 15: + continue + + speed = vals[0] * MPH + thrust = vals[9] + torque = vals[10] + data_points.append(PropOppoint(speed, thrust, torque, current_rpm)) + + return data_points + diff --git a/data/PERFILES2 b/data/PERFILES2 new file mode 120000 index 0000000..7940315 --- /dev/null +++ b/data/PERFILES2 @@ -0,0 +1 @@ +/home/Downloads/APC_perf/PERFILES_WEB/PERFILES2 \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..1c0206b --- /dev/null +++ b/test.py @@ -0,0 +1,33 @@ +from apc_perf import ApcPerfdata +import matplotlib.pyplot as plt +from typing import Collection +from dataclasses import dataclass +from collections import defaultdict +import numpy as np + +def plot_prop_op_points(apc_data: ApcPerfdata, rpm: float): + rpm_series = apc_data.rpm_series[rpm] + + speeds = [op.speed for op in rpm_series] + thrusts = [op.thrust for op in rpm_series] + torques = [op.torque for op in rpm_series] + + fig, ax1 = plt.subplots(figsize=(10, 6)) + + ax1.plot(speeds, thrusts, label="Thrust", marker='o', color='tab:blue') + ax1.set_xlabel("Speed") + ax1.set_ylabel("Thrust", color='tab:blue') + ax1.tick_params(axis='y', labelcolor='tab:blue') + + ax2 = ax1.twinx() + ax2.plot(speeds, torques, label="Torque", marker='s', color='tab:orange') + ax2.set_ylabel("Torque", color='tab:orange') + ax2.tick_params(axis='y', labelcolor='tab:orange') + + plt.title(f"PropOppoint Variables vs Speed at RPM {rpm}") + + fig.tight_layout() + plt.show() + +p = ApcPerfdata.from_file('./data/PERFILES2/PER3_10x3.dat') +plot_prop_op_points(p, 1000) diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..446c12c --- /dev/null +++ b/utils.py @@ -0,0 +1,101 @@ +from dataclasses import fields, is_dataclass +from typing import Callable, Protocol, Self, Sequence, TypeVar +import operator + +class Interpolable(Protocol): + def __add__(self: Self, other: Self, /) -> Self: ... + def __mul__(self: Self, other: float, /) -> Self: ... + def __neg__(self: Self, /) -> Self: ... + +EXTRAPOLATE_NONE = 0 +EXTRAPOLATE_CONST = 1 +EXTRAPOLATE_LIN = 2 + +T1 = TypeVar('T1', bound=Interpolable) +def lin_fun_interpolate(x_values: Sequence[float], y_values: Sequence[Callable[[], T1]], x: float, extr_down = EXTRAPOLATE_NONE, extr_up = EXTRAPOLATE_NONE) -> T1: + # x0,x1,y0,y1 = None + if x < x_values[0]: + if extr_down == EXTRAPOLATE_NONE: + raise ValueError("x is outside the interpolation range!") + elif extr_down == EXTRAPOLATE_CONST: + return y_values[0]() + elif extr_down == EXTRAPOLATE_LIN: + pass + else: + raise ValueError('Invalid extrapoltion method') + + for i in range(1, len(x_values)): + x0, x1 = x_values[i-1], x_values[i] + y0, y1 = y_values[i-1](), y_values[i]() + + if x_values[i] > x: + break + else: + if extr_up == EXTRAPOLATE_NONE: + raise ValueError("x is outside the interpolation range!") + elif extr_up == EXTRAPOLATE_CONST: + return y1 + elif extr_up == EXTRAPOLATE_LIN: + pass + else: + raise ValueError('Invalid extrapoltion method') + + return y0 + (y1 - y0) * (x - x0) / (x1 - x0) + + +# Operators + +def map_fields(obj, func): + if not is_dataclass(obj): + raise TypeError("Expected dataclass instance") + cls = type(obj) + return cls(**{ + f.name: func(f.name, getattr(obj, f.name)) + for f in fields(obj) + }) + +def apply_binary_vector_op(a, b, op): + if type(a) is not type(b): + return NotImplemented + return map_fields(a, lambda name, val: op(val, getattr(b, name))) + +def apply_scalar_op(a, scalar, op): + if not isinstance(scalar, (int, float)): + return NotImplemented + return map_fields(a, lambda name, val: op(val, scalar)) + +def apply_rscalar_op(a, scalar, op): + if not isinstance(scalar, (int, float)): + return NotImplemented + return map_fields(a, lambda name, val: op(scalar, val)) + +def apply_unary_op(a, op): + return map_fields(a, lambda name, val: op(val)) + +def _add_ops(cls, op_names, method_name_fn, apply_func): + for name in op_names: + magic = method_name_fn(name) + op = getattr(operator, name) + + def method(self, other=None, *, _op=op, _name=name): + if other is None: + return apply_func(self, _op) + return apply_func(self, other, _op) + + setattr(cls, magic, method) + return cls + +def with_ops_vector(*op_names): + return lambda cls: _add_ops(cls, op_names, lambda name: f"__{name}__", apply_binary_vector_op) + +def with_ops_scalar(*op_names): + return lambda cls: _add_ops(cls, op_names, lambda name: f"__{name}__", apply_scalar_op) + +def with_ops_rscalar(*op_names): + return lambda cls: _add_ops(cls, op_names, lambda name: f"__r{name}__", apply_rscalar_op) + +def with_ops_unary(*op_names): + return lambda cls: _add_ops(cls, op_names, lambda name: f"__{name}__", apply_unary_op) + + +