Kafka producers

When sending messages using Spring Boot and Kafka, we only need to inject the corresponding type of KafkaTemplate and call the send method, which is very simple.

@Service("kafkaProducerService")
@Slf4j
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("test-topic", "test", message);
        log.info("Sent message: {}", message);
    }

}

The simplicity of operation actually implies a lot of internal details are encapsulated. This article is used to analyze the Kafka producer in detail.

Message partitioning algorithm

In the basic concepts of Kafka, topics and partitions are mentioned. When a producer sends a message, unless otherwise specified, the message will be sent to a specific partition of the current topic. So, which partition is it sent to?

image-20251127141230422

To draw an analogy with nginx’s load balancing, where an upstream has multiple upstream services and a strategy is used to select the appropriate service for request forwarding, Kafka’s partitioning mechanism also serves a load balancing purpose. Therefore, the message partitioning mechanism is essentially Kafka’s load balancing algorithm.

Corresponding configuration items spring.kafka.producer.properties.partitioner.class

Round Robin

Round-robin is an option in almost all load balancing algorithms. Its advantages are very obvious: absolute fairness and simple implementation.

Producers only need to maintain a Map where the key is the name of the topic and the value is an integer starting from 0. Each time a message is encountered, the value corresponding to that topic is retrieved and the remainder is taken with respect to the current number of partitions.

Uniform Sticky

Before understanding sticky partitioning, consider this question: if a producer sends a message immediately after it’s generated, the throughput will inevitably be low because sending requires using network protocols. Reducing the number of requests, for example, by merging multiple messages into a single batch, could potentially improve throughput. This idea is also present in HTTP/1.1 performance optimization, where multiple small files, such as CSS and JS files, are retrieved in a single request.

The idea behind this algorithm in Kafka is to maintain a sending queue for each partition. Each element in the queue is a fixed-size space. Messages are first placed on the last unfilled element in the queue until that element is filled or the waiting time is reached.

Assuming each message has a fixed size and each element can hold 5 messages, with 3 partitions and 9 messages, the round-robin algorithm will place 3 messages at the end of the send queue in each partition, leaving none completely filled, and messages will have to wait until the waiting time expires before they can be sent. However, the sticky partitioning algorithm allows the last element of the first partition to hold 5 messages, and the last element of the second partition to hold 4 messages, thus allowing the message in partition 1 to be sent.

Message Key Partition

In addition to the two partitioning strategies mentioned above, Kafka also has a Default partitioning strategy, which is the default partitioning strategy.

The `send` method of `kafkaTemplate` has multiple overloads.

    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }

The second overloaded method has a generic key. In the default partitioning strategy, if the key is not null, a hash operation will be performed based on the key, and the modulo will be taken with the number of partitions. This ensures that messages with the same key are distributed in the same partition, and Kafka will ensure that messages within the same partition are consumed in an ordered manner.

If no key is specified, the sticky partitioning strategy will be used for partitioning.

Custom partition

Kafka’s partitioning mechanism is an interface; you only need to implement this interface. org.apache.kafka.clients.producer.Partitioner.class

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class KafkaPartitionerConfig implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}

}

The custom partition mechanism requires modifying the partition configuration class to take effect!

You can also find the implementation classes of this interface to discover the implementations of other partitioning strategies.

image-20251127151504223

Message caching model

The sticky partitioning mechanism states that messages are not sent immediately, but first enter the last element of the cache queue corresponding to that partition. If the last element has enough space, it is appended; otherwise, a new element is created or a large enough space is allocated for writing.

In Kafka, the size of this element is a configuration option, with a default value of 16384 (16K).

spring:
kafka:
producer:
bootstrap-servers: ****:9094
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
partitioner:
class: com.xsdl.config.KafkaPartitionerConfig
batch-size: 16384
buffer-memory: 33554432

When you need to send a message, first estimate the size of the message, then allocate enough space.

if (buffer == null) {
    // Take the larger of the batchSize size and the estimated message size
    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
            RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers));
    
    // This call may block if we exhausted buffer space.
    // Allocate enough space of sufficient size
    buffer = free.allocate(size, maxTimeToBlock);
}

Kafka uses a buffer pool with a size of 32M, or  buffer-memory 33554432. Since it is often necessary to allocate a space of 16K, when a message of this size is sent, this memory will not be garbage collected. Instead, it will be simply cleared and maintained in an idle queue. Each space in the queue is 16K in size, and it can be allocated directly when needed.

