一个简单的aio uart 实现
引用
import contextlib
import copy
import logging
import time
from typing import Dict, Union
import asyncio
import aioserial
import queue
class HubSerial():
"""
Custom serial class
Attributes:
port_config (dict[str, Any]): port configs
"""
DEFAULT_PORT_CONFIG = {
'baudrate': 115200,
'bytesize': 8,
'parity': 'N',
'stopbits': 1,
'timeout': 0.05, # read timeout
'xonxoff': False,
'rtscts': False,
}
def __init__(self, port: str, **kwargs):
"""
Args:
pexpect_proc: `PexpectProcess` instance
port: port string or pyserial Serial instance
"""
if port is None:
raise ValueError('Please specify port')
self.port = port
self.port_config = copy.deepcopy(self.DEFAULT_PORT_CONFIG)
self.port_config.update(**kwargs)
self.serial: aioserial.AioSerial = aioserial.AioSerial(port,
baudrate = self.port_config["baudrate"],
bytesize = self.port_config["bytesize"],
parity = self.port_config["parity"],
stopbits = self.port_config["stopbits"],
timeout = self.port_config["timeout"],
xonxoff = self.port_config["xonxoff"],
rtscts = self.port_config["rtscts"],
**kwargs
)
self.read_queue = queue.Queue()
self.write_queue = queue.Queue()
self.stop_serial = False
async def read_coroutine(self):
'''
read_coroutine
Param:
f Future: even result of an asynchronous operation.
'''
while True:
data: bytes = await self.serial.read_async()
# print(data.decode(errors='ignore'), end='', flush=True)
if self.read_queue.full():
logging.warning("read queue full for %s", self.port)
else:
self.read_queue.put_nowait(data.decode(errors='ignore'))
if self.stop_serial:
logging.info("quit read")
break
async def write_coroutine(self):
'''
write_coroutine
Param:
f Future: even result of an asynchronous operation.
'''
while True:
if not self.write_queue.empty():
_item = self.write_queue.get()
logging.info('write %s', _item)
await self.serial.write_async(str.encode(_item))
if self.stop_serial:
logging.info("quit write")
break
await asyncio.sleep(0.1)
def stop_coroutine(self):
'''
stop_coroutine
'''
self.stop_serial = True
async def start_coroutine(self):
'''
start_coroutine
this shall be called by a async coroutine
'''
_f = asyncio.get_running_loop().create_future()
asyncio.create_task(self.read_coroutine())
asyncio.create_task(self.write_coroutine())
return await _f
def serial_read(self):
'''
serial_read
'''
_str = ""
while not self.read_queue.empty():
_str += self.read_queue.get()
return _str
def serial_write(self, data):
'''
serial_write
'''
if not self.write_queue.full():
logging.info("add to write queue %s", data)
self.write_queue.put_nowait(data)
async def main_loop(ser, f):
asyncio.create_task(ser.start_coroutine())
await asyncio.sleep(3)
ser.serial_write("hello world\n")
await asyncio.sleep(1)
_str = ser.serial_read()
logging.info('read data is %s\n', _str)
await asyncio.sleep(3)
ser.stop_coroutine()
f.set_result(1)
return await f
async def main(ser):
f = asyncio.get_running_loop().create_future()
asyncio.create_task(main_loop(ser, f))
return await f
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG,
format='[%(filename)s:%(lineno)d] %(threadName)s %(message)s')
loop = asyncio.get_event_loop()
m_ser = HubSerial(port="COM11")
asyncio.run(main(m_ser))
分享到:
相关推荐
压缩包下载好后进行解压缩,得到一个名为“VisualCppRedist_AIO_x86_x64”的程序文件。运行这个文件,点击“Next”按钮,程序便开始自动安装,安装过程中可能会碰到“已经安装了该产品的另一个版本。无法继续安装此...
MAS_AIO_v2.2.cmd.rar
内含 MSVBCRT_AIO_2018.10.28_X86+X64.exe 和 MSVBCRT_AIO_2018.10.28_X86.exe 两个程序, 微软常用运行库合集32+64位合集 , vc运行库, 解决vc**.dll丢失 ,MSVBCRT,微软VC运行库,VC运行库,运行库,dll丢失 。
windows 提示缺少 dll 文件。微软常用运行库合集 MSVBCRT_AIO_2018.05.13_X86 X64.exe
dotNetRT_WithFix_AIO_downcc.zipdotNetRT_WithFix_AIO_downcc.zipdotNetRT_WithFix_AIO_downcc.zipdotNetRT_WithFix_AIO_downcc.zipdotNetRT_WithFix_AIO_downcc.zip
微软常用运行库合集32+64位合集,MSVBCRT_AIO_2018.07.18_X86+X64和MSVBCRT_AIO_2018.07.18_X86最新版。
windows_repair_aio_setup
Microsoft Visual C++ Redistributable(VisualCppRedist AIO 系统运行库,简称MSVC,VB/VC)是Windows操作系统应用程序的基础类型库组件。用于最新Microsoft Visual C ++可再发行运行时的AIO重新打包,而没有原始...
安装软件或服务报系统缺失某文件,安装此程序可解决系统缺失问题
VC++所有运行库
VisualCppRedist_AIO_x86_x64_v53.exe
3Planesoft_AIO_125_in_1___v8.1.2_By_DFoX
xshell6注册机
MSVBCFJ_AIO_2015.11_X86 x64
Microsoft Runtimes AIO(微软常用运行库合集).exe文件
打开程序时遇到“应用程序并行配置不正确使用命令行sxstrace.exe”问题时,可能是由于系统没有安装Microsoft Visual C++ 20XX(运行库),下载安装运行库后问题可以得到有效解决。例如安装MATLAB、cadence后无法打开
本集合包含下列组件: Visual Basic Virtual Machine(5. 1) Visual Basic Virtus1l Machine (6.... Microsoft C Runtime Libr ary(7.0) Microsoft C Runtime Libr ary(7.10) ...Microsoft Visual C++ 2005 SP1(8.0....
SoftPerfect8合1 注册机. 软件下载地址:https://www.softperfect.com/ SoftPerfect Network Scanner SoftPerfect NetWorx SoftPerfect RAM Disk 等
All in One Runtimes - 快速一键安装全部 Windows 系统必备常用运行库合集
微软常用运行库合集vc++等运行库