Skip to main content

Transactions

Kafka transactions enable exactly-once semantics (EOS) by allowing you to atomically write to multiple partitions and topics. Either all messages in a transaction are committed, or none are.

When to Use Transactions

Transactions are useful when you need to:

  • Atomically write to multiple topics - All succeed or all fail
  • Implement exactly-once processing - Consume, process, produce without duplicates
  • Maintain consistency - Ensure related messages are visible together

Setting Up a Transactional Producer

Create a producer with a transactional ID:

using Dekaf;

await using var producer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithTransactionalId("my-service-instance-1") // Must be unique per instance
.BuildAsync();

// Initialize transactions (required before first transaction)
await producer.InitTransactionsAsync();
warning

The transactional ID must be unique per producer instance. If two producers use the same ID, one will be fenced (killed) by Kafka.

Basic Transaction Flow

try
{
// Start the transaction
await producer.BeginTransactionAsync();

// Send messages within the transaction
await producer.ProduceAsync("orders", orderId, orderJson);
await producer.ProduceAsync("audit-log", orderId, auditEntry);
await producer.ProduceAsync("notifications", userId, notification);

// Commit - all messages become visible atomically
await producer.CommitTransactionAsync();
}
catch (Exception ex)
{
// Abort - none of the messages become visible
await producer.AbortTransactionAsync();
throw;
}

Exactly-Once Processing (Consume-Transform-Produce)

The most common use case for transactions is exactly-once stream processing:

using Dekaf;

await using var consumer = await Kafka.CreateConsumer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("processor-group")
.WithOffsetCommitMode(OffsetCommitMode.Manual) // We'll commit via transaction
.WithIsolationLevel(IsolationLevel.ReadCommitted) // Only read committed messages
.SubscribeTo("input-topic")
.BuildAsync();

await using var producer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithTransactionalId($"processor-{Environment.MachineName}")
.BuildAsync();

await producer.InitTransactionsAsync();

await foreach (var message in consumer.ConsumeAsync(ct))
{
try
{
await producer.BeginTransactionAsync();

// Process and produce output
var result = Process(message.Value);
await producer.ProduceAsync("output-topic", message.Key, result);

// Commit offsets within the transaction
await producer.SendOffsetsToTransactionAsync(
consumer.ConsumerGroupMetadata,
new[] { new TopicPartitionOffset(message.Topic, message.Partition, message.Offset + 1) }
);

await producer.CommitTransactionAsync();
}
catch (Exception ex)
{
await producer.AbortTransactionAsync();
_logger.LogError(ex, "Failed to process message, transaction aborted");
}
}

This pattern guarantees:

  • Each input message is processed exactly once
  • Output messages are produced exactly once
  • Consumer offsets are committed atomically with outputs

Transaction Isolation Levels

Consumers can choose whether to read uncommitted messages:

// Read all messages, including uncommitted (default)
.WithIsolationLevel(IsolationLevel.ReadUncommitted)

// Only read committed messages
.WithIsolationLevel(IsolationLevel.ReadCommitted)
tip

Use ReadCommitted when consuming from topics that receive transactional writes to avoid seeing messages that might be aborted.

Transaction Timeouts

Transactions have a timeout to prevent hanging transactions:

using Dekaf;

var producer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithTransactionalId("my-service")
.WithTransactionTimeout(TimeSpan.FromMinutes(2)) // Default is 1 minute
.BuildAsync();

If a transaction isn't committed or aborted within the timeout, Kafka will abort it automatically.

Error Handling

Different errors require different handling:

try
{
await producer.BeginTransactionAsync();
// ... produce messages ...
await producer.CommitTransactionAsync();
}
catch (ProducerFencedException)
{
// Another producer with the same transactional ID took over
// This producer is no longer valid - must recreate
throw;
}
catch (TransactionAbortedException)
{
// Transaction was aborted by Kafka (timeout, etc.)
// Can retry with a new transaction
await producer.AbortTransactionAsync();
}
catch (Exception ex)
{
// Other errors - abort and possibly retry
await producer.AbortTransactionAsync();
throw;
}

Best Practices

1. Keep Transactions Short

Long-running transactions:

  • Increase memory usage on brokers
  • May timeout
  • Block consumers using ReadCommitted
// ✅ Good - quick transaction
await producer.BeginTransactionAsync();
await producer.ProduceAsync("topic", key, value);
await producer.CommitTransactionAsync();

// ❌ Bad - long-running transaction
await producer.BeginTransactionAsync();
foreach (var item in millionsOfItems) // Too many items!
{
await producer.ProduceAsync("topic", item.Key, item.Value);
}
await producer.CommitTransactionAsync();

2. Unique Transactional IDs

Use instance-specific IDs to avoid fencing:

// ✅ Good - unique per instance
var transactionalId = $"order-processor-{Environment.MachineName}-{Guid.NewGuid():N}";

// ❌ Bad - will cause fencing when scaled
var transactionalId = "order-processor"; // Same ID for all instances!

3. Idempotent Operations

Even with transactions, make your processing idempotent when possible:

// Use deterministic IDs
var outputKey = $"{inputMessage.Topic}-{inputMessage.Partition}-{inputMessage.Offset}";

// Check if already processed (in your database, cache, etc.)
if (await IsAlreadyProcessedAsync(outputKey))
{
// Skip - this handles edge cases during recovery
continue;
}

Complete Example

public class ExactlyOnceProcessor
{
private readonly IKafkaConsumer<string, string> _consumer;
private readonly IKafkaProducer<string, string> _producer;
private readonly ILogger _logger;

public async Task RunAsync(CancellationToken ct)
{
await _producer.InitTransactionsAsync();

await foreach (var batch in _consumer.ConsumeAsync(ct).Batch(100))
{
try
{
await _producer.BeginTransactionAsync();

var offsets = new List<TopicPartitionOffset>();

foreach (var msg in batch)
{
var result = Transform(msg.Value);
await _producer.ProduceAsync("output", msg.Key, result);
offsets.Add(new TopicPartitionOffset(msg.Topic, msg.Partition, msg.Offset + 1));
}

await _producer.SendOffsetsToTransactionAsync(
_consumer.ConsumerGroupMetadata!,
offsets
);

await _producer.CommitTransactionAsync();
_logger.LogInformation("Committed batch of {Count} messages", batch.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Transaction failed, aborting");
await _producer.AbortTransactionAsync();
}
}
}

private string Transform(string input) => input.ToUpperInvariant();
}