How to Optimize your Kafka Streams
This post is an excerpt of my full article on my blog: https://www.sderosiaux.com/articles/2019/08/07/kafka-streams-topology-and-optimizations/. It’s quite long but has many insights, explanations, diagrams, and code examples.
Why?
Optimizations clearly reduce the load on our Kafka Cluster by avoiding to create unnecessary internal topics and simplify the Topology our Kafka Streams applications use. They improve performances and reduce memory/network pressure on Kafka Streams applications: they will process less data and will avoid doing unnecessary or redundant operations to process our streams.
Despite a few subtleties (explained in the full article) and exceptions we can get when starting the Kafka Streams application, we should always build our Topology with optimizations enabled. By upgrading our dependencies over time, more and more optimizations will be available without us to do anything.
KafkaStreams, StreamsBuilder, Topology
First, let’s talk about the low-level DSL (which builds the Physical Plan) versus high-level DSL (which builds the Logical Plan).
Here is an simple object diagram of the link between our friends:
- Kafka Streams runs a
Topology
. - When we don’t use the high-level DSL, we directly build a
Topology
(the physical plan, that’s exactly what Kafka Streams will run) that forwards calls to aInternalTopologyBuilder
: this is the latter that contains all the data about the real topology underneath. - When we use the high-level DSL, we pass through the
StreamsBuilder
(the Logical Plan, that’s going to be converted to a physical plan) that forwards calls to aInternalStreamsBuilder
. When we ask tobuild()
the StreamsBuilder, it converts its abstraction to aTopology
. - A
Topology
talks about Nodes, Processors, StateStores and Strings (to link Nodes children/parent by name). - A
Streams
is more abstract and talks about StreamsGraphNodes. OneStreamsGraphNode
can generate multiple Nodes and StateStores.
This Logical Plan, built by the StreamsBuilder
allows Kafka Streams to optimize the Topology it’s going to generate.
The StreamsGraphNode
s expose a lot of metadata for the optimizer to be able to optimize (move, merge, delete, replace) locally or globally the StreamsGraphNode
s (without altering the behavior), before converting it to a Topology (which is dumb).
A manual merge()
For instance, to do the equivalent of “s1.merge(s2)” using the Processor API, we can build 2 sources and 1 sink, no processor involved:
val t = Topology()
t.addSource(
Topology.AutoOffsetReset.EARLIEST,
"sensor-a", // its name
WallclockTimestampExtractor(),
Serdes.String().deserializer(),
Serdes.String().deserializer(),
"topic-a"
)
t.addSource(
Topology.AutoOffsetReset.EARLIEST,
"sensor-b", // its name
WallclockTimestampExtractor(),
Serdes.String().deserializer(),
Serdes.String().deserializer(),
"topic-b"
)
t.addSink(
"to-the-world", // its name
"output-topic",
Serdes.String().serializer(),
Serdes.String().serializer(),
StreamPartitioner { topic, k: String, v: String, par -> Random.nextInt() % par },
"sensor-a", "sensor-b" // its parents
)
Using https://zz85.github.io/kafka-streams-viz/ we can visualize it:
If we use the high-level DSL merge(), a Processor will be created in the middle “KSTREAM-MERGE” just doing a passthrough. Thanks to the Processor API here, we can plug directly multiples Sources to a Sink.
This is probably something that can be automatically optimized away when optimizations are on (it does not do it right now).
This lead to the question: which optimizations are possible, are going to be possible, and how Kafka Streams finds them?
How to enable optimizations?
Kafka Streams won’t optimize our Topology if we go straight to the low-level API. Optimizations are only enabled when writing with the high-level DSL.
Optimization must be enabled when we build()
the Topology, not in the general KafkaStreams
config, otherwise that won’t do a thing.
val sb = StreamsBuilder()
// ...
val topo = sb.build(Properties().apply {
put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE)
})
When we use the high-level DSLs, each operator creates a StreamsGraphNode
and adds different metadata, according to the function called, to the state of the parents etc.
For instance, when we do a merge()
, this adds a StreamsGraphNode
to the Logical Plan, whose parents are the 2 original incoming KStreams StreamsGraphNode
s. It also flags this new node as a mergeNode and as repartitionRequired if one of the original incoming KStreams already required repartitioning (we’ll see how it is set).
When we build the Topology, each merge node is going to be converted to a Processor with 2 parents, that will simply let data pass through (to its child). All types of nodes generate one or several Processors.
Before that, Kafka Streams will try to optimize its Logical Plan (the StreamsGraphNode
s) before building the Physical Plan (the Processors). To do this, it relies on the metadata we’re talking about.
Which Optimizations are possible?
There are 2 kinds of optimizations:
- KTable Source Topics: don’t always build a
-changelog
topic if it can reuse the source topic, to avoid duplicating the source topic. - Repartition Operations: this is the meat. This will try to prevent repartitioning multiple times the same topic.
When we use selectKey
, map
, flatMap
, transform
, flatTransform
or groupBy(KeyValueMapper)
(does a selectKey
), the resulting KStream
is always flagged as repartitionRequired.
through()
and all kinds of *join()
(if repartitionRequired
) will stop the propagation of repartitionRequired
, because they will sink the data into a topic. Hence, no repartition is needed after them: it has been materialized and this will form a new sub-topology.
For example, when joining:
- KStream + KStream: this will create a
-left-repartition
and/or-right-repartition
topics if the upstream KStreams are flagged asrepartitionRequired
.
// creates "applicationId-keeping-only-b-left-repartition" topic
sb.stream("a")
.map { k, v -> KeyValue.pair(k, v) }
.leftJoin(sb.stream("b"), { a, b -> b }, JoinWindows.of(1000), Joined.`as`("keeping-only-b"))
.to("c")
- KStream + KTable: this will create a
-repartition
topic if the KStream is flagged asrepartitionRequired
.
An example of repartition optimization
To make the optimization appear, we need to build a “relative” complex program, using a rekey of some sort, and join/aggregation afterwards:
// We build a `repartitionRequired` KStream
val k = sb.stream("a").map { k, v -> KeyValue.pair(k, v) }
// Then we create 2 sub-graphs from this KStream
k.join(sb.table("b"), { x, y -> y }).to("c")
k.groupByKey().count().toStream() .to("d")
Here is what’s happening in the Logical Plan, before and after the optimizations:
An OptimizableRepartitionNode hides the repartition logic (create a new topic, sink into it, and consume it).
In the optimized version, we can see that we have only one such node, meaning we have one less repartition topic created! Good.
—
If we look at our topics in Conduktor, we are happy to have optimized our topology:
- The unoptimized version has 2 repartition topics and 2 changelog topics (ktable and aggregate).
- The optimized version has 1 repartition topic and 1 changelog topic (the aggregate).
Imagine on a real Kafka Streams application with multiple aggregations, computing different aggregations on the same KStream, doing several joins, the gain can be tremendous (less topics: less storage, mem, and IO).
Conclusion
As Kafka Streams developers, we should always check the generated Topology if we use the high-level DSL, to understand what’s going on, see what the Optimizations are doing, and check if we can make manual optimizations in the global picture.
This was only a really quick overview of the optimizations in Kafka Streams. Consider reading the full article to get much much more details and insights:
https://www.sderosiaux.com/articles/2019/08/07/kafka-streams-topology-and-optimizations/
Thanks!