beam-postgres 0.5.0

Creator: codyrutscher

Last updated:

Add to Cart

Description:

beampostgres 0.5.0

beam-postgres


Light IO transforms for Postgres read/write in Apache Beam pipelines.
Goal
The project aims to provide highly performant and customizable transforms and is
not intended to support many different SQL database engines.
Features

ReadAllFromPostgres, ReadFromPostgres`` and WriteToPostgres` transforms
Records can be mapped to tuples, dictionaries or dataclasses
Reads and writes are in configurable batches

Usage
Printing data from the database table:
import apache_beam as beam
from psycopg.rows import dict_row

from beam_postgres.io import ReadAllFromPostgres

with beam.Pipeline() as p:
data = p | "Reading example records from database" >> ReadAllFromPostgres(
"host=localhost dbname=examples user=postgres password=postgres",
"select id, data from source",
dict_row,
)
data | "Writing to stdout" >> beam.Map(print)

Writing data to the database table:
from dataclasses import dataclass

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from beam_postgres.io import WriteToPostgres


@dataclass
class Example:
data: str


with beam.Pipeline(options=PipelineOptions()) as p:
data = p | "Reading example records" >> beam.Create(
[
Example("example1"),
Example("example2"),
]
)
data | "Writing example records to database" >> WriteToPostgres(
"host=localhost dbname=examples user=postgres password=postgres",
"insert into sink (data) values (%(data)s)",
)

See here for more examples.
Reading in batches
There may be situations when you have so much data that it will not fit into the
memory - then you want to read your table data in batches. You can see an
example code here (the code reads records in a batches of
1).

License

For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.