Skip to main content

Consumer Options

Complete reference for all consumer configuration options.

Connection Settings

WithBootstrapServers

Kafka broker addresses:

.WithBootstrapServers("localhost:9092")
.WithBootstrapServers("broker1:9092,broker2:9092")

WithClientId

Client identifier:

.WithClientId("order-processor")

Consumer Group Settings

WithGroupId

Consumer group identifier (required for group consumption):

.WithGroupId("order-processors")

WithGroupInstanceId

Static membership ID for faster rebalances:

.WithGroupInstanceId("instance-1")

Offset Management

WithOffsetCommitMode

How offsets are committed (matches Kafka's enable.auto.commit):

.WithOffsetCommitMode(OffsetCommitMode.Auto)    // Automatic commit in background (default)
.WithOffsetCommitMode(OffsetCommitMode.Manual) // You call CommitAsync() explicitly

WithAutoCommitInterval

Control how often offsets are committed in Auto mode:

.WithAutoCommitInterval(5000)                      // Commit every 5 seconds (default)
.WithAutoCommitInterval(TimeSpan.FromSeconds(5)) // Same, using TimeSpan

WithAutoOffsetReset

Where to start when no committed offset exists:

.WithAutoOffsetReset(AutoOffsetReset.Latest)    // New messages only (default)
.WithAutoOffsetReset(AutoOffsetReset.Earliest) // From beginning
.WithAutoOffsetReset(AutoOffsetReset.None) // Throw exception

Fetch Settings

WithMaxPollRecords

Maximum messages per poll:

.WithMaxPollRecords(500)  // Default: 500

Fetch Tuning (via presets or internal settings)

Control how data is fetched from brokers. These are set by presets:

  • FetchMinBytes - Minimum data to return (wait for more)
  • FetchMaxWaitMs - Maximum wait time for FetchMinBytes

Session Settings

WithSessionTimeout

How long before consumer is considered dead:

.WithSessionTimeout(45000)                      // 45 seconds (default)
.WithSessionTimeout(TimeSpan.FromSeconds(45)) // Same, using TimeSpan

Subscription

SubscribeTo

Subscribe to topics during build:

.SubscribeTo("orders")
.SubscribeTo("orders", "payments", "notifications")

Rebalancing

WithRebalanceListener

Get notified of partition changes:

.WithRebalanceListener(new MyRebalanceListener())

WithPartitionAssignmentStrategy

Partition assignment algorithm:

.WithPartitionAssignmentStrategy(PartitionAssignmentStrategy.CooperativeSticky)  // Default
.WithPartitionAssignmentStrategy(PartitionAssignmentStrategy.Range)
.WithPartitionAssignmentStrategy(PartitionAssignmentStrategy.RoundRobin)

Security

UseTls

Enable TLS:

.UseTls()
.UseTls(tlsConfig)
.UseMutualTls(caCert, clientCert, clientKey)

SASL Authentication

.WithSaslPlain("username", "password")
.WithSaslScramSha256("username", "password")
.WithSaslScramSha512("username", "password")

Serialization

WithKeyDeserializer / WithValueDeserializer

Custom deserializers:

.WithKeyDeserializer(new JsonDeserializer<OrderKey>())
.WithValueDeserializer(new JsonDeserializer<Order>())

Advanced Settings

WithPartitionEof

Receive notification when reaching end of partition:

.WithPartitionEof(true)

WithIsolationLevel

For transactional reads:

.WithIsolationLevel(IsolationLevel.ReadCommitted)    // Only committed messages
.WithIsolationLevel(IsolationLevel.ReadUncommitted) // All messages (default)

Observability

WithLoggerFactory

.WithLoggerFactory(loggerFactory)

WithStatisticsInterval / WithStatisticsHandler

.WithStatisticsInterval(TimeSpan.FromSeconds(30))
.WithStatisticsHandler(stats => { /* ... */ })

All Options Reference

MethodDefaultDescription
WithBootstrapServers(required)Broker addresses
WithClientId"dekaf-consumer"Client identifier
WithGroupIdnullConsumer group ID
WithGroupInstanceIdnullStatic membership ID
WithOffsetCommitModeAutoOffset management mode
WithAutoCommitInterval5000msAuto-commit interval
WithAutoOffsetResetLatestStart position
WithMaxPollRecords500Max messages per poll
WithSessionTimeout45000msSession timeout
SubscribeTo(none)Topics to subscribe
WithRebalanceListenernullRebalance callbacks
WithPartitionEoffalseEOF notifications
UseTlsfalseEnable TLS