Each Producer object has a property of type RecordAccumulator, and each RecordAccumulator maintains a Map where the key is the topic and the value is a Deque of ProducerBatch. ProducerBatch is the unit of transmission, usually 16KB, but it can be larger (for example, if a record is particularly large, a separate ProducerBatch of the corresponding size is created) or smaller (when a new message arrives, it is found that the remaining capacity of the previous ProducerBatch is insufficient, so the previous one is less than 16KB).

Therefore, there are four possible scenarios when creating a ProducerBatch:

  1. If the creation size is 16K and there are available ProducerBatches in the free queue, then allocation can be performed directly.
  2. If the created size is 16K but there are no free ProducerBatches in the free queue, then create a 16K space from the remaining memory, allocate it to the message, and clear it after use, adding it to the free queue.
  3. If the creation size is greater than 16K and there is enough remaining space after deducting the free queue, allocate space from the remaining space, and reclaim it by GC after use.
  4. If the created size is greater than 16K and there is insufficient remaining space after deducting the free queue, release ProducerBatch from the free queue sequentially until sufficient space is available, then allocate it and perform garbage collection after use.

Message confirmation mechanism

Kafka’s ability to ensure that messages are not lost is inseparable from its message acknowledgment mechanism.

To summarize in one sentence: Kafka only provides the greatest possible persistence guarantee for committed messages.

  • Committed: If the producer uses asynchronous sending, meaning it considers the message successful after sending, and then the service stops abnormally, the message might not have been successfully sent due to the caching model. In this case, Kafka will naturally not have a message record. Similarly, when the request reaches the Broker, if the leader node successfully saves the message, it is considered successful. If a sudden power outage occurs and the replica becomes the leader, the message will also be lost because it hasn’t been synchronized yet.
  • To the greatest extent possible: For example, high availability in a three-node environment generally means that if one node fails, the system can still function normally. If all three nodes fail, the system will naturally be unable to provide services. The same principle applies to Kafka clusters.

Based on the above analysis, the recommended Kafka producer configuration practices are as follows:

  1. Instead of using producer.send(msg), use the send method with callback notifications.
  2. Set acks = all. acks is a parameter of the Producer that represents your definition of a “committed” message. If set to all, it means that all replica brokers must receive the message for it to be considered “committed.” This is the highest level of “committed” definition.
  3. Set `retries` to a large value. Here, `retries` is also a parameter of the Producer. When there are momentary network fluctuations, message sending may fail. In this case, a Producer configured with `retries > 0` can automatically retry message sending to avoid message loss. The default value should be `Integer.MAX_VALUE`. The official recommendation is not to modify this configuration, but rather to limit the message timeout by configuring `request.timeout.ms` to avoid infinite retries.

The Kafka Broker configuration is as follows:

  1. Setting `unclean.leader.election.enable = false` controls which brokers are eligible to run for partition leader. If a broker lags too far behind the previous leader, message loss will inevitably occur once it becomes the new leader. Therefore, this parameter is generally set to false to prevent this from happening.
  2. Set `replication.factor` to >= 3. This is also a parameter on the Broker side. The point here is that it’s best to save multiple copies of the message, since redundancy is currently the primary mechanism for preventing message loss.
  3. Set `min.insync.replicas > 1`. This is still a broker-side parameter, controlling the minimum number of replicas a message must be written to to be considered “committed”. Setting it to a value greater than 1 can improve message persistence. In a real-world environment, never use the default value of 1.
  4. Ensure that `replication.factor` > `min.insync.replicas`. If they are equal, the entire partition will fail if even one replica fails. We aim to improve message persistence and prevent data loss without compromising availability. A recommended setting is `replication.factor = min.insync.replicas + 1`.

The Kafka consumer configuration is as follows:

  1. Ensure message consumption is complete before committing. The Consumer side has a parameter `enable.auto.commit`, which should ideally be set to `false`, and manual offset commit should be used. This is crucial for single-Consumer, multi-threaded processing scenarios, as it prevents message loss due to failed multi-threaded consumption even after offset commit.

Producer Interceptor

Configuration items:spring.kafka.producer.properties.interceptor.classes

spring:
  kafka:
    producer:
      bootstrap-servers: ****:9094
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        interceptor:
          classes: com.xsdl.config.KafkaInterceptor

Interceptor classes only need to implement the ProducerInterceptor interface.

@Slf4j
public class KafkaInterceptor implements ProducerInterceptor {

@Override
public ProducerRecord onSend(ProducerRecord record) {
log.info("Message: {} has been sent", record.value());
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
long offset = metadata.offset();
log.info("The offset of the message is:{}", offset);
}
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}

}
  • onSend: This method is called before the message is sent.
  • onAcknowledgement: This method is called after a message is successfully submitted or fails to be sent. It is located in the main path of the Producer’s sending process. If heavy business logic is placed in this method, it will seriously affect the message sending speed.

