Nov 13th, 2020 - written by Kimserey with .
Offsets are a big part of Kafka. They indicate the position of a log in the partition of a topic, allowing consumers to know what to read, and from where to start. In today’s post we will look into how consumers manage the offset, store and commit them, and how brokers maintain them to allow failure to happen on a consumer group.
We start first by setting up Kafka in Docker:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_CREATE_TOPICS: "kimtopic:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
This is a simple single broker, with a topic kimtopic
with a single partition. We start it via docker-compse
:
1
docker-compose up -d
For this post, we’ll reuse our previous consumer written in C# dotnet:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "my-group-1",
BootstrapServers = "localhost:9094",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
EnableAutoOffsetStore = false
};
var consumerBuilder = new ConsumerBuilder<Ignore, string>(conf);
using var c = consumerBuilder.Build();
c.Subscribe("kimtopic");
try
{
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
// c.StoreOffset(cr);
// c.Commit();
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
This consumer will start an infinite loop which pull messages out of the topics and write them onto the console. In our consumer configuration, we have disabled the auto storage of offset and the auto commit of offset in order to illustrate the usage of offsets:
1
2
EnableAutoCommit = false,
EnableAutoOffsetStore = false
Now if we start our consumer and inspect the broker (via docker cli as explained in our previous post):
1
2
3
4
/ # kafka-consumer-groups.sh --bootstrap-server kafka:9092 --all-groups all-topics --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group-1 kimtopic 0 - 0 - rdkafka-1c17f9cd-d55d-4510-8e17-3008f3a9a608 /172.21.0.1 rdkafka
we have one topic kimtopic
with a single partition 0
with a single consumer group and only one consumer within that group. The consumer is given a member ID rdkafka-1c17f9cd-d55d-4510-8e17-3008f3a9a608
by the broker.
We can see that there are no messages in the topic, hence the current-offset
is unknown, the lag is also unkown and the log-end-offset
is to 0.
Now if we start a producer with Kafkacat -P
and add one message:
1
2
❯ kafkacat -P -b localhost:9094 -t kimtopic
hello world!
We then look at the consumer group:
1
2
3
4
/ # kafka-consumer-groups.sh --bootstrap-server kafka:9092 --all-groups all-topics --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group-1 kimtopic 0 - 1 - rdkafka-71367ca0-7364-49a6-a3ce-a99796a9bb6f /172.21.0.1 rdkafka
As we continue to produce messages, the log-end-offset will continue to increase and we should see the messages being consumed by our consumer but we will still see that the current-offset
and the lag
remain unknown due to the fact that we have disable the auto-commit hence no offsets are ever committed.
The side effect of not committing any offset is that if we restart our consumers, the AutoOffsetReset
configuration will be checked and based on whether it is set to Earliest
or Latest
, the consumer will read back from beginning or start reading from the end of the topic. So even though we aren’t committing back to the broker our current offset, the consumer keeps track of where it is at in memory and pull messages from the last message it handled.
But there is a point where we want to commit offsets to the broker in order to survive failures. When a consumer is no longer available, the broker will rebalance the partitions on other consumers part of the same consumer group, if no offset has been committed, the new consumer getting the rebalanced partition will start to consume the partition based on the AutoOffsetReset
setting. On the other hand, if the last offset was committed - it will be seen under current-offset
and the new consumer will start back from where the failed consumer left (or more presicely - from the last committed offset).
Now that we understand the usage of committing offsets, we can look at how we can do so. The most common (and default) way of committing offset is via auto-commit. To reenable auto-commit, we either remove EnableAutoCommit
(default is true
) or explicitly mark it as true
, and remove EnableAutoOffsetStore
(default is true
) or explicitly mark it as true
:
1
2
EnableAutoCommit = true,
EnableAutoOffsetStore = true
The consumer will not commit offsets after each consumption as that would be too inefficient. Instead, it stores the last offsets of handled messages per partition and periodically commits them. This is the reason why EnableAutoOffsetStore
has to be true
, to mark offset as “ready to be committed”, and EnableAutoCommit
has to be true
to enable the periodic background commit.
Once we have reenabled both, we can see that the current-offset
and lag
change as we produce more messages:
1
2
3
4
/ # kafka-consumer-groups.sh --bootstrap-server kafka:9092 --all-groups all-topics --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group-1 kimtopic 0 11 11 0 rdkafka-cb5b1a81-77b2-4cf6-8506-aff53e7032fc /172.21.0.1 rdkafka
As offsets are made available to commit as soon as they are delivered to the consumer, any failure within the consumer might still endup committing the offset. In order to prevent that, we can delay storing the offset to the end of our process by disabling the auto-offset store and manually marking offsets using StoreOffset
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "my-group-1",
BootstrapServers = "localhost:9094",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false
};
var consumerBuilder = new ConsumerBuilder<Ignore, string>(conf);
using var c = consumerBuilder.Build();
c.Subscribe("kimtopic");
try
{
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
c.StoreOffset(cr); // here we manually store offsets as we have disabled `EnableAutoOffsetStore`.
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
So far we’ve seen what the offset represents and how to commit them and have been inspecting the current state of our consumer group using kafka-consumer-groups.sh
. Another important aspect is figuring out when our consumer has reach the end of the partition, meaning it managed to catch up all messages. The way a consumer can figure this out is via the PARTITION_EOF
event.
In order to enable PARTITION_EOF
, we need to set EnablePartitionEof
on the config:
1
EnablePartitionEof = true
and check the flag IsPartitionEOF
on consume result:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "my-group-1",
BootstrapServers = "localhost:9094",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
EnablePartitionEof = true
};
var consumerBuilder = new ConsumerBuilder<Ignore, string>(conf);
using var c = consumerBuilder.Build();
c.Subscribe("kimtopic");
try
{
while (true)
{
try
{
var cr = c.Consume();
if (cr.IsPartitionEOF)
{
Console.WriteLine($"Consumer reach end of partition '{cr.TopicPartition}'");
continue;
}
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
c.StoreOffset(cr); // here we manually store offsets as we have disabled `EnableAutoOffsetStore`.
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
This will give us an indication of the health of the consumer as it is able to reach the end of the partition. Because the end of partition is a moving target, the log will be showed as many time as the target moves and the consumer catches up.
The other indication of health of a consumer is the consumer lag. The consumer lag is computed by the difference between the LOG-END-OFFSET
and the CURRENT-OFFSET
which correspond to the number of messages ahead of the current position of the consumer. The consumer lag is a good way to figure out how far behind the consumer is at.
Another way to measure the lag behind in term of time is to use the timestamp of the message:
1
2
3
4
var lagInMilliseconds =
(DateTime.UtcNow - cr.Message.Timestamp.UtcDateTime).TotalMilliseconds;
Console.WriteLine($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}' - lag '{lagInMilliseconds}'.");
Instead of using Docker CLI and interogate the broker, we can also be retrieve directly from the consumer stored offset
, committed offset
and consumer lag
via stastics. To enable statistics, we add a statistics interval on the config, for example 5 seconds:
1
StatisticsIntervalMs = 5000
and add a handler on the consumer builder:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var consumerBuilder = new ConsumerBuilder<Null, string>(conf)
.SetStatisticsHandler((c, json) =>
{
var stats = JsonConvert.DeserializeObject<ConsumerStatistics>(json);
foreach (var (topic, topicStats) in stats.Topics)
{
foreach (var (partition, partitionStats) in topicStats.Partitions.Where(p => p.Key != "-1"))
{
if (!c.Assignment.Any(
a => a.Topic == topic
&& a.Partition.Value.ToString() == partition))
// Only record partition topic metrics for the partition assigned.
continue;
Console.WriteLine($"[{DateTimeOffset.FromUnixTimeSeconds(stats.Time)}] "
+ $"MemberId: {c.MemberId}\n"
+ $"Topic: {topic} - Partition: {partition}\n"
+ $"Lag {partitionStats.ConsumerLag}\n"
+ $"Hi Offset {partitionStats.HiOffset}\n"
+ $"Stored Offset {partitionStats.StoredOffset}\n"
+ $"Committed Offset {partitionStats.CommittedOffset}\n");
}
}
});
SetStatisticsHandler
will emit the statistics in json format which we can deserialize following the documentation:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ConsumerStatistics
{
[JsonProperty(PropertyName = "time")]
public long Time { get; set; }
[JsonProperty(PropertyName = "topics")]
public Dictionary<string, TopicStatistics> Topics { get; set; }
}
public class TopicStatistics
{
[JsonProperty(PropertyName = "partitions")]
public Dictionary<string, PartitionStatistics> Partitions { get; set; }
}
public class PartitionStatistics
{
[JsonProperty(PropertyName = "consumer_lag")]
public long ConsumerLag { get; set; }
[JsonProperty(PropertyName = "committed_offset")]
public long CommittedOffset { get; set; }
[JsonProperty(PropertyName = "stored_offset")]
public long StoredOffset { get; set; }
[JsonProperty(PropertyName = "hi_offset")]
public long HiOffset { get; set; }
}
which we then can print to console:
1
2
3
4
5
6
[11/01/2020 17:18:50 +00:00] MemberId: rdkafka-2a9e513e-65bc-4547-a88d-9f4c6602fc39
Topic: kimtopic - Partition: 0
Lag 0
Hi Offset 14
Stored Offset 14
Committed Offset 14
From the statistics, we look at the high watermark
offset. The high watermark offset is the latest offset which has been replicated to all replicas of the partition. It is the last offset of the partition ready to be consumed, and is actually the offset being checked to compute the lag (as opposed to the log end offset).
And that concludes today’s post!
In today’s post, we looked at what offsets meant in Kafka, starting from an explanation of how Kafka manages offsets. We then looked at what was the purpose of committing offsets and some variations of storing offsets. We then completed the post by looking at how we could figure out where the consumer was at with the partition EOF, the consumer lag and using the broker statistics to monitor our consumer. I hope you liked this post and I see you on the next one!