`

aio_uart 实现

阅读更多

一个简单的aio uart 实现

引用

import contextlib
import copy
import logging
import time

from typing import Dict, Union

import asyncio
import aioserial
import queue


class HubSerial():
    """
    Custom serial class
    Attributes:
        port_config (dict[str, Any]): port configs
    """

    DEFAULT_PORT_CONFIG = {
        'baudrate': 115200,
        'bytesize': 8,
        'parity': 'N',
        'stopbits': 1,
        'timeout': 0.05,  # read timeout
        'xonxoff': False,
        'rtscts': False,
    }

    def __init__(self, port: str, **kwargs):
        """
        Args:
            pexpect_proc: `PexpectProcess` instance
            port: port string or pyserial Serial instance
        """

        if port is None:
            raise ValueError('Please specify port')

        self.port = port
        self.port_config = copy.deepcopy(self.DEFAULT_PORT_CONFIG)
        self.port_config.update(**kwargs)
        self.serial: aioserial.AioSerial = aioserial.AioSerial(port,
                baudrate = self.port_config["baudrate"],
                bytesize = self.port_config["bytesize"],
                parity = self.port_config["parity"],
                stopbits = self.port_config["stopbits"],
                timeout = self.port_config["timeout"],
                xonxoff = self.port_config["xonxoff"],
                rtscts = self.port_config["rtscts"],
                **kwargs
                )
        self.read_queue = queue.Queue()
        self.write_queue = queue.Queue()
        self.stop_serial = False


    async def read_coroutine(self):
        '''
            read_coroutine
            Param:
                f Future: even result of an asynchronous operation.
        '''
        while True:
            data: bytes = await self.serial.read_async()
            # print(data.decode(errors='ignore'), end='', flush=True)
            if self.read_queue.full():
                logging.warning("read queue full for %s", self.port)
            else:
                self.read_queue.put_nowait(data.decode(errors='ignore'))
            if self.stop_serial:
                logging.info("quit read")
                break

    async def write_coroutine(self):
        '''
            write_coroutine
            Param:
                f Future: even result of an asynchronous operation.
        '''
        while True:
            if not self.write_queue.empty():
                _item = self.write_queue.get()
                logging.info('write %s', _item)
                await self.serial.write_async(str.encode(_item))
            if self.stop_serial:
                logging.info("quit write")
                break
            await asyncio.sleep(0.1)

    def stop_coroutine(self):
        '''
            stop_coroutine
        '''
        self.stop_serial = True

    async def start_coroutine(self):
        '''
            start_coroutine
            this shall be called by a async coroutine
        '''
        _f = asyncio.get_running_loop().create_future()

        asyncio.create_task(self.read_coroutine())

        asyncio.create_task(self.write_coroutine())

        return await _f

    def serial_read(self):
        '''
            serial_read
        '''
        _str = ""
        while not self.read_queue.empty():
            _str += self.read_queue.get()

        return _str

    def serial_write(self, data):
        '''
            serial_write
        '''
        if not self.write_queue.full():
            logging.info("add to write queue %s", data)
            self.write_queue.put_nowait(data)


async def main_loop(ser, f):
    asyncio.create_task(ser.start_coroutine())
    await asyncio.sleep(3)
    ser.serial_write("hello world\n")
    await asyncio.sleep(1)
    _str = ser.serial_read()
    logging.info('read data is %s\n', _str)
    await asyncio.sleep(3)
    ser.stop_coroutine()
    f.set_result(1)
    return await f


async def main(ser):
    f = asyncio.get_running_loop().create_future()
    asyncio.create_task(main_loop(ser, f))
    return await f

if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG,
                        format='[%(filename)s:%(lineno)d] %(threadName)s %(message)s')
    loop = asyncio.get_event_loop()
    m_ser = HubSerial(port="COM11")
    asyncio.run(main(m_ser))

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics