Kafka Processor

Event-driven processor that consumes messages from Apache Kafka topics and sends them to external ERP systems.

Purpose

  • Polls comerzzia database for pending documents

  • Transforms entities to ERP format (JSON/XML)

  • Publishes transformed messages to Kafka topics

  • Consumes messages from Kafka topics (optional, configurable)

  • Sends data to ERP systems via HTTP

  • Supports concurrent processing with configurable consumers

  • Comprehensive retry logic with DLQ support

Prerequisites

This application requires an existing Apache Kafka cluster. It does not include Kafka server setup or topic creation.

Ensure: - Kafka cluster is accessible - Required topics are created (main topic and DLQ topic)

Configuration

Application Settings

See application.yml for full configuration.

Key settings: - Kafka bootstrap servers: Connection to Kafka cluster - Consumer activation: Optional - Enable/disable via czz-to-erp.consumer.enabled in application.yml - Consumer configuration: Base group ID, concurrency, and offset management - Processing settings: Timeouts, retries, and acknowledgment - Retry configuration: Max attempts - Server configuration: Application and actuator ports, context paths - OpenAPI/Swagger UI: Interactive API documentation for testing endpoints - Actuator endpoints: Health checks, Prometheus metrics, and monitoring dashboard

Entity Configuration

Each entity is configured independently in entities-config.yml from Processor Common Library Module:

Per-entity settings: - topic: Independent topic for this entity - dlqTopic: Independent DLQ topic for failed messages - schedulerCron: Independent polling schedule (cron expression) - schedulerEnabled: Enable/disable database polling for this entity

Important Notes

  • Manual offset commit ensures messages are only committed after successful processing

Database Configuration

Database connection is configured via Spring Boot datasource properties in application.yml or profile-specific files (e.g., application-dev.yml).

Kafka Topic Structure

Architecture: - Each entity type has its own main topic and DLQ topic - Messages include entityType header for entity identification - Failed messages (after max retries) are sent to their entity-specific DLQ topic - Each entity uses an independent consumer group for offset management

Error Handling and Retry Logic

Retry Mechanism

  • First failure: Error registered in database, message republished to main topic with retry-count=1

  • Subsequent failures: Message republished with incremented retry-count

  • Max retries reached: Message sent to DLQ topic and ACKed

Message Validation

  • Entity type validation against registry

  • Message size validation (default: 10MB max)

  • Timeout protection (default: 120 seconds)

Processing Flow

Diagram