Skip to main content

Message Headers

Headers allow you to attach metadata to messages without including it in the message value. Common uses include trace IDs, message types, routing information, and timestamps.

Adding Headers

Using the Headers Builder

The fluent Headers API makes it easy to build headers:

var headers = Headers.Create()
.Add("correlation-id", correlationId)
.Add("source-service", "order-api")
.Add("message-type", "OrderCreated");

await producer.ProduceAsync(new ProducerMessage<string, string>
{
Topic = "orders",
Key = orderId,
Value = orderJson,
Headers = headers
});

Quick Creation

Create headers with a single key-value pair:

var headers = Headers.Create("trace-id", traceId);

Extension Method

Use the extension method for a cleaner API:

using Dekaf.Producer;

await producer.ProduceAsync("orders", orderId, orderJson, headers);

Conditional Headers

Add headers only when certain conditions are met:

var headers = Headers.Create()
.Add("request-id", requestId)
.AddIfNotNull("user-id", userId) // Only if userId != null
.AddIfNotNullOrEmpty("tenant", tenantId) // Only if not null or empty
.AddIf(isRetry, "retry-count", retryCount.ToString()); // Only if condition is true

This is cleaner than:

// ❌ Verbose alternative
var headers = Headers.Create().Add("request-id", requestId);
if (userId != null) headers.Add("user-id", userId);
if (!string.IsNullOrEmpty(tenantId)) headers.Add("tenant", tenantId);
if (isRetry) headers.Add("retry-count", retryCount.ToString());

Adding Multiple Headers

Add headers from a dictionary or collection:

var metadata = new Dictionary<string, string>
{
["version"] = "1.0",
["encoding"] = "utf-8",
["schema-id"] = "12345"
};

var headers = Headers.Create().AddRange(metadata);

Binary Headers

Headers can contain binary data:

// Add binary header
var signature = ComputeSignature(messageBody);
headers.Add("signature", signature); // byte[]

// Or from existing memory
headers.Add("checksum", checksumBytes.AsMemory());

Reading Headers (Consumer Side)

When consuming messages, access headers from the ConsumeResult:

await foreach (var message in consumer.ConsumeAsync(ct))
{
// Headers is IReadOnlyList<RecordHeader>
if (message.Headers != null)
{
foreach (var header in message.Headers)
{
Console.WriteLine($"{header.Key}: {header.GetValueString()}");
}
}

// Or get a specific header
var traceId = message.Headers?.FirstOrDefault(h => h.Key == "trace-id")?.GetValueString();
}

Common Header Patterns

Distributed Tracing

var headers = Headers.Create()
.Add("trace-id", Activity.Current?.TraceId.ToString() ?? Guid.NewGuid().ToString())
.Add("span-id", Activity.Current?.SpanId.ToString() ?? "")
.Add("parent-span-id", Activity.Current?.ParentSpanId.ToString() ?? "");

Message Typing

var headers = Headers.Create()
.Add("message-type", typeof(OrderCreatedEvent).FullName!)
.Add("message-version", "2")
.Add("content-type", "application/json");

Routing

var headers = Headers.Create()
.Add("tenant-id", tenantId)
.Add("region", region)
.Add("priority", priority.ToString());

Dead Letter Queue Context

var headers = Headers.Create()
.Add("original-topic", originalTopic)
.Add("original-partition", partition.ToString())
.Add("original-offset", offset.ToString())
.Add("failure-reason", exception.Message)
.Add("failure-time", DateTimeOffset.UtcNow.ToString("O"));

Performance Considerations

  • Headers are included in the message size limit
  • String values are UTF-8 encoded
  • Headers are compressed along with the message body
  • Keep header keys short and meaningful
// ✅ Good - short, meaningful keys
.Add("tid", traceId)
.Add("src", "order-svc")

// ❌ Avoid - verbose keys waste bytes
.Add("x-correlation-trace-identifier", traceId)
.Add("originating-service-name", "order-service")

Complete Example

public class OrderEventPublisher
{
private readonly IKafkaProducer<string, string> _producer;

public async Task PublishOrderCreatedAsync(Order order, string userId, string traceId)
{
var headers = Headers.Create()
// Tracing
.Add("trace-id", traceId)
.Add("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString())

// Event metadata
.Add("event-type", "OrderCreated")
.Add("event-version", "1")

// Context
.AddIfNotNull("user-id", userId)
.Add("source", "order-api");

var message = new ProducerMessage<string, string>
{
Topic = "order-events",
Key = order.Id,
Value = JsonSerializer.Serialize(order),
Headers = headers
};

await _producer.ProduceAsync(message);
}
}