Messaging
harder

The two services: gossip-syncer and gossip-post-processor run independently. There might be situations where one of them has an outtage or in general their gossip message processing speed differs. Introducting an event streaming platform like Kafka brings many advantages:

  • Services publish (subscribe) to Kafka and produce (consume) messages at their processing speed.
  • Seperation of concerns: Messages get filtered and inside the ln-history platform are consumed from the Kafka instance
  • Messages that were processed incorrectly can be stored in Kafka for manual handly, while ensuring the platform continues to run without issues.

to the cost of additional complexity in the architecture.

We will use Bitnamis Kafka docker image.

Setup

We will start by setting up the directory structure and continue with the docker container

Create directory

From the project root we create the kafka directory with the following command.

ln-history@host:~/ln-historymkdir kafka && cd kafka

Certsauthentication

We run Kafka with authentication enabled. We need to create our own kafka.keystore.pem and kafka.truststore.pem for that. In the official readme from Bitnami, we get this kafka-generate-ssl.sh.

We run this script and get the custom kafka.keystore.pem, kafka.truststore.pem and client.properties.

Environment variablesauthentication

We create an .env file and fill in the credentials that were entered previously when running the kafka-generate-ssl.sh script.

SERVER_IP_ADDRESS=YOUR_SERVER_IP
HOSTNAME=YOUR_URL
KAFKA_USER=YOUR_USER_NAME
KAFKA_PASSWORD=YOUR_KAFKA_PASSWORD
KAFKA_CONTROLLER_USER=YOUR_CONTROLLER_USER_NAME
KAFKA_CONTROLLER_PASSWORD=YOUR_CONTROLLER_USER_PASSOWRD
KAFKA_INTER_BROKER_USER=YOUR_INTER_BROKER_USER_NAME
KAFKA_INTER_BROKER_PASSWORD=YOUR_INTER_BROKER_PASSWORD
KAFKA_CERTIFICATE_PASSWORD=YOUR_CERTIFICATE_PASSWORD

🔐 Important: Never commit the .env, kafka.keystore.pem, kafka.truststore.pem and client.properties files to version control.

🐳 Docker container

Create the docker-compose.yml and paste the following content into it.

services:
  kafka:
    image: bitnami/kafka:latest
    container_name: kafka
    hostname: ${HOSTNAME}
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      - BITNAMI_DEBUG=yes

      # KRaft
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093

      # Listeners
      - KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://${SERVER_IP_ADDRESS}:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
      - KAFKA_CLIENT_LISTENER_NAME=SASL_SSL

      # SASL
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_CONTROLLER_USER=${KAFKA_CONTROLLER_USER}
      - KAFKA_CONTROLLER_PASSWORD=${KAFKA_CONTROLLER_PASSWORD}
      - KAFKA_INTER_BROKER_USER=${KAFKA_INTER_BROKER_USER}
      - KAFKA_INTER_BROKER_PASSWORD=${KAFKA_INTER_BROKER_PASSWORD}
      - KAFKA_CLIENT_USERS=${KAFKA_USER}
      - KAFKA_CLIENT_PASSWORDS=${KAFKA_PASSWORD}

      # SSL
      - KAFKA_TLS_TYPE=JKS
      - KAFKA_CERTIFICATE_PASSWORD=${KAFKA_CERTIFICATE_PASSWORD}

      # RAM allocation for Java Heap
      - KAFKA_HEAP_OPTS=-Xms1g -Xmx2g
    volumes:
      - kafka_data:/bitnami/kafka
      - "./certs:/bitnami/kafka/config/certs:ro"
      - "./certs/client.properties:/opt/bitnami/kafka/config/client.properties:ro"
    restart: unless-stopped

volumes:
  kafka_data:

Folder structure

Ultimatly the folder structure should look like this:

kafka/
├── .env                        # Environment variables for the service
├── certs                       # Certs for auth
    ├── client.properties   
    ├── kafka.keystore.jks  
    └── kafka.truststore.jks   
└── docker-compose.yml          # Docker setup for this service
                    

Run

We start the container by using docker compose up -d (The flag -d abbreviates deamon, meaning background process).

ln-history@host:~/ln-history/databasedocker compose up -d

Verify

Logs

See the logs of the docker container. Kafka logs a lot of information. Make sure that after some time the latest logs have the log level INFO.

ln-history@host:~/ln-history/kafkadocker compose logs -f --tail=100

kafka | [timestamp] INFO [GroupCoordinator id=0] Finished loading of

Test

Create a new directory kafka-test.

ln-history@host:~/ln-history/kafkamkdir kafka-test && cd kafka-test

Setup Virtual Environment

Create the requirements.txt to create a virtual python environment

kafka-python
python-dotenv

Create the virtual environment

ln-history@host:~/ln-history/kafka/kafka-testpython -m venv .venv

Activate the virtual environment

ln-history@host:~/ln-history/kafka/kafka-testsource .venv/bin/activate

Environment variablesauthentication

Create an .env file and fill in the values.

SSL_PASSWORD=YOUR_SSL_PASSWORD
SASL_PLAIN_USERNAME=YOUR_SASL_USER_NAME
SASL_PLAIN_PASSWORD=YOUR_SASL_PASSWORD
SERVER_IP_ADDRESS=YOUR_SERVER_IP
SERVER_PORT=9092

Create two files: test-kafka_producer.py and test-kafka_consumer.py:

Paste this into test-kafka_producer.py:

import json
import os
import time
import logging
from kafka import KafkaProducer
from dotenv import load_dotenv

load_dotenv(".env")

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def create_kafka_producer() -> KafkaProducer:
    """Create and return a configured Kafka producer."""
    bootstrap_servers = f"{os.getenv('SERVER_IP_ADDRESS')}:{os.getenv('SERVER_PORT')}"

    return KafkaProducer(
        bootstrap_servers=[bootstrap_servers],
        client_id="test-python-producer",
        security_protocol='SASL_SSL',
        ssl_cafile='./kafka.truststore.pem',
        ssl_certfile='./kafka.keystore.pem',
        ssl_keyfile='./kafka.keystore.pem',
        ssl_password=os.getenv("SSL_PASSWORD"),
        sasl_mechanism='SCRAM-SHA-512',
        sasl_plain_username=os.getenv("SASL_PLAIN_USERNAME"),
        sasl_plain_password=os.getenv("SASL_PLAIN_PASSWORD"),
        ssl_check_hostname=False,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )


def send_message(producer: KafkaProducer, topic: str, message: dict):
    """Send a message to a Kafka topic."""
    try:
        future = producer.send(topic, value=message)
        record_metadata = future.get(timeout=10)
        logger.info(
            f"Message sent successfully to {record_metadata.topic} "
            f"partition {record_metadata.partition} offset {record_metadata.offset}"
        )
    except Exception as e:
        logger.error(f"Failed to send message: {e}")


def main():
    topic = "test"
    producer = create_kafka_producer()

    test_message = {
        "timestamp": time.time(),
        "message": "Hello from Python test producer!"
    }

    send_message(producer, topic, test_message)
    producer.close()


if __name__ == "__main__":
    main()

And paste this into .py

import os
import json
import logging
from kafka import KafkaConsumer
from dotenv import load_dotenv

load_dotenv(".env")

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def create_kafka_consumer(topic: str) -> KafkaConsumer:
    """Create and return a configured Kafka consumer."""
    bootstrap_servers = f"{os.getenv('SERVER_IP_ADDRESS')}:{os.getenv('SERVER_PORT')}"

    return KafkaConsumer(
        topic,
        bootstrap_servers=[bootstrap_servers],
        api_version="3",
        client_id="test-python-consumer",
        security_protocol='SASL_SSL',
        ssl_cafile='./kafka.truststore.pem',
        ssl_certfile='./kafka.keystore.pem',
        ssl_keyfile='./kafka.keystore.pem',
        ssl_password=os.getenv("SSL_PASSWORD"),
        sasl_mechanism='SCRAM-SHA-512',
        sasl_plain_username=os.getenv("SASL_PLAIN_USERNAME"),
        sasl_plain_password=os.getenv("SASL_PLAIN_PASSWORD"),
        ssl_check_hostname=False,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='python-consumer-group',
        value_deserializer=lambda v: json.loads(v.decode('utf-8'))
    )


