async-pyserial 0.2.3

Last updated:

0 purchases

async-pyserial 0.2.3 Image
async-pyserial 0.2.3 Images
Add to Cart

Description:

asyncpyserial 0.2.3

Python bindings for a C++ serial port library providing asynchronous serial communication support.

Features

Non-blocking and blocking read/write operations.
Cross-platform support for Windows, Linux, macOS, and FreeBSD.
Event-driven architecture for handling data events.
Support for gevent, eventlet, asyncio, callback, and synchronous operations.
Uses epoll, iocp, and kqueue for efficient, scalable I/O operations based on the underlying operating system.



Links

Documentation
PyPI page



Installation
You can install the async-pyserial package using either poetry or pip.
### Using Poetry

Install poetry if you haven’t already:

curl -sSL https://install.python-poetry.org | python3 -


Add async-pyserial to your project:

poetry add async-pyserial



### Using pip

Install the package from PyPI:

pip install async-pyserial



### FreeBSD Installation
For FreeBSD, you need to build the package manually:

Clone the repository:

git clone https://github.com/Lei-k/async-pyserial.git


Navigate to the project directory:

cd async-pyserial


Install the dependencies using poetry:

poetry install


Build the package:

python -m build


Install the package:

pip install dist/*.whl





Usage
Here’s a simple example of how to use async-pyserial:
from async_pyserial import SerialPort, SerialPortOptions, SerialPortEvent, SerialPortParity

def on_data(data):
print(f"Received: {data}")

options = SerialPortOptions()
options.baudrate = 9600
options.bytesize = 8
options.stopbits = 1
options.parity = SerialPortParity.NONE # NONE, ODD, EVEN

serial_port = SerialPort('/dev/ttyUSB0', options)
serial_port.on(SerialPortEvent.ON_DATA, on_data)
serial_port.open()

try:
while True:
data_to_send = input("Enter data to send (or 'exit' to quit): ")
if data_to_send.lower() == 'exit':
break
serial_port.write(data_to_send.encode('utf-8'))
finally:
serial_port.close()


API
### SerialPort
A class for serial communication.
#### Methods

__init__(self, port: str, options: SerialPortOptions): Initializes the serial port with the specified parameters.
def write(self, data: bytes, callback: Callable | None = None): Writes data to the serial port. Can be blocking or non-blocking. If a callback is provided, the write will be asynchronous. Supports gevent, eventlet, asyncio, callback, and synchronous operations.
def read(self, bufsize: int = 512, callback: Callable | None = None): Reads data from the serial port. Can be blocking or non-blocking. If a callback is provided, the read will be asynchronous. Supports gevent, eventlet, asyncio, callback, and synchronous operations.
def open(self): Opens the serial port.
def close(self): Closes the serial port.
def on(self, event: SerialPortEvent, callback: Callable[[bytes], None]): Registers a callback for the specified event.
def emit(self, evt: str, *args, **kwargs): Emits an event, triggering all registered callbacks for that event.
def remove_all_listeners(self, evt: str): Removes all listeners for the specified event.
def remove_listener(self, evt: str, listener: Callable): Removes a specific listener for the specified event.
def off(self, evt: str, listener: Callable): Alias for remove_listener.

### SerialPortOptions
A class for specifying serial port options.
#### Attributes

baudrate: int: The baud rate for the serial port.
bytesize: int: The number of data bits.
stopbits: int: The number of stop bits.
parity: int: The parity checking (0: None, 1: Odd, 2: Even).
read_timeout: int: The read timeout in milliseconds.
write_timeout: int: The write timeout in milliseconds.
read_bufsize: int: The read buffer size. Default is 0. When read_bufsize is 0, the internal buffer is not used, and only data received after the read call will be returned. If read_bufsize is not 0, both buffered and new data will be returned.

### SerialPortEvent
An enumeration for serial port events.

ON_DATA: Event triggered when data is received.

### SerialPortError
An exception class for handling serial port errors.

__init__(self, *args: object): Initializes the SerialPortError with the specified arguments.

### PlatformNotSupported
An exception class for handling unsupported platforms.

__init__(self, *args: object): Initializes the PlatformNotSupported exception with the specified arguments.

### set_async_worker
A function for setting the asynchronous worker.

def set_async_worker(w: str, loop = None): Sets the asynchronous worker to gevent, eventlet, or asyncio. Optionally, an event loop can be provided for asyncio.



Examples
The examples directory contains sample scripts demonstrating how to use async-pyserial for various operations. Below are a few examples to help you get started.

Basic read and write operations.
Non-blocking read with asyncio.
Using gevent and eventlet for asynchronous operations.

Example scripts included in the examples directory:

interval_write.py: Demonstrates periodic writing to the serial port.
serialport_read.py: Demonstrates reading from the serial port with different async workers.
serialport_terminal.py: A terminal interface for interacting with the serial port.



Platform Support
Supports Windows, Linux, macOS, and FreeBSD.


Development
To contribute to the project, follow these steps:

Clone the repository:

git clone https://github.com/Lei-k/async-pyserial.git


Navigate to the project directory:

cd async-pyserial


Install the dependencies using poetry:

poetry install


Run the tests:

poetry run pytest





License
This project is licensed under the MIT License. See the LICENSE file for more details.

License:

For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.