Skip to content
Snippets Groups Projects
Commit c7f5ae47 authored by Chude, Chiamaka A (PG/T - Comp Sci & Elec Eng)'s avatar Chude, Chiamaka A (PG/T - Comp Sci & Elec Eng)
Browse files

Merge branch 'Chiamaka_Chude' into 'main'

Created a function to publish product code, price, and quantity to the kafka...

See merge request !6
parents 345af56b 32b11caf
No related branches found
No related tags found
1 merge request!6Created a function to publish product code, price, and quantity to the kafka...
Showing with 192 additions and 2 deletions
...@@ -6,7 +6,8 @@ import requests ...@@ -6,7 +6,8 @@ import requests
add_review_bp = Blueprint("addReview",__name__) add_review_bp = Blueprint("addReview",__name__)
#This connects to the user microservice to get username when a user wants to add a review
#It uses a get method to get the product id
@add_review_bp.route("/product/<int:productID>/addReview", methods=["GET"]) @add_review_bp.route("/product/<int:productID>/addReview", methods=["GET"])
def get_username_from_user_microservice(productID): def get_username_from_user_microservice(productID):
......
from flask import Blueprint, jsonify, request, json, session
from models.getReviews import get_reviews
from kafka import KafkaProducer
#This gets reviews posted by a certain user
get_review_bp = Blueprint("getReview",__name__)
@get_review_bp.route("/product/getReview", methods=["POST"])
def get_review():
user_id = session.get("user_id")
if user_id:
if request.method == 'POST':
#data = request.get_json()
#review = data.get("review")
get_reviews_by_user = get_reviews(user_id)
send_review_message(get_reviews_by_user)
return({"reviews" : get_reviews_by_user})
else:
return {"error" : "null"}
else:
return {"error" : "You need to be logged in to add a review"}
producer = KafkaProducer(bootstrap_servers = "localhost:9092")
def send_review_message(reviews):
metadata =producer.send("customer_reviews", json.dumps(reviews).encode("utf_8"))
try:
record_metadata = metadata.get(timeout=10)
print("Message sent successfully!")
print("Topic:", record_metadata.topic)
print("Partition:", record_metadata.partition)
print("Offset:", record_metadata.offset)
except Exception as e:
print("Failed to send message:", e)
\ No newline at end of file
from flask import Blueprint, jsonify, request, json, session
from models.updateProduct import update_product_info
from publishers.kafkaPublishers import publish_product_updated_event
import requests
update_product_bp = Blueprint("updateProduct",__name__)
@update_product_bp.route("/product/updateProduct", methods=["POST"])
def update_product():
user_id = session.get("user_id")
if user_id:
if request.method == 'POST':
data = request.get_json()
price = data.get("price")
quantity = data.get("quantity")
product_id = data.get("product_id")
#username = session.get('username')
if isinstance(data.get("price"), (int, float)):
if isinstance(data.get("quantity"), int):
info = {
"quantity" : quantity,
"price" : price,
"product_id" : product_id
}
update = update_product_info(info)
if "message" in update:
event_data = {"quantity" : quantity, "price" : price, "product_id" : product_id}
publish_product_updated_event(event_data)
return {"Update Status": update}
else:
return {"error" : "error"}
else:
return {"error" : "Quantity should be int"}
else:
return{"error" : "Price should be a number"}
else:
return {"error" : "null"}
else:
return {"error" : "You need to be logged in to add a review"}
\ No newline at end of file
...@@ -11,7 +11,8 @@ import subscribers ...@@ -11,7 +11,8 @@ import subscribers
from controllers.getProductController import display_product_bp from controllers.getProductController import display_product_bp
from controllers.addReviewController import add_review_bp from controllers.addReviewController import add_review_bp
from controllers.productHomeController import product_home_bp from controllers.productHomeController import product_home_bp
from controllers.getReviews import get_review_bp from controllers.getReviewsController import get_review_bp
from controllers.updateProductController import update_product_bp
...@@ -49,6 +50,7 @@ app.register_blueprint(display_product_bp) ...@@ -49,6 +50,7 @@ app.register_blueprint(display_product_bp)
app.register_blueprint(add_review_bp) app.register_blueprint(add_review_bp)
app.register_blueprint(product_home_bp) app.register_blueprint(product_home_bp)
app.register_blueprint(get_review_bp) app.register_blueprint(get_review_bp)
app.register_blueprint(update_product_bp)
......
File added
import pyodbc
from flask import jsonify
from models.database_connection import connect_db
def update_product_info(data):
try: #error handling
connection = connect_db()
cursor = connection.cursor()
#insert data into reviews and ratings table
update_product_query = '''UPDATE dbo.ProductCatalog
SET
Price = CASE WHEN ? IS NOT NULL THEN ? ELSE Price END,
StockQuantity = CASE WHEN ? IS NOT NULL THEN ? ELSE StockQuantity END
WHERE
ProductID = ?;'''
cursor.execute(update_product_query, (data["price"], data["price"], data["quantity"], data["quantity"], data["product_id"]))
#commit changes to database if no errors
connection.commit()
return {"message" : "Product updated successfully"}
except pyodbc.Error as e: #more error handling
print(f"Database error in add_review: {e}")
connection.rollback()
return {"Error" : "Database error"}
except Exception as e: #more error handling
print(f"Unexpected error occured in add_review: {e}")
connection.rollback()
return {"Error" : "Unexpected error"}
finally:
if cursor:
cursor.close()
if connection:
connection.close()
\ No newline at end of file
File added
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
import json
producer = KafkaProducer(bootstrap_servers="localhost:9092")
#Creates the topic
def create_product_updated_topic():
# Create KafkaAdminClient instance
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
# Define the topic name and configuration
topic_name = "product_updated_topic"
num_partitions = 1
replication_factor = 1
# Retrieve the list of existing topics
topic_metadata = admin_client.list_topics()
# Check if the topic already exists
if topic_name not in topic_metadata:
# Define the configuration for the new topic
new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
# Create the new topic
admin_client.create_topics(new_topics=[new_topic], validate_only=False)
#Function is called in updateProfileControllers.py
def publish_product_updated_event(event_data):
# Serialize the event data to JSON
event_json = json.dumps(event_data)
# Publish the event to the Kafka topic
data_to_send = producer.send("product_updated_topic", value=event_json.encode("utf-8"))
try:
record_metadata = data_to_send.get(timeout=10)
print("Message sent successfully!")
print("Topic:", record_metadata.topic)
print("Partition:", record_metadata.partition)
print("Offset:", record_metadata.offset)
except Exception as e:
print("Failed to send message:", e)
producer.flush()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment