498 lines
16 KiB
Python
Executable File
498 lines
16 KiB
Python
Executable File
from multiprocessing import Process
|
|
import os
|
|
from centralisedFileSystem.models import Script
|
|
from scriptAudit.models import ScriptAuditModel, States
|
|
from scriptAudit.mnf_script_audit import NeutralAudit
|
|
from datetime import datetime
|
|
from django.core.files.base import ContentFile
|
|
from utils.filesystem import new_screenplay, create_script_docx,get_file_path,new_screenplay_without_audit_in_background
|
|
from .mnf_script_audit import NeutralAudit
|
|
|
|
from time import sleep
|
|
import time
|
|
import pandas as pd
|
|
import re
|
|
import uuid
|
|
from django.conf import settings
|
|
from django.template.loader import render_to_string
|
|
from django.utils.html import strip_tags
|
|
from django.core.mail import EmailMultiAlternatives
|
|
from PyPDF2 import PdfReader, PdfWriter
|
|
|
|
|
|
def update_audit_status(script_id : str, status : str) -> None:
|
|
|
|
ScriptAuditModel.objects.update_or_create(
|
|
script = Script.objects.get(
|
|
id = script_id
|
|
),
|
|
defaults={"status" : status}
|
|
)
|
|
|
|
|
|
def audit_in_background(script_id : str) -> None:
|
|
|
|
# ---------------------------------------------------
|
|
# for running audit without celery
|
|
#
|
|
try:
|
|
update_audit_status(script_id, States.STARTED)
|
|
except:
|
|
update_audit_status(script_id, States.FAILURE)
|
|
|
|
audit = NeutralAudit(script_id)
|
|
process1 = Process(target=audit.audit_in_background())
|
|
process1.start()
|
|
process1.join()
|
|
# status = ScriptAuditModel.objects.get(
|
|
# script = Script.objects.get(
|
|
# id = script_id
|
|
# ))
|
|
# if status.status == "SUCCESS":
|
|
# to_email = [request.user.email]
|
|
# email_code = 'SB1'
|
|
# sendmail(to_email=to_email , email_code=email_code )
|
|
# elif status.status == "FAILURE":
|
|
# to_email = [request.user.email]
|
|
# email_code = 'SB2'
|
|
# sendmail(to_email=to_email , email_code=email_code )
|
|
# del audit
|
|
# ---------------------------------------------------
|
|
|
|
# ---------------------------------------------------
|
|
# for running audit with celery
|
|
#
|
|
# uncomment only on AWS
|
|
# NeutralAuditTask().delay(
|
|
# script_id = script_id,
|
|
# )
|
|
# ---------------------------------------------------
|
|
|
|
def generate_script_id_for_pitchdeck(path,request):
|
|
input_file = os.path.basename(path)
|
|
input_file1 = os.path.splitext(input_file)[0]
|
|
now = datetime.now()
|
|
screenplay_name = input_file1 +"_"+ str(now)
|
|
author = "mynextfilm-user"
|
|
language = "en"
|
|
script_file = path
|
|
script_ext = script_file.split(".")[-1]
|
|
script_file_name = screenplay_name + "." + script_ext
|
|
print(script_file_name)
|
|
with open(path, "rb") as script_file:
|
|
file = ContentFile(script_file.read(),
|
|
script_file_name,
|
|
)
|
|
result = new_screenplay(
|
|
request.user,
|
|
author,
|
|
screenplay_name,
|
|
file,
|
|
"script-original",
|
|
language,
|
|
)
|
|
script_id = result.get("script", {}).get("id")
|
|
print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n")
|
|
return script_id
|
|
|
|
|
|
def check_status_and_trigger_vector(script_id,v_id):
|
|
"""
|
|
this function is used to check the status of the script audit , if the status is SUCCESS
|
|
this will generate a docx audited file and send it to vector generation
|
|
this function accepts 2 parameters
|
|
1.script_id --> audit id
|
|
2.v_id vector id
|
|
"""
|
|
pass
|
|
# print("vector_id:",v_id)
|
|
# audit_completed = False
|
|
# while not audit_completed:
|
|
# status = ScriptAuditModel.objects.get(
|
|
# script = Script.objects.get(
|
|
# id = script_id
|
|
# ))
|
|
|
|
# print("waiting for audit to get complete")
|
|
# if status.status == "SUCCESS":
|
|
# try:
|
|
# a_path = get_file_path(str(script_id), "script-docx")
|
|
# vector_gen(a_path,v_id)
|
|
# audit_completed = True
|
|
# break
|
|
# except:
|
|
# create_script_docx(script_id)
|
|
# a_path = get_file_path(str(script_id), "script-docx")
|
|
# print("Audited script path is fetched")
|
|
# vector_gen(a_path,v_id)
|
|
# audit_completed = True
|
|
# break
|
|
# elif status.status == "FAILURE":
|
|
# print("Audit Failed")
|
|
# audit_completed = True
|
|
|
|
|
|
|
|
|
|
def generate_script_id(path,request,id):
|
|
"""
|
|
the below function is used to generate script id
|
|
and the function accepts a docx file path as a parameter
|
|
"""
|
|
input_file = os.path.basename(path)
|
|
input_file1 = os.path.splitext(input_file)[0]
|
|
now = datetime.now()
|
|
screenplay_name = input_file1 +"_"+ str(id)
|
|
author = "mynextfilm-user"
|
|
language = "en"
|
|
script_file = path
|
|
script_ext = script_file.split(".")[-1]
|
|
script_file_name = screenplay_name + "." + script_ext
|
|
print(script_file_name)
|
|
with open(path, "rb") as script_file:
|
|
file = ContentFile(script_file.read(),
|
|
script_file_name,
|
|
)
|
|
result = new_screenplay_without_audit_in_background(
|
|
request.user,
|
|
author,
|
|
screenplay_name,
|
|
file,
|
|
"script-original",
|
|
language,
|
|
|
|
)
|
|
script_id = result.get("script", {}).get("id")
|
|
print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n")
|
|
|
|
try:
|
|
update_audit_status(script_id, States.STARTED)
|
|
except:
|
|
update_audit_status(script_id, States.FAILURE)
|
|
|
|
try:
|
|
naudit = NeutralAudit(script_id)
|
|
naudit.audit()
|
|
ScriptAuditModel.objects.update_or_create(
|
|
script = Script.objects.get(
|
|
id = script_id
|
|
),
|
|
defaults={"status" : "SUCCESS"}
|
|
)
|
|
|
|
except:
|
|
ScriptAuditModel.objects.update_or_create(
|
|
script = Script.objects.get(
|
|
id = script_id
|
|
),
|
|
defaults={"status" : "FAILURE"}
|
|
)
|
|
|
|
return script_id
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
this below function is called when you want call audit + vector by giving the vectors
|
|
"""
|
|
def audit_vector_integration(path,v_id,request):
|
|
print("vector_id:",v_id)
|
|
script_id = generate_script_id(path,request,v_id)
|
|
audit_completed = False
|
|
|
|
status = ScriptAuditModel.objects.get(
|
|
script = Script.objects.get(
|
|
id = script_id
|
|
))
|
|
if status.status == "SUCCESS":
|
|
try:
|
|
a_path = get_file_path(str(script_id), "script-docx")
|
|
except:
|
|
create_script_docx(script_id)
|
|
a_path = get_file_path(str(script_id), "script-docx")
|
|
print("Audited script path is fetched")
|
|
vector_gen(a_path,v_id)
|
|
|
|
|
|
elif status.status == "FAILURE":
|
|
print("Audit Failed")
|
|
|
|
|
|
|
|
|
|
|
|
# def audit_vector_integration_(path,id,request):
|
|
# print("vector_id:",id)
|
|
# script_id = generate_script_id(path,request,id)
|
|
# audit_completed = False
|
|
# while not audit_completed:
|
|
# scripts = Script.objects.filter(screenplay__user=request.user)
|
|
# for script in scripts:
|
|
# if ScriptAuditModel.objects.filter(script=script).exists():
|
|
# audit_status_objects = ScriptAuditModel.objects.filter(script=script)
|
|
# for audit_status_object in audit_status_objects:
|
|
# script_audit_status = audit_status_object.status
|
|
# if script_audit_status == States.SUCCESS:
|
|
# try:
|
|
# a_path = get_file_path(str(script_id), "script-docx")
|
|
# except:
|
|
# create_script_docx(script_id)
|
|
# a_path = get_file_path(str(script_id), "script-docx")
|
|
# print("Audited script path is fetched")
|
|
# vector_gen(a_path,id)
|
|
# elif script_audit_status == States.FAILURE:
|
|
# raise Exception("Script Audit procress failed")
|
|
# break
|
|
# elif script_audit_status == States.STARTED or script_audit_status == States.PENDING:
|
|
# # Wait for some time before checking the audit status again
|
|
# sleep(10)
|
|
# else:
|
|
# raise Exception("Unexpected script audit status")
|
|
|
|
# # If the loop breaks without returning, it means the audit status is FAILURE
|
|
# raise Exception("Script audit failed")
|
|
|
|
|
|
def send_email_to_user(user,screenplay_name,subject,message):# removed flag = 1
|
|
subject = subject + "."
|
|
from_email = settings.EMAIL_HOST_USER
|
|
to = user.email
|
|
context = {
|
|
"Name": user,
|
|
"story_name": screenplay_name,
|
|
"message" : message,
|
|
}
|
|
html_content = render_to_string(
|
|
"audit/coree_email.html", context
|
|
)
|
|
text_content = strip_tags(html_content)
|
|
|
|
msg = EmailMultiAlternatives(subject, text_content, from_email, [to])
|
|
msg.attach_alternative(html_content, "text/html")
|
|
msg.send()
|
|
|
|
def split_pdf_into_chunks(input_pdf_path, chunk_size=3):
|
|
chunk_uuid = f"chunk_{uuid.uuid4().hex[:8]}"
|
|
chunk_folder = os.path.join("/content", chunk_uuid)
|
|
os.makedirs(chunk_folder, exist_ok=True)
|
|
reader = PdfReader(input_pdf_path)
|
|
total_pages = len(reader.pages)
|
|
file_paths = []
|
|
for start in range(0, total_pages, chunk_size):
|
|
writer = PdfWriter()
|
|
for page_num in range(start, min(start + chunk_size, total_pages)):
|
|
writer.add_page(reader.pages[page_num])
|
|
chunk_path = os.path.join(chunk_folder, f"chunk_{start // chunk_size + 1}.pdf")
|
|
with open(chunk_path, "wb") as f:
|
|
writer.write(f)
|
|
file_paths.append(chunk_path)
|
|
return file_paths
|
|
|
|
def split_text_file_by_lines(input_txt_path, lines_per_chunk=45):
|
|
chunk_uuid = f"chunk_{uuid.uuid4().hex[:8]}"
|
|
chunk_folder = os.path.join("/content", chunk_uuid)
|
|
os.makedirs(chunk_folder, exist_ok=True)
|
|
|
|
with open(input_txt_path, "r", encoding="utf-8") as f:
|
|
lines = f.readlines()
|
|
|
|
file_paths = []
|
|
for i in range(0, len(lines), lines_per_chunk):
|
|
chunk_lines = lines[i:i + lines_per_chunk]
|
|
chunk_path = os.path.join(chunk_folder, f"chunk_{i // lines_per_chunk + 1}.txt")
|
|
with open(chunk_path, "w", encoding="utf-8") as f:
|
|
f.writelines(chunk_lines)
|
|
file_paths.append(chunk_path)
|
|
if len(file_paths) == 10:
|
|
break
|
|
|
|
print(f"✅ Created {len(file_paths)} chunks in {chunk_folder}")
|
|
return file_paths
|
|
|
|
|
|
def extract_labeled_lines(response_text):
|
|
pattern = re.compile(r"(.*?)(?:\s*)\{(.*?)\}")
|
|
rows = []
|
|
for line in response_text.strip().split("\n"):
|
|
matches = pattern.findall(line.strip())
|
|
for content, label in matches:
|
|
rows.append([content.strip(), label.strip()])
|
|
return rows
|
|
|
|
|
|
|
|
def remove_empty_content(df):
|
|
df = df.dropna(subset=['content'])
|
|
df = df[df['content'].str.strip() != '']
|
|
return df
|
|
|
|
def remove_leading_numbers(df: pd.DataFrame) -> pd.DataFrame:
|
|
def clean_content(text):
|
|
if isinstance(text, str):
|
|
return re.sub(r'^\s*\d+\.\s*', '', text)
|
|
return text
|
|
|
|
df['content'] = df['content'].apply(clean_content)
|
|
return df
|
|
|
|
def remove_numeric_only_content(df: pd.DataFrame) -> pd.DataFrame:
|
|
def is_numeric_only(text):
|
|
if isinstance(text, str):
|
|
return re.fullmatch(r'\s*\d+\s*', text) is not None
|
|
return False
|
|
|
|
return df[~df['content'].apply(is_numeric_only)].reset_index(drop=True)
|
|
|
|
def remove_emptyline_rows(df: pd.DataFrame) -> pd.DataFrame:
|
|
def is_only_empty_line_repeats(text):
|
|
if not isinstance(text, str):
|
|
return False
|
|
return re.fullmatch(r'(\s*\(empty line\)\s*)+', text.strip(), flags=re.IGNORECASE) is not None
|
|
|
|
return df[~df['content'].apply(is_only_empty_line_repeats)].reset_index(drop=True)
|
|
|
|
|
|
def merge_consecutive_action_lines(df: pd.DataFrame) -> pd.DataFrame:
|
|
merged_rows = []
|
|
prev_row = None
|
|
|
|
for _, row in df.iterrows():
|
|
if (
|
|
prev_row is not None and
|
|
row['script_element'] == 'action' and
|
|
prev_row['script_element'] == 'action'
|
|
):
|
|
prev_row['content'] += " " + row['content']
|
|
else:
|
|
if prev_row is not None:
|
|
merged_rows.append(prev_row)
|
|
prev_row = row.copy()
|
|
|
|
if prev_row is not None:
|
|
merged_rows.append(prev_row)
|
|
|
|
return pd.DataFrame(merged_rows).reset_index(drop=True)
|
|
|
|
def merge_consecutive_action_lines_new(df: pd.DataFrame) -> pd.DataFrame:
|
|
merged_rows = []
|
|
prev_row = None
|
|
|
|
for _, row in df.iterrows():
|
|
current_is_action = row['script_element'] == 'action'
|
|
previous_is_action = prev_row is not None and prev_row['script_element'] == 'action'
|
|
|
|
if (
|
|
current_is_action and
|
|
previous_is_action and
|
|
not prev_row['content'].strip().endswith(('.', '!', '?'))
|
|
):
|
|
# Merge into previous action
|
|
prev_row['content'] += ' ' + row['content'].strip()
|
|
else:
|
|
if prev_row is not None:
|
|
merged_rows.append(prev_row)
|
|
prev_row = row.copy()
|
|
|
|
if prev_row is not None:
|
|
merged_rows.append(prev_row)
|
|
|
|
return pd.DataFrame(merged_rows).reset_index(drop=True)
|
|
|
|
|
|
def merge_consecutive_dialogue_lines(df: pd.DataFrame) -> pd.DataFrame:
|
|
merged_rows = []
|
|
prev_row = None
|
|
|
|
for _, row in df.iterrows():
|
|
if (
|
|
prev_row is not None and
|
|
row['script_element'] == 'dialogue' and
|
|
prev_row['script_element'] == 'dialogue'
|
|
):
|
|
prev_row['content'] += " " + row['content']
|
|
else:
|
|
if prev_row is not None:
|
|
merged_rows.append(prev_row)
|
|
prev_row = row.copy()
|
|
|
|
if prev_row is not None:
|
|
merged_rows.append(prev_row)
|
|
|
|
return pd.DataFrame(merged_rows).reset_index(drop=True)
|
|
|
|
def insert_blank_lines(df: pd.DataFrame) -> pd.DataFrame:
|
|
insert_after = {"slugline", "dialogue", "action", "transition"}
|
|
new_rows = []
|
|
|
|
for _, row in df.iterrows():
|
|
new_rows.append(row)
|
|
if row['script_element'] in insert_after:
|
|
new_rows.append(pd.Series({'content': '', 'script_element': 'blank'}))
|
|
|
|
return pd.DataFrame(new_rows).reset_index(drop=True)
|
|
|
|
def add_fade_in_out(df: pd.DataFrame) -> pd.DataFrame:
|
|
first_slugline_idx = df[df['script_element'] == 'slugline'].index.min()
|
|
|
|
if pd.isna(first_slugline_idx):
|
|
return df
|
|
|
|
df_trimmed = df.loc[first_slugline_idx:].reset_index(drop=True)
|
|
|
|
fade_in_row = pd.DataFrame([{'content': 'FADE IN', 'script_element': 'transition'}])
|
|
df_trimmed = pd.concat([fade_in_row, df_trimmed], ignore_index=True)
|
|
|
|
fade_out_row = pd.DataFrame([{'content': 'FADE OUT', 'script_element': 'transition'}])
|
|
df_trimmed = pd.concat([df_trimmed, fade_out_row], ignore_index=True)
|
|
|
|
return df_trimmed
|
|
|
|
def remove_asterisks(df: pd.DataFrame) -> pd.DataFrame:
|
|
df['content'] = df['content'].astype(str).str.replace(r'\*+', '', regex=True)
|
|
return df
|
|
|
|
|
|
def merge_consecutive_action_lines_new(df: pd.DataFrame) -> pd.DataFrame:
|
|
merged_rows = []
|
|
prev_row = None
|
|
|
|
for _, row in df.iterrows():
|
|
current_is_action = row['script_element'] == 'action'
|
|
previous_is_action = prev_row is not None and prev_row['script_element'] == 'action'
|
|
|
|
if (
|
|
current_is_action and
|
|
previous_is_action and
|
|
not prev_row['content'].strip().endswith(('.', '!', '?'))
|
|
):
|
|
# Merge into previous action
|
|
prev_row['content'] += ' ' + row['content'].strip()
|
|
else:
|
|
if prev_row is not None:
|
|
merged_rows.append(prev_row)
|
|
prev_row = row.copy()
|
|
|
|
if prev_row is not None:
|
|
merged_rows.append(prev_row)
|
|
|
|
return pd.DataFrame(merged_rows).reset_index(drop=True)
|
|
|
|
|
|
|
|
def extract_labeled_lines(response_lines: list[str]):
|
|
pattern = re.compile(r"(.*?)(?:\s*)\{(.*?)\}")
|
|
|
|
rows = []
|
|
for line in response_lines:
|
|
matches = pattern.findall(line.strip())
|
|
for content, label in matches:
|
|
rows.append([content.strip(), label.strip()])
|
|
return rows
|
|
|
|
|
|
def remove_trailing_speaker(df: pd.DataFrame) -> pd.DataFrame:
|
|
if not df.empty and df.iloc[-1]['script_element'] == 'speaker':
|
|
return df.iloc[:-1].reset_index(drop=True)
|
|
return df.reset_index(drop=True) |