ars_noise_measurement/async_serial.py

71 lines
2.0 KiB
Python

import asyncio
from typing import cast
from serial.serialutil import SerialException
from typing_extensions import Buffer
import serial
class X(asyncio.StreamReaderProtocol):
...
class SerialTransport(asyncio.Transport):
ser: serial.Serial
loop: asyncio.BaseEventLoop
def __init__(self, loop, protocol, serial: serial.Serial):
serial.timeout = 0
self.loop = loop
self.protocol = protocol
self.ser = serial
self.paused = False
self._read_ready()
def write(self, data: Buffer):
self.ser.write(data)
def _read_ready(self):
READ_BUFF_SIZE = 2048
while True:
try:
data = self.ser.read(READ_BUFF_SIZE)
except SerialException as e:
self.protocol.connection_lost(e)
return
if len(data) == 0:
break
self.protocol.data_received(data)
if self.ser.fd is None:
raise RuntimeError('Invalid serial fd')
self.loop.add_reader(cast(int, self.ser.fd), self._read_ready)
def is_closing(self) -> bool:
print('asked if closing')
return True
# def serial_create_connection(protocol_factory, serial):
# loop = asyncio.get_event_loop()
# protocol = protocol_factory()
# transport = SerialTransport(loop, protocol, serial)
# return transport, protocol
# def serial_open_connection(serial):
# reader = streams.StreamReader()
# protocol = streams.StreamReaderProtocol(reader)
# factory = lambda: protocol
# transport, _ = serial_create_connection(factory, serial)
# writer = streams.StreamWriter(transport, protocol, reader, asyncio.get_running_loop())
# return reader, writer
def wrap_serial(serial: serial.Serial):
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
transport = SerialTransport(loop, protocol, serial)
writer = asyncio.StreamWriter(transport, protocol, reader, loop)
return reader, writer