Building Event-Driven Architecture: Publishing Keycloak Events to Kafka with Custom SPI

A comprehensive guide to developing and deploying a production-ready Keycloak SPI plugin that streams authentication and admin events to Kafka for real-time processing and analytics.

20 min read
#keycloak#kafka#spi#event-driven#microservices#java#security#identity-management

Building Event-Driven Architecture: Publishing Keycloak Events to Kafka with Custom SPI

Hero
Keycloak Events to Kafka with Custom SPI

In modern distributed systems, capturing and processing identity and access management events in real-time is crucial for security monitoring, analytics, and compliance. This comprehensive guide demonstrates how to build a production-ready Keycloak Service Provider Interface (SPI) plugin that streams events to Apache Kafka.

Understanding Keycloak SPI Architecture

Keycloak's Service Provider Interface (SPI) provides a powerful extension mechanism that allows developers to customize and extend Keycloak's functionality without modifying its core codebase. Built on top of JBoss modules, SPIs enable seamless integration with external systems while maintaining upgrade compatibility.

Key Benefits of Event Streaming

  • Real-time Security Monitoring: Detect suspicious login patterns and security threats instantly
  • Compliance and Auditing: Maintain comprehensive audit trails for regulatory requirements
  • User Analytics: Track user behavior and authentication patterns for business insights
  • System Integration: Enable downstream systems to react to identity events immediately

Project Setup and Dependencies

Let's start by creating a robust Java project structure with all necessary dependencies.

Enhanced build.gradle Configuration

hljs gradle62 lines
plugins {
    id 'java'
    id 'maven-publish'
    id 'com.github.johnrengelman.shadow' version '7.1.2'
}

group 'com.enterprise'
version '2.0.0'
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

ext {
    keycloakVersion = '22.0.1'
    kafkaVersion = '3.5.0'
    jacksonVersion = '2.15.2'
    slf4jVersion = '1.7.36'
}

repositories {
    mavenCentral()
}

dependencies {
    // Keycloak SPI Dependencies
    compileOnly group: 'org.keycloak', name: 'keycloak-server-spi', version: "${keycloakVersion}"
    compileOnly group: 'org.keycloak', name: 'keycloak-server-spi-private', version: "${keycloakVersion}"
    compileOnly group: 'org.keycloak', name: 'keycloak-services', version: "${keycloakVersion}"

    // Kafka Client
    implementation group: 'org.apache.kafka', name: 'kafka-clients', version: "${kafkaVersion}"

    // JSON Processing
    implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jacksonVersion}"
    implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: "${jacksonVersion}"

    // Logging
    compileOnly group: 'org.slf4j', name: 'slf4j-api', version: "${slf4jVersion}"

    // Testing
    testImplementation 'org.junit.jupiter:junit-jupiter:5.9.2'
    testImplementation 'org.mockito:mockito-core:5.3.1'
    testImplementation 'org.testcontainers:kafka:1.18.3'
}

shadowJar {
    archiveBaseName = 'keycloak-kafka-events-spi'
    archiveClassifier = ''

    // Relocate dependencies to avoid conflicts
    relocate 'org.apache.kafka', 'com.enterprise.shaded.kafka'
    relocate 'com.fasterxml.jackson', 'com.enterprise.shaded.jackson'

    // Exclude unnecessary files
    exclude 'META-INF/*.SF'
    exclude 'META-INF/*.DSA'
    exclude 'META-INF/*.RSA'
}

test {
    useJUnitPlatform()
}

Enhanced Event Listener Factory

The factory class is responsible for initializing the Kafka producer and managing the event listener lifecycle with comprehensive configuration and error handling.

