Kafka Processor

Event-driven processor that consumes messages from Apache Kafka topics and processes them to comerzzia database.

Purpose

  • Consumes messages from Kafka topics

  • Validates and processes entity data

  • Persists to comerzzia database via integration services

  • Supports concurrent processing with configurable consumers

Prerequisites

This application requires an existing Apache Kafka. It does not include Kafka server setup or topic creation.

Ensure: - Apache Kafka service is accessible - Required topics are created (main topic and error topic)

Processing Flow

Diagram

Kafka Topic Structure

Messages consumed from configured main topic with entity metadata headers. Failed messages published to error topic.

Configuration

Application Settings

Key settings:

Spring Property Environment var Default value

erp-to-czz.main-topic

ERP_TO_CZZ_MAIN_TOPIC

erp-to-czz-input

erp-to-czz.error-topic

ERP_TO_CZZ_ERROR_TOPIC

erp-to-czz-input-errors

erp-to-czz.consumer.group-id

ERP_TO_CZZ_CONSUMER_GROUP_ID

erp-processor-group

spring.kafka.bootstrap-servers

SPRING_KAFKA_BOOSTRAP_SERVERS

kafka-server-svc:9092

Important Notes

  • Consumer group ID is used for offset management across restarts

  • Manual offset commit ensures messages are only committed after successful processing

Database Configuration

Requires comerzzia database configuration via comerzzia.xml in java classpath or in COMERZZIA_HOME path.

Docker container execution

docker run -it --rm --name processor-kafka --env-file processor-dev.env --pull=always -p 8080:8080 europe-west3-docker.pkg.dev/czz-devops/comerzzia/erp-to-czz/processor-kafka-application:1.0
processor-dev.env file example for environment variables
SPRING_KAFKA_BOOSTRAP_SERVERS=127.0.0.1:9092
ERP_TO_CZZ_MAIN_TOPIC=erp-to-czz-input
ERP_TO_CZZ_ERROR_TOPIC=erp-to-czz-input-errors

uidActividad=MYACTIVITY
uidInstancia=MYINSTANCE

SPRING_PROFILES_ACTIVE=jsonlogs

COMERZZIA_DB_CLASS=com.mysql.jdbc.Driver
COMERZZIA_DB_DRIVER=mysql
COMERZZIA_DB_USER=comerzzia
COMERZZIA_DB_PASS=comerzzia
COMERZZIA_DB_URL=jdbc:mysql://127.0.0.1:3306/comerzzia