Analytics consent

We use Google Analytics for basic traffic stats. kafkahelper stores no cluster data, config input, or personal data in the app.

Back to How To

How To

How to use Kafka queues

Kafka 4.2 introduces real queue semantics through Share Groups. If you want work-queue behavior in Java, this is the feature to use instead of forcing classic consumer groups to imitate a queue.

Kafka 4.2 makes Queues for Kafka production-ready. The feature introduces Share Groups, which are different from classic consumer groups. In a share group, consumers cooperatively process records without assigning each partition to only one consumer, and they acknowledge records individually.

That means this is now a real queue feature, not just a queue-shaped pattern built on top of topics and offsets. It is the right fit when work items are processed one at a time and you want per-record acknowledgement, delivery attempts, and more queue-like behavior than classic consumer groups provide.

What changed in Kafka 4.2

Kafka's 4.2 upgrade notes say Queues for Kafka from KIP-932 are now production-ready. The feature introduces Share Groups as an alternative to traditional consumer groups.

The key behavioral change is that consumers in a share group cooperatively consume records from topics without giving one consumer exclusive ownership of an entire partition. Share groups also add per-record acknowledgement and delivery-attempt counting, which is exactly why they fit queue-style workloads better.

When to use Share Groups

Use Share Groups when records are treated as units of work and you want queue behavior: individual acknowledgement, retries, and worker pools that are not strictly limited by one-consumer-per-partition ownership.

This is a better fit than classic consumer groups when the workload is job processing rather than stream processing. If ordering inside a partition is the main requirement, classic consumer groups still map more directly to that stream-style model.

The actual queue configs that matter

Kafka 4.2 adds share-specific client and group settings. These are the configs that actually make the queue semantics explicit in Java.

  • group.id: still identifies the group, but here it is the share group for the queue workers.
  • share.acknowledgement.mode=explicit: makes the share consumer use explicit acknowledgement via ShareConsumer.acknowledge() instead of implicit ack on the next poll or commit.
  • share.auto.offset.reset=earliest: useful for a new queue group if it should start from the backlog rather than only new records.
  • share.acquire.mode=record_limit: makes max.poll.records a hard limit per poll() instead of allowing larger batch-optimized returns.
  • max.poll.records: controls how much work is handed to the worker on each poll when using record_limit.
  • share.record.lock.duration.ms: how long a record stays acquired before it can be made available again if not acknowledged in time.
  • share.session.timeout.ms and share.heartbeat.interval.ms: liveness settings for share group members.
  • share.isolation.level=read_committed: if the queue should ignore aborted transactional writes.
Properties props = new Properties();
props.put("bootstrap.servers", "broker-1:9092,broker-2:9092");
props.put("group.id", "jobs-queue");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("share.acknowledgement.mode", "explicit");
props.put("share.auto.offset.reset", "earliest");
props.put("share.acquire.mode", "record_limit");
props.put("max.poll.records", "50");
props.put("share.record.lock.duration.ms", "60000");
props.put("share.session.timeout.ms", "45000");
props.put("share.heartbeat.interval.ms", "5000");
props.put("share.isolation.level", "read_committed");

ShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);

The Java API is different from a normal consumer

Kafka's APIs page now lists the Share Consumer API separately from the normal Consumer API. In Java, the type to think about is ShareConsumer<K,V>, implemented by KafkaShareConsumer.

This is the important difference from a classic consumer-group loop: queue-style processing is driven by acknowledgement, not by offset commit alone. You can explicitly tell Kafka whether a record was accepted, should be released for retry, should have its processing window renewed, or should be rejected.

consumer.subscribe(List.of("jobs"));

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

  for (ConsumerRecord<String, String> record : records) {
    try {
      process(record.value());
      consumer.acknowledge(record, AcknowledgeType.ACCEPT);
    } catch (RetryableJobException ex) {
      consumer.acknowledge(record, AcknowledgeType.RELEASE);
    } catch (SlowJobException ex) {
      consumer.acknowledge(record, AcknowledgeType.RENEW);
    } catch (Exception ex) {
      consumer.acknowledge(record, AcknowledgeType.REJECT);
    }
  }

  consumer.commitSync();
}

What the acknowledgement settings mean

The 4.2 release announcement specifically calls out the RENEW acknowledgement type for extended processing times. That matters when a job is still in progress and you do not want the record lock to expire and hand the job to another worker too early.

A practical queue model in Java usually looks like this: ACCEPT after successful processing, RELEASE for retryable failures, RENEW when the handler needs more time, and REJECT when the record is unprocessable and should not keep bouncing forever.

Producer settings still matter

Even with real queue semantics, the producer still decides whether accepted jobs are durable. Queue behavior is only as good as the write path putting work into the queue.

  • Use acks=all so jobs are not treated as accepted after only a leader write.
  • Use enable.idempotence=true so producer retries do not duplicate jobs during transient failures.
  • Keep max.in.flight.requests.per.connection<=5 together with idempotence, which Kafka requires for safe ordering-preserving retries.
Properties props = new Properties();
props.put("bootstrap.servers", "broker-1:9092,broker-2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("enable.idempotence", "true");
props.put("retries", Integer.toString(Integer.MAX_VALUE));
props.put("max.in.flight.requests.per.connection", "5");
props.put("compression.type", "lz4");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

What to watch operationally

Kafka 4.2 added more queue-oriented observability for Share Groups, including lag metrics and delivery-attempt behavior. That is important because queue workloads fail differently from stream workloads: the big questions are whether jobs are getting stuck, repeatedly retried, or timing out under lock.

In practice, watch record lock duration, worker processing time, retry volume, and whether max.poll.records is sized to the real concurrency and memory budget of the Java process. If workers are slow, either raise share.record.lock.duration.ms or use RENEW deliberately rather than letting the lock expire accidentally.

One important correction

If you are on Kafka 4.2 and your goal is true queue semantics, do not start from the old 'topic plus classic consumer group plus manual commits' pattern unless you have a specific compatibility reason. That older model is still valid for stream-style work distribution, but Kafka 4.2 now gives you a feature built for queues directly.