Skip to main content

Dependency Injection

Dekaf plays nicely with ASP.NET Core's DI container. Here's how to wire it up.

Installation

dotnet add package Dekaf.Extensions.DependencyInjection

Basic Registration

Use AddDekaf to register producers and consumers:

using Dekaf.Extensions.DependencyInjection;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDekaf(dekaf =>
{
dekaf.AddProducer<string, string>(producer => producer
.WithBootstrapServers(builder.Configuration["Kafka:BootstrapServers"]!)
.ForReliability());

dekaf.AddConsumer<string, string>(consumer => consumer
.WithBootstrapServers(builder.Configuration["Kafka:BootstrapServers"]!)
.WithGroupId("my-service"));
});

Using in Services

Inject the interfaces:

public class OrderService
{
private readonly IKafkaProducer<string, string> _producer;

public OrderService(IKafkaProducer<string, string> producer)
{
_producer = producer;
}

public async Task PublishOrderAsync(Order order)
{
await _producer.ProduceAsync("orders", order.Id, JsonSerializer.Serialize(order));
}
}

Multiple Producers/Consumers

Register multiple producers or consumers with different type parameters:

builder.Services.AddDekaf(dekaf =>
{
dekaf.AddProducer<string, string>(producer => producer
.WithBootstrapServers(config["Kafka:BootstrapServers"]!)
.ForReliability());

dekaf.AddProducer<string, byte[]>(producer => producer
.WithBootstrapServers(config["Kafka:BootstrapServers"]!)
.ForHighThroughput());
});

Global Interceptors

Register cross-cutting interceptors (tracing, metrics, audit logging) that apply to all producers or consumers automatically:

builder.Services.AddDekaf(dekaf =>
{
// Global interceptors apply to every producer/consumer
dekaf.AddGlobalProducerInterceptor(typeof(TracingInterceptor<,>));
dekaf.AddGlobalConsumerInterceptor(typeof(MetricsInterceptor<,>));

dekaf.AddProducer<string, string>(producer => producer
.WithBootstrapServers("localhost:9092"));

dekaf.AddConsumer<string, string>(consumer => consumer
.WithBootstrapServers("localhost:9092")
.WithGroupId("my-service"));
});

Global interceptors execute before per-instance interceptors, in registration order. They are constructed via ActivatorUtilities, so their dependencies are resolved from the DI container.

Open generic interceptor types (e.g., typeof(TracingInterceptor<,>)) are automatically closed with the producer's or consumer's TKey/TValue type arguments. Concrete types that implement the interface for specific type combinations are also supported.

Per-Instance Interceptors

You can also add interceptors to individual producers or consumers:

builder.Services.AddDekaf(dekaf =>
{
dekaf.AddGlobalProducerInterceptor(typeof(TracingInterceptor<,>));

dekaf.AddProducer<string, string>(producer => producer
.WithBootstrapServers("localhost:9092")
.AddInterceptor(new AuditInterceptor())); // Runs after global interceptors
});

Background Consumer Service

Create a hosted service for continuous consumption:

public class OrderConsumerService : BackgroundService
{
private readonly IKafkaConsumer<string, string> _consumer;
private readonly ILogger<OrderConsumerService> _logger;

public OrderConsumerService(
IKafkaConsumer<string, string> consumer,
ILogger<OrderConsumerService> logger)
{
_consumer = consumer;
_logger = logger;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var message in _consumer.ConsumeAsync(stoppingToken))
{
_logger.LogInformation("Received: {Key}", message.Key);
// Process message
}
}
}

// Registration
builder.Services.AddHostedService<OrderConsumerService>();

Configuration from appsettings.json

{
"Kafka": {
"BootstrapServers": "localhost:9092",
"Producer": {
"ClientId": "my-service-producer"
},
"Consumer": {
"ClientId": "my-service-consumer",
"GroupId": "my-service"
}
}
}
builder.Services.AddDekaf(dekaf =>
{
var config = builder.Configuration.GetSection("Kafka");

dekaf.AddProducer<string, string>(producer => producer
.WithBootstrapServers(config["BootstrapServers"]!)
.WithClientId(config["Producer:ClientId"]!));

dekaf.AddConsumer<string, string>(consumer => consumer
.WithBootstrapServers(config["BootstrapServers"]!)
.WithClientId(config["Consumer:ClientId"]!)
.WithGroupId(config["Consumer:GroupId"]!));
});

Lifetime Management

Both producers and consumers are registered as singletons. This is intentional—they're expensive to create, thread-safe, and meant to be reused. They'll be disposed automatically when your app shuts down.

Complete Example

// Program.cs
var builder = WebApplication.CreateBuilder(args);

var kafkaConfig = builder.Configuration.GetSection("Kafka");

builder.Services.AddDekaf(dekaf =>
{
// Register producer
dekaf.AddProducer<string, Order>(producer => producer
.WithBootstrapServers(kafkaConfig["BootstrapServers"]!)
.WithValueSerializer(new JsonSerializer<Order>())
.ForReliability());

// Register consumer
dekaf.AddConsumer<string, Order>(consumer => consumer
.WithBootstrapServers(kafkaConfig["BootstrapServers"]!)
.WithGroupId(kafkaConfig["GroupId"]!)
.WithValueDeserializer(new JsonDeserializer<Order>()));
});

// Register background consumer
builder.Services.AddHostedService<OrderProcessorService>();

var app = builder.Build();
app.MapControllers();
app.Run();

// OrderController.cs
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IKafkaProducer<string, Order> _producer;

public OrdersController(IKafkaProducer<string, Order> producer)
{
_producer = producer;
}

[HttpPost]
public async Task<IActionResult> CreateOrder(Order order)
{
await _producer.ProduceAsync("orders", order.Id, order);
return Accepted();
}
}

// OrderProcessorService.cs
public class OrderProcessorService : BackgroundService
{
private readonly IKafkaConsumer<string, Order> _consumer;
private readonly IOrderRepository _repository;

public OrderProcessorService(
IKafkaConsumer<string, Order> consumer,
IOrderRepository repository)
{
_consumer = consumer;
_repository = repository;
}

protected override async Task ExecuteAsync(CancellationToken ct)
{
await foreach (var msg in _consumer.ConsumeAsync(ct))
{
await _repository.SaveAsync(msg.Value);
}
}
}