Last updated:
0 purchases
kafkabr
Dart Kafka #
Kafka client library written in Dart.
Current status #
This library is a work-in-progress.
Currently all the updates are happening in kafka-0.10 branch:
Support for Kafka 0.10 APIs including Group Membership API
Implementation of HighLevelConsumer capable of automatic load-balancing
and re-distribution of topics/partitions in case of failures.
Better testing framework.
Isolate-based distribution of consumer group members for better utilization
of system resources.
Master branch currently targets 0.8.x versions of Kafka server.
Things that are not supported yet. #
Snappy compression.
Installation #
There is no Pub package yet, but it will be published as soon as APIs are
stable enough.
For now you can use git dependency in your pubspec.yaml:
dependencies:
kafka:
git: https://github.com/armando-couto/kafkabr.git
copied to clipboard
And then import it as usual:
import 'package:kafkabr/kafka.dart';
copied to clipboard
Features #
This library provides several high-level API objects to interact with Kafka:
KafkaSession - responsible for managing connections to Kafka brokers and
coordinating all requests. Also provides access to metadata information.
Producer - publishes messages to Kafka topics
Consumer - consumes messages from Kafka topics and stores it's state (current
offsets). Leverages ConsumerMetadata API via ConsumerGroup.
Fetcher - consumes messages from Kafka without storing state.
OffsetMaster - provides convenience on top of Offset API allowing to easily
retrieve earliest and latest offsets of particular topic-partitions.
ConsumerGroup - provides convenience on top of Consumer Metadata API to easily
fetch or commit consumer offsets.
Producer #
Simple implementation of Kafka producer. Supports auto-detection of leaders for
topic-partitions and creates separate ProduceRequests for each broker.
Requests are sent in parallel and all responses are aggregated in special
ProduceResult object.
// file:produce.dart
import 'dart:io';
import 'package:kafkabr/kafka.dart';
main(List<String> arguments) async {
var host = new ContactPoint('127.0.0.1', 9092);
var session = new KafkaSession([host]);
var producer = new Producer(session, 1, 1000);
var result = await producer.produce([
new ProduceEnvelope('topicName', 0, [new Message('msgForPartition0'.codeUnits)]),
new ProduceEnvelope('topicName', 1, [new Message('msgForPartition1'.codeUnits)])
]);
print(result.hasErrors);
print(result.offsets);
session.close(); // make sure to always close the session when the work is done.
}
copied to clipboard
Result:
$ dart produce.dart
$ false
$ {dartKafkaTest: {0: 213075, 1: 201680}}
copied to clipboard
Consumer #
High-level implementation of Kafka consumer which stores it's state using
Kafka's ConsumerMetadata API.
If you don't want to keep state of consumed offsets take a look at Fetcher
which was designed specifically for this use case.
Consumer returns messages as a Stream, so all standard stream operations
should be applicable. However Kafka topics are ordered streams of messages
with sequential offsets. Consumer implementation allows to preserve order of
messages received from server. For this purpose all messages are wrapped in
special MessageEnvelope object with following methods:
/// Signals to consumer that message has been processed and it's offset can
/// be committed.
void commit(String metadata);
/// Signals that message has been processed and we are ready for
/// the next one. Offset of this message will **not** be committed.
void ack();
/// Signals to consumer to cancel any further deliveries and close the stream.
void cancel();
copied to clipboard
One must call commit() or ack() for each processed message, otherwise
Consumer won't send the next message to the stream.
Simplest example of a consumer:
import 'dart:io';
import 'dart:async';
import 'package:kafkabr/kafka.dart';
void main(List<String> arguments) async {
var host = new ContactPoint('127.0.0.1', 9092);
var session = new KafkaSession([host]);
var group = new ConsumerGroup(session, 'consumerGroupName');
var topics = {
'topicName': [0, 1] // list of partitions to consume from.
};
var consumer = new Consumer(session, group, topics, 100, 1);
await for (MessageEnvelope envelope in consumer.consume(limit: 3)) {
// Assuming that messages were produces by Producer from previous example.
var value = new String.fromCharCodes(envelope.message.value);
print('Got message: ${envelope.offset}, ${value}');
envelope.commit('metadata'); // Important.
}
session.close(); // make sure to always close the session when the work is done.
}
copied to clipboard
It is also possible to consume messages in batches for improved efficiency:
import 'dart:io';
import 'dart:async';
import 'package:kafkabr/kafka.dart';
void main(List<String> arguments) async {
var host = new ContactPoint('127.0.0.1', 9092);
var session = new KafkaSession([host]);
var group = new ConsumerGroup(session, 'consumerGroupName');
var topics = {
'topicName': [0, 1] // list of partitions to consume from.
};
var consumer = new Consumer(session, group, topics, 100, 1);
await for (BatchEnvelope batch in consumer.batchConsume(20)) {
batch.items.forEach((MessageEnvelope envelope) {
// use envelope as usual
});
batch.commit('metadata'); // use batch control methods instead of individual messages.
}
session.close(); // make sure to always close the session when the work is done.
}
copied to clipboard
Consumer offset reset strategy #
Due to the fact that Kafka topics can be configured to delete old messages
periodically, it is possible that your consumer offset may become invalid (
just because there is no such message/offset in Kafka topic anymore).
In such cases Consumer provides configurable strategy with following options:
OffsetOutOfRangeBehavior.throwError
OffsetOutOfRangeBehavior.resetToEarliest (default)
OffsetOutOfRangeBehavior.resetToLatest
By default if it gets OffsetOutOfRange server error it will reset it's offsets
to earliest available in the consumed topic and partitions, which essentially
means consuming all available messages from the beginning.
To modify this behavior simply set onOffsetOutOfRange property of consumer to
one of the above values:
var consumer = new Consumer(session, group, topics, 100, 1);
consumer.onOffsetOutOfRange = OffsetOutOfRangeBehavior.throwError;
copied to clipboard
Supported protocol versions #
Current version targets version 1.1.1 of the Kafka protocol. There is no plans
to support earlier versions.
License #
BSD-2
Publish #
dart pub publish --dry-run
dart pub publish
copied to clipboard
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.