ars_noise_measurement/thrust_stand.py

96 lines
2.4 KiB
Python

import asyncio
from asyncio.futures import Future
from asyncio.tasks import Task
from time import time
from typing import Optional, cast
from msp import MSPSlave, PollResponse
HIST_SIZE = 100
GRAVITY_CONST = 9.80665
HINGE_DISTANCE = 0.07492
THRUST_CONST = 1000 * 5 / 5 * GRAVITY_CONST
TORQUE_CONST = 1000 * 2 / 5 * GRAVITY_CONST * HINGE_DISTANCE / 2
CAL_HINGE_LEFT = 1.2100092475098374
CAL_HINGE_RIGHT = 1.2590952216896254
CAL_LEFT = 0.9663293361785854
CAL_RIGHT = -0.9575068323376389
CAL_THRUST = 0.9516456828857573
CAL_TORQUE_LEFT = CAL_LEFT * CAL_HINGE_LEFT
CAL_TORQUE_RIGHT = CAL_RIGHT * CAL_HINGE_RIGHT
def raw_torque(val_left: float, val_right: float) -> float:
return ((val_right * CAL_TORQUE_RIGHT) - (val_left * CAL_TORQUE_LEFT)) * TORQUE_CONST
def raw_thrust(val: float) -> float:
return val * CAL_THRUST * THRUST_CONST
class ThrustStandMeasurement:
pass
class ThrustStand:
mot_pwm: int
msp: MSPSlave
sample_number: int
samples: list[PollResponse]
tare_thrust: float
tare_torque: float
_next_sample_future: Future
_poller_task: Optional[Task]
_closed: bool
def __init__(self, msp: MSPSlave):
self.msp = msp
self.mot_pwm = 1000
self._poller_task = None
self.samples = []
self._next_sample_future = Future()
async def _poller(self):
while True:
await asyncio.gather(
asyncio.sleep(0.03),
self._do_poll()
)
async def _do_poll(self):
res = await self.msp.do_poll(self.mot_pwm)
self.samples.append(res)
# print(res.rot_e)
self._next_sample_future.set_result(None)
self._next_sample_future = Future()
async def ensure_running(self):
if self._poller_task is None:
self._poller_task = asyncio.create_task(self._poller())
async def wait_samples(self, n):
for _ in range(n):
await self.next_sample()
async def stabilize_rpm(self, window: int, tolerance: float):
await self.wait_samples(window)
while True:
samples = self.samples[-window:]
rpms = list(map(lambda s: s.rot_e, samples))
if max(rpms) - min(rpms) < tolerance:
return
await self.next_sample()
def next_sample(self) -> Future:
return asyncio.shield(self._next_sample_future)