from kafka import KafkaConsumer from config import KAFKA_SERVER from models.updateUsername import update_username from threading import Thread import json import logging #This function is called in the "__init__.py" file in the "subscribers" folder def consume_username_updated_event(): logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.info("Consuming username updated event...") consumer = KafkaConsumer("profile_updated_topic", bootstrap_servers=KAFKA_SERVER) for message in consumer: profile_data_str = message.value.decode("utf-8") profile_data = json.loads(profile_data_str) print("I am here") print(profile_data) update_username(profile_data) def start_kafka_consumer(): logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.info("Starting Kafka consumer...") print("Hello from kafka consumer") kafka_thread = Thread(target=consume_username_updated_event) kafka_thread.daemon = True # Daemonize the thread so it will be automatically killed when the main thread exits kafka_thread.start() # Call the start_kafka_consumer function to start the Kafka consumer thread