hljs java199 lines
package com.enterprise.keycloak.events;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.keycloak.Config;
import org.keycloak.events.EventListenerProvider;
import org.keycloak.events.EventListenerProviderFactory;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class EnhancedKafkaEventListenerFactory implements EventListenerProviderFactory {

    private static final Logger logger = LoggerFactory.getLogger(EnhancedKafkaEventListenerFactory.class);
    private static final String PROVIDER_ID = "enhanced-kafka-events";

    // Configuration keys
    private static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS";
    private static final String KAFKA_TOPIC_EVENTS = "KAFKA_TOPIC_EVENTS";
    private static final String KAFKA_TOPIC_ADMIN_EVENTS = "KAFKA_TOPIC_ADMIN_EVENTS";
    private static final String KAFKA_SSL_ENABLED = "KAFKA_SSL_ENABLED";
    private static final String KAFKA_SASL_ENABLED = "KAFKA_SASL_ENABLED";
    private static final String KAFKA_RETRIES = "KAFKA_RETRIES";
    private static final String KAFKA_BATCH_SIZE = "KAFKA_BATCH_SIZE";
    private static final String KAFKA_LINGER_MS = "KAFKA_LINGER_MS";
    private static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY";
    private static final String KAFKA_COMPRESSION_TYPE = "KAFKA_COMPRESSION_TYPE";

    // Thread-safe cache for event listeners per session
    private final ConcurrentMap<String, EnhancedKafkaEventListener> listenerCache = new ConcurrentHashMap<>();

    private volatile Properties kafkaProperties;
    private volatile String eventsTopicName;
    private volatile String adminEventsTopicName;
    private volatile boolean isInitialized = false;

    @Override
    public EventListenerProvider create(KeycloakSession session) {
        if (!isInitialized) {
            logger.warn("Kafka event listener factory not properly initialized");
            return null;
        }

        String sessionId = session.getContext().getConnection().getRemoteAddr();
        return listenerCache.computeIfAbsent(sessionId,
            k -> new EnhancedKafkaEventListener(eventsTopicName, adminEventsTopicName, kafkaProperties));
    }

    @Override
    public void init(Config.Scope config) {
        logger.info("Initializing Enhanced Kafka Event Listener Factory");

        try {
            validateAndLoadConfiguration();
            kafkaProperties = buildKafkaProperties();
            isInitialized = true;
            logger.info("Kafka Event Listener Factory initialized successfully");
        } catch (Exception e) {
            logger.error("Failed to initialize Kafka Event Listener Factory", e);
            throw new RuntimeException("Kafka Event Listener initialization failed", e);
        }
    }

    @Override
    public void postInit(KeycloakSessionFactory factory) {
        logger.info("Post-initialization of Kafka Event Listener Factory completed");
    }

    @Override
    public void close() {
        logger.info("Closing Kafka Event Listener Factory");
        listenerCache.values().forEach(listener -> {
            try {
                listener.close();
            } catch (Exception e) {
                logger.warn("Error closing event listener", e);
            }
        });
        listenerCache.clear();
    }

    @Override
    public String getId() {
        return PROVIDER_ID;
    }

    private void validateAndLoadConfiguration() {
        // Load configuration from environment variables
        String bootstrapServers = getRequiredEnvVar(KAFKA_BOOTSTRAP_SERVERS);
        eventsTopicName = getRequiredEnvVar(KAFKA_TOPIC_EVENTS);
        adminEventsTopicName = getEnvVar(KAFKA_TOPIC_ADMIN_EVENTS, eventsTopicName + "-admin");

        boolean sslEnabled = Boolean.parseBoolean(getEnvVar(KAFKA_SSL_ENABLED, "false"));
        boolean saslEnabled = Boolean.parseBoolean(getEnvVar(KAFKA_SASL_ENABLED, "false"));

        logger.info("Kafka Configuration - Bootstrap Servers: {}", bootstrapServers);
        logger.info("Kafka Configuration - Events Topic: {}", eventsTopicName);
        logger.info("Kafka Configuration - Admin Events Topic: {}", adminEventsTopicName);
        logger.info("Kafka Configuration - SSL Enabled: {}", sslEnabled);
        logger.info("Kafka Configuration - SASL Enabled: {}", saslEnabled);

        // Validate SSL configuration
        if (sslEnabled) {
            validateSslConfiguration();
        }

        // Validate SASL configuration
        if (saslEnabled) {
            validateSaslConfiguration();
        }
    }

    private Properties buildKafkaProperties() {
        Properties props = new Properties();

        // Basic producer configuration
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getRequiredEnvVar(KAFKA_BOOTSTRAP_SERVERS));
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Performance optimization
        props.put(ProducerConfig.RETRIES_CONFIG, getEnvVar(KAFKA_RETRIES, "3"));
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, getEnvVar(KAFKA_BATCH_SIZE, "16384"));
        props.put(ProducerConfig.LINGER_MS_CONFIG, getEnvVar(KAFKA_LINGER_MS, "5"));
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, getEnvVar(KAFKA_BUFFER_MEMORY, "33554432"));
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, getEnvVar(KAFKA_COMPRESSION_TYPE, "snappy"));

        // Reliability settings
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");

        // SSL Configuration
        if (Boolean.parseBoolean(getEnvVar(KAFKA_SSL_ENABLED, "false"))) {
            configureSsl(props);
        }

        // SASL Configuration
        if (Boolean.parseBoolean(getEnvVar(KAFKA_SASL_ENABLED, "false"))) {
            configureSasl(props);
        }

        return props;
    }

    private void configureSsl(Properties props) {
        props.put("security.protocol", getEnvVar("KAFKA_SECURITY_PROTOCOL", "SSL"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getRequiredEnvVar("KAFKA_SSL_TRUSTSTORE_LOCATION"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getRequiredEnvVar("KAFKA_SSL_TRUSTSTORE_PASSWORD"));
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getRequiredEnvVar("KAFKA_SSL_KEYSTORE_LOCATION"));
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getRequiredEnvVar("KAFKA_SSL_KEYSTORE_PASSWORD"));
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, getEnvVar("KAFKA_SSL_KEY_PASSWORD", ""));
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    }

    private void configureSasl(Properties props) {
        props.put(SaslConfigs.SASL_MECHANISM, getEnvVar("KAFKA_SASL_MECHANISM", "PLAIN"));
        props.put(SaslConfigs.SASL_JAAS_CONFIG, getRequiredEnvVar("KAFKA_SASL_JAAS_CONFIG"));

        String currentProtocol = props.getProperty("security.protocol", "PLAINTEXT");
        if (currentProtocol.equals("SSL")) {
            props.put("security.protocol", "SASL_SSL");
        } else {
            props.put("security.protocol", "SASL_PLAINTEXT");
        }
    }

    private void validateSslConfiguration() {
        getRequiredEnvVar("KAFKA_SSL_TRUSTSTORE_LOCATION");
        getRequiredEnvVar("KAFKA_SSL_TRUSTSTORE_PASSWORD");
        getRequiredEnvVar("KAFKA_SSL_KEYSTORE_LOCATION");
        getRequiredEnvVar("KAFKA_SSL_KEYSTORE_PASSWORD");
    }

    private void validateSaslConfiguration() {
        getRequiredEnvVar("KAFKA_SASL_JAAS_CONFIG");
    }

    private String getRequiredEnvVar(String key) {
        String value = System.getenv(key);
        if (value == null || value.trim().isEmpty()) {
            throw new IllegalArgumentException("Required environment variable not set: " + key);
        }
        return value.trim();
    }

    private String getEnvVar(String key, String defaultValue) {
        String value = System.getenv(key);
        return (value != null && !value.trim().isEmpty()) ? value.trim() : defaultValue;
    }
}

