Kafka Schema Registry With Avro CSharp Docker Kafka

Oct 30th, 2020 - written by Kimserey with .

In previous posts, we have seen how to setup Kafka locally and how to write a producer and consumer in dotnet. The topic on which producer produces messages and consumer consumes messages accepts messages of any type, hence an agreement needs to be made between producer and consumer on a contract so that whatever is being produced can be understood at consumption. In order to enforce that contract, it is common to use a Schema Registry. In this post, we will look at how we can setup and use a schema registry, and we will look at how we can create an Avro schema to enforce produced and consumed data.

Docker Setup

We start by upgrading our previous docker compose file with a Schema registry:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
version: '2'
services:

  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    build: .
    ports:
      - "9094:9094"
    environment:      
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_CREATE_TOPICS: "payment:2:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  
  schemaregistry:
    image: confluentinc/cp-schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schemaregistry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:9092"

Here for this post, we changed the name of the topic to payment so that it makes more sense for the rest of the post. On the schema registry, we expose the default port 8081 used to access the REST API. SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS is specified to point to the kafka broker.

Once we run docker-compose up -d, we should be able to check if our registry is up by doing a get request:

1
curl http://localhost:8081/subjects

In schema registry, a schema can be specified for either the key or value. A subject is the name by which the schema can be identified. The default naming strategy is {topic}-key to define the schema for the key of a topic and {topic}-value for the value. This will be important in when we create our schema.

Avro Schema

Now that we have the schema registry, we can upload our first Avro schema Payment.V1.avsc.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
  "type": "record",
  "namespace": "payment",
  "name": "Payment",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "price",
      "type": "double"
    }
  ]
}

An avro schema is defined in json format, it allows us ot define the schema of the messages between producer and consumer. Once we have the schema, we can upload it into the registry. In order to do so, we have to embed it into a string within an object with a schema property:

1
2
3
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{ "schema": "{ \"type\": \"record\", \"namespace\": \"payment\", \"name\": \"Payment\", \"fields\": [{ \"name\": \"name\", \"type\": \"string\" }, { \"name\": \"price\", \"type\": \"double\" }]}" }' \
    http://localhost:8081/subjects/payment-value/versions

And we POST it into /subjects/payment-value/version to create a new version. We can then do a GET on /subjects/payment-value/versions/1 to retrieve it:

1
2
3
curl localhost:8081/subjects/payment-value/versions/1

{"subject":"payment-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"payment\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"}]}"}

By defaut the schema registry ensures that any new upload of schema has to be backward compatible with previous version.

Generate C# Class from Avro Schema

Now that we have uploaded our schema onto the registry, we can generate a C# class from our original schema which we can then use in our code. We do so by installing Confluent.Apache.Avro.AvroGen dotnet tool:

1
dotnet tool install -g Confluent.Apache.Avro.AvroGen

And then generate the class using:

1
avrogen -s Payment.V1.avsc .

which will produce the following class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// ------------------------------------------------------------------------------
// <auto-generated>
//    Generated by avrogen, version 1.7.7.5
//    Changes to this file may cause incorrect behavior and will be lost if code
//    is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace payment
{
	using System;
	using System.Collections.Generic;
	using System.Text;
	using global::Avro;
	using global::Avro.Specific;
	
	public partial class Payment : ISpecificRecord
	{
		public static Schema _SCHEMA = Schema.Parse("{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"payment\",\"fields\":[{\"name\":\"name\"," +
				"\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"}]}");
		private string _name;
		private double _price;
		public virtual Schema Schema
		{
			get
			{
				return Payment._SCHEMA;
			}
		}
		public string name
		{
			get
			{
				return this._name;
			}
			set
			{
				this._name = value;
			}
		}
		public double price
		{
			get
			{
				return this._price;
			}
			set
			{
				this._price = value;
			}
		}
		public virtual object Get(int fieldPos)
		{
			switch (fieldPos)
			{
			case 0: return this.name;
			case 1: return this.price;
			default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
			};
		}
		public virtual void Put(int fieldPos, object fieldValue)
		{
			switch (fieldPos)
			{
			case 0: this.name = (System.String)fieldValue; break;
			case 1: this.price = (System.Double)fieldValue; break;
			default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
			};
		}
	}
}

We will then use this class to serialize and deserialize messages.

Use Avro Schema from Producer

For producer and consumer, we install the package Confluent.SchemaRegistry.Serdes.Avro which provides serialization and deserialization to and from Avro schema using the AvroSerializer and AvroDeserializer.

1
2
3
var config = new[] {new KeyValuePair<string, string>("schema.registry.url", "localhost:8081")};
var registry = new CachedSchemaRegistryClient(config);
var serializer = new AvroSerializer<Payment>(registry);

For the serializer, we provide a chaced schema registry which will pull the schema and cache it, making sure the object that is being produced respects the contract. We also specify Payment, the type we generated from the Avro contract as the type to serialize to.

1
2
3
4
5
6
7
8
9
10
11
var topic = "payment";
using var producer = new ProducerBuilder<int, byte[]>(new ProducerConfig {BootstrapServers = "localhost:9094"}).Build();

var payment = new Payment
{
    name = faker.Company.Bs(),
    price = faker.Random.Double(0, 1000)
};
var data =  await serializer.SerializeAsync(payment, new SerializationContext(MessageComponentType.Value, topic));

var deliveryResult = await producer.ProduceAsync(topic, new Message<int, byte[]> {Key = 1, Value = data});

Here the topic being payment, the serializer will refer to the schema payment-value which we have uploaded.

Use Avro Schema from Consumer

On the consumer side, the deserializer also needs the registry.

1
2
3
var config = new[] {new KeyValuePair<string, string>("schema.registry.url", "localhost:8081")};
var registry = new CachedSchemaRegistryClient(config);
var deserializer = new AvroDeserializer<Payment>(registry);

Then can deserialize the message from the producer:

1
2
3
4
5
6
7
var consumerBuilder = new ConsumerBuilder<int, byte[]>(conf)
var topic = "payment";

using var consumer = consumerBuilder.Build();

var cr = consumer.Consume();
var deliveryResult = await deserializer.DeserializeAsync(cr.Message.Value, false, new SerializationContext(MessageComponentType.Value, topic));

One thing to point out is that the schema registry does not prevent a producer to produce the wrong format on the topic. For example, if we use Kafkacat and produce a string message on the payment topic, nothing would stop us:

1
2
❯ kafkacat -P -b localhost:9094 -t payment
test

so it is up to the producer and consumer to own the topic and ensure that the contracts are respected.

And that concludes today’s post!

Conclusion

In today’s post we looked into Schema Registry in Kafka. We started by adding a schema registry to our local Kafka setup, then we looked into Avro to create a schema which we generated a C# class from and used from a producer and a consumer. I hope you liked this post and I see you on the next one!

Kafka Posts

  1. Introduction to Kafkacat CLI for Kafka
  2. Local Kafka Docker Setup
  3. Kafka Consumer and Producer in dotnet
  4. Kafka Schema Registry with Avro
  5. Kafka Topics, Partitions and Consumer Groups
  6. Kafka Offsets and Statistics
  7. Kafka Log Compaction

External Sources

Designed, built and maintained by Kimserey Lam.