119 lines
3.3 KiB
Python
119 lines
3.3 KiB
Python
import asyncio
|
|
from collections.abc import Iterable, Sequence
|
|
from dataclasses import dataclass
|
|
from logging import debug
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from fft import FFTData
|
|
from logger import spr
|
|
from norsonic_parser import parse_report
|
|
import thrust_stand
|
|
import norsonic
|
|
|
|
from norsonic_fetcher import nor_get_reports
|
|
from thrust_stand import ThrustStand, ThrustStandMeasurement
|
|
|
|
@dataclass(frozen=True)
|
|
class OpPointData:
|
|
data_thrust_stand: Sequence[ThrustStandMeasurement]
|
|
raw_nor_report: Optional[bytes]
|
|
pwm_setpoint: int
|
|
|
|
@property
|
|
def nor_report_parsed(self):
|
|
if self.raw_nor_report is None:
|
|
raise ValueError()
|
|
return parse_report(self.raw_nor_report)
|
|
|
|
@property
|
|
def data_accustic(self):
|
|
return self.nor_report_parsed.profile
|
|
|
|
@property
|
|
def data_thrust_stand_avg(self) -> ThrustStandMeasurement:
|
|
return sum(self.data_thrust_stand, ThrustStandMeasurement.zero())/len(self.data_thrust_stand)
|
|
|
|
@property
|
|
def data_accustic_avg(self) -> dict[str, float]:
|
|
return {k: sum(map(float, (row[k] for row in self.data_accustic)))/len(self.data_accustic) for k in self.data_accustic[0].keys() if k != 'Date'}
|
|
|
|
@property
|
|
def data_fft(self) -> FFTData:
|
|
return FFTData(self.nor_report_parsed.glob_fft)
|
|
|
|
|
|
|
|
# import sys
|
|
# async def ainput(string: str) -> str:
|
|
# await asyncio.get_event_loop().run_in_executor(
|
|
# None, lambda s=string: sys.stdout.write(s+' '))
|
|
# return await asyncio.get_event_loop().run_in_executor(
|
|
# None, sys.stdin.readline)
|
|
|
|
|
|
@dataclass
|
|
class ConnectionParams:
|
|
stand_tty: str
|
|
nor_addr: str
|
|
nor_ftp_user: str
|
|
nor_ftp_pass: str
|
|
nor_recordings_dir: str
|
|
|
|
async def meas_series(params: ConnectionParams, pwms: Iterable[int]):
|
|
stand = await ThrustStand.open_connection(params.stand_tty)
|
|
nor = await norsonic.open_connection(params.nor_addr)
|
|
|
|
results_stand = {}
|
|
files_nor = {}
|
|
|
|
# TODO TARA
|
|
sample, = stand.get_samples_raw(1)
|
|
stand.tare_thrust = thrust_stand.raw_thrust(sample.load_thrust)
|
|
stand.tare_torque = thrust_stand.raw_torque(sample.load_left, sample.load_right)
|
|
stand.tare_current = sample.esc_current
|
|
|
|
stand.mot_pwm = 1000
|
|
|
|
####
|
|
# stand.mot_pwm = 1200
|
|
# await ainput("Press Enter to continue...")
|
|
|
|
|
|
for pwm in pwms:
|
|
stand.mot_pwm = pwm
|
|
spr(f'Output: {pwm}PWM')
|
|
await stand.stabilize_rpm(10, 4)
|
|
spr(f'Starting measurement')
|
|
stand_series_pending = stand.start_meas_series()
|
|
#
|
|
files_nor[pwm] = await norsonic.record(nor)
|
|
# await asyncio.sleep(8)
|
|
spr(f'Done')
|
|
results_stand[pwm] = stand.finish_meas_series(stand_series_pending)
|
|
print(results_stand[pwm][0])
|
|
|
|
while stand.mot_pwm > 1000:
|
|
stand.mot_pwm -= 10
|
|
await asyncio.sleep(0.1)
|
|
|
|
stand.mot_pwm = 1000
|
|
|
|
await asyncio.sleep(3)
|
|
|
|
spr('Downlaoding reports')
|
|
|
|
|
|
nor_reports = await nor_get_reports(params.nor_addr, params.nor_ftp_user, params.nor_ftp_pass, [files_nor[pwm] for pwm in pwms])
|
|
spr('Done')
|
|
|
|
ret = {
|
|
pwm: OpPointData(
|
|
data_thrust_stand=results_stand[pwm],
|
|
# raw_nor_report=None
|
|
raw_nor_report=nor_reports[i]
|
|
) for i, pwm in enumerate(pwms)
|
|
}
|
|
return ret
|
|
|