Enhanced Event Listener with Filtering and Metrics

The event listener handles both user events and admin events with sophisticated filtering, error handling, and performance monitoring.

hljs java278 lines
package com.enterprise.keycloak.events;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.keycloak.events.Event;
import org.keycloak.events.EventListenerProvider;
import org.keycloak.events.EventType;
import org.keycloak.events.admin.AdminEvent;
import org.keycloak.events.admin.OperationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

public class EnhancedKafkaEventListener implements EventListenerProvider {

    private static final Logger logger = LoggerFactory.getLogger(EnhancedKafkaEventListener.class);

    private final EnhancedKafkaProducer kafkaProducer;
    private final ObjectMapper objectMapper;
    private final String eventsTopicName;
    private final String adminEventsTopicName;

    // Metrics
    private final AtomicLong eventsProcessed = new AtomicLong(0);
    private final AtomicLong adminEventsProcessed = new AtomicLong(0);
    private final AtomicLong errorsCount = new AtomicLong(0);

    // Event filtering configuration
    private final EnumSet<EventType> filteredEventTypes;
    private final EnumSet<OperationType> filteredOperationTypes;

    public EnhancedKafkaEventListener(String eventsTopicName, String adminEventsTopicName, Properties kafkaProperties) {
        this.eventsTopicName = eventsTopicName;
        this.adminEventsTopicName = adminEventsTopicName;
        this.kafkaProducer = new EnhancedKafkaProducer(kafkaProperties);

        // Configure JSON mapper
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(new JavaTimeModule());
        this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);

        // Initialize event filters
        this.filteredEventTypes = initializeEventTypeFilters();
        this.filteredOperationTypes = initializeOperationTypeFilters();

