Skip to content
Snippets Groups Projects
Commit 00ed983b authored by Chude, Chiamaka A (PG/T - Comp Sci & Elec Eng)'s avatar Chude, Chiamaka A (PG/T - Comp Sci & Elec Eng)
Browse files

Final commit for Advanced Web Project. Added docker and docker compose files...

Final commit for Advanced Web Project. Added docker and docker compose files for containerisation. Bug were also removed after testing.
parent 5442f370
No related branches found
No related tags found
1 merge request!9Final commit for User Microservice. Added docker and docker compose files for...
Showing
with 179 additions and 0 deletions
File added
File added
File added
File added
File added
File added
File added
File added
File added
File added
File added
import pyodbc
import os
print("Outside db connection function")
#Connect to database
def connect_db():
print("In CONNECT DB")
try:
server = '34.39.6.180'
database = 'ordermanagementdatabase'
username = 'sqlserver'
password = 'WebTechGroup3'
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f'DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password};Trusted_Connection=no;'
return pyodbc.connect(connection_string)
except Exception as e:
# If the file doesn't exist
print("Unexpected error occured {e}")
return False
import pyodbc
from models.database_connection import connect_db
def fetch_order_history(user_id):
try:
# Open database connection
connection = connect_db()
cursor = connection.cursor()
# Fetch the order history for the user
cursor.execute("""
SELECT og.OrderGroupID, og.CustomerID, og.TransactionDate, og.Location, og.OrdersStatus, og.TotalPrice, oi.OrderItemID, oi.ProductID, oi.UnitPrice, oi.Quantity, oi.TotalItemPrice
FROM OrderGroup og
INNER JOIN OrderItem oi ON og.OrderGroupID = oi.OrderGroupID
WHERE og.CustomerID = ?
ORDER BY og.TransactionDate DESC;""", (user_id,))
# Fetch all records and close cursor and connection
records = cursor.fetchall()
cursor.close()
connection.close()
# Process the fetched data into a structured format for the API response
order_history = {}
for record in records:
# Unpack the fetched row
order_group_id, customer_id, transaction_date, location, orders_status, total_price, order_item_id, product_id, unit_price, quantity, total_item_price = record
if order_group_id not in order_history:
order_history[order_group_id] = {
"CustomerID": customer_id,
"TransactionDate": transaction_date,
"Location": location,
"OrdersStatus": orders_status,
"TotalPrice": total_price,
"Items": []
}
# Append this item to the items list for the order group
order_history[order_group_id]["Items"].append({
"OrderItemID": order_item_id,
"ProductID": product_id,
"UnitPrice": unit_price,
"Quantity": quantity,
"TotalItemPrice": total_item_price
})
# Convert to list of order histories
order_history_list = list(order_history.values())
return order_history_list
except pyodbc.Error as e:
# Close cursor and connection in case of error
if cursor:
cursor.close()
if connection:
connection.close()
return {"error": str(e)}
File added
File added
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from config import KAFKA_SERVER
import json
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER)
#Creates the topic
def create_Quantity_updated_topic():
admin_client = KafkaAdminClient(bootstrap_servers=KAFKA_SERVER)
# Define the topic name
topic_name = "QuantityUpdateFromOrders"
num_partitions = 1
replication_factor = 1
# Retrieve the list of existing topics
topic_metadata = admin_client.list_topics()
# Check if the topic already exists
if topic_name not in topic_metadata:
# Define the configuration for the new topic
new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
# Create the new topic
admin_client.create_topics(new_topics=[new_topic], validate_only=False)
def publish_product_updated_event(event_data):
# Serialize the event data to JSON
event_json = json.dumps(event_data)
# Publish the event to the Kafka topic
data_to_send = producer.send("QuantityUpdateFromOrders", value=event_json.encode("utf-8"))
try:
record_metadata = data_to_send.get(timeout=10)
print("Message sent successfully!")
print("Topic:", record_metadata.topic)
print("Partition:", record_metadata.partition)
print("Offset:", record_metadata.offset)
except Exception as e:
print("Failed to send message:", e)
producer.flush()
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(debug=app.config['DEBUG'], port=app.config['PORT'])
\ No newline at end of file
import sys
print(sys.path)
from threading import Thread
from kafka import KafkaConsumer
import json
import logging
from config import KAFKA_SERVER
#KAFKA_SERVER= "localhost:9092"
TOPIC_NAME = "product_updated_topic"
from models.UpdateProduct import UpdateProduct
def consume_product_updated_event():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Consuming product updated event...")
# Create KafkaConsumer listening on the product_updated_topic
consumer = KafkaConsumer("product_updated_topic", bootstrap_servers=KAFKA_SERVER)
for message in consumer:
product_data_str = message.value.decode("utf-8")
product_data = json.loads(product_data_str)
logging.info("Received product update: {}".format(product_data))
# Call your UpdateProduct function passing the dictionary containing product details
UpdateProduct(product_id=product_data['product_id'],
quantity=product_data['quantity'],
price=product_data['price'])
def start_kafka_consumer():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Starting Kafka consumer...")
# Starting the consumer in a separate thread
kafka_thread = Thread(target=consume_product_updated_event)
kafka_thread.daemon = True
kafka_thread.start()
\ No newline at end of file
print("Subscribers package initialized")
from subscriber.UpdateProductSubscriber import consume_product_updated_event
from subscriber.UpdateProductSubscriber import start_kafka_consumer
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment