Skip to main content

Fire-and-Forget Production

For high-throughput scenarios where you don't need to wait for acknowledgment, Dekaf provides fire-and-forget methods that return immediately.

The Produce() Method

Produce() queues a message for delivery and returns immediately without waiting:

// Returns immediately - doesn't wait for broker acknowledgment
producer.Produce("my-topic", "key", "value");

Compare this to ProduceAsync():

// Waits for broker acknowledgment
await producer.ProduceAsync("my-topic", "key", "value");

When to Use Fire-and-Forget

Fire-and-forget is ideal when:

  • High throughput - You're sending many messages per second
  • Eventual delivery is OK - You don't need immediate confirmation
  • Latency matters - You can't afford to wait for network round-trips
  • Logs/metrics - Data that's valuable but not critical
caution

Fire-and-forget means you won't know if a message fails to deliver. Use it only when losing occasional messages is acceptable.

Ensuring Delivery

Messages sent via Produce() are buffered internally. To ensure all buffered messages are delivered:

// Send many messages quickly
for (int i = 0; i < 10000; i++)
{
producer.Produce("events", $"event-{i}", eventData);
}

// Wait for all to be delivered
await producer.FlushAsync();
tip

Always call FlushAsync() before disposing the producer, or use await using which does this automatically.

With Delivery Callbacks

If you need to know about delivery success/failure without blocking, use the callback overload:

producer.Produce(
ProducerMessage<string, string>.Create("orders", orderId, orderJson),
(metadata, error) =>
{
if (error != null)
{
_logger.LogError(error, "Failed to deliver order {OrderId}", orderId);
// Queue for retry, send to dead letter, etc.
}
else
{
_logger.LogDebug("Order {OrderId} delivered to partition {Partition}",
orderId, metadata.Partition);
}
}
);

The callback is invoked on a background thread when delivery completes (successfully or not).

warning

Don't perform blocking operations in the callback - it runs on the producer's internal thread pool.

With Headers

Use the extension method to send with headers:

using Dekaf.Producer;

var headers = Headers.Create()
.Add("trace-id", traceId)
.Add("source", "event-generator");

producer.Produce("events", eventKey, eventValue, headers);

Performance Comparison

Here's how the different methods compare:

MethodBlocks?Knows Result?Throughput
await ProduceAsync()YesImmediatelyLower
Produce()NoNeverHighest
Produce(..., callback)NoVia callbackHigh

Real-World Example: Event Streaming

Here's a pattern for high-throughput event streaming:

public class EventPublisher : IAsyncDisposable
{
private readonly IKafkaProducer<string, byte[]> _producer;
private readonly ILogger<EventPublisher> _logger;
private long _successCount;
private long _failureCount;

public EventPublisher(IKafkaProducer<string, byte[]> producer, ILogger<EventPublisher> logger)
{
_producer = producer;
_logger = logger;
}

public void Publish(string eventType, byte[] payload)
{
var message = new ProducerMessage<string, byte[]>
{
Topic = "events",
Key = eventType,
Value = payload,
Headers = Headers.Create()
.Add("event-type", eventType)
.Add("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString())
};

_producer.Produce(message, (metadata, error) =>
{
if (error != null)
{
Interlocked.Increment(ref _failureCount);
_logger.LogWarning(error, "Failed to publish {EventType} event", eventType);
}
else
{
Interlocked.Increment(ref _successCount);
}
});
}

public (long Success, long Failure) GetStats() =>
(Interlocked.Read(ref _successCount), Interlocked.Read(ref _failureCount));

public async ValueTask DisposeAsync()
{
await _producer.FlushAsync();
await _producer.DisposeAsync();

var (success, failure) = GetStats();
_logger.LogInformation("EventPublisher shutdown: {Success} succeeded, {Failure} failed",
success, failure);
}
}

// Usage
var publisher = new EventPublisher(producer, logger);

// Publish many events quickly
foreach (var evt in events)
{
publisher.Publish(evt.Type, evt.Serialize());
}

// When shutting down
await publisher.DisposeAsync();

Combining with Batching Settings

For maximum throughput with fire-and-forget, tune the batching settings:

using Dekaf;

var producer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("localhost:9092")
.ForHighThroughput() // Preset with good defaults
.WithLingerMs(10) // Allow more time for batching
.WithBatchSize(131072) // Larger batches (128KB)
.UseLz4Compression() // Compress batches
.BuildAsync();

This configuration:

  • Waits up to 10ms to fill batches
  • Uses larger 128KB batches
  • Compresses data to reduce network usage
  • Uses leader-only acks for speed