os-scrapy-kafka-pipeline 0.0.15

Creator: railscoder56

Last updated:

Add to Cart


osscrapykafkapipeline 0.0.15


This project provide pipeline to send Scrapy Item to kafka as JSON format

support config default kafka brokers and topic in the settings.py file
support kafka-python producer init args
support dynamic connect and send to other kafka cluster and topic using item meta
item will send to kafka as JSON format, bytes can be encoded to base64 string if it can not be utf-8 encoded

pip install os-scrapy-kafka-pipeline

You can run example spider directly in the project root path.
scrapy crawl example


Enable pipeline in the project settings.py file
"os_scrapy_kafka_pipeline.KafkaPipeline": 300,

Config default kafka brokers
KAFKA_PRODUCER_BROKERS = ["broker01.kafka:9092", "broker02.kafka:9092"]

brokers in the item meta will override this default value
pipeline will not be enabled when this settings can not to start kafka connection
it will raise exception when no brokers configured

Config default kafka producer
KAFKA_PRODUCER_CONFIGS = {"client_id": "id01", "retries": 1}

this is global config, the dynamic connections will use this configs
the bootstrap_servers will not work when KAFKA_PRRDUCER_BROKERS already configured

Config defult topic

the config in the item.meta will override this config
it will raise exception when no topic configured

Config kafka-python loglevel (default "WARNING")

Config kafka producer close timeout (default: None)

Ensure base64
The bytes type of the item mumber will be decoded by utf-8, if decode fail, the pipeline will use base64 to encode the bytes when you set:

Filter field
You can filter item fields which will not export and send to kafka
KAFKA_EXPORT_FILTER = ["filtered_field"]

Dynamic Kafka Connection with item.meta

you can set topic, key, partition using item["meta"]

the item must has meta mumber which type is dict

meta = {
"kafka.topic": "topic01",
"kafka.key": "key01",
"kafka.partition": 1,
"kafka.brokers": "broker01.kafka:9092,broker02.kafka:9092"

Storage Format
Item will send to kafka as JSON format, bytes will encode to base64
Unit Tests
sh scripts/test.sh

MIT licensed.


For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.