        logger.info("Enhanced Kafka Event Listener initialized for topics: {} and {}", eventsTopicName, adminEventsTopicName);
    }

    @Override
    public void onEvent(Event event) {
        if (event == null || !shouldProcessEvent(event)) {
            return;
        }

        try {
            EnrichedEvent enrichedEvent = enrichEvent(event);
            String eventJson = objectMapper.writeValueAsString(enrichedEvent);

            String partitionKey = generatePartitionKey(event.getUserId(), event.getRealmId());
            kafkaProducer.sendEvent(eventsTopicName, partitionKey, eventJson);

            eventsProcessed.incrementAndGet();

            if (logger.isDebugEnabled()) {
                logger.debug("Successfully published event: {} for user: {} in realm: {}",
                    event.getType(), event.getUserId(), event.getRealmId());
            }

        } catch (JsonProcessingException e) {
            errorsCount.incrementAndGet();
            logger.error("Failed to serialize event: {} for user: {}", event.getType(), event.getUserId(), e);
        } catch (Exception e) {
            errorsCount.incrementAndGet();
            logger.error("Failed to publish event: {} for user: {}", event.getType(), event.getUserId(), e);
        }
    }

    @Override
    public void onEvent(AdminEvent adminEvent, boolean includeRepresentation) {
        if (adminEvent == null || !shouldProcessAdminEvent(adminEvent)) {
            return;
        }

        try {
            EnrichedAdminEvent enrichedAdminEvent = enrichAdminEvent(adminEvent, includeRepresentation);
            String eventJson = objectMapper.writeValueAsString(enrichedAdminEvent);

            String partitionKey = generatePartitionKey(adminEvent.getAuthDetails().getUserId(), adminEvent.getRealmId());
            kafkaProducer.sendEvent(adminEventsTopicName, partitionKey, eventJson);

            adminEventsProcessed.incrementAndGet();

            if (logger.isDebugEnabled()) {
                logger.debug("Successfully published admin event: {} for resource: {} by user: {}",
                    adminEvent.getOperationType(), adminEvent.getResourceType(),
                    adminEvent.getAuthDetails().getUserId());
            }

        } catch (JsonProcessingException e) {
            errorsCount.incrementAndGet();
            logger.error("Failed to serialize admin event: {} for resource: {}",
                adminEvent.getOperationType(), adminEvent.getResourceType(), e);
        } catch (Exception e) {
            errorsCount.incrementAndGet();
            logger.error("Failed to publish admin event: {} for resource: {}",
                adminEvent.getOperationType(), adminEvent.getResourceType(), e);
        }
    }

    @Override
    public void close() {
        logger.info("Closing Enhanced Kafka Event Listener. Stats - Events: {}, Admin Events: {}, Errors: {}",
            eventsProcessed.get(), adminEventsProcessed.get(), errorsCount.get());

        if (kafkaProducer != null) {
            kafkaProducer.close();
        }
    }

    private boolean shouldProcessEvent(Event event) {
        return filteredEventTypes.contains(event.getType());
    }

    private boolean shouldProcessAdminEvent(AdminEvent adminEvent) {
        return filteredOperationTypes.contains(adminEvent.getOperationType());
    }

    private EnrichedEvent enrichEvent(Event event) {
        EnrichedEvent enriched = new EnrichedEvent();
        enriched.setEventId(java.util.UUID.randomUUID().toString());
        enriched.setTimestamp(Instant.now());
        enriched.setOriginalEvent(event);
        enriched.setSource("keycloak-spi");
        enriched.setVersion("2.0.0");

        // Add custom metadata
        Map<String, Object> metadata = new HashMap<>();
        metadata.put("severity", calculateEventSeverity(event.getType()));
        metadata.put("category", categorizeEvent(event.getType()));
        enriched.setMetadata(metadata);

        return enriched;
    }

    private EnrichedAdminEvent enrichAdminEvent(AdminEvent adminEvent, boolean includeRepresentation) {
        EnrichedAdminEvent enriched = new EnrichedAdminEvent();
        enriched.setEventId(java.util.UUID.randomUUID().toString());
        enriched.setTimestamp(Instant.now());
        enriched.setOriginalEvent(adminEvent);
        enriched.setSource("keycloak-spi");
        enriched.setVersion("2.0.0");
        enriched.setIncludeRepresentation(includeRepresentation);

        // Add custom metadata
        Map<String, Object> metadata = new HashMap<>();
        metadata.put("severity", calculateAdminEventSeverity(adminEvent.getOperationType()));
        metadata.put("category", "admin-operation");
        metadata.put("resourcePath", adminEvent.getResourcePath());
        enriched.setMetadata(metadata);

        return enriched;
    }

    private String generatePartitionKey(String userId, String realmId) {
        // Use userId for better distribution, fallback to realmId if userId is null
        return userId != null ? userId : (realmId != null ? realmId : "unknown");
    }

    private String calculateEventSeverity(EventType eventType) {
        switch (eventType) {
            case LOGIN_ERROR:
            case INVALID_USER_CREDENTIALS:
            case USER_DISABLED_BY_PERMANENT_LOCKOUT:
                return "HIGH";
            case LOGIN:
            case LOGOUT:
            case REFRESH_TOKEN:
                return "MEDIUM";
            default:
                return "LOW";
        }
    }

    private String calculateAdminEventSeverity(OperationType operationType) {
        switch (operationType) {
            case DELETE:
                return "HIGH";
            case CREATE:
            case UPDATE:
                return "MEDIUM";
            default:
                return "LOW";
        }
    }

    private String categorizeEvent(EventType eventType) {
        if (eventType.name().contains("LOGIN") || eventType.name().contains("LOGOUT")) {
            return "authentication";
        } else if (eventType.name().contains("REGISTER") || eventType.name().contains("UPDATE_PROFILE")) {
            return "user-management";
        } else if (eventType.name().contains("TOKEN")) {
            return "token-management";
        } else {
            return "general";
        }
    }

    private EnumSet<EventType> initializeEventTypeFilters() {
        // Configure which events to process based on environment variable
        String filterConfig = System.getenv("KAFKA_EVENT_TYPES_FILTER");
        if (filterConfig != null && !filterConfig.trim().isEmpty()) {
            // Parse custom filter configuration
            return parseEventTypeFilter(filterConfig);
        }

        // Default filter - include most important events
        return EnumSet.of(
            EventType.LOGIN, EventType.LOGIN_ERROR, EventType.LOGOUT,
            EventType.REGISTER, EventType.REGISTER_ERROR,
            EventType.UPDATE_PROFILE, EventType.UPDATE_PASSWORD,
            EventType.VERIFY_EMAIL, EventType.RESET_PASSWORD,
            EventType.INVALID_USER_CREDENTIALS,
            EventType.USER_DISABLED_BY_PERMANENT_LOCKOUT
        );
    }

    private EnumSet<OperationType> initializeOperationTypeFilters() {
        // Configure which admin operations to process
        String filterConfig = System.getenv("KAFKA_ADMIN_OPERATION_TYPES_FILTER");
        if (filterConfig != null && !filterConfig.trim().isEmpty()) {
            return parseOperationTypeFilter(filterConfig);
        }

        // Default filter - include all operations
        return EnumSet.allOf(OperationType.class);
    }

    private EnumSet<EventType> parseEventTypeFilter(String filterConfig) {
        EnumSet<EventType> filter = EnumSet.noneOf(EventType.class);
        String[] types = filterConfig.split(",");
        for (String type : types) {
            try {
                filter.add(EventType.valueOf(type.trim().toUpperCase()));
            } catch (IllegalArgumentException e) {
                logger.warn("Invalid event type in filter configuration: {}", type.trim());
            }
        }
        return filter;
    }

    private EnumSet<OperationType> parseOperationTypeFilter(String filterConfig) {
        EnumSet<OperationType> filter = EnumSet.noneOf(OperationType.class);
        String[] types = filterConfig.split(",");
        for (String type : types) {
            try {
                filter.add(OperationType.valueOf(type.trim().toUpperCase()));
            } catch (IllegalArgumentException e) {
                logger.warn("Invalid operation type in filter configuration: {}", type.trim());
            }
        }
        return filter;
    }

    // Metrics getter methods for monitoring
    public long getEventsProcessed() { return eventsProcessed.get(); }
    public long getAdminEventsProcessed() { return adminEventsProcessed.get(); }
    public long getErrorsCount() { return errorsCount.get(); }
}

