272 lines
9.0 KiB
Python
Executable File
272 lines
9.0 KiB
Python
Executable File
from multiprocessing import Process
|
|
import os
|
|
from centralisedFileSystem.models import Script
|
|
from scriptAudit.models import ScriptAuditModel, States
|
|
# from scriptAudit.tasks import NeutralAuditTask
|
|
from scriptAudit.mnf_script_audit import NeutralAudit
|
|
from datetime import datetime
|
|
from django.core.files.base import ContentFile
|
|
from utils.filesystem import new_screenplay, create_script_docx,get_file_path,new_screenplay_without_audit_in_background
|
|
from .mnf_script_audit import NeutralAudit
|
|
# from narration.vectorcode.code.vector_generation import vector_gen
|
|
from time import sleep
|
|
import time
|
|
|
|
from django.conf import settings
|
|
from django.template.loader import render_to_string
|
|
from django.utils.html import strip_tags
|
|
from django.core.mail import EmailMultiAlternatives
|
|
|
|
def update_audit_status(script_id : str, status : str) -> None:
|
|
|
|
ScriptAuditModel.objects.update_or_create(
|
|
script = Script.objects.get(
|
|
id = script_id
|
|
),
|
|
defaults={"status" : status}
|
|
)
|
|
|
|
|
|
def audit_in_background(script_id : str) -> None:
|
|
|
|
# ---------------------------------------------------
|
|
# for running audit without celery
|
|
#
|
|
try:
|
|
update_audit_status(script_id, States.STARTED)
|
|
except:
|
|
update_audit_status(script_id, States.FAILURE)
|
|
|
|
audit = NeutralAudit(script_id)
|
|
process1 = Process(target=audit.audit_in_background())
|
|
process1.start()
|
|
process1.join()
|
|
# status = ScriptAuditModel.objects.get(
|
|
# script = Script.objects.get(
|
|
# id = script_id
|
|
# ))
|
|
# if status.status == "SUCCESS":
|
|
# to_email = [request.user.email]
|
|
# email_code = 'SB1'
|
|
# sendmail(to_email=to_email , email_code=email_code )
|
|
# elif status.status == "FAILURE":
|
|
# to_email = [request.user.email]
|
|
# email_code = 'SB2'
|
|
# sendmail(to_email=to_email , email_code=email_code )
|
|
# del audit
|
|
# ---------------------------------------------------
|
|
|
|
# ---------------------------------------------------
|
|
# for running audit with celery
|
|
#
|
|
# uncomment only on AWS
|
|
# NeutralAuditTask().delay(
|
|
# script_id = script_id,
|
|
# )
|
|
# ---------------------------------------------------
|
|
|
|
def generate_script_id_for_pitchdeck(path,request):
|
|
input_file = os.path.basename(path)
|
|
input_file1 = os.path.splitext(input_file)[0]
|
|
now = datetime.now()
|
|
screenplay_name = input_file1 +"_"+ str(now)
|
|
author = "mynextfilm-user"
|
|
language = "en"
|
|
script_file = path
|
|
script_ext = script_file.split(".")[-1]
|
|
script_file_name = screenplay_name + "." + script_ext
|
|
print(script_file_name)
|
|
with open(path, "rb") as script_file:
|
|
file = ContentFile(script_file.read(),
|
|
script_file_name,
|
|
)
|
|
result = new_screenplay(
|
|
request.user,
|
|
author,
|
|
screenplay_name,
|
|
file,
|
|
"script-original",
|
|
language,
|
|
)
|
|
script_id = result.get("script", {}).get("id")
|
|
print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n")
|
|
return script_id
|
|
|
|
|
|
def check_status_and_trigger_vector(script_id,v_id):
|
|
"""
|
|
this function is used to check the status of the script audit , if the status is SUCCESS
|
|
this will generate a docx audited file and send it to vector generation
|
|
this function accepts 2 parameters
|
|
1.script_id --> audit id
|
|
2.v_id vector id
|
|
"""
|
|
print("vector_id:",v_id)
|
|
audit_completed = False
|
|
while not audit_completed:
|
|
status = ScriptAuditModel.objects.get(
|
|
script = Script.objects.get(
|
|
id = script_id
|
|
))
|
|
|
|
print("waiting for audit to get complete")
|
|
if status.status == "SUCCESS":
|
|
try:
|
|
a_path = get_file_path(str(script_id), "script-docx")
|
|
vector_gen(a_path,v_id)
|
|
audit_completed = True
|
|
break
|
|
except:
|
|
create_script_docx(script_id)
|
|
a_path = get_file_path(str(script_id), "script-docx")
|
|
print("Audited script path is fetched")
|
|
vector_gen(a_path,v_id)
|
|
audit_completed = True
|
|
break
|
|
elif status.status == "FAILURE":
|
|
print("Audit Failed")
|
|
audit_completed = True
|
|
|
|
|
|
|
|
|
|
def generate_script_id(path,request,id):
|
|
"""
|
|
the below function is used to generate script id
|
|
and the function accepts a docx file path as a parameter
|
|
"""
|
|
input_file = os.path.basename(path)
|
|
input_file1 = os.path.splitext(input_file)[0]
|
|
now = datetime.now()
|
|
screenplay_name = input_file1 +"_"+ str(id)
|
|
author = "mynextfilm-user"
|
|
language = "en"
|
|
script_file = path
|
|
script_ext = script_file.split(".")[-1]
|
|
script_file_name = screenplay_name + "." + script_ext
|
|
print(script_file_name)
|
|
with open(path, "rb") as script_file:
|
|
file = ContentFile(script_file.read(),
|
|
script_file_name,
|
|
)
|
|
result = new_screenplay_without_audit_in_background(
|
|
request.user,
|
|
author,
|
|
screenplay_name,
|
|
file,
|
|
"script-original",
|
|
language,
|
|
|
|
)
|
|
script_id = result.get("script", {}).get("id")
|
|
print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n")
|
|
|
|
try:
|
|
update_audit_status(script_id, States.STARTED)
|
|
except:
|
|
update_audit_status(script_id, States.FAILURE)
|
|
|
|
try:
|
|
naudit = NeutralAudit(script_id)
|
|
naudit.audit()
|
|
ScriptAuditModel.objects.update_or_create(
|
|
script = Script.objects.get(
|
|
id = script_id
|
|
),
|
|
defaults={"status" : "SUCCESS"}
|
|
)
|
|
|
|
except:
|
|
ScriptAuditModel.objects.update_or_create(
|
|
script = Script.objects.get(
|
|
id = script_id
|
|
),
|
|
defaults={"status" : "FAILURE"}
|
|
)
|
|
|
|
return script_id
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
this below function is called when you want call audit + vector by giving the vectors
|
|
"""
|
|
def audit_vector_integration(path,v_id,request):
|
|
print("vector_id:",v_id)
|
|
script_id = generate_script_id(path,request,v_id)
|
|
audit_completed = False
|
|
|
|
status = ScriptAuditModel.objects.get(
|
|
script = Script.objects.get(
|
|
id = script_id
|
|
))
|
|
if status.status == "SUCCESS":
|
|
try:
|
|
a_path = get_file_path(str(script_id), "script-docx")
|
|
except:
|
|
create_script_docx(script_id)
|
|
a_path = get_file_path(str(script_id), "script-docx")
|
|
print("Audited script path is fetched")
|
|
vector_gen(a_path,v_id)
|
|
|
|
|
|
elif status.status == "FAILURE":
|
|
print("Audit Failed")
|
|
|
|
|
|
|
|
|
|
|
|
# def audit_vector_integration_(path,id,request):
|
|
# print("vector_id:",id)
|
|
# script_id = generate_script_id(path,request,id)
|
|
# audit_completed = False
|
|
# while not audit_completed:
|
|
# scripts = Script.objects.filter(screenplay__user=request.user)
|
|
# for script in scripts:
|
|
# if ScriptAuditModel.objects.filter(script=script).exists():
|
|
# audit_status_objects = ScriptAuditModel.objects.filter(script=script)
|
|
# for audit_status_object in audit_status_objects:
|
|
# script_audit_status = audit_status_object.status
|
|
# if script_audit_status == States.SUCCESS:
|
|
# try:
|
|
# a_path = get_file_path(str(script_id), "script-docx")
|
|
# except:
|
|
# create_script_docx(script_id)
|
|
# a_path = get_file_path(str(script_id), "script-docx")
|
|
# print("Audited script path is fetched")
|
|
# vector_gen(a_path,id)
|
|
# elif script_audit_status == States.FAILURE:
|
|
# raise Exception("Script Audit procress failed")
|
|
# break
|
|
# elif script_audit_status == States.STARTED or script_audit_status == States.PENDING:
|
|
# # Wait for some time before checking the audit status again
|
|
# sleep(10)
|
|
# else:
|
|
# raise Exception("Unexpected script audit status")
|
|
|
|
# # If the loop breaks without returning, it means the audit status is FAILURE
|
|
# raise Exception("Script audit failed")
|
|
|
|
|
|
def send_email_to_user(user,screenplay_name,subject,message):# removed flag = 1
|
|
subject = subject + "."
|
|
from_email = settings.EMAIL_HOST_USER
|
|
to = user.email
|
|
context = {
|
|
"Name": user,
|
|
"story_name": screenplay_name,
|
|
"message" : message,
|
|
}
|
|
html_content = render_to_string(
|
|
"audit/coree_email.html", context
|
|
)
|
|
text_content = strip_tags(html_content)
|
|
|
|
msg = EmailMultiAlternatives(subject, text_content, from_email, [to])
|
|
msg.attach_alternative(html_content, "text/html")
|
|
msg.send()
|
|
|