os-scrapy-kafka-pipeline 0.0.15

Last updated:

0 purchases

os-scrapy-kafka-pipeline 0.0.15 Image
os-scrapy-kafka-pipeline 0.0.15 Images
Add to Cart

Description:

osscrapykafkapipeline 0.0.15

os-scrapy-kafka-pipeline




This project provide pipeline to send Scrapy Item to kafka as JSON format
Features:

support config default kafka brokers and topic in the settings.py file
support kafka-python producer init args
support dynamic connect and send to other kafka cluster and topic using item meta
item will send to kafka as JSON format, bytes can be encoded to base64 string if it can not be utf-8 encoded

Install
pip install os-scrapy-kafka-pipeline

You can run example spider directly in the project root path.
scrapy crawl example

Usage
Settings


Enable pipeline in the project settings.py file
ITEM_PIPELINES = {
"os_scrapy_kafka_pipeline.KafkaPipeline": 300,
}



Config default kafka brokers
KAFKA_PRODUCER_BROKERS = ["broker01.kafka:9092", "broker02.kafka:9092"]


brokers in the item meta will override this default value
pipeline will not be enabled when this settings can not to start kafka connection
it will raise exception when no brokers configured



Config default kafka producer
KAFKA_PRODUCER_CONFIGS = {"client_id": "id01", "retries": 1}


this is global config, the dynamic connections will use this configs
the bootstrap_servers will not work when KAFKA_PRRDUCER_BROKERS already configured



Config defult topic
KAFKA_PRODUCER_TOPIC = "topic01"


the config in the item.meta will override this config
it will raise exception when no topic configured



Config kafka-python loglevel (default "WARNING")
KAFKA_PRODUCER_LOGLEVEL = "DEBUG"



Config kafka producer close timeout (default: None)
KAFKA_PRODUCER_CLOSE_TIMEOUT = 10



Ensure base64
The bytes type of the item mumber will be decoded by utf-8, if decode fail, the pipeline will use base64 to encode the bytes when you set:
KAFKA_VALUE_ENSURE_BASE64 = True



Filter field
You can filter item fields which will not export and send to kafka
KAFKA_EXPORT_FILTER = ["filtered_field"]



Dynamic Kafka Connection with item.meta


you can set topic, key, partition using item["meta"]


the item must has meta mumber which type is dict


options:
meta = {
"kafka.topic": "topic01",
"kafka.key": "key01",
"kafka.partition": 1,
"kafka.brokers": "broker01.kafka:9092,broker02.kafka:9092"
}



Storage Format
Item will send to kafka as JSON format, bytes will encode to base64
Unit Tests
sh scripts/test.sh

License
MIT licensed.

License:

For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.