Skip to main content

OAuth / OAUTHBEARER

OAUTHBEARER authentication allows using OAuth 2.0 tokens for Kafka authentication.

Using OAuth Configuration

using Dekaf;

var oauthConfig = new OAuthBearerConfig
{
TokenEndpoint = "https://auth.example.com/oauth2/token",
ClientId = "my-kafka-client",
ClientSecret = "client-secret",
Scope = "kafka"
};

var producer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("kafka.example.com:9092")
.UseTls()
.WithOAuthBearer(oauthConfig)
.BuildAsync();

Custom Token Provider

For more control, implement a custom token provider:

using Dekaf;

var producer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("kafka.example.com:9092")
.UseTls()
.WithOAuthBearer(async ct =>
{
var token = await GetTokenFromIdentityProviderAsync(ct);
return new OAuthBearerToken
{
Value = token.AccessToken,
ExpiresAt = token.ExpiresAt,
Principal = token.Subject
};
})
.BuildAsync();

Azure AD Example

using Dekaf;

var credential = new DefaultAzureCredential();

var producer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("kafka.example.com:9092")
.UseTls()
.WithOAuthBearer(async ct =>
{
var token = await credential.GetTokenAsync(
new TokenRequestContext(new[] { "https://eventhubs.azure.net/.default" }),
ct
);

return new OAuthBearerToken
{
Value = token.Token,
ExpiresAt = token.ExpiresOn
};
})
.BuildAsync();

AWS MSK IAM

For AWS MSK with IAM authentication:

using Dekaf;

var producer = await Kafka.CreateProducer<string, string>()
.WithBootstrapServers("broker.msk.us-east-1.amazonaws.com:9098")
.UseTls()
.WithOAuthBearer(async ct =>
{
// Use AWS SDK to generate IAM token
var token = await GenerateMskIamTokenAsync(ct);
return new OAuthBearerToken
{
Value = token,
ExpiresAt = DateTimeOffset.UtcNow.AddMinutes(5)
};
})
.BuildAsync();

Token Refresh

Dekaf automatically refreshes tokens before they expire. The token provider is called whenever a new token is needed.

.WithOAuthBearer(async ct =>
{
_logger.LogDebug("Refreshing OAuth token");

var token = await _tokenService.GetTokenAsync(ct);

_logger.LogDebug("Token expires at {ExpiresAt}", token.ExpiresAt);

return token;
})

Complete Example

using Dekaf;

public class OAuthKafkaClientFactory
{
private readonly ITokenService _tokenService;
private readonly IConfiguration _config;

public OAuthKafkaClientFactory(ITokenService tokenService, IConfiguration config)
{
_tokenService = tokenService;
_config = config;
}

public async Task<IKafkaProducer<string, string>> CreateProducer()
{
return await Kafka.CreateProducer<string, string>()
.WithBootstrapServers(_config["Kafka:BootstrapServers"])
.UseTls()
.WithOAuthBearer(GetTokenAsync)
.BuildAsync();
}

private async ValueTask<OAuthBearerToken> GetTokenAsync(CancellationToken ct)
{
var token = await _tokenService.GetAccessTokenAsync(
_config["OAuth:Scope"],
ct
);

return new OAuthBearerToken
{
Value = token.AccessToken,
ExpiresAt = token.ExpiresAt,
Principal = token.Claims.Subject
};
}
}