def consume_messages(consumer: KafkaConsumer):
    logger.info("Listening for messages...")
    for message in consumer:
        logger.info(f"Received message: {message.value}")


def main():
    topic = "test"
    consumer = create_kafka_consumer(topic)
    consume_messages(consumer)


if __name__ == "__main__":
    main()

Run the test-kafka_producer.py with the python interpreter of the virtual environment /path/to/ln-history/kafka/test-kafka/.venv/bin/python:

ln-history@host:/path/to/ln-history/kafka/test-kafka/path/to/ln-history/kafka/test-kafka/.venv/bin/python test-kafka_producer.py
(.venv) ln-history@host test-kafka % /path/to/ln-history/kafka
/test-kafka/.venv/bin/python /Users/fabiankraus/Programming/ln-history/kafka-data/test-kafka_pro
ducer.py
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <checking_api_versions_recv> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Broker version identified as 2.6
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=0 host=YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=0 host=YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=0 host=YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Closing connection. 
INFO:__main__:Message sent successfully to test partition 0 offset 1
INFO:kafka.producer.kafka:Closing the Kafka producer with 9223372036.0 secs timeout.
INFO:kafka.conn:<BrokerConnection client_id=test-python-producer, node_id=0 host=YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Closing connection. 

After that open a new session and run the test-kafka_subscriber.py.

ln-history@host:/path/to/ln-history/kafka/test-kafka/path/to/ln-history/kafka/test-kafka/.venv/bin/python test-kafka_subscriber.py
(.venv) ln-history@host test-kafka % /path/to/ln-history/kafka
/test-kafka/.venv/bin/python /Users/fabiankraus/Programming/ln-history/kafka-data/test-kafka_subscriber.py
WARNING:kafka.consumer.group:use api_version=(3,) [tuple] -- "3" as str is deprecated
WARNING:kafka.client:Configured api_version (3,) is ambiguous; using (3, 0)
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('test',)
INFO:__main__:Listening for messages...
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=0 host=YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=0 host=YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=0 host=YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=bootstrap-0 host=YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Closing connection. 
INFO:kafka.cluster:Group coordinator for python-consumer-group is BrokerMetadata(nodeId='coordinator-0', host='YOUR_IP_ADDRESS', port=9092, rack=None)
INFO:kafka.coordinator:Discovered coordinator coordinator-0 for group python-consumer-group
INFO:kafka.coordinator:Starting new heartbeat thread
INFO:kafka.coordinator.consumer:Revoking previously assigned partitions set() for group python-consumer-group
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=coordinator-0 host=YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=coordinator-0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=coordinator-0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=coordinator-0 host=YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=coordinator-0 host=YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:<BrokerConnection client_id=test-python-consumer, node_id=coordinator-0 host=YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.coordinator:(Re-)joining group python-consumer-group
INFO:kafka.coordinator:Elected group leader -- performing partition assignments using range
INFO:kafka.coordinator:Successfully joined group python-consumer-group with generation 5
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='test', partition=0)]
INFO:kafka.coordinator.consumer:Setting newly assigned partitions {TopicPartition(topic='test', partition=0)} for group python-consumer-group
INFO:__main__:Received message: {'timestamp': 1754742448.652868, 'message': 'Hello from Python test producer!'}

After the message has been recieved indicated by the last line, you can stop the program by pressing CTRL + C.

The kafka instance is now setup correctly.