Mirroring High-Throughput Topics with Kafka MirrorMaker 2

Klarrio
9 min readNov 5, 2024

By Dominique Chanet
Lead Architect at Klarrio

Kafka MirrorMaker is a tool, included with the Apache Kafka distribution, that allows for the replication of Kafka topic messages across different Kafka clusters. Initially a standalone tool, as of version 2.0, it’s built on top of the Kafka Connect framework.

We recently faced a situation where we needed to replicate a high-throughput (roughly 2500 msg/s, good for about 5MB/s) topic across two geographically separate locations.

There was ample bandwidth available: we could comfortably transmit up to 1Gb/s (128MB/s), so replicating a 5MB/s topic should be easy peasy, right?

Wrong!

Of course, one does not set and forget a replication job. Our operations team, tasked with monitoring the overall health of the platforms, was keeping an eye on the topic replication performance as well.

Much to our dismay, they reported steadily increasing replication latencies, creeping up into the hours range. The source topic has a time-based retention of 24 hours, so we were coming close to losing data because the source topic would drop day-old data before the replication job could even start to process it.

Why does MirrorMaker fail to keep up with a 5MB/s topic when there’s ample bandwidth available? To answer that question, we need to make a quick detour into the innards of the Kafka protocol.

Kafka Clients Are Sequential Pollers

Kafka is conceptually a publish-subscribe system. A naive observer might be inclined to think that Kafka clients (producers and consumers in Kafka parlance) simply open a TCP pipe to the broker, and stream data at line rate over that pipe. That is quite far from the truth, however:

Producers batch messages for a topic partition, and transmit entire batches in one ProduceRequest. They then wait for acknowledgement from the broker before sending the next batch of records for the same topic partition.

While you can configure a producer to have multiple record batches in flight (i.e., sent-but-unacknowledged) in parallel, this does away with ordering guarantees: if one batch errors out, it may be overtaken by a later batch of messages which get committed to the topic while the earlier batch is being retransmitted.

Unless you don’t care about message ordering, it’s best to configure your producer to have only one batch in flight at a time.

Consumers continuously poll the broker for new messages. It sends a FetchRequest, waits for the broker to respond with new message batches, and then immediately shoots off the next FetchRequest while it delivers the just-received messages to the application thread.

The long and short of it is that Kafka clients don’t stream data at line rate, but rather do sequential round trips to deliver data to, or fetch data from, the brokers. This has a significant effect on the attainable throughput.

Let’s do a little back-of-the-envelope calculation. We’ll disregard protocol overhead and processing time, so the numbers won’t be accurate, but they illustrate the situation well enough.

Assume:

  • a fixed round trip time RTT between client and broker. This is what gamers call the “ping”: the time it takes to send a short packet from client to broker, and receive the response.
  • a fixed data size D that is transferred for each round trip.
  • a fixed bandwidth B between client and broker.

The total time for one request Tr is then easily shown to be RTT + D/B. Requests are sequential, so the next request can only be launched as soon as the previous request has completed. This gives us a simple formula for total throughput TP:

TP = D/Tr

After substitution of Tr and application of some high school math, we end up with:

TP = B*D/(B*RTT + D)

Let’s plug in some numbers to get a feel for what this really means. Going back to our topic replication use case, we’re looking at:

  • B = 128 MiB/s
  • RTT = 0.2 s

For different data sizes, we get the following attainable throughput:

To achieve at least half of the theoretically attainable throughput in our situation, we need batches of 32MiB or more. The flip side of this is, of course, latency; the larger the batches you produce, or the larger the fetches you do as a consumer, the longer it takes to accumulate enough data to send.

Hence, the end-to-end latency (the time it takes for a message to move from producer to consumer) will go up.

Tuning Kafka Clients for High Throughput

The default configuration of Kafka clients favors latency over throughput. Luckily, they offer lots of configuration knobs that allow you to tune this balance for your use case. In fact, there are so many configuration options for Kafka producers and consumers that it might be a bit hard to see the forest for the trees.

We (and many other fine resources on the web) are here to help! These are the most important configuration options that affect the latency/throughput tradeoff:

For Producers

batch.size (default: 16kiB) is the equivalent of the D parameter in our calculations above. It determines the maximum size of per-partition message batches.

linger.ms (default: 0) sets the time the producer waits to fill up a batch. If the batch isn’t full by that time, it’ll send it off anyway. This acts as a cap on the amount of latency you’re willing to tolerate in favor of throughput.

acks determines a reliability-versus-throughput tradeoff:

  • acks=0 does not wait for any broker acknowledgement. That means you can achieve produce throughput close to line rate, at the (very real) risk of losing data. This setting is not recommended.
  • acks=all (the default since Kafka 3.0) delays the broker acknowledgement until the batch is properly received by all replicas. This setting guarantees the highest reliability, but significantly increases RTT and hence weighs heavily on the attainable throughput.
  • acks=1 (the default prior to Kafka 3.0) lets the broker send its acknowledgement as soon as the batch is persisted in the local log. There is a small chance of losing the batch if the broker fails unrecoverably before the batch was replicated to the followers.

In return, you’ll get faster acknowledgements, which means a lower RTT,
which increases the attainable throughput. Unless you have extremely
strict reliability requirements, this is the sweet spot for higher
throughput topics.

compression.type (default: none) trades off processor time for throughput. If you enable compression (valid values are lz4, snappy, gzip and zstd), the producer requires more CPU time to perform the compression, but it can cram more messages in the same batch size, which results in higher overall throughput.

For Consumers

fetch.max.bytes (default: 50MiB) is roughly the equivalent of the D parameter in our calculations above. It sets the maximum size of the fetch response, which may include fetches across different topic partitions.

max.partition.fetch.bytes (default: 1MiB) is the maximum number of bytes that will be fetched from a single topic partition in one request. Depending on the number of topics you’re subscribed to, this is another contributing factor in the D parameter.

fetch.min.bytes (default: 1) is the minimum number of bytes to be returned by a fetch request. On a high-throughput topic, this won’t contribute much to D as there will always be sufficient data available to fetch. Increasing this minimum on other consumers might reduce the strain on a heavily loaded broker, which, in turn, might result in better throughput for your high-volume topic.

fetch.max.wait.ms (default: 500) is the maximum time a broker will wait for fetch.min.bytes of data before it answers the fetch request anyway. This puts a cap on the latency the consumer is willing to tolerate in exchange for reducing the load on the broker.

Finding the right values for all of these settings is a bit of a dark art, so be sure to experiment and measure for your specific use case.

For our topic replication case, experimentation showed that we needed to tune both the consumer (reading from the source cluster) and the producer (writing to the target cluster) to achieve acceptable throughput.

On the consumer side, each consumer was fetching from a single topic partition, so it doesn’t make sense to have max.partition.fetch.bytes be lower than fetch.max.bytes.

Setting max.partition.fetch.bytes to 50MiB ensured we could pull sizable batches from the source cluster. On the producer side, the default batch size of 16kiB is just ridiculously low. We increased it to the same 50MiB cap, to ensure that the entire batch that was fetched from the source cluster could be pushed to the target cluster in one go.

Tuning Kafka Clients for MirrorMaker 2

Armed with all of the knowledge above, we can now turn our attention back to MirrorMaker. How do we tell it that we’d like to tune the Kafka client settings for better throughput?

Due to the fact that MirrorMaker 2 is built on top of Kafka Connect, the answer to this question is slightly more convoluted and confusing than you’d expect. Allow me to take you on a quick conceptual tour of Kafka Connect…

Kafka Connect is designed to bridge data between a Kafka cluster and a non-Kafka data system. It can function either as a Source, where it receives data from the foreign system and publishes it on Kafka, or as a Sink, where it takes data from Kafka and persists it in the foreign system.

If Kafka Connect functions as a Source, it uses a built-in producer to write data to Kafka. If it functions as a Sink, it uses a built-in consumer to read data from Kafka.

MirrorMaker 2 is a Kafka Connect plugin that defines the “foreign system” as “just another Kafka cluster”. It can act in either Source or Sink mode. When acting as Source, it uses its own consumer to read from the “foreign” Kafka cluster, and the built-in producer to write to the “local” Kafka.

When acting as Sink, the situation is flipped around: the built-in consumer reads from the “local” Kafka, and MirrorMaker’s own producer writes to the “foreign” Kafka. And therein lies the rub: the built-in producer and consumer need to be configured in a different way from MirrorMaker’s own producer and consumer.

First off, you need to change the Connect cluster’s base configuration. Add the following line (on all cluster nodes) to connect.properties:

connector.client.config.override.policy=All

This allows individual connectors to override the default client configuration of the Connect worker nodes. Without this line, you won’t be able to tune Connect’s built-in producer and consumer settings.

With this configuration change in place, you can add your tuned configuration settings to the connector configuration, but you need to prefix them correctly. The table below gives the correct prefixes for source and sink mode:

Hence, with MirrorMaker working in Source mode, you’d add connector properties
like producer.override.linger.ms and source.consumer.fetch.max.bytes.

Conclusion

Kafka clients are by default tuned to prioritize latency over throughput. Especially when round trip times are high, it’s important to correctly tune the clients to achieve good throughput. If you’re using Kafka Connect or MirrorMaker, it may be confusing to figure out exactly how this is done. We hope that this article can help you on the way.

About Klarrio

At Klarrio, we design cloud native, cloud agnostic software solutions to empower our customers to control their data, limit cloud costs, and optimize performance. We ensure flexibility for scalable platform building across various cloud and on-premises infrastructures, prioritizing privacy, security, and resilience by design.

We are platform pioneers at heart, with a proven track record in building self-service data platforms, Internal Developer Platforms, log aggregation platforms, and other innovative software solutions in various domains: from Telecom,Transportation & Logistics, Manufacturing, Public Sector, Healthcare to Entertainment.

Beyond technology, we actively collaborate and share knowledge, both in-house and together with our customers. True impact is achieved together.

--

--

Klarrio
Klarrio

Written by Klarrio

Klarrio empowers you with tailor-made, scalable data platforms & microservices for real-time data processing across various cloud & on-premises infrastructures.

No responses yet