ars_noise_measurement/visualizer.py

149 lines
5.1 KiB
Python

from dataclasses import fields
from typing import Callable, cast
from bokeh.io import curdoc
from bokeh.layouts import column, row
from bokeh.models import ColumnDataSource, MultiChoice, Select, Slider, TextInput
from bokeh.plotting import figure
import pickle
from fft import FFTData
from interpolation import interp_keyed
from measurement_station import OpPointData
import math
from bokeh.server.server import Server
from bokeh.application import Application
from bokeh.application.handlers.function import FunctionHandler
import sys
import itertools
def oppoint_data_src(p: OpPointData) -> dict[str, float]:
ret = {}
ret.update({k: v for k, v in field_entries(p.data_thrust_stand_avg).items()})
ret.update({k: v for k, v in p.data_accustic_avg.items()})
return ret
def field_entries(datacls) -> dict[str, float]:
fnames = [f.name for f in fields(datacls)]
return {n: getattr(datacls, n) for n in fnames}
def read_series(fname: str) -> dict[int, OpPointData]:
with open(fname, 'rb') as f:
return pickle.load(f)
def series_dataclosrc(series: dict[int, OpPointData]) -> ColumnDataSource:
entries = [oppoint_data_src(p) for p in series.values()]
keys = entries[0].keys()
data = { key: [e[key] for e in entries] for key in keys }
return ColumnDataSource(data)
def fft_datasrc(fft: FFTData) -> ColumnDataSource:
fft_data = fft.data
freqs = sorted(fft_data.keys())
vals = [fft_data[f] for f in freqs]
data = { "FREQ": freqs, "POWER": vals }
return data
def interp_fft(oppoints: dict[int, OpPointData], key: Callable[[OpPointData], float], x: float) -> FFTData:
seq = [oppoints[x] for x in sorted(oppoints.keys())]
return interp_keyed(seq, key, lambda op: op.data_fft, x)
def make_doc(doc, files):
# print('aaaaaaa')
# print(doc.session_context.request.arguments)
args = doc.session_context.request.arguments
print(args)
files = []
for i in itertools.count():
key = f'f{i}'
if key in args:
files.append(args[key][0].decode())
else:
break
series = [ read_series(f) for f in files ]
sources = [series_dataclosrc(s) for s in series]
keys = list(sources[0].data.keys())
p = figure(title='', x_axis_label='X', y_axis_label='Y', tools=['hover', 'pan', 'xwheel_zoom'])
p.sizing_mode = 'scale_both' # type: ignore
multi_choice = MultiChoice(title="Y Axis", options=keys)
xsel = Select(title="X axis:", value="", options=keys)
srcsel = MultiChoice(title="Source:", value=files, options=files)
colors = ['blue', 'green', 'red', 'yellow', 'orange', 'purple']
pfft = figure(title='', x_axis_label='X', y_axis_label='Y', tools=['hover', 'pan', 'xwheel_zoom'])
pfft.sizing_mode = 'scale_both' # type: ignore
# fftslider = Slider(start=0, end=15, value=1, step=.1, title="fft X")
fftslider = TextInput(title = 'fft X')
def update_plot(attr, _, new_values):
p.renderers = [] # type: ignore
p.xaxis.axis_label = xsel.value
ymin = math.inf
ymax = -math.inf
for source, fname, color in zip(sources, files, colors):
if fname not in srcsel.value: #type: ignore
continue
for column in multi_choice.value: # type: ignore
p.line(x=xsel.value, y=column, source=source, legend_label=f'{fname} {column}', line_width=2, color=color)
ymin = min(min(source.data[column]), ymin)
ymax = max(max(source.data[column]), ymax)
margin = (ymax-ymin) * 0.1
p.y_range.start = ymin - margin
p.y_range.end = ymax + margin
try:
pfft.renderers = [] # type: ignore
for serie, fname, color in zip(series, files, colors):
if fname not in srcsel.value: #type: ignore
continue
data = fft_datasrc(interp_fft(serie, lambda op: oppoint_data_src(op)[xsel.value], float(fftslider.value)))
pfft.line(x='FREQ', y='POWER', source=data, legend_label=f'{fname} FFT', line_width=2, color=color)
except ValueError:
print('Ommiting FFT')
multi_choice.on_change('value', update_plot)
xsel.on_change('value', update_plot)
srcsel.on_change('value', update_plot)
fftslider.on_change('value', update_plot)
layout = column(column(srcsel, multi_choice, xsel), p, pfft, fftslider)
layout.sizing_mode = 'scale_both' # type: ignore
doc.add_root(layout)
if __name__ == "__main__":
files = sys.argv[1:]
if len(files) == 0:
print(f'Usage: {sys.argv[0]} [measurement directory/directories]')
print(f'Examples:')
print(f'- {sys.argv[0]} benchmark1/shroud_1m')
print(f'- {sys.argv[0]} benchmark1/shroud_*')
print(f'- {sys.argv[0]} benchmark1/*')
exit()
apps = {'/': Application(FunctionHandler(lambda doc: make_doc(doc, files)))}
server = Server(apps, address='127.0.0.1', port=8051, allow_websocket_origin=["*"])
print('Visualizer started. navigate to http://localhost:5000/ to continue')
print('Exit with interrupt (Ctrl+C)')
server.run_until_shutdown()