Last updated:
0 purchases
osscrapykafkapipeline 0.0.15
os-scrapy-kafka-pipeline
This project provide pipeline to send Scrapy Item to kafka as JSON format
Features:
support config default kafka brokers and topic in the settings.py file
support kafka-python producer init args
support dynamic connect and send to other kafka cluster and topic using item meta
item will send to kafka as JSON format, bytes can be encoded to base64 string if it can not be utf-8 encoded
Install
pip install os-scrapy-kafka-pipeline
You can run example spider directly in the project root path.
scrapy crawl example
Usage
Settings
Enable pipeline in the project settings.py file
ITEM_PIPELINES = {
"os_scrapy_kafka_pipeline.KafkaPipeline": 300,
}
Config default kafka brokers
KAFKA_PRODUCER_BROKERS = ["broker01.kafka:9092", "broker02.kafka:9092"]
brokers in the item meta will override this default value
pipeline will not be enabled when this settings can not to start kafka connection
it will raise exception when no brokers configured
Config default kafka producer
KAFKA_PRODUCER_CONFIGS = {"client_id": "id01", "retries": 1}
this is global config, the dynamic connections will use this configs
the bootstrap_servers will not work when KAFKA_PRRDUCER_BROKERS already configured
Config defult topic
KAFKA_PRODUCER_TOPIC = "topic01"
the config in the item.meta will override this config
it will raise exception when no topic configured
Config kafka-python loglevel (default "WARNING")
KAFKA_PRODUCER_LOGLEVEL = "DEBUG"
Config kafka producer close timeout (default: None)
KAFKA_PRODUCER_CLOSE_TIMEOUT = 10
Ensure base64
The bytes type of the item mumber will be decoded by utf-8, if decode fail, the pipeline will use base64 to encode the bytes when you set:
KAFKA_VALUE_ENSURE_BASE64 = True
Filter field
You can filter item fields which will not export and send to kafka
KAFKA_EXPORT_FILTER = ["filtered_field"]
Dynamic Kafka Connection with item.meta
you can set topic, key, partition using item["meta"]
the item must has meta mumber which type is dict
options:
meta = {
"kafka.topic": "topic01",
"kafka.key": "key01",
"kafka.partition": 1,
"kafka.brokers": "broker01.kafka:9092,broker02.kafka:9092"
}
Storage Format
Item will send to kafka as JSON format, bytes will encode to base64
Unit Tests
sh scripts/test.sh
License
MIT licensed.
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.