Skip to content
Snippets Groups Projects
Commit fdec3b37 authored by Chude, Chiamaka A (PG/T - Comp Sci & Elec Eng)'s avatar Chude, Chiamaka A (PG/T - Comp Sci & Elec Eng)
Browse files

Update - implemented the event driven architecture with kafka event store and...

Update - implemented the event driven architecture with kafka event store and streaming service for asynchronous communication between the product and user microservice
parent eca07e4b
No related branches found
No related tags found
1 merge request!5Update - implemented the event driven architecture using kafka
Showing
with 194 additions and 1 deletion
No preview for this file type
......@@ -4,4 +4,6 @@ import os
DEBUG = True
SECRET_KEY = "Group3"
PORT = 5001
\ No newline at end of file
PORT = 5001
KAFKA_SERVER= "localhost:9092"
\ No newline at end of file
File added
from flask import Blueprint, jsonify, request, json, session
from models.getReviews import get_reviews
from kafka import KafkaProducer
get_review_bp = Blueprint("getReview",__name__)
@get_review_bp.route("/product/getReview", methods=["POST"])
def get_review():
user_id = session.get("user_id")
if user_id:
if request.method == 'POST':
#data = request.get_json()
#review = data.get("review")
get_reviews_by_user = get_reviews(user_id)
send_review_message(get_reviews_by_user)
return({"reviews" : get_reviews_by_user})
else:
return {"error" : "null"}
else:
return {"error" : "You need to be logged in to add a review"}
producer = KafkaProducer(bootstrap_servers = "localhost:9092")
def send_review_message(reviews):
metadata =producer.send("customer_reviews", json.dumps(reviews).encode("utf_8"))
try:
record_metadata = metadata.get(timeout=10)
print("Message sent successfully!")
print("Topic:", record_metadata.topic)
print("Partition:", record_metadata.partition)
print("Offset:", record_metadata.offset)
except Exception as e:
print("Failed to send message:", e)
\ No newline at end of file
......@@ -6,10 +6,12 @@ from flask import jsonify
from config import DEBUG, SECRET_KEY, PORT
import os
import requests
import subscribers
from controllers.getProductController import display_product_bp
from controllers.addReviewController import add_review_bp
from controllers.productHomeController import product_home_bp
from controllers.getReviews import get_review_bp
......@@ -24,6 +26,9 @@ app.secret_key = SECRET_KEY
# Read user microservice URL from environment variable
USER_MICROSERVICE_URL = os.getenv('USER_MICROSERVICE_URL', 'http://127.0.0.1:5000')
#subscribers.consume_username_updated_event()
@app.route('/product', methods=['POST'])
def get_session_id():
session_id = session.get('user_id')
......@@ -43,8 +48,14 @@ def index():
app.register_blueprint(display_product_bp)
app.register_blueprint(add_review_bp)
app.register_blueprint(product_home_bp)
app.register_blueprint(get_review_bp)
if __name__ == '__main__':
subscribers.start_kafka_consumer()
app.run(debug=DEBUG, port=PORT)
\ No newline at end of file
File added
File added
#from app import db
import pyodbc
from flask import jsonify
from models.database_connection import connect_db
#Function to get user info
def get_reviews(data):
try: #error handling
connection = connect_db()
cursor = connection.cursor()
user_id = data
#Get reviews from database
reviews_query = "SELECT CustomerID, Username, Review, rating, ProductID FROM dbo.ReviewsAndRatings WHERE CustomerID= ?"
cursor.execute(reviews_query, user_id)
reviews = cursor.fetchall()
reviews_data = [{"CustomerID": row[0], "Username": row[1], "Review": row[2], "rating" : row[3], "ProductID" : row[4]} for row in reviews]
#connection.close()
return (reviews_data)
except pyodbc.Error as e: #error handling
print(f"Database error in get_review: {e}")
return None
except Exception as e: #error handling
print(f"Unexpected error occured in get_review: {e}")
return None
finally:
if cursor:
cursor.close()
if connection:
connection.close()
\ No newline at end of file
#from app import db
from flask import jsonify
import pyodbc
from models.database_connection import connect_db
def update_username(data):
try: #error handling
connection = connect_db()
cursor = connection.cursor()
user_id = data["user_id"]
username = data["new_username"]
#insert data into user table
update_user_query = '''UPDATE dbo.ReviewsAndRatings
SET
Username= ?
WHERE
CustomerID= ?'''
cursor.execute(update_user_query, (username, user_id))
#commit changes to database if no errors
connection.commit()
return {"message" : "Username updated successfully"}
except pyodbc.Error as e: #more error handling
print(f"Database error in update_username: {e}")
connection.rollback()
return {"Error" : "Database error"}
except Exception as e: #more error handling
print(f"Unexpected error occured in update_username: {e}")
connection.rollback()
return {"Error" : "Unexpected error"}
finally:
if cursor:
cursor.close()
if connection:
connection.close()
\ No newline at end of file
print("Subscribers package initialized")
from subscribers.updateUsernameSubscriber import consume_username_updated_event
from subscribers.updateUsernameSubscriber import start_kafka_consumer
\ No newline at end of file
File added
from kafka import KafkaConsumer
from config import KAFKA_SERVER
from models.updateUsername import update_username
from threading import Thread
import json
import logging
#This function is called in the "__init__.py" file in the "subscribers" folder
def consume_username_updated_event():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Consuming username updated event...")
consumer = KafkaConsumer("profile_updated_topic", bootstrap_servers=KAFKA_SERVER)
for message in consumer:
profile_data_str = message.value.decode("utf-8")
profile_data = json.loads(profile_data_str)
print("I am here")
print(profile_data)
update_username(profile_data)
def start_kafka_consumer():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Starting Kafka consumer...")
print("Hello from kafka consumer")
kafka_thread = Thread(target=consume_username_updated_event)
kafka_thread.daemon = True # Daemonize the thread so it will be automatically killed when the main thread exits
kafka_thread.start()
# Call the start_kafka_consumer function to start the Kafka consumer thread
\ No newline at end of file
File added
import os
DEBUG = True
SECRET_KEY = "Group3"
PORT = 5000
\ No newline at end of file
No preview for this file type
No preview for this file type
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment