F[F[S]] Kafka! - Case study: The digital transformation of Santa’s logistical nightmare - Part 3 fs2-kafka

5 minute read

In this post we’ll go through our usage of FS2-Kafka when consuming behaviour reports. FS2-Kafka is a wrapper for Java’s Kafka library that provides a nice level of abstraction for working with Kafka in the Typelevel ecosystem. It uses Vulcan to create your Avro codecs to provide a pleasant development experience.

Some previous posts in the series:

Consumer

We’ll chiefly be looking at BehaviourReportConsumer.scala which is one of the two Kafka consumers we have in this project. The code demonstrates how to connect to a Kafka cluster, subscribe to a topic, consume and deserialise messages and then finally commit their offset. Let’s get started!

Deserialisers

Starting from near the top of the class you can see we first instantiate a couple of RecordDeserializer objects. These depend on Vulcan codecs we’ve defined (we’ll get into this shortly) and schema registry settings to be initialised.

implicit val fullNameDeserializer: RecordDeserializer[IO, FullName] =
  avroDeserializer[FullName].using(config.avroSettings)


implicit val behaviourReportDeserializer : RecordDeserializer[IO, BehaviourReport] =
  avroDeserializer[BehaviourReport].using(config.avroSettings)
  

Our Vulcan codecs are located in domain.scala and are derived using Vulcan-Generic. These define how to convert between our case classes and avro and back again - as well as the avro schema. Alternatively, you can make your Vulcan codecs more resistant to bugs being accidentally introduced when refactoring code by handwriting them and specifying exactly which property names should be used when serialising these fields.

The schema registry settings are required so that our kafka publishers know where to register new updated schemas and kafka consumers know where to go and find the corresponding schema for a kafka message. (Avro messages contain the ID of the schema that they were serialized with)

Configuration

Moving on, further down in BehaviourReportConsumer.scala we create our consumer configuration - this uses the two serialisers we defined earlier to know how to deserialise the kafka record key and value.

private val behaviourConsumerSettings = 
  ConsumerSettings[IO, FullName, BehaviourReport]
    .withAutoOffsetReset(AutoOffsetReset.Earliest)
    .withBootstrapServers(config.bootstrapServers)
    .withGroupId("Rudolph")

The ConsumerSettings need to be configured with bootstrapServers so that the consumer knows how to connect to the kafka cluster. We also specify an AutoOffsetReset value so that in the event that this is the first consumer to connect (or more specifically, there are no offsets for the consumer group) it knows where to start consuming off the topic from. In this case, we’ve specified that it should be from the Earliest available message.

Finally, we configure it with a consumer group ID - this defines the ‘group’ of consumers that work together to consume kafka messages off of topics and the topic offsets are stored for the group. It impacts consumer partition balancing, fault tolerance and more.

Streaming from the consumer

Towards the end of BehaviourReportConsumer.scala in .consumeWith we’re going to:

  • Create our consumer
  • Specify what to do with the consumed messages
  • Commit the record offsets after they’ve been processed.
def consumeWith(processReport: (FullName, BehaviourReport) => IO[Unit]): 
Resource[IO, fs2.Stream[IO, Unit]] =

    KafkaConsumer
    .resource(behaviourConsumerSettings)
    .evalTap(_.subscribeTo(BehaviourReportTopic))
    .map(
      _.stream
      .evalTap(r => processReport(r.record.key, r.record.value))
      .map(_.offset)
      .through(commitBatchWithin(500, 15.seconds))
    )

As you can see in the snippet above, we first create a consumer resource and then subscribe to our behaviour report topic so that it looks for new messages. We’ve used FS2’s evalTap - this is a method on Stream that lets us evaluate an F[_] for its side effects and keeps hold of original value.

In the last step, we get the stream of records using _.stream and tap each message with processReportto save it before finally passing it through to commitBatchWithin.

commitBatchWithin is a helper from FS2-Kafka which returns a Pipe that takes care of committing your offsets in batches (of either max batch size or after the max interval elapses). It also takes care of ensuring that commits are sent in their original ordering. An FS2 Pipe is a lambda function that describes how to transform one FS2 Stream into another FS2 Stream.

Some of you may be wondering why consumeWithin returns a Resource[IO, fs2.Stream[IO, Unit]] instead of a flattened Stream - which we could’ve achieved with Stream.resource and flatMap/flatten. This is because we’d like our application’s bootstrapper and tests to be able to identify when Consumers have been allocated/started - which would be difficult with a flattened Stream.

Producer

We will revise our integration tests approach in a separate post, however, we wanted to show how to publish messages to Kafka. Have a look at NaughtyNiceReportSpec.scala. We’ll start by looking at how we configure and use our producer.

We first set up our schema registry configuration for avro and our serialisers just like we did earlier in the consumer example.

val avroSettings = AvroSettings(
  SchemaRegistryClientSettings[IO](s"http://localhost:$registryPort")
  )

Then, we need to create a ProducerSettings instance to capture these serialisers and configure how to connect to the Kafka cluster.

val behaviourReportProducer = 
  ProducerSettings[IO, FullName, BehaviourReport].withBootstrapServers(
    kafkaContainer.bootstrapServers
  )

Finally, we create our Resource[IO, KafkaProducer] using ProducerSettings - ready to be used to produce messages.

KafkaProducer.resource(behaviourReportProducer)

If you go further down the file you’ll see we’re using an FS2-Kafka method called produceOne_ to publish a ProducerRecord, the return type is F[F[Unit]]. There’s a lot to unpack here.

behaviourReportProducer.produceOne_(
  ProducerRecord(
    BehaviourReportTopic,
    MegachusFullName,
    BehaviourReport(ExpectedScore)
  )
).flatten

There are a handful of methods available that can be used to produce messages but we’re using produceOne_ because:

  • We’re not interested in the metadata returned when the record is published (hence the trailing _ in the method name)
  • We only want to produce a single record (hence produceOne)

This method takes a ProducerRecord as an argument - this type captures the key and value to be produced and the topic to be published to. Finally, let’s break down the F[F[Unit]] response type:

  • The outer F[_] here captures the effect of submitting the record to our producer’s buffer
  • The inner F[_] blocks on acknowledgement of the record being received by the kafka cluster.

We’re flattening these two effects in our test case because we just want to make sure that the records are published - but in practice blocking on the inner F[_] one at a time will yield suboptimal performance. It is recommended that multiple inner effects are evaluated in parallel.

Conclusion

As we reach the end of this post, we reflect on how hard it was to follow the code to connect to a Kafka cluster, subscribe to a topic, consume and deserialise messages and then commit their offset. We hope that it was simpler than you expected (can you let us know if not? and what you did think about it). This is only possible because we are leveraging the powerful constructs provided by the Kafka Java library, FS2, cats effects, etc, and of course functional programming. These tools are supposed to make our lives as programmers easier and simpler, they should make it harder for us to make mistakes. As usual, we would really like to know what you learned from this, or whatever questions you might have after reading this post so that we can make it better.

You can now read Part 4: http4s .

Cat Tax!

You made it all the way to the end!! wooo! Here is a picture of us! (the authors of this post!!)

Rockstar Dog
Rockstar Dog
Megachu chilling
Megachu

Updated: