JSON Serialization
JSON is the easy choice when you're working with complex objects. Install the package and you're off.
Installation
dotnet add package Dekaf.Serialization.Json
Basic Usage
Producer
using Dekaf.Serialization.Json;
var producer = await Kafka.CreateProducer<string, Order>()
.WithBootstrapServers("localhost:9092")
.WithValueSerializer(new JsonSerializer<Order>())
.BuildAsync();
var order = new Order
{
Id = "order-123",
CustomerId = "customer-456",
Total = 99.99m,
Items = new[] { "item1", "item2" }
};
await producer.ProduceAsync("orders", order.Id, order);
Consumer
using Dekaf.Serialization.Json;
var consumer = await Kafka.CreateConsumer<string, Order>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("order-processors")
.WithValueDeserializer(new JsonDeserializer<Order>())
.SubscribeTo("orders")
.BuildAsync();
await foreach (var message in consumer.ConsumeAsync(ct))
{
Order order = message.Value;
Console.WriteLine($"Processing order {order.Id} for ${order.Total}");
}
Custom JsonSerializerOptions
Configure System.Text.Json behavior:
using Dekaf;
var options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
};
var producer = await Kafka.CreateProducer<string, Order>()
.WithBootstrapServers("localhost:9092")
.WithValueSerializer(new JsonSerializer<Order>(options))
.BuildAsync();
Both Key and Value
Serialize both key and value as JSON:
using Dekaf;
var producer = await Kafka.CreateProducer<OrderKey, OrderEvent>()
.WithBootstrapServers("localhost:9092")
.WithKeySerializer(new JsonSerializer<OrderKey>())
.WithValueSerializer(new JsonSerializer<OrderEvent>())
.BuildAsync();
await producer.ProduceAsync("order-events",
new OrderKey { TenantId = "acme", OrderId = "123" },
new OrderCreated { Amount = 99.99m }
);
Error Handling
JSON deserialization errors throw SerializationException:
try
{
await foreach (var message in consumer.ConsumeAsync(ct))
{
ProcessOrder(message.Value);
}
}
catch (SerializationException ex)
{
_logger.LogError(ex, "Failed to deserialize message");
// Handle malformed JSON
}
Polymorphic Serialization
For polymorphic types, configure the serializer:
var options = new JsonSerializerOptions
{
TypeInfoResolver = new DefaultJsonTypeInfoResolver()
};
// With .NET 7+ polymorphism attributes
[JsonDerivedType(typeof(OrderCreated), "created")]
[JsonDerivedType(typeof(OrderShipped), "shipped")]
public abstract class OrderEvent { }
public class OrderCreated : OrderEvent { public decimal Amount { get; set; } }
public class OrderShipped : OrderEvent { public string TrackingId { get; set; } }
Performance Considerations
- JSON serialization adds overhead compared to binary formats
- Consider using source generators for better performance:
[JsonSerializable(typeof(Order))]
public partial class OrderJsonContext : JsonSerializerContext { }
var options = new JsonSerializerOptions
{
TypeInfoResolver = OrderJsonContext.Default
};
var serializer = new JsonSerializer<Order>(options);
Complete Example
using Dekaf;
public record Order(
string Id,
string CustomerId,
decimal Total,
DateTimeOffset CreatedAt,
IReadOnlyList<OrderItem> Items
);
public record OrderItem(string ProductId, int Quantity, decimal Price);
// Producer
var jsonOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
await using var producer = await Kafka.CreateProducer<string, Order>()
.WithBootstrapServers("localhost:9092")
.WithValueSerializer(new JsonSerializer<Order>(jsonOptions))
.BuildAsync();
var order = new Order(
Id: "order-123",
CustomerId: "cust-456",
Total: 149.99m,
CreatedAt: DateTimeOffset.UtcNow,
Items: new[]
{
new OrderItem("prod-1", 2, 49.99m),
new OrderItem("prod-2", 1, 50.01m)
}
);
await producer.ProduceAsync("orders", order.Id, order);
// Consumer
await using var consumer = await Kafka.CreateConsumer<string, Order>()
.WithBootstrapServers("localhost:9092")
.WithGroupId("order-processors")
.WithValueDeserializer(new JsonDeserializer<Order>(jsonOptions))
.SubscribeTo("orders")
.BuildAsync();
await foreach (var msg in consumer.ConsumeAsync(ct))
{
Console.WriteLine($"Order {msg.Value.Id}: ${msg.Value.Total}");
}