Skip to main content

Producer Options

Complete reference for all producer configuration options.

Connection Settings

WithBootstrapServers

Kafka broker addresses for initial connection:

// Single server
.WithBootstrapServers("localhost:9092")

// Multiple servers (comma-separated)
.WithBootstrapServers("broker1:9092,broker2:9092,broker3:9092")

// Multiple servers (params)
.WithBootstrapServers("broker1:9092", "broker2:9092", "broker3:9092")

WithClientId

Identifier sent to brokers for logging and metrics:

.WithClientId("order-service-producer")

Delivery Settings

WithAcks

Controls when the broker considers a message delivered:

.WithAcks(Acks.All) // Wait for all in-sync replicas (safest)
.WithAcks(Acks.Leader) // Wait for leader only (faster)
.WithAcks(Acks.None) // Don't wait (fastest, may lose messages)

EnableIdempotence

Prevents duplicate messages during retries:

.EnableIdempotence()

Automatically sets Acks.All and enables sequence numbers.

Batching Settings

WithLingerMs / WithLinger

Time to wait before sending a batch:

.WithLingerMs(5) // Wait up to 5ms
.WithLinger(TimeSpan.FromMilliseconds(5)) // Same, using TimeSpan

Higher values = more batching, higher latency.

WithBatchSize

Maximum batch size in bytes:

.WithBatchSize(65536) // 64KB batches

Compression

UseCompression / Specific Methods

Enable message compression:

.UseLz4Compression() // Fast, good ratio (recommended)
.UseZstdCompression() // Best ratio, more CPU
.UseSnappyCompression() // Very fast, lower ratio
.UseGzipCompression() // Compatible, slower

// Or specify directly
.UseCompression(CompressionType.Lz4)

Partitioning

WithPartitioner

Control how messages are assigned to partitions:

.WithPartitioner(PartitionerType.Default) // Hash key or round-robin
.WithPartitioner(PartitionerType.Sticky) // Stick to partition for batching
.WithPartitioner(PartitionerType.RoundRobin) // Even distribution

Transactions

WithTransactionalId

Enable transactional producer:

.WithTransactionalId("my-service-tx-1")

Must be unique per producer instance.

Security

UseTls

Enable TLS encryption:

.UseTls() // Basic TLS
.UseTls(tlsConfig) // Custom TLS config
.UseMutualTls(caCert, clientCert, clientKey) // mTLS

SASL Authentication

.WithSaslPlain("username", "password")
.WithSaslScramSha256("username", "password")
.WithSaslScramSha512("username", "password")
.WithGssapi(gssapiConfig)
.WithOAuthBearer(oauthConfig)

Serialization

WithKeySerializer / WithValueSerializer

Custom serializers:

.WithKeySerializer(new JsonSerializer<OrderKey>())
.WithValueSerializer(new JsonSerializer<Order>())

Networking

WithConnectionsPerBroker

Number of TCP connections to each broker:

.WithConnectionsPerBroker(3) // 3 parallel connections per broker

Default: 1. Must be 1 for idempotent producers (partition affinity requires a fixed connection).

WithAdaptiveConnections

Configure adaptive connection scaling. When sustained buffer backpressure is detected, the producer automatically adds connections per broker to increase drain throughput:

// Use defaults (max 10 connections per broker)
.WithAdaptiveConnections()

// Custom maximum
.WithAdaptiveConnections(maxConnections: 5)

Adaptive scaling is enabled by default for non-idempotent producers. It monitors three signals before scaling up:

  • Pressure delta: at least 100 buffer-full events since the last check
  • Utilization: buffer is over 80% full
  • Cooldown: at least 30 seconds since the last scale-up

Connections are only scaled up, never down. Connections added during a traffic spike persist for the lifetime of the producer. Idempotent producers ignore this setting.

WithoutAdaptiveConnections

Disable adaptive scaling and use a fixed connection count:

.WithoutAdaptiveConnections()

WithBufferMemory

Maximum memory the producer uses for buffering unsent messages:

.WithBufferMemory(256 * 1024 * 1024) // 256MB

Default: 2GB. When the buffer is full, ProduceAsync and Send block until space is freed (controlled by WithMaxBlockMs). Increase if profiling shows significant time in backpressure waits; decrease in memory-constrained environments.

WithSocketSendBufferBytes / WithSocketReceiveBufferBytes

TCP socket buffer sizes:

.WithSocketSendBufferBytes(1_048_576) // 1MB send buffer
.WithSocketReceiveBufferBytes(1_048_576) // 1MB receive buffer

Observability

WithLoggerFactory

Enable logging:

.WithLoggerFactory(loggerFactory)

WithStatisticsInterval / WithStatisticsHandler

Enable periodic statistics:

.WithStatisticsInterval(TimeSpan.FromSeconds(30))
.WithStatisticsHandler(stats =>
{
Console.WriteLine($"Messages sent: {stats.TotalMessagesSent}");
})

All Options Reference

MethodDefaultDescription
WithBootstrapServers(required)Broker addresses
WithClientId"dekaf-producer"Client identifier
WithAcksAllAcknowledgment mode
WithLingerMs5Batch wait time (ms)
WithBatchSize16384Max batch size (bytes)
EnableIdempotencefalsePrevent duplicates
WithTransactionalIdnullTransaction ID
UseCompressionNoneCompression codec
WithPartitionerDefaultPartition strategy
WithConnectionsPerBroker1TCP connections per broker
WithAdaptiveConnectionsenabled (max 10)Auto-scale connections under load
WithoutAdaptiveConnectionsDisable adaptive scaling
WithBufferMemory2GBMax buffer for unsent messages
WithSocketSendBufferBytes(OS default)TCP send buffer size
WithSocketReceiveBufferBytes(OS default)TCP receive buffer size
UseTlsfalseEnable TLS
WithKeySerializer(auto)Key serializer
WithValueSerializer(auto)Value serializer