From 32b11cafcb4aaee57df66f7073c887a409d65bc4 Mon Sep 17 00:00:00 2001 From: "cc02503@surrey.ac.uk" <cc02503@surrey.ac.uk> Date: Sat, 6 Apr 2024 20:52:53 +0100 Subject: [PATCH] Created a function to publish product code, price, and quantity to the kafka message broker for the order microservice to receive the message and update the shopping cart table with new quantity or price --- .../updateProductController.cpython-311.pyc | Bin 0 -> 2186 bytes .../app/controllers/addReviewController.py | 3 +- .../app/controllers/getReviewsController.py | 44 ++++++++++++++ .../controllers/updateProductController.py | 54 ++++++++++++++++++ Product_MicroService_Group3/app/index.py | 4 +- .../__pycache__/updateProduct.cpython-311.pyc | Bin 0 -> 2562 bytes .../app/models/updateProduct.py | 45 +++++++++++++++ .../kafkaPublishers.cpython-311.pyc | Bin 0 -> 2336 bytes .../app/publishers/kafkaPublishers.py | 44 ++++++++++++++ 9 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 Product_MicroService_Group3/app/controllers/__pycache__/updateProductController.cpython-311.pyc create mode 100644 Product_MicroService_Group3/app/controllers/getReviewsController.py create mode 100644 Product_MicroService_Group3/app/controllers/updateProductController.py create mode 100644 Product_MicroService_Group3/app/models/__pycache__/updateProduct.cpython-311.pyc create mode 100644 Product_MicroService_Group3/app/models/updateProduct.py create mode 100644 Product_MicroService_Group3/app/publishers/__pycache__/kafkaPublishers.cpython-311.pyc create mode 100644 Product_MicroService_Group3/app/publishers/kafkaPublishers.py diff --git a/Product_MicroService_Group3/app/controllers/__pycache__/updateProductController.cpython-311.pyc b/Product_MicroService_Group3/app/controllers/__pycache__/updateProductController.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..9a851b605b4771ad33e986818477a8d99267d3c2 GIT binary patch literal 2186 zcma(SU2M}v_?$R#k~RrhLhBY<%xEZ~jEXX#f)3U-Eo{=DQ(<UPrpV1*61R>W&UQzu zAo0LMA9z5#G(t$^sW7ogXc7|I(_Z)HAWMrZAyt~tG-(f+5HIXycTSo%k&rl_@4o-L z@4N52dmIb~0BOJ9iRAwA0`MnAwj0|5-aI7027mxEGAJ@Brszt!itd!V$fnq$C*@%% z&n0_{T#9pGx9ltWQ+@|#<v=Rn0*HYg)U)XV;3@f=E!B&9tHEt-_YA#|%QHXuI_07N z4%j4OPurrY5M&oXBJek*Bs{kN3)w&=EGfFpEorJE<yLJDqZ<a&bep9Vo70e{Nve`y z?5I(KqK+~ptU@EJXCx)367o>VSd=BL(55%VU<R!qMZatZoA`WFYprj*r9a+5x7qnC z3s;Fbu88!43bkx|*97p8-%b8E$4E3bfX?hNm)bC7HVOQ+jo$)gfdA9C&xNQgQ5#N! zu4N~#_%EH>ZndShY-ikVF=eLOw!1A&XXx%))Ae3SwoB6eUP(`vB&+xAtodH8uAS(; zJ34x32hO=Vvji`GkoB(J@3!7$?{wK+c9mJ^edxaB(IY!6m%IHUSzGq506efugVUqC zG`P-4kP7jYy%Mtkpdb2MbD%Z%wC3K{9Het1blK(%4dILgF^!__A^Dy~w(o`^D!Qbv z+JSaH!-N~}Au>2j2fA~bjhZIr(OU4Dll;PhF6xG6dl1GdUhDg&Wkt{msv$#R5ecN4 z*7$k4$S$HNC`NG+VVhM9SzbHxooWaQLQv3Esw1oUJV7Ld;v$5CC}6ZAp&t?f%+Y<g z^GL@uQ*CdvK5QSMWSkNWNSdT*x~LGh+_aWGIaw8T?4w@z$&m12ngBLS%2#YU=eX&3 z$kH;lz0G2@15VLqsGvsmg;sm_>&)5oH4;ZHEf!a^h4d8#C9$-E(n&RI6iK0L>3PCc z6;V!KQwRfVB)Ec_6BZ1N(Q0}|HL!++>ndJOPs0_uTPVyFL|H~k9%<C~>u6E9in0Yo zCGn6DvDmt4zLv6BT|jt+1T}LJt48T$S}c{)Syj=oN-U6Vb=)R1ZN~A^svT<bwl63w z5XWdPXtSiIZ-cj95bnFXTuIbIBUWhS_QfqO)BuM)bIhxrkM5qYeEYKJklAzSb#%~* z9;!v-Ry4kztjG9`OZP8TuGeC5D;8g$t@rQSxOx9(HBjpxv--!@FVzQz9`O(O>ap6u zxHT}o{uNng&tPTb(UAv7s%L70W7go<&)Mgvo}_ExvsU<Q1MCfc_BuLLfsYCg3e~mR z@Psuy@qDHhowTBp4G@WZ&TPd$see3LA3f3F*!_JCK!$tCx4mHa#Ism!=(II-`rcf< zU#QNR{YOc!pFH=QVSb-C;|0<yfmi&gXG=AH+Ty28etJt7f7WLnm^AsxEn)P@sCi(* z<R=<lFf?KwK5y>3V8$+N^$(i|&zXHw=ANlW0PNjg$(hl(*&8Qf5#Ah3n&DZ~H~WS} z6C}bk^F(bK(`y#fyVOZpOkPUROygKlg-F(pcN~_JsP<s%86jtRd|Avbi}THAh2TyS z+rHMrq}jeqMiGlBld&V6C!1L;*&dP}I&#kVM}QB~r4Nwdoo|Y)ZOG_6K1xt}nQOn0 zq2Xp2rVgge59@XCsoB}<AZd2?EfBu#d?ZGDhwpt^QJ)lQu`^cej0qxy=8FEo@GlwO m960e}?!~0Z@HHlBG0BGOC^N@203H8z1J^9mMQpMhpZ*6f$~uMs literal 0 HcmV?d00001 diff --git a/Product_MicroService_Group3/app/controllers/addReviewController.py b/Product_MicroService_Group3/app/controllers/addReviewController.py index 1a2b5d35..4f85e35e 100644 --- a/Product_MicroService_Group3/app/controllers/addReviewController.py +++ b/Product_MicroService_Group3/app/controllers/addReviewController.py @@ -6,7 +6,8 @@ import requests add_review_bp = Blueprint("addReview",__name__) - +#This connects to the user microservice to get username when a user wants to add a review +#It uses a get method to get the product id @add_review_bp.route("/product/<int:productID>/addReview", methods=["GET"]) def get_username_from_user_microservice(productID): diff --git a/Product_MicroService_Group3/app/controllers/getReviewsController.py b/Product_MicroService_Group3/app/controllers/getReviewsController.py new file mode 100644 index 00000000..49b5f7c6 --- /dev/null +++ b/Product_MicroService_Group3/app/controllers/getReviewsController.py @@ -0,0 +1,44 @@ +from flask import Blueprint, jsonify, request, json, session +from models.getReviews import get_reviews + +from kafka import KafkaProducer + +#This gets reviews posted by a certain user +get_review_bp = Blueprint("getReview",__name__) + +@get_review_bp.route("/product/getReview", methods=["POST"]) +def get_review(): + + user_id = session.get("user_id") + + if user_id: + if request.method == 'POST': + + #data = request.get_json() + #review = data.get("review") + + get_reviews_by_user = get_reviews(user_id) + + send_review_message(get_reviews_by_user) + + return({"reviews" : get_reviews_by_user}) + + else: + return {"error" : "null"} + + else: + return {"error" : "You need to be logged in to add a review"} + + +producer = KafkaProducer(bootstrap_servers = "localhost:9092") + +def send_review_message(reviews): + metadata =producer.send("customer_reviews", json.dumps(reviews).encode("utf_8")) + try: + record_metadata = metadata.get(timeout=10) + print("Message sent successfully!") + print("Topic:", record_metadata.topic) + print("Partition:", record_metadata.partition) + print("Offset:", record_metadata.offset) + except Exception as e: + print("Failed to send message:", e) \ No newline at end of file diff --git a/Product_MicroService_Group3/app/controllers/updateProductController.py b/Product_MicroService_Group3/app/controllers/updateProductController.py new file mode 100644 index 00000000..70d31a74 --- /dev/null +++ b/Product_MicroService_Group3/app/controllers/updateProductController.py @@ -0,0 +1,54 @@ +from flask import Blueprint, jsonify, request, json, session +from models.updateProduct import update_product_info +from publishers.kafkaPublishers import publish_product_updated_event + +import requests + + +update_product_bp = Blueprint("updateProduct",__name__) + + +@update_product_bp.route("/product/updateProduct", methods=["POST"]) +def update_product(): + + user_id = session.get("user_id") + + if user_id: + if request.method == 'POST': + + data = request.get_json() + price = data.get("price") + quantity = data.get("quantity") + product_id = data.get("product_id") + #username = session.get('username') + + if isinstance(data.get("price"), (int, float)): + + if isinstance(data.get("quantity"), int): + info = { + "quantity" : quantity, + "price" : price, + "product_id" : product_id + } + + update = update_product_info(info) + if "message" in update: + event_data = {"quantity" : quantity, "price" : price, "product_id" : product_id} + publish_product_updated_event(event_data) + + return {"Update Status": update} + else: + return {"error" : "error"} + else: + + return {"error" : "Quantity should be int"} + else: + return{"error" : "Price should be a number"} + + + + else: + return {"error" : "null"} + + else: + return {"error" : "You need to be logged in to add a review"} \ No newline at end of file diff --git a/Product_MicroService_Group3/app/index.py b/Product_MicroService_Group3/app/index.py index 79afca3d..676b86d8 100644 --- a/Product_MicroService_Group3/app/index.py +++ b/Product_MicroService_Group3/app/index.py @@ -11,7 +11,8 @@ import subscribers from controllers.getProductController import display_product_bp from controllers.addReviewController import add_review_bp from controllers.productHomeController import product_home_bp -from controllers.getReviews import get_review_bp +from controllers.getReviewsController import get_review_bp +from controllers.updateProductController import update_product_bp @@ -49,6 +50,7 @@ app.register_blueprint(display_product_bp) app.register_blueprint(add_review_bp) app.register_blueprint(product_home_bp) app.register_blueprint(get_review_bp) +app.register_blueprint(update_product_bp) diff --git a/Product_MicroService_Group3/app/models/__pycache__/updateProduct.cpython-311.pyc b/Product_MicroService_Group3/app/models/__pycache__/updateProduct.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..68e89d2ebd24e81165e2c46e12cb0eb9b6280211 GIT binary patch literal 2562 zcmcguO=ufO6rR;;<*_1Lmh)Ffa=av_ts2*9>q0|fH8_^igu04T*@U>;nDy=`*?6@p z&#r9Cl}ji+q#+Org&aa5r!>Vq^q4~_B|oLdwGmh>6bhw>-c-_n$f<AiV?}lgAt4>j zJiWhp^S(E;v-?d)haW+UeiBN*At3ZS6B^A^XC4j!a~-Kj<#aT~{yjRE@l1I*#`8L# z@lJVx_kND11eHsnsPHG7nv8O8+eOPXv~(U=zhW8&QS51TM&atA*<U*h9>zlz=yee7 zC#<)?umYpjmbn6V^9gH1kI*JRkIF6c1<%a}gr*~K1-<|fdNy=hB)-MgDtVi>v~UaJ zuWYQrA;^v)+0t6GgHj2qPvZ(6x&w0bZgB_2<{1T!?u6)1<(on&v>(~kjjhGL=xeOe zkEp!twSx^AA+@dGy@9^=+=9BUY}B@j1vkB#+~&+wthQ^M;yDY4VcGks_eG+rt+2q^ z-OYUd1(E-`5pDaw5iOua{w!MH_M;b7zXjA-BN|99{Wv)uADKvq>Wn!!PE9qZ*rT#7 z>*jgUpGr)Kei7O@)f6Je#L<ydLOgRKkra=LCsShb)P$Iv92*lSYO2H-m}|a?WPG4H zHD#O1?CG3r*qWVx$`Gw{Ae0~0{4mkTrJCf)I0QR-#1*nEHy6*<mvH^r8j5L6b=xw; zvgGq*DN=_f=CZ166IHZwiUPKDPS^8Gd*gt3Mz)AZC^f058KSJJ)0E6>WZ|&r3JIoO z>TGpedUetui&?nHAYN^vsVF%L(xw1QU7J0le#)L;*9ZQVN!>QGNR*sSKw)Mwn(YdT zZdxE_^CsMqls&>#sD|xgYU=uotjxOYiA9BEZOt^Ig6jof^2Xf?I=YQb+EGJ1-Kff3 zj?lcz5eqJP#P(KvyfS)NnzRVDBsr5;E=Z>g5~tcck>aM3%Mio1q;asChOA4I1{kQt z_R>s?sT`#wFO8ajf{15KIxCH+^Rl5p4Ms1>x=xJq#A4+*LuSMYqFgXcI9VEji1l6i zKvSrhB6J>har!-K=CVVQoXtuZQzg13)v~WU4`%c3_Dy&+BW==`081MTD+YJ)B~<qI z3w?j=?E6ysO1kVXg~IEheQTk8tD*j4sQ<FBj0JI*gCpetY7buPywd6HK5$$8{+;hK zcP4&%<%f@di94r1Dvn9cxfw^v6wet>)-EG7&mHspjnEu7>H%pm?)eK|SqY{B3K~ZJ zsD1l7?peb<t2kW5VF!nQ>*{eLhn?V&@*WiEE+eizTncnw8@n?0Me=I$a<a5-CsX#7 z0=w1&k+nc%HL$lB*y~h#nFmYdQ5a@k9WEnY7-*7Tafj3S*6leU&{y$L5f3?dsD!)M zad-`fS8;C<_d2+@)V15$^O_Sp;NX^C?t0dQDwWEk5DGxzyfD}#)r^Mkgq+S{=&LwZ z#4!iQ{v)FSC-}O9TYA~|92u2EkPKv2fqi|(LRTu*2{=Zdjl6xVAKmQ_CSv^Ep=dnF z--~d--|O`rN9caA1Ni%qXyOq6Kw$a@KBj+gNML-72mWVt@c2PK>Z9!a>I!LHwq_~& zB5?QA4)!2>TG^E|UH=$j4=lS9s%QCI^blA?cLRBhhha)iC-2f;5ZQ~_dLIZJ4UQ|J dj!V^F3H3Owy@cL$T6-D4%=MMgRzf2X?;oYzW(5EM literal 0 HcmV?d00001 diff --git a/Product_MicroService_Group3/app/models/updateProduct.py b/Product_MicroService_Group3/app/models/updateProduct.py new file mode 100644 index 00000000..4ad88367 --- /dev/null +++ b/Product_MicroService_Group3/app/models/updateProduct.py @@ -0,0 +1,45 @@ +import pyodbc +from flask import jsonify +from models.database_connection import connect_db + + + +def update_product_info(data): + + try: #error handling + + connection = connect_db() + cursor = connection.cursor() + + + #insert data into reviews and ratings table + update_product_query = '''UPDATE dbo.ProductCatalog +SET + Price = CASE WHEN ? IS NOT NULL THEN ? ELSE Price END, + StockQuantity = CASE WHEN ? IS NOT NULL THEN ? ELSE StockQuantity END +WHERE + ProductID = ?;''' + cursor.execute(update_product_query, (data["price"], data["price"], data["quantity"], data["quantity"], data["product_id"])) + + + + #commit changes to database if no errors + connection.commit() + + return {"message" : "Product updated successfully"} + + except pyodbc.Error as e: #more error handling + print(f"Database error in add_review: {e}") + connection.rollback() + return {"Error" : "Database error"} + + except Exception as e: #more error handling + print(f"Unexpected error occured in add_review: {e}") + connection.rollback() + return {"Error" : "Unexpected error"} + + finally: + if cursor: + cursor.close() + if connection: + connection.close() \ No newline at end of file diff --git a/Product_MicroService_Group3/app/publishers/__pycache__/kafkaPublishers.cpython-311.pyc b/Product_MicroService_Group3/app/publishers/__pycache__/kafkaPublishers.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0dc8ca0740bb9e53a5d54a2c55ad96fb36f17dbe GIT binary patch literal 2336 zcma(TTWk|YaL;#VukAPxV)Jk*n1GtvAqj#uf;6c@Ly1I-6B-bTvFaXo>*UD!&e}UC zB(kfBR%$-0suKCk7X^`z_OI$!OMm)#j!sH+5)x9C5B^f+2l3U}^CNMfQpdY<JF~Mp zv$Hd^{wWd(Apqq;M}EqW(BF(`0AC;Mq+#$7DM;ZoRNyik$HqR*SKu>zLC6Rm&1+&t z^dZ72!kYMqL+B~=8f5&6|67y^C;`A&;U`fNzhqNM&W(H~<!7Y{YA7X{(4@~j=rJxU z1yvu_RH9ohPLuhYMp2d1uiG_4mb4kev<5GoyLA2u=XTsR49m2rRLq%#&Jk+Lb*<h< z1kBE{^BbUt$ZFaIXXe&?t#($B!dJlliVr$pMyq?Iz1=+Y2=LP;SrN)yS(xMKS&Qka z1Mqxh9{d2hpEmr6DkALJq#Cm9TNbA|#lIGK<O6x=HDG?7^m-DfDO?Yt0YGl7=()?v zm5PdF5hZ6a@4RHzCV5xTr2=u=^->{Ml&Gazs-c_iAxesxDoc#Y<t5oN=vDA`NGJ2P z<)#~%lQflWm@{;3At_LSDGO_=Y1Kt2JE9wrDPdBcRx{~$+az{dIk|RSA&;M9R?rB@ z({{F~y~ZsNO9B;=8{AdQ4wmrX^)I7?*;{arW>zXJ$TQh-om`>n9LZiW<Wd2G!OTv; ztf5O<_LdG4)Ksm7gpp57mMA3)*-@iJO_I26(An%|Wlqv%q9jIVBuyjwG%*vZp14i! zCT<dWMmIELS|zY!-J91{nHrN2M5;`3W7H@WFJz@+F<UI%1s}{Xt7g4mZPI<k1^0N} z8*jZ6dKAvaGR*u2&=T6}JZ%qLwL8bEonubt*wW|Qesn0gbZs+scqzSkEN%;jfpfTP zBi6ef>#fH6oml@;8d&(y$_YEr4Q(rWd^Pp!m-biGj+)h|=|s)t;0qkxz+LOO>+!<l z*}q~o?yBN(2ans0wjD(HeOSE%W*~WQTr=;pnB3U}Zfl0zYs!_;zD(gP794eWdrT`_ znOof>)sTkF+n0B1d~4iSd_h$9t?tREJ!{{nE*0KlVO9sl-)bp`T4n=f{vOLA4_dYE zlf)p2|CeMCB!mAYsdt}7R<tpT&>2MI6`{qZP+8b-$rNOLMfh6KaqT<L&7=GL6q;}K zdw6%>-BD|Os@0HWI89j?E{Y{9-+K}2t_aCqA}<*esDPywh*7c@<JXC4O4B4^Lb4^y zk}LzAFKODssYU-yR%QklLlcd%GPoER&*x2I4K5zJDybUenq{!1N}^EPXE5333ilvk zT~R3&il&Qec|xcwFge#xblFgd%TE)_6^m4bf-ict!VNadm+Ln?HE!_J`!XrA8s>_5 ztz^z5MK?s|z}B2sRqA6;VH@K>k%D&t^c<i>HmH(o79ft8EWo_FLyy47eYc+Ix&5i^ zDWLrza}v7w4<Jb8j@Xa!m21_GZl|Mr>DpH3I}e9{7`9Jc{5`ZWa$|kuhW*7<btL1A zWWFD+cHVJ1??ATu0^-@t_<I}i)OtMi^Laa-s>TPL_`vdk=Qv`wr>eNe!96za0djd} z<$!avw~G55+-KuH#?n^Co#P)?@qmK|Y&^hN>dHyySbr6tbMQIaYp<S%quYoR&+Z~` z_wL6X&HX{e-u4P$n};Rv-4fcqOFu|gg{UJ$?M5??!KwccFQ54&g`TBCqwV~&4-Sq7 z`9FgqjFUlE^lBqzrNj+;<38_M@Lmz`;h-mAjwTplWzlo!6GYj@-UYnILk0L7pplR1 z1>mvbW6l8r!#HjeowQrqCOU7wt-V0uCGYnld~|u(?)k_L4^_iMPIzbuZ{rSbg4;%W Oa4N#Zw$Yo&v+G|)*jdZ~ literal 0 HcmV?d00001 diff --git a/Product_MicroService_Group3/app/publishers/kafkaPublishers.py b/Product_MicroService_Group3/app/publishers/kafkaPublishers.py new file mode 100644 index 00000000..20018a03 --- /dev/null +++ b/Product_MicroService_Group3/app/publishers/kafkaPublishers.py @@ -0,0 +1,44 @@ +from kafka import KafkaProducer +from kafka.admin import KafkaAdminClient, NewTopic + +import json + +producer = KafkaProducer(bootstrap_servers="localhost:9092") + + +#Creates the topic +def create_product_updated_topic(): + # Create KafkaAdminClient instance + admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") + + # Define the topic name and configuration + topic_name = "product_updated_topic" + num_partitions = 1 + replication_factor = 1 + + # Retrieve the list of existing topics + topic_metadata = admin_client.list_topics() + + # Check if the topic already exists + if topic_name not in topic_metadata: + # Define the configuration for the new topic + new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor) + # Create the new topic + admin_client.create_topics(new_topics=[new_topic], validate_only=False) + + +#Function is called in updateProfileControllers.py +def publish_product_updated_event(event_data): + # Serialize the event data to JSON + event_json = json.dumps(event_data) + # Publish the event to the Kafka topic + data_to_send = producer.send("product_updated_topic", value=event_json.encode("utf-8")) + try: + record_metadata = data_to_send.get(timeout=10) + print("Message sent successfully!") + print("Topic:", record_metadata.topic) + print("Partition:", record_metadata.partition) + print("Offset:", record_metadata.offset) + except Exception as e: + print("Failed to send message:", e) + producer.flush() \ No newline at end of file -- GitLab