Robust Kafka Producer with Circuit Breaker

The Kafka producer implementation includes retry logic, circuit breaker pattern, and comprehensive error handling.

hljs java163 lines
package com.enterprise.keycloak.events;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class EnhancedKafkaProducer {

    private static final Logger logger = LoggerFactory.getLogger(EnhancedKafkaProducer.class);
    private static final int DEFAULT_SEND_TIMEOUT_SECONDS = 30;
    private static final int CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5;
    private static final long CIRCUIT_BREAKER_TIMEOUT_MS = 60000; // 1 minute

    private final KafkaProducer<String, String> producer;
    private final AtomicLong successCount = new AtomicLong(0);
    private final AtomicLong failureCount = new AtomicLong(0);

    // Circuit breaker implementation
    private final AtomicInteger consecutiveFailures = new AtomicInteger(0);
    private volatile long lastFailureTime = 0;
    private volatile boolean circuitOpen = false;

    public EnhancedKafkaProducer(Properties kafkaProperties) {
        // Set context class loader to avoid classloading issues in Keycloak
        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());

        try {
            this.producer = new KafkaProducer<>(kafkaProperties);
            logger.info("Kafka producer initialized successfully");
        } catch (Exception e) {
            logger.error("Failed to initialize Kafka producer", e);
            throw new RuntimeException("Kafka producer initialization failed", e);
        }
    }

    public CompletableFuture<RecordMetadata> sendEvent(String topic, String key, String value) {
        if (isCircuitOpen()) {
            CompletableFuture<RecordMetadata> failedFuture = new CompletableFuture<>();
            failedFuture.completeExceptionally(new RuntimeException("Circuit breaker is open"));
            return failedFuture;
        }

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        CompletableFuture<RecordMetadata> future = new CompletableFuture<>();

        try {
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    handleSuccess(metadata, future);
                } else {
                    handleFailure(exception, future, topic, key);
                }
            });
        } catch (Exception e) {
            handleFailure(e, future, topic, key);
        }

        return future;
    }

    private void handleSuccess(RecordMetadata metadata, CompletableFuture<RecordMetadata> future) {
        successCount.incrementAndGet();
        resetCircuitBreaker();
        future.complete(metadata);

        if (logger.isDebugEnabled()) {
            logger.debug("Message sent successfully to topic: {}, partition: {}, offset: {}",
                metadata.topic(), metadata.partition(), metadata.offset());
        }
    }

    private void handleFailure(Exception exception, CompletableFuture<RecordMetadata> future, String topic, String key) {
        failureCount.incrementAndGet();

        if (isRetriableError(exception)) {
            logger.warn("Retriable error sending message to topic: {} with key: {}", topic, key, exception);
            updateCircuitBreakerOnFailure();
        } else {
            logger.error("Non-retriable error sending message to topic: {} with key: {}", topic, key, exception);
            updateCircuitBreakerOnFailure();
        }

        future.completeExceptionally(exception);
    }

    private boolean isRetriableError(Exception exception) {
        return exception instanceof RetriableException ||
               exception.getCause() instanceof RetriableException;
    }

    private boolean isFatalError(Exception exception) {
        return exception instanceof ProducerFencedException ||
               exception instanceof OutOfOrderSequenceException ||
               exception instanceof AuthorizationException;
    }

    private void updateCircuitBreakerOnFailure() {
        int failures = consecutiveFailures.incrementAndGet();
        lastFailureTime = System.currentTimeMillis();

        if (failures >= CIRCUIT_BREAKER_FAILURE_THRESHOLD) {
            circuitOpen = true;
            logger.warn("Circuit breaker opened after {} consecutive failures", failures);
        }
    }

    private void resetCircuitBreaker() {
        if (consecutiveFailures.get() > 0) {
            consecutiveFailures.set(0);
            circuitOpen = false;
            logger.info("Circuit breaker reset after successful send");
        }
    }

    private boolean isCircuitOpen() {
        if (!circuitOpen) {
            return false;
        }

        // Check if timeout has passed and we should try again
        if (System.currentTimeMillis() - lastFailureTime > CIRCUIT_BREAKER_TIMEOUT_MS) {
            logger.info("Circuit breaker timeout expired, attempting to close circuit");
            circuitOpen = false;
            consecutiveFailures.set(0);
            return false;
        }

        return true;
    }

    public void close() {
        logger.info("Closing Kafka producer. Stats - Success: {}, Failures: {}",
            successCount.get(), failureCount.get());

        try {
            if (producer != null) {
                producer.flush();
                producer.close(DEFAULT_SEND_TIMEOUT_SECONDS, TimeUnit.SECONDS);
            }
        } catch (Exception e) {
            logger.warn("Error closing Kafka producer", e);
        }
    }

    // Metrics getter methods
    public long getSuccessCount() { return successCount.get(); }
    public long getFailureCount() { return failureCount.get(); }
    public boolean isCircuitBreakerOpen() { return circuitOpen; }
}

Event Data Models

Define enriched event models for better data structure and analytics.

hljs java40 lines
package com.enterprise.keycloak.events.model;

import com.fasterxml.jackson.annotation.JsonInclude;
import org.keycloak.events.Event;

import java.time.Instant;
import java.util.Map;

@JsonInclude(JsonInclude.Include.NON_NULL)
public class EnrichedEvent {
    private String eventId;
    private Instant timestamp;
    private String source;
    private String version;
    private Event originalEvent;
    private Map<String, Object> metadata;

    // Constructors
    public EnrichedEvent() {}

    // Getters and Setters
    public String getEventId() { return eventId; }
    public void setEventId(String eventId) { this.eventId = eventId; }

    public Instant getTimestamp() { return timestamp; }
    public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }

    public String getSource() { return source; }
    public void setSource(String source) { this.source = source; }

    public String getVersion() { return version; }
    public void setVersion(String version) { this.version = version; }

    public Event getOriginalEvent() { return originalEvent; }
    public void setOriginalEvent(Event originalEvent) { this.originalEvent = originalEvent; }

    public Map<String, Object> getMetadata() { return metadata; }
    public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata; }
}
hljs java44 lines
package com.enterprise.keycloak.events.model;

import com.fasterxml.jackson.annotation.JsonInclude;
import org.keycloak.events.admin.AdminEvent;

import java.time.Instant;
import java.util.Map;

@JsonInclude(JsonInclude.Include.NON_NULL)
public class EnrichedAdminEvent {
    private String eventId;
    private Instant timestamp;
    private String source;
    private String version;
    private AdminEvent originalEvent;
    private boolean includeRepresentation;
    private Map<String, Object> metadata;

    // Constructors
    public EnrichedAdminEvent() {}

    // Getters and Setters
    public String getEventId() { return eventId; }
    public void setEventId(String eventId) { this.eventId = eventId; }

    public Instant getTimestamp() { return timestamp; }
    public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }

    public String getSource() { return source; }
    public void setSource(String source) { this.source = source; }

    public String getVersion() { return version; }
    public void setVersion(String version) { this.version = version; }

    public AdminEvent getOriginalEvent() { return originalEvent; }
    public void setOriginalEvent(AdminEvent originalEvent) { this.originalEvent = originalEvent; }

    public boolean isIncludeRepresentation() { return includeRepresentation; }
    public void setIncludeRepresentation(boolean includeRepresentation) { this.includeRepresentation = includeRepresentation; }

    public Map<String, Object> getMetadata() { return metadata; }
    public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata; }
}

SPI Service Configuration

Create the service configuration file that registers your SPI provider.

File: src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory

com.enterprise.keycloak.events.EnhancedKafkaEventListenerFactory

Production Deployment Strategies

Enhanced Dockerfile with Multi-stage Build

hljs dockerfile38 lines
# Multi-stage build for optimized image size
FROM gradle:7-jdk11 AS builder

WORKDIR /app
COPY build.gradle settings.gradle ./
COPY src ./src

# Build the application
RUN gradle shadowJar --no-daemon

# Production image
FROM quay.io/keycloak/keycloak:22.0.1

# Add health check dependencies
USER root
RUN microdnf install -y curl && microdnf clean all

# Copy the built SPI jar
COPY --from=builder /app/build/libs/keycloak-kafka-events-spi-2.0.0.jar /opt/keycloak/providers/

# Add custom startup script for health checks
COPY scripts/health-check.sh /opt/keycloak/bin/
RUN chmod +x /opt/keycloak/bin/health-check.sh

# Switch back to keycloak user
USER 1000

# Build the provider
RUN /opt/keycloak/bin/kc.sh build

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
    CMD /opt/keycloak/bin/health-check.sh

EXPOSE 8080 8443

ENTRYPOINT ["/opt/keycloak/bin/kc.sh"]

Kubernetes Deployment with ConfigMap

hljs yaml94 lines
apiVersion: v1
kind: ConfigMap
metadata:
  name: keycloak-kafka-config
data:
  KAFKA_BOOTSTRAP_SERVERS: "kafka-cluster-kafka-bootstrap:9092"
  KAFKA_TOPIC_EVENTS: "keycloak-events"
  KAFKA_TOPIC_ADMIN_EVENTS: "keycloak-admin-events"
  KAFKA_SSL_ENABLED: "true"
  KAFKA_SASL_ENABLED: "true"
  KAFKA_RETRIES: "5"
  KAFKA_BATCH_SIZE: "32768"
  KAFKA_LINGER_MS: "10"
  KAFKA_COMPRESSION_TYPE: "lz4"
  KAFKA_EVENT_TYPES_FILTER: "LOGIN,LOGIN_ERROR,LOGOUT,REGISTER,UPDATE_PROFILE"
---
apiVersion: v1
kind: Secret
metadata:
  name: keycloak-kafka-secrets
