F[F[S]] Kafka! - Case study: The digital transformation of Santa’s logistical nightmare - Part 3 fs2-kafka
In this post we’ll go through our usage of FS2-Kafka when consuming behaviour reports. FS2-Kafka is a wrapper for Java’s Kafka library that provides a nice level of abstraction for working with Kafka in the Typelevel ecosystem. It uses Vulcan to create your Avro codecs to provide a pleasant development experience.
Some previous posts in the series:
We’ll chiefly be looking at
BehaviourReportConsumer.scala which is one of the two Kafka consumers we have in this project. The code demonstrates how to connect to a Kafka cluster, subscribe to a topic, consume and deserialise messages and then finally commit their offset. Let’s get started!
Starting from near the top of the class you can see we first instantiate a couple of
These depend on Vulcan codecs we’ve defined (we’ll get into this shortly) and schema registry settings to be initialised.
Our Vulcan codecs are located in
domain.scala and are derived using Vulcan-Generic.
These define how to convert between our case classes and avro and back again - as well as the avro schema. Alternatively, you can make your Vulcan codecs more resistant to bugs being accidentally introduced when refactoring code by handwriting them and specifying exactly which property names should be used when serialising these fields.
The schema registry settings are required so that our kafka publishers know where to register new updated schemas and kafka consumers know where to go and find the corresponding schema for a kafka message. (Avro messages contain the ID of the schema that they were serialized with)
Moving on, further down in
BehaviourReportConsumer.scala we create our consumer configuration - this uses the two serialisers we defined earlier to know how to deserialise the kafka record key and value.
ConsumerSettings need to be configured with
bootstrapServers so that the consumer knows how to connect to the kafka cluster.
We also specify an
AutoOffsetReset value so that in the event that this is the first consumer to connect (or more specifically, there are no offsets for the consumer group) it knows where to start consuming off the topic from.
In this case, we’ve specified that it should be from the
Earliest available message.
Finally, we configure it with a consumer group ID - this defines the ‘group’ of consumers that work together to consume kafka messages off of topics and the topic offsets are stored for the group. It impacts consumer partition balancing, fault tolerance and more.
Streaming from the consumer
Towards the end of
.consumeWith we’re going to:
- Create our consumer
- Specify what to do with the consumed messages
- Commit the record offsets after they’ve been processed.
As you can see in the snippet above, we first create a consumer resource and then subscribe to our behaviour report topic so that it looks for new messages.
We’ve used FS2’s
evalTap - this is a method on
Stream that lets us evaluate an
F[_] for its side effects and keeps hold of original value.
In the last step, we get the stream of records using
_.stream and tap each message with
processReportto save it before finally passing it through to
commitBatchWithin is a helper from FS2-Kafka which returns a
Pipe that takes care of committing your offsets in batches (of either max batch size or after the max interval elapses). It also takes care of ensuring that commits are sent in their original ordering.
An FS2 Pipe is a lambda function that describes how to transform one FS2 Stream into another FS2 Stream.
Some of you may be wondering why
consumeWithin returns a
Resource[IO, fs2.Stream[IO, Unit]] instead of a flattened Stream - which we could’ve achieved with
flatMap/flatten. This is because we’d like our application’s bootstrapper and tests to be able to identify when Consumers have been allocated/started - which would be difficult with a flattened Stream.
We will revise our integration tests approach in a separate post, however, we wanted to show how to publish messages to Kafka.
Have a look at
NaughtyNiceReportSpec.scala. We’ll start by looking at how we configure and use our producer.
We first set up our schema registry configuration for avro and our serialisers just like we did earlier in the consumer example.
Then, we need to create a
ProducerSettings instance to capture these serialisers and configure how to connect to the Kafka cluster.
Finally, we create our
Resource[IO, KafkaProducer] using
ProducerSettings - ready to be used to produce messages.
If you go further down the file you’ll see we’re using an FS2-Kafka method called
produceOne_ to publish a
ProducerRecord, the return type is
F[F[Unit]]. There’s a lot to unpack here.
There are a handful of methods available that can be used to produce messages but we’re using
- We’re not interested in the metadata returned when the record is published (hence the trailing
_in the method name)
- We only want to produce a single record (hence
This method takes a
ProducerRecord as an argument - this type captures the key and value to be produced and the topic to be published to.
Finally, let’s break down the
F[F[Unit]] response type:
- The outer
F[_]here captures the effect of submitting the record to our producer’s buffer
- The inner
F[_]blocks on acknowledgement of the record being received by the kafka cluster.
We’re flattening these two effects in our test case because we just want to make sure that the records are published - but in practice blocking on the inner
F[_] one at a time will yield suboptimal performance. It is recommended that multiple inner effects are evaluated in parallel.
As we reach the end of this post, we reflect on how hard it was to follow the code to connect to a Kafka cluster, subscribe to a topic, consume and deserialise messages and then commit their offset. We hope that it was simpler than you expected (can you let us know if not? and what you did think about it). This is only possible because we are leveraging the powerful constructs provided by the Kafka Java library, FS2, cats effects, etc, and of course functional programming. These tools are supposed to make our lives as programmers easier and simpler, they should make it harder for us to make mistakes. As usual, we would really like to know what you learned from this, or whatever questions you might have after reading this post so that we can make it better.
You can now read Part 4: http4s .
You made it all the way to the end!! wooo! Here is a picture of us! (the authors of this post!!)