Oct 23rd, 2020 - written by Kimserey with .
Last week we looked at how we could setup Kafka locally in Docker. Continuing along our Kafka series, we will look at how we can create a producer and consumer using confluent-kafka-dotnet
.
As a reminder of our post from last week, here is the docker compose file for our local setup:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_CREATE_TOPICS: "kimtopic:2:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Which we then start with docker-compose up -d
. This will start a broker available on localhost:9094
and with a topic kimtopic
with 2 partitions.
To create a producer, we start by adding the confluent-kafka-dotnet
nuget package. Then we can create a producer with the builder ProducerBuilder
.
1
using var producer = new ProducerBuilder<int, string>(new ProducerConfig { BootstrapServers = "localhost:9094" }).Build();
The producer takes two types, the key type and value type. Here we define the key type as int
and value as string
. For the configuration of the producer, we specify the boostrap server as localhost:9094
as that’s the address we advertised.
Using this producer, we can then produce a message on the Kafka topic:
1
await producer.ProduceAsync("kimtopic", new Message<int, string> {Key = 1, Value = "hello world"});
Here is a producer with a while
loop using Bogus
to generate random content for the message:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Bogus;
namespace KafkaProducer
{
class Program
{
public static async Task Main(string[] args)
{
var faker = new Faker();
using var p = new ProducerBuilder<Null, string>(new ProducerConfig { BootstrapServers = "localhost:9094" }).Build();
while (true)
{
Console.WriteLine("Click any key to generate a random value.");
Console.ReadKey();
try
{
var dr = await p.ProduceAsync("kimtopic", new Message<Null, string> {Key = 1, Value = faker.Company.Bs()});
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
}
With this setup, we can use Kafkacat to list to the message this producer add to the topic:
1
kafkacat -C -b localhost:9094 -t kimtopic
Once we have the producer setup, we can move on to look at creating the consumer part. Similarly to the producer, we use a consumer build:
1
2
3
4
5
6
7
8
9
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group-1",
BootstrapServers = "localhost:9094",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoOffsetStore = false
};
using var consumer = new ConsumerBuilder<int, string>(conf).Build();
In the configuration of the consumer, we specify:
GroupId
which will identify the consumer group that this consumer joins,localhost:9094
,earliest
which indicates that if the consumer restart at a committed offset which is no longer valid - it should restart from the earliest offset from the partition,We also set the auto offset store to false
, this is a special case where we do not want the offset to be committed right away after being delivered to the consumer but rather we want to mark it ready for commit once we processed successfully the message. This can be done by calling consumer.SotreOffset(consumerReuslt)
.
After having created the consumer, we can subcribe to the topic with:
1
consumer.Subscribe("kimtopic");
And once we subscribe we can then consume from it:
1
2
3
var consumerResult = consumer.Consume();
Console.WriteLine($"Consumed message '{consumerResult.Value}' at: '{consumerResult.TopicPartitionOffset}'.");
consumer.StoreOffset(consumerResult);
Here is a complete consumer with a while
loop to continue consuming the topic indifinitely.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group-1",
BootstrapServers = "localhost:9094",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false
};
var consumerBuilder = new ConsumerBuilder<Ignore, string>(conf);
using var c = consumerBuilder.Build();
c.Subscribe("kimtopic");
try
{
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
c.StoreOffset(cr);
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
Once reception, we look at the message value and the topic partition offset:
1
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
Using this, we can demonstrate how the partitions are reassigned. If we start a single consumer, we will receive messages from both partitions, and when we start a second instance of the consumer, we will have a reassignment where each partition will be assigned to its separate consumer.
In the same way as C#, Python has an equivalent library called kafka-python
.
1
pip install kafka-python
which we can use to get started creating a producer and a consumer. We use KafkaProducer
to create a producer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from kafka import KafkaProducer
from kafka.errors import KafkaError
from faker import Faker
import json
producer = KafkaProducer(
bootstrap_servers=["localhost:9094"],
key_serializer=lambda m: json.dumps(m).encode("ascii"),
value_serializer=lambda m: json.dumps(m).encode("ascii"),
)
fake = Faker()
for i in range(100):
# Synchronously send to Kafka
producer.send("kimtopic", value={"value": fake.address()}, key=i).get()
and KafkaConsumer
to create a consumer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
"kimtopic",
group_id="my-group",
bootstrap_servers=["localhost:9094"],
key_deserializer=lambda m: json.loads(m.decode('ascii')),
value_deserializer=lambda m: json.loads(m.decode('ascii')),
)
for message in consumer:
print(
"%s:%d:%d: key=%s value=%s"
% (message.topic, message.partition, message.offset, message.key, message.value)
)
The same configuration for the consuemr and producer can be specified direclty under the classes as keyword arguments. And we simply run the scripts py consumer.py
and py producer.py
to start testing. This showcase how even though Kafka is written in Java, the usage of it via the Consumer and Producer is totally under control of the user and any language can be used provided that a library was built to interface with the broker.
And that concludes today’s post!
Today we looked at how we could leverage the local Kafka setup we created in our previous post. We looked at how we could create a producer, what were the configuration involved in that. And we then looked at how we could create a consumer and looked into the configuration necessary to create a consumer. I hope you liked this post and I see you on the next one!