Consumer Groups
Consumer groups enable multiple consumer instances to share the work of consuming a topic. Kafka automatically distributes partitions among group members.
How Consumer Groups Work
When multiple consumers share a group ID:
- Kafka assigns partitions to consumers (1 partition = 1 consumer max)
- Each message is delivered to exactly one consumer in the group
- If a consumer fails, its partitions are reassigned to others
Topic with 4 partitions:
Group "my-group" with 2 consumers:
Consumer A: [Partition 0] [Partition 1]
Consumer B: [Partition 2] [Partition 3]
Group "my-group" with 4 consumers:
Consumer A: [Partition 0]
Consumer B: [Partition 1]
Consumer C: [Partition 2]
Consumer D: [Partition 3]
Creating Consumer Group Members
Each consumer instance needs the same group ID:
using Dekaf;
// Instance 1
var consumer1 = await Kafka.CreateConsumer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("order-processors") // Same group ID
.SubscribeTo("orders")
.BuildAsync();
// Instance 2 (different machine/process)
var consumer2 = await Kafka.CreateConsumer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("order-processors") // Same group ID
.SubscribeTo("orders")
.BuildAsync();
Rebalancing
When the group membership changes, Kafka rebalances partitions:
- A new consumer joins
- A consumer leaves (graceful shutdown)
- A consumer is considered dead (heartbeat timeout)
- Topic partition count changes
Rebalance Listener
Get notified when partitions are assigned or revoked:
using Dekaf;
public class MyRebalanceListener : IRebalanceListener
{
public async ValueTask OnPartitionsAssignedAsync(
IEnumerable<TopicPartition> partitions,
CancellationToken ct)
{
Console.WriteLine($"Assigned: {string.Join(", ", partitions)}");
// Initialize resources for these partitions
}
public async ValueTask OnPartitionsRevokedAsync(
IEnumerable<TopicPartition> partitions,
CancellationToken ct)
{
Console.WriteLine($"Revoked: {string.Join(", ", partitions)}");
// Commit offsets, clean up resources
}
public async ValueTask OnPartitionsLostAsync(
IEnumerable<TopicPartition> partitions,
CancellationToken ct)
{
Console.WriteLine($"Lost: {string.Join(", ", partitions)}");
// Partitions were taken away (e.g., due to timeout)
}
}
var consumer = await Kafka.CreateConsumer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("my-group")
.WithRebalanceListener(new MyRebalanceListener())
.BuildAsync();
Cooperative Rebalancing
Dekaf uses cooperative (incremental) rebalancing by default, which minimizes disruption:
using Dekaf;
// Default: CooperativeSticky assignor
var consumer = await Kafka.CreateConsumer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("my-group")
.BuildAsync();
// Or explicitly set the assignor
var consumer = await Kafka.CreateConsumer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("my-group")
.WithPartitionAssignmentStrategy(PartitionAssignmentStrategy.CooperativeSticky)
.BuildAsync();
With cooperative rebalancing:
- Only affected partitions are revoked
- Other partitions continue processing
- Reduces rebalance time significantly
Static Membership
For faster rebalances with planned restarts, use static membership:
using Dekaf;
var consumer = await Kafka.CreateConsumer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("my-group")
.WithGroupInstanceId("instance-1") // Must be unique within the group
.BuildAsync();
Benefits:
- Consumer can rejoin and get the same partitions back
- No rebalance if consumer restarts within session timeout
- Great for rolling deployments
Each instance in the group must have a unique GroupInstanceId. Using the same ID causes fencing.
Session and Heartbeat Configuration
using Dekaf;
var consumer = await Kafka.CreateConsumer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("my-group")
.WithSessionTimeout(TimeSpan.FromSeconds(45)) // Max time before considered dead
.WithHeartbeatInterval(TimeSpan.FromSeconds(3)) // How often to send heartbeats
.BuildAsync();
Guidelines:
SessionTimeoutshould be > 3xHeartbeatInterval- Longer timeout = more time for slow consumers, but slower failure detection
- Shorter timeout = faster failure detection, but more spurious rebalances
Consumer Group Metadata
Access information about the group:
// Get member ID
string? memberId = consumer.MemberId;
// Get consumer group metadata (for transactions)
var metadata = consumer.ConsumerGroupMetadata;
Scaling Consumers
Adding Consumers
When you add consumers to a group:
- New consumer joins and triggers rebalance
- Partitions are redistributed
- New consumer starts receiving messages
Removing Consumers
When a consumer leaves gracefully (await using or CloseAsync):
- Consumer commits offsets and leaves group
- Remaining consumers get its partitions
Maximum Parallelism
The maximum number of active consumers in a group equals the number of partitions:
Topic with 4 partitions:
- 1 consumer: processes all 4 partitions
- 2 consumers: each processes 2 partitions
- 4 consumers: each processes 1 partition
- 5 consumers: one consumer is idle!
Multiple Consumer Groups
Different groups consume the same topic independently:
using Dekaf;
// Analytics group - processes all messages
var analyticsConsumer = await Kafka.CreateConsumer<string, string>()
.WithGroupId("analytics")
.SubscribeTo("orders")
.BuildAsync();
// Notification group - also processes all messages
var notificationConsumer = await Kafka.CreateConsumer<string, string>()
.WithGroupId("notifications")
.SubscribeTo("orders")
.BuildAsync();
Each group:
- Tracks its own offsets
- Receives all messages
- Scales independently
Complete Example
using Dekaf;
public class OrderProcessor
{
private readonly ILogger<OrderProcessor> _logger;
public async Task RunAsync(string instanceId, CancellationToken ct)
{
await using var consumer = await Kafka.CreateConsumer<string, Order>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("order-processors")
.WithGroupInstanceId(instanceId) // Static membership
.WithRebalanceListener(new LoggingRebalanceListener(_logger))
.WithOffsetCommitMode(OffsetCommitMode.Manual)
.SubscribeTo("orders")
.BuildAsync();
_logger.LogInformation(
"Consumer {InstanceId} started, member ID: {MemberId}",
instanceId,
consumer.MemberId
);
await foreach (var batch in consumer.ConsumeAsync(ct).Batch(100))
{
_logger.LogInformation(
"Processing batch of {Count} orders from partitions: {Partitions}",
batch.Count,
string.Join(", ", batch.Select(m => m.Partition).Distinct())
);
foreach (var msg in batch)
{
await ProcessOrderAsync(msg.Value);
}
await consumer.CommitAsync();
}
}
private class LoggingRebalanceListener : IRebalanceListener
{
private readonly ILogger _logger;
public LoggingRebalanceListener(ILogger logger) => _logger = logger;
public ValueTask OnPartitionsAssignedAsync(IEnumerable<TopicPartition> partitions, CancellationToken ct)
{
_logger.LogInformation("Partitions assigned: {Partitions}", string.Join(", ", partitions));
return ValueTask.CompletedTask;
}
public ValueTask OnPartitionsRevokedAsync(IEnumerable<TopicPartition> partitions, CancellationToken ct)
{
_logger.LogInformation("Partitions revoked: {Partitions}", string.Join(", ", partitions));
return ValueTask.CompletedTask;
}
public ValueTask OnPartitionsLostAsync(IEnumerable<TopicPartition> partitions, CancellationToken ct)
{
_logger.LogWarning("Partitions lost: {Partitions}", string.Join(", ", partitions));
return ValueTask.CompletedTask;
}
}
private Task ProcessOrderAsync(Order order) => Task.Delay(10);
}