type: Opaque
stringData:
  KAFKA_SSL_TRUSTSTORE_PASSWORD: "truststore-password"
  KAFKA_SSL_KEYSTORE_PASSWORD: "keystore-password"
  KAFKA_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";'
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: keycloak-with-kafka-events
spec:
  replicas: 2
  selector:
    matchLabels:
      app: keycloak
  template:
    metadata:
      labels:
        app: keycloak
    spec:
      containers:
        - name: keycloak
          image: your-registry/keycloak-kafka:2.0.0
          ports:
            - containerPort: 8080
            - containerPort: 8443
          env:
            - name: KEYCLOAK_ADMIN
              value: admin
            - name: KEYCLOAK_ADMIN_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: keycloak-admin-secret
                  key: password
          envFrom:
            - configMapRef:
                name: keycloak-kafka-config
            - secretRef:
                name: keycloak-kafka-secrets
          volumeMounts:
            - name: kafka-ssl-certs
              mountPath: /opt/keycloak/conf/kafka-ssl
              readOnly: true
          resources:
            requests:
              memory: "1Gi"
              cpu: "500m"
            limits:
              memory: "2Gi"
              cpu: "1"
          livenessProbe:
            httpGet:
              path: /health/live
              port: 8080
            initialDelaySeconds: 120
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /health/ready
              port: 8080
            initialDelaySeconds: 60
            periodSeconds: 10
          command:
            - /opt/keycloak/bin/kc.sh
            - start
            - --hostname-strict=false
            - --http-enabled=true
            - --import-realm
            - --spi-events-listener-enhanced-kafka-events-enabled=true
      volumes:
        - name: kafka-ssl-certs
          secret:
            secretName: kafka-ssl-certificates

Docker Compose for Development

hljs yaml50 lines
version: "3.8"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true

  keycloak:
    build: .
    ports:
      - "8080:8080"
    environment:
      KEYCLOAK_ADMIN: admin
      KEYCLOAK_ADMIN_PASSWORD: admin
      KAFKA_BOOTSTRAP_SERVERS: kafka:29092
      KAFKA_TOPIC_EVENTS: keycloak-events
      KAFKA_TOPIC_ADMIN_EVENTS: keycloak-admin-events
      KAFKA_SSL_ENABLED: false
      KAFKA_SASL_ENABLED: false
    depends_on:
      - kafka
    command:
      - start-dev
      - --spi-events-listener-enhanced-kafka-events-enabled=true

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8090:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
    depends_on:
      - kafka

Monitoring and Observability

Health Check Script

File: scripts/health-check.sh

hljs bash17 lines
#!/bin/bash

# Health check script for Keycloak with Kafka SPI
set -e

# Check if Keycloak is responding
if ! curl -f -s http://localhost:8080/health/live > /dev/null; then
    echo "Keycloak health check failed"
    exit 1
fi

# Check if Kafka connection is working (basic test)
# This would require additional monitoring endpoint in the SPI
# For now, just check if Keycloak is responding
echo "Health check passed"
exit 0

Prometheus Metrics Integration

To add comprehensive monitoring, you can extend the SPI with Prometheus metrics:

hljs java19 lines
// Add to EnhancedKafkaEventListener class
private final Counter eventsPublishedTotal = Counter.build()
    .name("keycloak_kafka_events_published_total")
    .help("Total number of events published to Kafka")
    .labelNames("event_type", "realm")
    .register();

private final Counter eventsFailedTotal = Counter.build()
    .name("keycloak_kafka_events_failed_total")
    .help("Total number of failed event publications")
    .labelNames("event_type", "realm", "error_type")
    .register();

private final Histogram eventProcessingDuration = Histogram.build()
    .name("keycloak_kafka_event_processing_duration_seconds")
    .help("Time spent processing events")
    .labelNames("event_type")
    .register();

Configuration Management

Environment Variables Reference

VariableDescriptionDefaultRequired
KAFKA_BOOTSTRAP_SERVERSKafka bootstrap servers-Yes
KAFKA_TOPIC_EVENTSTopic for user events-Yes
KAFKA_TOPIC_ADMIN_EVENTSTopic for admin events{KAFKA_TOPIC_EVENTS}-adminNo
KAFKA_SSL_ENABLEDEnable SSL encryptionfalseNo
KAFKA_SASL_ENABLEDEnable SASL authenticationfalseNo
KAFKA_RETRIESNumber of retries3No
KAFKA_BATCH_SIZEBatch size in bytes16384No
KAFKA_LINGER_MSLinger time in milliseconds5No
KAFKA_COMPRESSION_TYPECompression typesnappyNo
KAFKA_EVENT_TYPES_FILTERComma-separated event types to processAll important eventsNo
KAFKA_ADMIN_OPERATION_TYPES_FILTERComma-separated admin operation typesAll operationsNo

SSL Configuration Variables

VariableDescriptionRequired when SSL enabled
KAFKA_SSL_TRUSTSTORE_LOCATIONPath to truststoreYes
KAFKA_SSL_TRUSTSTORE_PASSWORDTruststore passwordYes
KAFKA_SSL_KEYSTORE_LOCATIONPath to keystoreYes
KAFKA_SSL_KEYSTORE_PASSWORDKeystore passwordYes
KAFKA_SSL_KEY_PASSWORDKey passwordNo

SASL Configuration Variables

