Producer Options
Complete reference for all producer configuration options.
Connection Settings
WithBootstrapServers
Kafka broker addresses for initial connection:
// Single server
.WithBootstrapServers("localhost:9092")
// Multiple servers (comma-separated)
.WithBootstrapServers("broker1:9092,broker2:9092,broker3:9092")
// Multiple servers (params)
.WithBootstrapServers("broker1:9092", "broker2:9092", "broker3:9092")
WithClientId
Identifier sent to brokers for logging and metrics:
.WithClientId("order-service-producer")
Delivery Settings
WithAcks
Controls when the broker considers a message delivered:
.WithAcks(Acks.All) // Wait for all in-sync replicas (safest)
.WithAcks(Acks.Leader) // Wait for leader only (faster)
.WithAcks(Acks.None) // Don't wait (fastest, may lose messages)
EnableIdempotence
Prevents duplicate messages during retries:
.EnableIdempotence()
Automatically sets Acks.All and enables sequence numbers.
Batching Settings
WithLingerMs / WithLinger
Time to wait before sending a batch:
.WithLingerMs(5) // Wait up to 5ms
.WithLinger(TimeSpan.FromMilliseconds(5)) // Same, using TimeSpan
Higher values = more batching, higher latency.
WithBatchSize
Maximum batch size in bytes:
.WithBatchSize(65536) // 64KB batches
Compression
UseCompression / Specific Methods
Enable message compression:
.UseLz4Compression() // Fast, good ratio (recommended)
.UseZstdCompression() // Best ratio, more CPU
.UseSnappyCompression() // Very fast, lower ratio
.UseGzipCompression() // Compatible, slower
// Or specify directly
.UseCompression(CompressionType.Lz4)
Partitioning
WithPartitioner
Control how messages are assigned to partitions:
.WithPartitioner(PartitionerType.Default) // Hash key or round-robin
.WithPartitioner(PartitionerType.Sticky) // Stick to partition for batching
.WithPartitioner(PartitionerType.RoundRobin) // Even distribution
Transactions
WithTransactionalId
Enable transactional producer:
.WithTransactionalId("my-service-tx-1")
Must be unique per producer instance.
Security
UseTls
Enable TLS encryption:
.UseTls() // Basic TLS
.UseTls(tlsConfig) // Custom TLS config
.UseMutualTls(caCert, clientCert, clientKey) // mTLS
SASL Authentication
.WithSaslPlain("username", "password")
.WithSaslScramSha256("username", "password")
.WithSaslScramSha512("username", "password")
.WithGssapi(gssapiConfig)
.WithOAuthBearer(oauthConfig)
Serialization
WithKeySerializer / WithValueSerializer
Custom serializers:
.WithKeySerializer(new JsonSerializer<OrderKey>())
.WithValueSerializer(new JsonSerializer<Order>())
Networking
WithConnectionsPerBroker
Number of TCP connections to each broker:
.WithConnectionsPerBroker(3) // 3 parallel connections per broker
Default: 1. Must be 1 for idempotent producers (partition affinity requires a fixed connection).
WithAdaptiveConnections
Configure adaptive connection scaling. When sustained buffer backpressure is detected, the producer automatically adds connections per broker to increase drain throughput:
// Use defaults (max 10 connections per broker)
.WithAdaptiveConnections()
// Custom maximum
.WithAdaptiveConnections(maxConnections: 5)
Adaptive scaling is enabled by default for non-idempotent producers. It monitors three signals before scaling up:
- Pressure delta: at least 100 buffer-full events since the last check
- Utilization: buffer is over 80% full
- Cooldown: at least 30 seconds since the last scale-up
Connections are only scaled up, never down. Connections added during a traffic spike persist for the lifetime of the producer. Idempotent producers ignore this setting.
WithoutAdaptiveConnections
Disable adaptive scaling and use a fixed connection count:
.WithoutAdaptiveConnections()
WithBufferMemory
Maximum memory the producer uses for buffering unsent messages:
.WithBufferMemory(256 * 1024 * 1024) // 256MB
Default: 2GB. When the buffer is full, ProduceAsync and Send block until space is freed (controlled by WithMaxBlockMs). Increase if profiling shows significant time in backpressure waits; decrease in memory-constrained environments.
WithSocketSendBufferBytes / WithSocketReceiveBufferBytes
TCP socket buffer sizes:
.WithSocketSendBufferBytes(1_048_576) // 1MB send buffer
.WithSocketReceiveBufferBytes(1_048_576) // 1MB receive buffer
Observability
WithLoggerFactory
Enable logging:
.WithLoggerFactory(loggerFactory)
WithStatisticsInterval / WithStatisticsHandler
Enable periodic statistics:
.WithStatisticsInterval(TimeSpan.FromSeconds(30))
.WithStatisticsHandler(stats =>
{
Console.WriteLine($"Messages sent: {stats.TotalMessagesSent}");
})
All Options Reference
| Method | Default | Description |
|---|---|---|
WithBootstrapServers | (required) | Broker addresses |
WithClientId | "dekaf-producer" | Client identifier |
WithAcks | All | Acknowledgment mode |
WithLingerMs | 5 | Batch wait time (ms) |
WithBatchSize | 16384 | Max batch size (bytes) |
EnableIdempotence | false | Prevent duplicates |
WithTransactionalId | null | Transaction ID |
UseCompression | None | Compression codec |
WithPartitioner | Default | Partition strategy |
WithConnectionsPerBroker | 1 | TCP connections per broker |
WithAdaptiveConnections | enabled (max 10) | Auto-scale connections under load |
WithoutAdaptiveConnections | — | Disable adaptive scaling |
WithBufferMemory | 2GB | Max buffer for unsent messages |
WithSocketSendBufferBytes | (OS default) | TCP send buffer size |
WithSocketReceiveBufferBytes | (OS default) | TCP receive buffer size |
UseTls | false | Enable TLS |
WithKeySerializer | (auto) | Key serializer |
WithValueSerializer | (auto) | Value serializer |