Skip to main content

Topic-Specific Producers

When your application produces to a single topic, you can use ITopicProducer<TKey, TValue> for a cleaner API. It binds to a specific topic at construction time, so you don't need to specify the topic on every call.

Creating a Topic Producer

Direct Creation

The simplest way to create a topic producer:

using Dekaf;

await using var producer = Kafka.CreateTopicProducer<string, string>(
"localhost:9092", "orders");

await producer.ProduceAsync("order-123", orderJson);

From the Builder

Use the builder for more configuration options:

using Dekaf;

await using var producer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithAcks(Acks.All)
.EnableIdempotence()
.BuildForTopicAsync("orders");

await producer.ProduceAsync("order-123", orderJson);

From an Existing Producer

Create topic producers from a shared base producer. This is useful when you have a few fixed topics but want to share connections and resources:

using Dekaf;

await using var baseProducer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("localhost:9092")
.WithAcks(Acks.All)
.BuildAsync();

// Create topic-specific wrappers (they share the base producer's resources)
var ordersProducer = baseProducer.ForTopic("orders");
var eventsProducer = baseProducer.ForTopic("events");

await ordersProducer.ProduceAsync("order-123", orderJson);
await eventsProducer.ProduceAsync("event-456", eventJson);

API Comparison

With a regular producer, you specify the topic on every call:

// Regular producer - topic on every call
await producer.ProduceAsync("orders", "key", "value");
await producer.ProduceAsync("orders", "key2", "value2");

With a topic producer, the topic is implicit:

// Topic producer - no topic parameter
await producer.ProduceAsync("key", "value");
await producer.ProduceAsync("key2", "value2");

Available Methods

ProduceAsync

Send a message and wait for acknowledgment:

// Key and value
var metadata = await producer.ProduceAsync("key", "value");

// With headers
var metadata = await producer.ProduceAsync("key", "value", headers);

// To a specific partition
var metadata = await producer.ProduceAsync(partition: 2, "key", "value");

// Full control with TopicProducerMessage
var metadata = await producer.ProduceAsync(new TopicProducerMessage<string, string>
{
Key = "key",
Value = "value",
Headers = headers,
Partition = 2,
Timestamp = DateTimeOffset.UtcNow
});

Send (Fire-and-Forget)

Send without waiting for acknowledgment:

// Basic fire-and-forget
producer.Produce("key", "value");

// With headers
producer.Produce("key", "value", headers);

// With delivery callback
producer.Produce("key", "value", (metadata, error) =>
{
if (error is not null)
Console.WriteLine($"Failed: {error.Message}");
else
Console.WriteLine($"Delivered to partition {metadata.Partition}");
});

ProduceAllAsync

Send multiple messages and wait for all acknowledgments:

// Simple tuples
var results = await producer.ProduceAllAsync(new[]
{
("key1", "value1"),
("key2", "value2"),
("key3", "value3")
});

// With TopicProducerMessage for full control
var results = await producer.ProduceAllAsync(new[]
{
new TopicProducerMessage<string, string> { Key = "key1", Value = "value1" },
new TopicProducerMessage<string, string> { Key = "key2", Value = "value2", Partition = 0 }
});

FlushAsync

Ensure all pending messages are delivered:

await producer.FlushAsync();

Disposal Semantics

The disposal behavior depends on how the topic producer was created:

Creation MethodOn Dispose
CreateTopicProducer()Disposes underlying producer
BuildForTopic()Disposes underlying producer
ForTopic()Does NOT dispose base producer

This allows safe resource sharing:

using Dekaf;

await using var baseProducer = Kafka.CreateProducer<string, string>("localhost:9092");

var orders = baseProducer.ForTopic("orders");
var events = baseProducer.ForTopic("events");

// Disposing topic producers doesn't affect the base producer
await orders.DisposeAsync();
await events.DisposeAsync();

// Base producer still works
await baseProducer.ProduceAsync("audit", "key", "value");

When to Use Topic Producers

Use topic producers when:

  • Your service produces to one or a few fixed topics
  • You want a cleaner API without repeating topic names
  • You're using dependency injection and want to inject a producer per topic

Use regular producers when:

  • You produce to many different topics dynamically
  • Topic names come from runtime data (e.g., routing based on message content)
  • You want maximum flexibility

Dependency Injection Example

Topic producers work well with DI:

using Dekaf;

// Registration
services.AddSingleton<ITopicProducer<string, OrderEvent>>(sp =>
{
return Kafka.CreateProducer<string, OrderEvent>()
.WithBootstrapServers(config["Kafka:BootstrapServers"])
.WithValueSerializer(new JsonSerializer<OrderEvent>())
.BuildForTopic("orders");
});

// Usage
public class OrderService
{
private readonly ITopicProducer<string, OrderEvent> _producer;

public OrderService(ITopicProducer<string, OrderEvent> producer)
{
_producer = producer;
}

public async Task PlaceOrderAsync(Order order)
{
var @event = new OrderEvent { OrderId = order.Id, Status = "Placed" };
await _producer.ProduceAsync(order.Id, @event);
}
}

Performance

Topic producers have zero overhead - they simply delegate to the underlying IKafkaProducer with the topic embedded. All the performance optimizations (batching, compression, connection pooling) work identically.