Conversion_Kitchen_Code/kitchen_counter/scriptAudit/utils.py

272 lines
9.0 KiB
Python
Executable File

from multiprocessing import Process
import os
from centralisedFileSystem.models import Script
from scriptAudit.models import ScriptAuditModel, States
# from scriptAudit.tasks import NeutralAuditTask
from scriptAudit.mnf_script_audit import NeutralAudit
from datetime import datetime
from django.core.files.base import ContentFile
from utils.filesystem import new_screenplay, create_script_docx,get_file_path,new_screenplay_without_audit_in_background
from .mnf_script_audit import NeutralAudit
# from narration.vectorcode.code.vector_generation import vector_gen
from time import sleep
import time
from django.conf import settings
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.core.mail import EmailMultiAlternatives
def update_audit_status(script_id : str, status : str) -> None:
ScriptAuditModel.objects.update_or_create(
script = Script.objects.get(
id = script_id
),
defaults={"status" : status}
)
def audit_in_background(script_id : str) -> None:
# ---------------------------------------------------
# for running audit without celery
#
try:
update_audit_status(script_id, States.STARTED)
except:
update_audit_status(script_id, States.FAILURE)
audit = NeutralAudit(script_id)
process1 = Process(target=audit.audit_in_background())
process1.start()
process1.join()
# status = ScriptAuditModel.objects.get(
# script = Script.objects.get(
# id = script_id
# ))
# if status.status == "SUCCESS":
# to_email = [request.user.email]
# email_code = 'SB1'
# sendmail(to_email=to_email , email_code=email_code )
# elif status.status == "FAILURE":
# to_email = [request.user.email]
# email_code = 'SB2'
# sendmail(to_email=to_email , email_code=email_code )
# del audit
# ---------------------------------------------------
# ---------------------------------------------------
# for running audit with celery
#
# uncomment only on AWS
# NeutralAuditTask().delay(
# script_id = script_id,
# )
# ---------------------------------------------------
def generate_script_id_for_pitchdeck(path,request):
input_file = os.path.basename(path)
input_file1 = os.path.splitext(input_file)[0]
now = datetime.now()
screenplay_name = input_file1 +"_"+ str(now)
author = "mynextfilm-user"
language = "en"
script_file = path
script_ext = script_file.split(".")[-1]
script_file_name = screenplay_name + "." + script_ext
print(script_file_name)
with open(path, "rb") as script_file:
file = ContentFile(script_file.read(),
script_file_name,
)
result = new_screenplay(
request.user,
author,
screenplay_name,
file,
"script-original",
language,
)
script_id = result.get("script", {}).get("id")
print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n")
return script_id
def check_status_and_trigger_vector(script_id,v_id):
"""
this function is used to check the status of the script audit , if the status is SUCCESS
this will generate a docx audited file and send it to vector generation
this function accepts 2 parameters
1.script_id --> audit id
2.v_id vector id
"""
print("vector_id:",v_id)
audit_completed = False
while not audit_completed:
status = ScriptAuditModel.objects.get(
script = Script.objects.get(
id = script_id
))
print("waiting for audit to get complete")
if status.status == "SUCCESS":
try:
a_path = get_file_path(str(script_id), "script-docx")
vector_gen(a_path,v_id)
audit_completed = True
break
except:
create_script_docx(script_id)
a_path = get_file_path(str(script_id), "script-docx")
print("Audited script path is fetched")
vector_gen(a_path,v_id)
audit_completed = True
break
elif status.status == "FAILURE":
print("Audit Failed")
audit_completed = True
def generate_script_id(path,request,id):
"""
the below function is used to generate script id
and the function accepts a docx file path as a parameter
"""
input_file = os.path.basename(path)
input_file1 = os.path.splitext(input_file)[0]
now = datetime.now()
screenplay_name = input_file1 +"_"+ str(id)
author = "mynextfilm-user"
language = "en"
script_file = path
script_ext = script_file.split(".")[-1]
script_file_name = screenplay_name + "." + script_ext
print(script_file_name)
with open(path, "rb") as script_file:
file = ContentFile(script_file.read(),
script_file_name,
)
result = new_screenplay_without_audit_in_background(
request.user,
author,
screenplay_name,
file,
"script-original",
language,
)
script_id = result.get("script", {}).get("id")
print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n")
try:
update_audit_status(script_id, States.STARTED)
except:
update_audit_status(script_id, States.FAILURE)
try:
naudit = NeutralAudit(script_id)
naudit.audit()
ScriptAuditModel.objects.update_or_create(
script = Script.objects.get(
id = script_id
),
defaults={"status" : "SUCCESS"}
)
except:
ScriptAuditModel.objects.update_or_create(
script = Script.objects.get(
id = script_id
),
defaults={"status" : "FAILURE"}
)
return script_id
"""
this below function is called when you want call audit + vector by giving the vectors
"""
def audit_vector_integration(path,v_id,request):
print("vector_id:",v_id)
script_id = generate_script_id(path,request,v_id)
audit_completed = False
status = ScriptAuditModel.objects.get(
script = Script.objects.get(
id = script_id
))
if status.status == "SUCCESS":
try:
a_path = get_file_path(str(script_id), "script-docx")
except:
create_script_docx(script_id)
a_path = get_file_path(str(script_id), "script-docx")
print("Audited script path is fetched")
vector_gen(a_path,v_id)
elif status.status == "FAILURE":
print("Audit Failed")
# def audit_vector_integration_(path,id,request):
# print("vector_id:",id)
# script_id = generate_script_id(path,request,id)
# audit_completed = False
# while not audit_completed:
# scripts = Script.objects.filter(screenplay__user=request.user)
# for script in scripts:
# if ScriptAuditModel.objects.filter(script=script).exists():
# audit_status_objects = ScriptAuditModel.objects.filter(script=script)
# for audit_status_object in audit_status_objects:
# script_audit_status = audit_status_object.status
# if script_audit_status == States.SUCCESS:
# try:
# a_path = get_file_path(str(script_id), "script-docx")
# except:
# create_script_docx(script_id)
# a_path = get_file_path(str(script_id), "script-docx")
# print("Audited script path is fetched")
# vector_gen(a_path,id)
# elif script_audit_status == States.FAILURE:
# raise Exception("Script Audit procress failed")
# break
# elif script_audit_status == States.STARTED or script_audit_status == States.PENDING:
# # Wait for some time before checking the audit status again
# sleep(10)
# else:
# raise Exception("Unexpected script audit status")
# # If the loop breaks without returning, it means the audit status is FAILURE
# raise Exception("Script audit failed")
def send_email_to_user(user,screenplay_name,subject,message):# removed flag = 1
subject = subject + "."
from_email = settings.EMAIL_HOST_USER
to = user.email
context = {
"Name": user,
"story_name": screenplay_name,
"message" : message,
}
html_content = render_to_string(
"audit/coree_email.html", context
)
text_content = strip_tags(html_content)
msg = EmailMultiAlternatives(subject, text_content, from_email, [to])
msg.attach_alternative(html_content, "text/html")
msg.send()