httpx-sse 0.4.0

Creator: rpa-with-ash

Last updated:

Add to Cart

Description:

httpxsse 0.4.0

httpx-sse



Consume Server-Sent Event (SSE) messages with HTTPX.
Table of contents

Installation
Quickstart
How-To
API Reference

Installation
NOTE: This is beta software. Please be sure to pin your dependencies.
pip install httpx-sse=="0.4.*"

Quickstart
httpx-sse provides the connect_sse and aconnect_sse helpers for connecting to an SSE endpoint. The resulting EventSource object exposes the .iter_sse() and .aiter_sse() methods to iterate over the server-sent events.
Example usage:
import httpx
from httpx_sse import connect_sse

with httpx.Client() as client:
with connect_sse(client, "GET", "http://localhost:8000/sse") as event_source:
for sse in event_source.iter_sse():
print(sse.event, sse.data, sse.id, sse.retry)

You can try this against this example Starlette server (credit):
# Requirements: pip install uvicorn starlette sse-starlette
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse

async def numbers(minimum, maximum):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.9)
yield {"data": i}

async def sse(request):
generator = numbers(1, 5)
return EventSourceResponse(generator)

routes = [
Route("/sse", endpoint=sse)
]

app = Starlette(routes=routes)

if __name__ == "__main__":
uvicorn.run(app)

How-To
Calling into Python web apps
You can call into Python web apps with HTTPX and httpx-sse to test SSE endpoints directly.
Here's an example of calling into a Starlette ASGI app...
import asyncio

import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route

async def auth_events(request):
async def events():
yield {
"event": "login",
"data": '{"user_id": "4135"}',
}

return EventSourceResponse(events())

app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])

async def main():
async with httpx.AsyncClient(app=app) as client:
async with aconnect_sse(
client, "GET", "http://localhost:8000/sse/auth/"
) as event_source:
events = [sse async for sse in event_source.aiter_sse()]
(sse,) = events
assert sse.event == "login"
assert sse.json() == {"user_id": "4135"}

asyncio.run(main())

Handling reconnections
(Advanced)
SSETransport and AsyncSSETransport don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an httpx.ReadError from iter_sse() (or aiter_sse()).
However, httpx-sse does allow implementing reconnection by using the Last-Event-ID and reconnection time (in milliseconds), exposed as sse.id and sse.retry respectively.
Here's how you might achieve this using stamina...
import time
from typing import Iterator

import httpx
from httpx_sse import connect_sse, ServerSentEvent
from stamina import retry

def iter_sse_retrying(client, method, url):
last_event_id = ""
reconnection_delay = 0.0

# `stamina` will apply jitter and exponential backoff on top of
# the `retry` reconnection delay sent by the server.
@retry(on=httpx.ReadError)
def _iter_sse():
nonlocal last_event_id, reconnection_delay

time.sleep(reconnection_delay)

headers = {"Accept": "text/event-stream"}

if last_event_id:
headers["Last-Event-ID"] = last_event_id

with connect_sse(client, method, url, headers=headers) as event_source:
for sse in event_source.iter_sse():
last_event_id = sse.id

if sse.retry is not None:
reconnection_delay = sse.retry / 1000

yield sse

return _iter_sse()

Usage:
with httpx.Client() as client:
for sse in iter_sse_retrying(client, "GET", "http://localhost:8000/sse"):
print(sse.event, sse.data)

API Reference
connect_sse
def connect_sse(
client: httpx.Client,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> ContextManager[EventSource]

Connect to an SSE endpoint and return an EventSource context manager.
This sets Cache-Control: no-store on the request, as per the SSE spec, as well as Accept: text/event-stream.
If the response Content-Type is not text/event-stream, this will raise an SSEError.
aconnect_sse
async def aconnect_sse(
client: httpx.AsyncClient,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> AsyncContextManager[EventSource]

An async equivalent to connect_sse.
EventSource
def __init__(response: httpx.Response)

Helper for working with an SSE response.
response
The underlying httpx.Response.
iter_sse
def iter_sse() -> Iterator[ServerSentEvent]

Decode the response content and yield corresponding ServerSentEvent.
Example usage:
for sse in event_source.iter_sse():
...

aiter_sse
async def iter_sse() -> AsyncIterator[ServerSentEvent]

An async equivalent to iter_sse.
ServerSentEvent
Represents a server-sent event.

event: str - Defaults to "message".
data: str - Defaults to "".
id: str - Defaults to "".
retry: str | None - Defaults to None.

Methods:

json() -> Any - Returns sse.data decoded as JSON.

SSEError
An error that occurred while making a request to an SSE endpoint.
Parents:

httpx.TransportError

License
MIT
Changelog
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog.
0.4.0 - 2023-12-22
Removed

Dropped Python 3.7 support, as it has reached EOL. (Pull #21)

Added

Add official support for Python 3.12. (Pull #21)

Fixed

Allow Content-Type that contain but are not strictly text/event-stream. (Pull #22 by @dbuades)
Improve error message when Content-Type is missing. (Pull #20 by @jamesbraza)

0.3.1 - 2023-06-01
Added

Add __repr__() for ServerSentEvent model, which may help with debugging and other tasks. (Pull #16)

0.3.0 - 2023-04-27
Changed

Raising an SSEError if the response content type is not text/event-stream is now performed as part of iter_sse() / aiter_sse(), instead of connect_sse() / aconnect_sse(). This allows inspecting the response before iterating on server-sent events, such as checking for error responses. (Pull #12)

0.2.0 - 2023-03-27
Changed

connect_sse() and aconnect_sse() now require a method argument: connect_sse(client, "GET", "https://example.org"). This provides support for SSE requests with HTTP verbs other than GET. (Pull #7)

0.1.0 - 2023-02-05
Initial release
Added

Add connect_sse, aconnect_sse(), ServerSentEvent and SSEError.

License

For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.