Don’t leave Apache Flink and Schema Registry alone
Thinking earlier to evolve our architecture could be tough but won’t be more harder and complex if you decide to tackle this challenge at a later stage.
Perhaps at first sight re-design the architecture to have a Schema Registry could take from you some time that can be invested in developing another feature, but believe me, as the system is growing and growing is necessary to have a catalog of the events, as well a well-established definition of each one.
Schema Registry is a central repository with a RESTful interface for developers to define standard schemas and register applications to enable compatibility
Schema Registry is a solution to solve the challenge of sharing schemas between the applications allowing:
- Evolution of data in a simple way, where the consumers and producers can be using different schemas version and even doing that can be compatible, depending on the compatibility strategy used backward or forward compatibility.
- Centralized repository/catalog, having a centralized piece that manages the data models, allows your architecture to have a data catalog that knows all the data types in the system.
- Schemas enforcement ensures the data published, matches the schema defined and the constraints are respected.
This will facilitate the implementation of other features like auditing, GDPR and is a natural step to have quality in the data of our system.
Apache Flink and Schema Registry
For this integration is used Apache Kafka, Schema Registry Confluent, and Apache Flink.
Using Avro to define our model
In the above schema, it was defined the event Record composed by the fields name, age, date, and city. The schema is already defined and now is time to start the schema registry, to do that will be used the images defined in the confluent repo.
If you want to understand better want is going on with your schema-registry you can use the visual tool schema-registry-ui to see/edit/evolve the schemas.
- Kafka Tool: GUI application for managing and using Apache Kafka clusters
- Kafkacat: Generic command line non-JVM Apache Kafka producer and consumer
I will advise you to have at least Kafka Tool or kafkacat to fully understand the example and to test your wonderful Schema Registry.
Register the new schema
Using the Avro format firstly is necessary to register the schema before starting to use it, to do that you can register it manually using the REST API or use the maven plugin. The maven plugin can simplify the process and reduce the complexity.
To have a quick understanding of the basics concepts of the schema registry you can read the documentation on confluent. Assuming you already have the basic knowledge of what is maven and how plugins work, maybe what you don’t understand is the concept of subjects that is defined above.
A subject refers to the name under which the schema is registered.
After the registration of the schema, you can use your favorite tool to see what was created in the registry. The schemas are registered by default in the topic _schemas.
To send events, the utility kafka-avro-console-producer will be used to insert valid and invalid events, the schema of each event sent is validated using the Schema Registry that is up-and-running in the endpoint specified in the command arguments.
Above you can see the argument value.schema.id=1 which specifies the schema and must match the id of the schema you created.
Inserting a valid message:
Inserting an invalid message:
As the field date is required, the insertion will fail.
Build Flink Job
Apache Kafka Flink provides a built-in connector to read and write messages to Apache Kafka, in the documentation you easily find how to integrate the connector in the project.
Besides the Kafka connector which enables the requirement to consume/produce messages into the message log, is necessary to use a deserializer that reads data serialized Avro format. Exits at least two AvroDeserializationSchema and ConfluentRegistryAvroDeserializationSchema, in our example, will be used the second one because reads the schema registered in the Schema Registry and deserialize the message using the schema retrieved.
Note: deserializing records that won’t match the schema retrieved will throw an Exception
The consumer is simple to create if is inserting only one event type by topic:
Is possible to define multiple event types in the same topic?
If you have a very rich system and perhaps you are using Kafka as an event sourcing, the need to have more than one type in a topic will arise.
Bonus: Flink Job reading multiple event types in a single topic
Slight changing the previous approach and to cope with the requirement:
multiple event types in a single topic
the interface KafkaDeserializationSchema to deserialize the messages from Kafka will be extended in order to be possible to deserialize multiple event types using the same topic.
After consuming and properly deserializing a valid event, the event types typeA and typeB will be transformed in our internal domain:
After the conversion to our internal domain, depending on the requirements of the Job can be applied transformations like map, filter, reduce, and aggregations in order to satisfy our needs.
This is an example to show you the capabilities to take advantage of a well-defined system avoiding producers/consumers from generating bad events and breaking your architecture. Having a catalog of events isn’t easy but you need to do small steps to avoid future headaches.
To test the Flink job you can produce messages using the same tooling used previously, kafka-avro-console-producer, but now you need to identify the type of event you are inserting otherwise the message will fail to be produced.
The benefits of having a schema registry in our architecture are huge which allows event types to evolve and to be sure exactly what’s going on in our system.
If you are interested to know more about the JSON schema and the options available to combine multiple event types, I advise you to take a look at the documentation to dissipate doubts you can have, this is normal if it is the first time you are using this definition.
All the source code can be found here