Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| # import torch | |
| from ultralyticsplus import YOLO, render_result | |
| import cv2 | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.base import MIMEBase | |
| from email import encoders | |
| import smtplib | |
| import os | |
| from twilio.rest import Client | |
| sender_email = os.environ.get("sender_email") | |
| receiver_email = os.environ.get("receiver_email") | |
| sender_password = os.environ.get("sender_password") | |
| smtp_port = 2525 | |
| smtp_server = "smtp.gmail.com" | |
| subject = "Accident detected" | |
| classes: ['car', 'bike', 'person', 'car_car_accident', 'car_bike_accident', 'car_person_accident', 'bike_bike_accidnet', 'bike_person_accident', 'car_object_accident', 'bike_object_accident'] | |
| def send_email(accident_type): | |
| body = accident_type | |
| msg = MIMEMultipart() | |
| msg['From'] = sender_email | |
| msg['To'] = receiver_email | |
| msg['Subject'] = subject | |
| msg.attach(MIMEText(body, 'plain')) | |
| filename = "res.png" | |
| folder = "./result/" | |
| fullFileName = folder + filename | |
| attachment = open(fullFileName, 'rb') | |
| attachment_package = MIMEBase('application', 'octet-stream') | |
| attachment_package.set_payload((attachment).read()) | |
| encoders.encode_base64(attachment_package) | |
| attachment_package.add_header('Content-Disposition', "attachment; filename= " + filename) | |
| msg.attach(attachment_package) | |
| text = msg.as_string() | |
| print("Connecting to server") | |
| gmail_server = smtplib.SMTP(smtp_server, smtp_port) | |
| gmail_server.starttls() | |
| gmail_server.login(sender_email, sender_password) | |
| print("Successfully connected to server") | |
| print() | |
| print("Sending email to ", receiver_email) | |
| gmail_server.sendmail(sender_email, receiver_email, text) | |
| print("Email sent to ", receiver_email) | |
| print() | |
| gmail_server.quit() | |
| def send_sms(accident_type): | |
| # print(os.environ.get("account_sid")) | |
| # print(os.environ.get("auth_token")) | |
| # print(os.environ.get("my_twilio_number")) | |
| # print(os.environ.get("receiver_number")) | |
| # print(os.environ.get("sender_email")) | |
| # print(os.environ.get("receiver_email")) | |
| # print(os.environ.get("sender_password")) | |
| account_sid = os.environ.get("account_sid") | |
| auth_token = os.environ.get("auth_token") | |
| my_twilio_number = os.environ.get("my_twilio_number") | |
| receiver_number = os.environ.get("receiver_number") | |
| # print(account_sid) | |
| # print(auth_token) | |
| # print(my_twilio_number) | |
| # print(receiver_number) | |
| # client = Client(account_sid, auth_token) | |
| # message = client.messages.create(body=accident_type, | |
| # from_=my_twilio_number, | |
| # to=receiver_number) | |
| print(accident_type) | |
| def check_acc(box): | |
| # return format(box.cls) | |
| res_index_list = box.cls.tolist() | |
| result = "" | |
| for index in res_index_list: | |
| if index ==3: | |
| result = "car car accident detected" | |
| break | |
| elif index==4: | |
| result = "car bike accident detected" | |
| break | |
| elif index==5: | |
| result = "car person accident detected" | |
| break | |
| elif index==6: | |
| result = "bike bike accident detected" | |
| break | |
| elif index==7: | |
| result = "bike person accident detected" | |
| break | |
| elif index ==8: | |
| result = "car object accident detected" | |
| break | |
| elif index == 9: | |
| result = "bike object accident detected" | |
| break | |
| return result | |
| def image_predict(image): | |
| model_path = "best.pt" | |
| model = YOLO(model_path) | |
| results = model.predict(image, | |
| conf = 0.4, | |
| iou = 0.6, | |
| imgsz = 640) | |
| box = results[0].boxes | |
| res = check_acc(box) | |
| print("object Type: ", res) | |
| render = render_result(model=model, image=image, result=results[0]) | |
| # if res == "car car accident detected": | |
| # cv2.imwrite("./result/res.png", render) | |
| # print(render) | |
| # if res == "car car accident detected": | |
| # render.save("./result/res.png") | |
| # # send_email(res) | |
| # send_sms | |
| if len(res) >0: | |
| # render.save("./result/res.png") | |
| send_sms(res) | |
| return (res, render) | |
| def extract_frames(video): | |
| vidcap = cv2.VideoCapture(video) | |
| vidcap = cv2.VideoCapture(video) | |
| fps = vidcap.get(cv2.CAP_PROP_FPS) | |
| nof = 5 | |
| frame_no = 0 | |
| while vidcap.isOpened(): | |
| res = "" | |
| render = None | |
| success, image = vidcap.read() | |
| if success ==False: | |
| break | |
| if (frame_no*nof)%fps==0: | |
| res, render = image_predict(image) | |
| if len(res) >0: | |
| return (res, render) | |
| return ("", None) | |
| def take_input(image, video): | |
| if(video != None): | |
| res = extract_frames(video) | |
| else: | |
| res = image_predict(image) | |
| return res | |
| with gr.Blocks(title="YOLOS Object Detection - ClassCat", css=".gradio-container {background:lightyellow;}") as demo: | |
| gr.HTML('<h1>Yolo Object Detection</h1>') | |
| gr.HTML("<br>") | |
| with gr.Row(): | |
| input_image = gr.Image(label="Input image") | |
| input_video = gr.Video(label="Input video") | |
| output_label = gr.Text(label="output label") | |
| output_image = gr.Image(label="Output image") | |
| gr.HTML("<br>") | |
| send_btn = gr.Button("Detect") | |
| gr.HTML("<br>") | |
| send_btn.click(fn=take_input, inputs=[input_image, input_video], outputs=[output_label, output_image]) | |
| demo.launch(debug=True) | |