The two services: gossip-syncer and gossip-post-processor run independently. There might be situations where one of them has an outtage or in general their gossip message processing speed differs.
Introducting an event streaming platform like Kafka brings many advantages:
Services publish (subscribe) to Kafka and produce (consume) messages at their processing speed.
Seperation of concerns: Messages get filtered and inside the ln-history platform are consumed from the Kafka instance
Messages that were processed incorrectly can be stored in Kafka for manual handly, while ensuring the platform continues to run without issues.
to the cost of additional complexity in the architecture.
We will use Bitnamis Kafka docker image .
Setup
We will start by setting up the directory structure and continue with the docker container
Create directory
From the project root we create the kafka
directory with the following command.
ln-history@host :~/ln-history ₿ mkdir kafka && cd kafka Copy icon
Certsauthentication
We run Kafka with authentication enabled. We need to create our own kafka.keystore.pem
and kafka.truststore.pem
for that.
In the official readme from Bitnami, we get this kafka-generate-ssl.sh .
We run this script and get the custom kafka.keystore.pem
, kafka.truststore.pem
and client.properties
.
Environment variablesauthentication
We create an .env
file and fill in the credentials that were entered previously when running the kafka-generate-ssl.sh script.
SERVER_IP_ADDRESS = YOUR_SERVER_IP
HOSTNAME = YOUR_URL
KAFKA_USER = YOUR_USER_NAME
KAFKA_PASSWORD = YOUR_KAFKA_PASSWORD
KAFKA_CONTROLLER_USER = YOUR_CONTROLLER_USER_NAME
KAFKA_CONTROLLER_PASSWORD = YOUR_CONTROLLER_USER_PASSOWRD
KAFKA_INTER_BROKER_USER = YOUR_INTER_BROKER_USER_NAME
KAFKA_INTER_BROKER_PASSWORD = YOUR_INTER_BROKER_PASSWORD
KAFKA_CERTIFICATE_PASSWORD = YOUR_CERTIFICATE_PASSWORD
🔐 Important : Never commit the .env
, kafka.keystore.pem
, kafka.truststore.pem
and client.properties
files to version control.
🐳 Docker container
Create the docker-compose.yml
and paste the following content into it.
services :
kafka :
image : bitnami/kafka:latest
container_name : kafka
hostname : ${HOSTNAME}
ports :
- "9092:9092"
- "9093:9093"
environment :
- BITNAMI_DEBUG=yes
# KRaft
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://${SERVER_IP_ADDRESS}:9092
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL
# SASL
- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
- KAFKA_CONTROLLER_USER=${KAFKA_CONTROLLER_USER}
- KAFKA_CONTROLLER_PASSWORD=${KAFKA_CONTROLLER_PASSWORD}
- KAFKA_INTER_BROKER_USER=${KAFKA_INTER_BROKER_USER}
- KAFKA_INTER_BROKER_PASSWORD=${KAFKA_INTER_BROKER_PASSWORD}
- KAFKA_CLIENT_USERS=${KAFKA_USER}
- KAFKA_CLIENT_PASSWORDS=${KAFKA_PASSWORD}
# SSL
- KAFKA_TLS_TYPE=JKS
- KAFKA_CERTIFICATE_PASSWORD=${KAFKA_CERTIFICATE_PASSWORD}
# RAM allocation for Java Heap
- KAFKA_HEAP_OPTS=-Xms1g -Xmx2g
volumes :
- kafka_data:/bitnami/kafka
- "./certs:/bitnami/kafka/config/certs:ro"
- "./certs/client.properties:/opt/bitnami/kafka/config/client.properties:ro"
restart : unless-stopped
volumes :
kafka_data :
Folder structure
Ultimatly the folder structure should look like this:
kafka/
├── .env # Environment variables for the service
├── certs # Certs for auth
├── client.properties
├── kafka.keystore.jks
└── kafka.truststore.jks
└── docker-compose.yml # Docker setup for this service
Run
We start the container by using docker compose up -d
(The flag -d
abbreviates deamon
, meaning background process).
ln-history@host :~/ln-history/database ₿ docker compose up -d Copy icon
Verify
Logs
See the logs of the docker container.
Kafka logs a lot of information. Make sure that after some time the latest logs have the log level INFO
.
ln-history@host :~/ln-history/kafka ₿ docker compose logs -f --tail=100 Copy icon kafka | [timestamp] INFO [GroupCoordinator id=0] Finished loading of
Test
Create a new directory kafka-test
.
ln-history@host :~/ln-history/kafka ₿ mkdir kafka-test && cd kafka-test Copy icon
Setup Virtual Environment
Create the requirements.txt
to create a virtual python environment
kafka-python
python-dotenv
Create the virtual environment
ln-history@host :~/ln-history/kafka/kafka-test ₿ python -m venv .venv Copy icon
Activate the virtual environment
ln-history@host :~/ln-history/kafka/kafka-test ₿ source .venv/bin/activate Copy icon
Environment variablesauthentication
Create an .env
file and fill in the values.
SSL_PASSWORD = YOUR_SSL_PASSWORD
SASL_PLAIN_USERNAME = YOUR_SASL_USER_NAME
SASL_PLAIN_PASSWORD = YOUR_SASL_PASSWORD
SERVER_IP_ADDRESS = YOUR_SERVER_IP
SERVER_PORT = 9092
Create two files: test-kafka_producer.py
and test-kafka_consumer.py
:
Paste this into test-kafka_producer.py
:
import json
import os
import time
import logging
from kafka import KafkaProducer
from dotenv import load_dotenv
load_dotenv ( ".env" )
logging . basicConfig (level = logging.INFO)
logger = logging . getLogger ( __name__ )
def create_kafka_producer () -> KafkaProducer:
"""Create and return a configured Kafka producer."""
bootstrap_servers = f " { os . getenv ( 'SERVER_IP_ADDRESS' ) } : { os . getenv ( 'SERVER_PORT' ) } "
return KafkaProducer (
bootstrap_servers = [bootstrap_servers],
client_id = "test-python-producer" ,
security_protocol = 'SASL_SSL' ,
ssl_cafile = './kafka.truststore.pem' ,
ssl_certfile = './kafka.keystore.pem' ,
ssl_keyfile = './kafka.keystore.pem' ,
ssl_password = os. getenv ( "SSL_PASSWORD" ),
sasl_mechanism = 'SCRAM-SHA-512' ,
sasl_plain_username = os. getenv ( "SASL_PLAIN_USERNAME" ),
sasl_plain_password = os. getenv ( "SASL_PLAIN_PASSWORD" ),
ssl_check_hostname = False ,
value_serializer =lambda v : json. dumps (v). encode ( 'utf-8' )
)
def send_message ( producer : KafkaProducer , topic : str , message : dict ):
"""Send a message to a Kafka topic."""
try :
future = producer . send (topic, value = message)
record_metadata = future . get (timeout = 10 )
logger . info (
f "Message sent successfully to { record_metadata.topic } "
f "partition { record_metadata.partition } offset { record_metadata.offset } "
)
except Exception as e :
logger . error ( f "Failed to send message: { e } " )
def main ():
topic = "test"
producer = create_kafka_producer ()
test_message = {
"timestamp" : time . time (),
"message" : "Hello from Python test producer!"
}
send_message (producer, topic, test_message)
producer . close ()
if __name__ == "__main__" :
main ()
And paste this into .py
import os
import json
import logging
from kafka import KafkaConsumer
from dotenv import load_dotenv
load_dotenv ( ".env" )
logging . basicConfig (level = logging.INFO)
logger = logging . getLogger ( __name__ )
def create_kafka_consumer ( topic : str ) -> KafkaConsumer:
"""Create and return a configured Kafka consumer."""
bootstrap_servers = f " { os . getenv ( 'SERVER_IP_ADDRESS' ) } : { os . getenv ( 'SERVER_PORT' ) } "
return KafkaConsumer (
topic,
bootstrap_servers = [bootstrap_servers],
api_version = "3" ,
client_id = "test-python-consumer" ,
security_protocol = 'SASL_SSL' ,
ssl_cafile = './kafka.truststore.pem' ,
ssl_certfile = './kafka.keystore.pem' ,
ssl_keyfile = './kafka.keystore.pem' ,
ssl_password = os. getenv ( "SSL_PASSWORD" ),
sasl_mechanism = 'SCRAM-SHA-512' ,
sasl_plain_username = os. getenv ( "SASL_PLAIN_USERNAME" ),
sasl_plain_password = os. getenv ( "SASL_PLAIN_PASSWORD" ),
ssl_check_hostname = False ,
auto_offset_reset = 'earliest' ,
enable_auto_commit = True ,
group_id = 'python-consumer-group' ,
value_deserializer =lambda v : json. loads (v. decode ( 'utf-8' ))
)
def consume_messages ( consumer : KafkaConsumer):
logger . info ( "Listening for messages..." )
for message in consumer :
logger . info ( f "Received message: { message.value } " )
def main ():
topic = "test"
consumer = create_kafka_consumer (topic)
consume_messages (consumer)
if __name__ == "__main__" :
main ()
Run the test-kafka_producer.py
with the python interpreter of the virtual environment /path/to/ln-history/kafka/test-kafka/.venv/bin/python
:
ln-history@host :/path/to/ln-history/kafka/test-kafka ₿ /path/to/ln-history/kafka/test-kafka/.venv/bin/python test-kafka_producer.py Copy icon (.venv) ln-history@host test-kafka % /path/to/ln-history/kafka
/test-kafka/.venv/bin/python /Users/fabiankraus/Programming/ln-history/kafka-data/test-kafka_pro
ducer.py
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <checking_api_versions_recv> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Broker version identified as 2.6
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =0 host =YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =0 host =YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =0 host =YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Closing connection.
INFO: __ main __ :Message sent successfully to test partition 0 offset 1
INFO:kafka.producer.kafka:Closing the Kafka producer with 9223372036.0 secs timeout.
INFO:kafka.conn:< BrokerConnection client_id =test-python-producer, node_id =0 host =YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Closing connection.
After that open a new session and run the test-kafka_subscriber.py
.
ln-history@host :/path/to/ln-history/kafka/test-kafka ₿ /path/to/ln-history/kafka/test-kafka/.venv/bin/python test-kafka_subscriber.py Copy icon (.venv) ln-history@host test-kafka % /path/to/ln-history/kafka
/test-kafka/.venv/bin/python /Users/fabiankraus/Programming/ln-history/kafka-data/test-kafka_subscriber.py
WARNING:kafka.consumer.group:use api_version=(3,) [ tuple ] -- "3" as str is deprecated
WARNING:kafka.client:Configured api_version (3,) is ambiguous; using (3, 0)
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('test',)
INFO: __ main __ :Listening for messages...
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =0 host =YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =0 host =YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =0 host =YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =bootstrap-0 host =YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Closing connection.
INFO:kafka.cluster:Group coordinator for python-consumer-group is BrokerMetadata(nodeId='coordinator-0', host='YOUR_IP_ADDRESS', port=9092, rack=None)
INFO:kafka.coordinator:Discovered coordinator coordinator-0 for group python-consumer-group
INFO:kafka.coordinator:Starting new heartbeat thread
INFO:kafka.coordinator.consumer:Revoking previously assigned partitions set() for group python-consumer-group
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =coordinator-0 host =YOUR_IP_ADDRESS:9092 <connecting> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: connecting to YOUR_IP_ADDRESS:9092 [('YOUR_IP_ADDRESS', 9092) IPv4]
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =coordinator-0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL CA from ./kafka.truststore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =coordinator-0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Cert from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =coordinator-0 host =YOUR_IP_ADDRESS:9092 <handshake> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Loading SSL Key from ./kafka.keystore.pem
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =coordinator-0 host =YOUR_IP_ADDRESS:9092 <authenticating> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Authenticated as YOUR_KAFKA_USER via SASL / SCRAM-SHA-512
INFO:kafka.conn:< BrokerConnection client_id =test-python-consumer, node_id =coordinator-0 host =YOUR_IP_ADDRESS:9092 <connected> [IPv4 ('YOUR_IP_ADDRESS', 9092)]>: Connection complete.
INFO:kafka.coordinator:(Re-)joining group python-consumer-group
INFO:kafka.coordinator:Elected group leader -- performing partition assignments using range
INFO:kafka.coordinator:Successfully joined group python-consumer-group with generation 5
INFO:kafka.consumer.subscription_state:Updated partition assignment: [ TopicPartition(topic='test', partition=0) ]
INFO:kafka.coordinator.consumer:Setting newly assigned partitions { TopicPartition (topic = 'test' , partition = 0 ) } for group python-consumer-group
INFO: __ main __ :Received message: { 'timestamp' : 1754742448.652868 , 'message' : 'Hello from Python test producer!' }
After the message has been recieved indicated by the last line, you can stop the program by pressing CTRL + C
.
The kafka instance is now setup correctly.