Oct 30th, 2020 - written by Kimserey with .
In previous posts, we have seen how to setup Kafka locally and how to write a producer and consumer in dotnet. The topic on which producer produces messages and consumer consumes messages accepts messages of any type, hence an agreement needs to be made between producer and consumer on a contract so that whatever is being produced can be understood at consumption. In order to enforce that contract, it is common to use a Schema Registry. In this post, we will look at how we can setup and use a schema registry, and we will look at how we can create an Avro schema to enforce produced and consumed data.
We start by upgrading our previous docker compose file with a Schema registry:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_CREATE_TOPICS: "payment:2:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
schemaregistry:
image: confluentinc/cp-schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:9092"
Here for this post, we changed the name of the topic to payment
so that it makes more sense for the rest of the post. On the schema registry, we expose the default port 8081
used to access the REST API. SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
is specified to point to the kafka broker.
Once we run docker-compose up -d
, we should be able to check if our registry is up by doing a get request:
1
curl http://localhost:8081/subjects
In schema registry, a schema can be specified for either the key or value. A subject is the name by which the schema can be identified. The default naming strategy is {topic}-key
to define the schema for the key of a topic and {topic}-value
for the value. This will be important in when we create our schema.
Now that we have the schema registry, we can upload our first Avro schema Payment.V1.avsc
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"type": "record",
"namespace": "payment",
"name": "Payment",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "price",
"type": "double"
}
]
}
An avro schema is defined in json format, it allows us ot define the schema of the messages between producer and consumer. Once we have the schema, we can upload it into the registry. In order to do so, we have to embed it into a string
within an object with a schema
property:
1
2
3
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{ "schema": "{ \"type\": \"record\", \"namespace\": \"payment\", \"name\": \"Payment\", \"fields\": [{ \"name\": \"name\", \"type\": \"string\" }, { \"name\": \"price\", \"type\": \"double\" }]}" }' \
http://localhost:8081/subjects/payment-value/versions
And we POST
it into /subjects/payment-value/version
to create a new version. We can then do a GET on /subjects/payment-value/versions/1
to retrieve it:
1
2
3
curl localhost:8081/subjects/payment-value/versions/1
{"subject":"payment-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"payment\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"}]}"}
By defaut the schema registry ensures that any new upload of schema has to be backward compatible with previous version.
Now that we have uploaded our schema onto the registry, we can generate a C# class from our original schema which we can then use in our code. We do so by installing Confluent.Apache.Avro.AvroGen
dotnet tool:
1
dotnet tool install -g Confluent.Apache.Avro.AvroGen
And then generate the class using:
1
avrogen -s Payment.V1.avsc .
which will produce the following class:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.7.7.5
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace payment
{
using System;
using System.Collections.Generic;
using System.Text;
using global::Avro;
using global::Avro.Specific;
public partial class Payment : ISpecificRecord
{
public static Schema _SCHEMA = Schema.Parse("{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"payment\",\"fields\":[{\"name\":\"name\"," +
"\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"}]}");
private string _name;
private double _price;
public virtual Schema Schema
{
get
{
return Payment._SCHEMA;
}
}
public string name
{
get
{
return this._name;
}
set
{
this._name = value;
}
}
public double price
{
get
{
return this._price;
}
set
{
this._price = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.name;
case 1: return this.price;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.name = (System.String)fieldValue; break;
case 1: this.price = (System.Double)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
}
We will then use this class to serialize and deserialize messages.
For producer and consumer, we install the package Confluent.SchemaRegistry.Serdes.Avro
which provides serialization and deserialization to and from Avro schema using the AvroSerializer
and AvroDeserializer
.
1
2
3
var config = new[] {new KeyValuePair<string, string>("schema.registry.url", "localhost:8081")};
var registry = new CachedSchemaRegistryClient(config);
var serializer = new AvroSerializer<Payment>(registry);
For the serializer, we provide a chaced schema registry which will pull the schema and cache it, making sure the object that is being produced respects the contract. We also specify Payment
, the type we generated from the Avro contract as the type to serialize to.
1
2
3
4
5
6
7
8
9
10
11
var topic = "payment";
using var producer = new ProducerBuilder<int, byte[]>(new ProducerConfig {BootstrapServers = "localhost:9094"}).Build();
var payment = new Payment
{
name = faker.Company.Bs(),
price = faker.Random.Double(0, 1000)
};
var data = await serializer.SerializeAsync(payment, new SerializationContext(MessageComponentType.Value, topic));
var deliveryResult = await producer.ProduceAsync(topic, new Message<int, byte[]> {Key = 1, Value = data});
Here the topic
being payment
, the serializer will refer to the schema payment-value
which we have uploaded.
On the consumer side, the deserializer also needs the registry.
1
2
3
var config = new[] {new KeyValuePair<string, string>("schema.registry.url", "localhost:8081")};
var registry = new CachedSchemaRegistryClient(config);
var deserializer = new AvroDeserializer<Payment>(registry);
Then can deserialize the message from the producer:
1
2
3
4
5
6
7
var consumerBuilder = new ConsumerBuilder<int, byte[]>(conf)
var topic = "payment";
using var consumer = consumerBuilder.Build();
var cr = consumer.Consume();
var deliveryResult = await deserializer.DeserializeAsync(cr.Message.Value, false, new SerializationContext(MessageComponentType.Value, topic));
One thing to point out is that the schema registry does not prevent a producer to produce the wrong format on the topic. For example, if we use Kafkacat and produce a string
message on the payment
topic, nothing would stop us:
1
2
❯ kafkacat -P -b localhost:9094 -t payment
test
so it is up to the producer and consumer to own the topic and ensure that the contracts are respected.
And that concludes today’s post!
In today’s post we looked into Schema Registry in Kafka. We started by adding a schema registry to our local Kafka setup, then we looked into Avro to create a schema which we generated a C# class from and used from a producer and a consumer. I hope you liked this post and I see you on the next one!