Note: OnSend and onAcknowledgement are Pay attention to thread safety called in different threads

Idempotence

Idempotency primarily prevents duplicate payments. For example, when purchasing the same product on some shopping apps, you might be prompted that there is already a transaction with the same amount with the same merchant, and you need to confirm whether you are making a duplicate payment.

If a network error causes the broker to time out while the producer is sending a message and waiting for a response from the broker, the producer will retry based on the retries parameter. If idempotency is not considered, the same message may be sent repeatedly.

The concept of idempotency actually involves reliability guarantees:

  • At most once: The message may be lost, but it will never be sent again.
  • At least once: The message will not be lost, but it may be sent repeatedly.
  • Exactly once: The message will not be lost or sent repeatedly.

If the message is sent at most once, the `acks` parameter can be set to 0, and the producer considers it successful after sending the message. If it’s sent at least once, there are unlimited retries until one succeeds. Neither at least once nor at most once is as appealing as exactly once. Most users prefer messages to be delivered only once, ensuring no message is lost or duplicated. In other words, even if the producer sends the same message repeatedly, the broker should automatically deduplicate it. From the downstream consumer’s perspective, there should still be only one message.

Kafka ensures idempotency by storing additional fields on the Broker side. When a Producer sends messages with the same field values, the Broker automatically recognizes these messages as duplicates and silently discards them in the background.

[1] Maintain a Pid for each producer.

[2] The producer maintains a Map<Partitoon, Seq> where the key is the partition and the value is the sequence number of the message sent by the current producer, an incrementing value starting from 0.

[3] The broker maintains a <Pid, Partition, SN>, where SN is the sequence number (seq) of the next message the broker expects to receive.

For example, if the producer sends a message <1,100>, the broker writes it successfully and updates the sn of the next message that it should receive in that partition to 101, but the ack fails, the producer retryes due to the retries setting. When the broker receives the message again, it finds that seq 100 < the expected sn of the next message 101, and considers the write to be successful. Therefore, it returns success directly to ensure the idempotency of the message.

The above implementation demonstrates some limitations of idempotency:

  • It can only achieve idempotency within a single session, not across sessions, because the PID will be updated after the producer changes the session.
  • It can only guarantee idempotency on a single partition, meaning an idempotent producer can guarantee that no duplicate messages appear on a single partition of a topic; it cannot achieve idempotency across multiple partitions.

Enabling this is very simple;  enable.idempotence just set it to true in the producer’s properties. The default value is true.

Transactional

The ACID properties will not be elaborated here. Transactional Producers can guarantee that messages are written atomically to multiple partitions.

How to activate:

  • enable.idempotence = true
  • Configure the `transactional.id` parameter on the Producer side. It’s best to give it a meaningful name. In Spring Boot, set the transaction ID prefix to `spring.kafka.producer.transaction-id-prefix`.

On the Consumer side, reading messages sent by a transactional Producer also requires some changes; the `isolation.level` parameter needs to be set. Currently, this parameter has two possible values:

  1. `read_uncommitted`: This is the default value , indicating that the Consumer can read any messages written by Kafka, regardless of whether the transactional Producer commits or terminates the transaction. Obviously, if you are using a transactional Producer, then the corresponding Consumer should not use this value (because regardless of whether the transaction ultimately succeeds or not, successfully written messages have already been stored).
  2. `read_committed` indicates that the Consumer will only read messages written by transactional Producers that have successfully committed their transactions. Of course, it can also see all messages written by non-transactional Producers.

Serialization and message compression

In addition to the content mentioned above, there’s also knowledge about serialization and message compression. If you’ve used RedisTemplate, you’ll basically know that you need to configure a serializer; otherwise, Redis will directly use the JDK’s serialization method, causing the keys and values ​​stored in Redis to appear as gibberish. The same applies to Kafka; you need to configure the corresponding key and value serialization methods on both the producer and consumer sides.

Message compression (configuration parameter: compression.type) is a trade-off between bandwidth and CPU consumption. Compression reduces message size, naturally saving network bandwidth, but correspondingly increasing CPU utilization on the client side. Message flow is as follows: Producer compresses, Broker retains, and Consumer decompresses. The Broker respects the Producer’s compression algorithm selection. If the Broker and Producer explicitly specify different compression methods, the Broker’s CPU utilization will increase. The attached diagram analyzes the available compression algorithms:

image-20251129140536358

If the server bandwidth is average but the client CPU is decent, it is recommended to enable zstd compression, which can greatly save network resource consumption.