VariableDescriptionRequired when SASL enabled
KAFKA_SASL_MECHANISMSASL mechanism (PLAIN, SCRAM-SHA-256, etc.)No (default: PLAIN)
KAFKA_SASL_JAAS_CONFIGJAAS configuration stringYes
KAFKA_SECURITY_PROTOCOLSecurity protocolNo (auto-detected)

Testing Strategy

Unit Tests

hljs java33 lines
@ExtendWith(MockitoExtension.class)
class EnhancedKafkaEventListenerTest {

    @Mock
    private EnhancedKafkaProducer mockProducer;

    @Mock
    private Event mockEvent;

    private EnhancedKafkaEventListener listener;

    @BeforeEach
    void setUp() {
        Properties props = new Properties();
        listener = new EnhancedKafkaEventListener("test-topic", "admin-topic", props);
        // Inject mock producer using reflection or dependency injection
    }

    @Test
    void shouldPublishEventSuccessfully() {
        // Given
        when(mockEvent.getType()).thenReturn(EventType.LOGIN);
        when(mockEvent.getUserId()).thenReturn("user123");
        when(mockEvent.getRealmId()).thenReturn("realm1");

        // When
        listener.onEvent(mockEvent);

        // Then
        verify(mockProducer).sendEvent(eq("test-topic"), eq("user123"), any(String.class));
    }
}

Integration Tests with Testcontainers

hljs java21 lines
@TestcontainersJunit4
class KafkaIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));

    @Test
    void shouldPublishEventsToKafka() {
        // Test the complete flow with real Kafka container
        Properties props = new Properties();
        props.put("bootstrap.servers", kafka.getBootstrapServers());

        EnhancedKafkaProducer producer = new EnhancedKafkaProducer(props);

        // Test event publishing
        CompletableFuture<RecordMetadata> future = producer.sendEvent("test-topic", "key1", "test-message");

        assertThat(future).succeedsWithin(Duration.ofSeconds(10));
    }
}

Performance Tuning and Best Practices

Production Optimization Settings

# Kafka Producer Optimization
KAFKA_BATCH_SIZE=65536
KAFKA_LINGER_MS=20
KAFKA_BUFFER_MEMORY=67108864
KAFKA_COMPRESSION_TYPE=lz4
KAFKA_RETRIES=10
KAFKA_RETRY_BACKOFF_MS=1000
KAFKA_REQUEST_TIMEOUT_MS=30000
KAFKA_DELIVERY_TIMEOUT_MS=120000

# JVM Tuning for Keycloak
JAVA_OPTS=-Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200

Security Hardening Checklist

  • ✅ Use SSL/TLS encryption for Kafka communication
  • ✅ Implement SASL authentication with strong passwords
  • ✅ Use separate topics for different event types
  • ✅ Implement proper access controls on Kafka topics
  • ✅ Regularly rotate SSL certificates and passwords
  • ✅ Monitor for unauthorized access attempts
  • ✅ Use network policies to restrict Kafka access

Scaling Considerations

  • Horizontal Scaling: Deploy multiple Keycloak instances with load balancing
  • Kafka Partitioning: Use appropriate partition keys for even distribution
  • Consumer Groups: Implement proper consumer groups for event processing
  • Resource Allocation: Allocate sufficient memory and CPU for both Keycloak and Kafka

Troubleshooting Guide

Common Issues and Solutions

Issue: Events not appearing in Kafka topics

  • Check network connectivity between Keycloak and Kafka
  • Verify Kafka topic configuration and permissions
  • Review Keycloak logs for SSL/SASL authentication errors

Issue: High memory usage in Keycloak

  • Adjust Kafka producer buffer settings
  • Implement event filtering to reduce volume
  • Monitor garbage collection patterns

Issue: Event delivery failures

  • Check Kafka cluster health and availability
  • Verify SSL certificate validity
  • Review network policies and firewall rules

Debugging Commands

# Check Kafka topic contents
kafka-console-consumer --bootstrap-server localhost:9092 --topic keycloak-events --from-beginning

# Monitor Kafka consumer lag
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group keycloak-events-group

# Check SSL connectivity
openssl s_client -connect kafka-server:9093 -servername kafka-server

Conclusion

This enhanced Keycloak SPI provides a robust, production-ready solution for streaming identity events to Kafka. The implementation includes comprehensive error handling, security features, monitoring capabilities, and scalability considerations.

Key benefits of this approach:

  • Real-time Event Processing: Enable immediate response to authentication and authorization events
  • Scalable Architecture: Support for high-volume identity operations with proper partitioning
  • Security First: Comprehensive SSL/SASL support with proper credential management
  • Production Ready: Circuit breaker patterns, retry logic, and comprehensive monitoring
  • Flexible Configuration: Environment-based configuration suitable for containerized deployments

The complete source code and deployment examples are available in the GitHub repository, including Docker compositions for development and Kubernetes manifests for production deployment.

Next Steps

Consider implementing additional features:

  • Schema registry integration for structured event evolution
  • Dead letter queue handling for failed events
  • Custom event enrichment based on business requirements
  • Integration with observability platforms like Grafana and Prometheus

This implementation has been tested with Keycloak 22.x and Kafka 3.5.x. For the latest updates and community contributions, visit the project repository.

YB

Youssef Bentaleb

Senior Full Stack Developer

Share this article: