Skip to main content

Producer Options

Complete reference for all producer configuration options.

Connection Settings

WithBootstrapServers

Kafka broker addresses for initial connection:

// Single server
.WithBootstrapServers("localhost:9092")

// Multiple servers (comma-separated)
.WithBootstrapServers("broker1:9092,broker2:9092,broker3:9092")

// Multiple servers (params)
.WithBootstrapServers("broker1:9092", "broker2:9092", "broker3:9092")

WithClientId

Identifier sent to brokers for logging and metrics:

.WithClientId("order-service-producer")

Delivery Settings

WithAcks

Controls when the broker considers a message delivered:

.WithAcks(Acks.All)     // Wait for all in-sync replicas (safest)
.WithAcks(Acks.Leader) // Wait for leader only (faster)
.WithAcks(Acks.None) // Don't wait (fastest, may lose messages)

EnableIdempotence

Prevents duplicate messages during retries:

.EnableIdempotence()

Automatically sets Acks.All and enables sequence numbers.

Batching Settings

WithLingerMs / WithLinger

Time to wait before sending a batch:

.WithLingerMs(5)                    // Wait up to 5ms
.WithLinger(TimeSpan.FromMilliseconds(5)) // Same, using TimeSpan

Higher values = more batching, higher latency.

WithBatchSize

Maximum batch size in bytes:

.WithBatchSize(65536)  // 64KB batches

Compression

UseCompression / Specific Methods

Enable message compression:

.UseLz4Compression()    // Fast, good ratio (recommended)
.UseZstdCompression() // Best ratio, more CPU
.UseSnappyCompression() // Very fast, lower ratio
.UseGzipCompression() // Compatible, slower

// Or specify directly
.UseCompression(CompressionType.Lz4)

Partitioning

WithPartitioner

Control how messages are assigned to partitions:

.WithPartitioner(PartitionerType.Default)     // Hash key or round-robin
.WithPartitioner(PartitionerType.Sticky) // Stick to partition for batching
.WithPartitioner(PartitionerType.RoundRobin) // Even distribution

Transactions

WithTransactionalId

Enable transactional producer:

.WithTransactionalId("my-service-tx-1")

Must be unique per producer instance.

Security

UseTls

Enable TLS encryption:

.UseTls()                    // Basic TLS
.UseTls(tlsConfig) // Custom TLS config
.UseMutualTls(caCert, clientCert, clientKey) // mTLS

SASL Authentication

.WithSaslPlain("username", "password")
.WithSaslScramSha256("username", "password")
.WithSaslScramSha512("username", "password")
.WithGssapi(gssapiConfig)
.WithOAuthBearer(oauthConfig)

Serialization

WithKeySerializer / WithValueSerializer

Custom serializers:

.WithKeySerializer(new JsonSerializer<OrderKey>())
.WithValueSerializer(new JsonSerializer<Order>())

Observability

WithLoggerFactory

Enable logging:

.WithLoggerFactory(loggerFactory)

WithStatisticsInterval / WithStatisticsHandler

Enable periodic statistics:

.WithStatisticsInterval(TimeSpan.FromSeconds(30))
.WithStatisticsHandler(stats =>
{
Console.WriteLine($"Messages sent: {stats.TotalMessagesSent}");
})

All Options Reference

MethodDefaultDescription
WithBootstrapServers(required)Broker addresses
WithClientId"dekaf-producer"Client identifier
WithAcksAllAcknowledgment mode
WithLingerMs5Batch wait time (ms)
WithBatchSize16384Max batch size (bytes)
EnableIdempotencefalsePrevent duplicates
WithTransactionalIdnullTransaction ID
UseCompressionNoneCompression codec
WithPartitionerDefaultPartition strategy
UseTlsfalseEnable TLS
WithKeySerializer(auto)Key serializer
WithValueSerializer(auto)Value serializer