Skip to content
Snippets Groups Projects
Commit 32b11caf authored by Chude, Chiamaka A (PG/T - Comp Sci & Elec Eng)'s avatar Chude, Chiamaka A (PG/T - Comp Sci & Elec Eng)
Browse files

Created a function to publish product code, price, and quantity to the kafka...

Created a function to publish product code, price, and quantity to the kafka message broker for the order microservice to receive the message and update the shopping cart table with new quantity or price
parent 42d16ef5
No related branches found
No related tags found
1 merge request!6Created a function to publish product code, price, and quantity to the kafka...
Showing with 192 additions and 2 deletions
...@@ -6,7 +6,8 @@ import requests ...@@ -6,7 +6,8 @@ import requests
add_review_bp = Blueprint("addReview",__name__) add_review_bp = Blueprint("addReview",__name__)
#This connects to the user microservice to get username when a user wants to add a review
#It uses a get method to get the product id
@add_review_bp.route("/product/<int:productID>/addReview", methods=["GET"]) @add_review_bp.route("/product/<int:productID>/addReview", methods=["GET"])
def get_username_from_user_microservice(productID): def get_username_from_user_microservice(productID):
......
from flask import Blueprint, jsonify, request, json, session
from models.getReviews import get_reviews
from kafka import KafkaProducer
#This gets reviews posted by a certain user
get_review_bp = Blueprint("getReview",__name__)
@get_review_bp.route("/product/getReview", methods=["POST"])
def get_review():
user_id = session.get("user_id")
if user_id:
if request.method == 'POST':
#data = request.get_json()
#review = data.get("review")
get_reviews_by_user = get_reviews(user_id)
send_review_message(get_reviews_by_user)
return({"reviews" : get_reviews_by_user})
else:
return {"error" : "null"}
else:
return {"error" : "You need to be logged in to add a review"}
producer = KafkaProducer(bootstrap_servers = "localhost:9092")
def send_review_message(reviews):
metadata =producer.send("customer_reviews", json.dumps(reviews).encode("utf_8"))
try:
record_metadata = metadata.get(timeout=10)
print("Message sent successfully!")
print("Topic:", record_metadata.topic)
print("Partition:", record_metadata.partition)
print("Offset:", record_metadata.offset)
except Exception as e:
print("Failed to send message:", e)
\ No newline at end of file
from flask import Blueprint, jsonify, request, json, session
from models.updateProduct import update_product_info
from publishers.kafkaPublishers import publish_product_updated_event
import requests
update_product_bp = Blueprint("updateProduct",__name__)
@update_product_bp.route("/product/updateProduct", methods=["POST"])
def update_product():
user_id = session.get("user_id")
if user_id:
if request.method == 'POST':
data = request.get_json()
price = data.get("price")
quantity = data.get("quantity")
product_id = data.get("product_id")
#username = session.get('username')
if isinstance(data.get("price"), (int, float)):
if isinstance(data.get("quantity"), int):
info = {
"quantity" : quantity,
"price" : price,
"product_id" : product_id
}
update = update_product_info(info)
if "message" in update:
event_data = {"quantity" : quantity, "price" : price, "product_id" : product_id}
publish_product_updated_event(event_data)
return {"Update Status": update}
else:
return {"error" : "error"}
else:
return {"error" : "Quantity should be int"}
else:
return{"error" : "Price should be a number"}
else:
return {"error" : "null"}
else:
return {"error" : "You need to be logged in to add a review"}
\ No newline at end of file
...@@ -11,7 +11,8 @@ import subscribers ...@@ -11,7 +11,8 @@ import subscribers
from controllers.getProductController import display_product_bp from controllers.getProductController import display_product_bp
from controllers.addReviewController import add_review_bp from controllers.addReviewController import add_review_bp
from controllers.productHomeController import product_home_bp from controllers.productHomeController import product_home_bp
from controllers.getReviews import get_review_bp from controllers.getReviewsController import get_review_bp
from controllers.updateProductController import update_product_bp
...@@ -49,6 +50,7 @@ app.register_blueprint(display_product_bp) ...@@ -49,6 +50,7 @@ app.register_blueprint(display_product_bp)
app.register_blueprint(add_review_bp) app.register_blueprint(add_review_bp)
app.register_blueprint(product_home_bp) app.register_blueprint(product_home_bp)
app.register_blueprint(get_review_bp) app.register_blueprint(get_review_bp)
app.register_blueprint(update_product_bp)
......
File added
import pyodbc
from flask import jsonify
from models.database_connection import connect_db
def update_product_info(data):
try: #error handling
connection = connect_db()
cursor = connection.cursor()
#insert data into reviews and ratings table
update_product_query = '''UPDATE dbo.ProductCatalog
SET
Price = CASE WHEN ? IS NOT NULL THEN ? ELSE Price END,
StockQuantity = CASE WHEN ? IS NOT NULL THEN ? ELSE StockQuantity END
WHERE
ProductID = ?;'''
cursor.execute(update_product_query, (data["price"], data["price"], data["quantity"], data["quantity"], data["product_id"]))
#commit changes to database if no errors
connection.commit()
return {"message" : "Product updated successfully"}
except pyodbc.Error as e: #more error handling
print(f"Database error in add_review: {e}")
connection.rollback()
return {"Error" : "Database error"}
except Exception as e: #more error handling
print(f"Unexpected error occured in add_review: {e}")
connection.rollback()
return {"Error" : "Unexpected error"}
finally:
if cursor:
cursor.close()
if connection:
connection.close()
\ No newline at end of file
File added
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
import json
producer = KafkaProducer(bootstrap_servers="localhost:9092")
#Creates the topic
def create_product_updated_topic():
# Create KafkaAdminClient instance
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
# Define the topic name and configuration
topic_name = "product_updated_topic"
num_partitions = 1
replication_factor = 1
# Retrieve the list of existing topics
topic_metadata = admin_client.list_topics()
# Check if the topic already exists
if topic_name not in topic_metadata:
# Define the configuration for the new topic
new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
# Create the new topic
admin_client.create_topics(new_topics=[new_topic], validate_only=False)
#Function is called in updateProfileControllers.py
def publish_product_updated_event(event_data):
# Serialize the event data to JSON
event_json = json.dumps(event_data)
# Publish the event to the Kafka topic
data_to_send = producer.send("product_updated_topic", value=event_json.encode("utf-8"))
try:
record_metadata = data_to_send.get(timeout=10)
print("Message sent successfully!")
print("Topic:", record_metadata.topic)
print("Partition:", record_metadata.partition)
print("Offset:", record_metadata.offset)
except Exception as e:
print("Failed to send message:", e)
producer.flush()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment