from multiprocessing import Process import os from centralisedFileSystem.models import Script from scriptAudit.models import ScriptAuditModel, States from scriptAudit.mnf_script_audit import NeutralAudit from datetime import datetime from django.core.files.base import ContentFile from utils.filesystem import new_screenplay, create_script_docx,get_file_path,new_screenplay_without_audit_in_background from .mnf_script_audit import NeutralAudit from time import sleep import time import pandas as pd import re import uuid from django.conf import settings from django.template.loader import render_to_string from django.utils.html import strip_tags from django.core.mail import EmailMultiAlternatives from PyPDF2 import PdfReader, PdfWriter def update_audit_status(script_id : str, status : str) -> None: ScriptAuditModel.objects.update_or_create( script = Script.objects.get( id = script_id ), defaults={"status" : status} ) def audit_in_background(script_id : str) -> None: # --------------------------------------------------- # for running audit without celery # try: update_audit_status(script_id, States.STARTED) except: update_audit_status(script_id, States.FAILURE) audit = NeutralAudit(script_id) process1 = Process(target=audit.audit_in_background()) process1.start() process1.join() # status = ScriptAuditModel.objects.get( # script = Script.objects.get( # id = script_id # )) # if status.status == "SUCCESS": # to_email = [request.user.email] # email_code = 'SB1' # sendmail(to_email=to_email , email_code=email_code ) # elif status.status == "FAILURE": # to_email = [request.user.email] # email_code = 'SB2' # sendmail(to_email=to_email , email_code=email_code ) # del audit # --------------------------------------------------- # --------------------------------------------------- # for running audit with celery # # uncomment only on AWS # NeutralAuditTask().delay( # script_id = script_id, # ) # --------------------------------------------------- def generate_script_id_for_pitchdeck(path,request): input_file = os.path.basename(path) input_file1 = os.path.splitext(input_file)[0] now = datetime.now() screenplay_name = input_file1 +"_"+ str(now) author = "mynextfilm-user" language = "en" script_file = path script_ext = script_file.split(".")[-1] script_file_name = screenplay_name + "." + script_ext print(script_file_name) with open(path, "rb") as script_file: file = ContentFile(script_file.read(), script_file_name, ) result = new_screenplay( request.user, author, screenplay_name, file, "script-original", language, ) script_id = result.get("script", {}).get("id") print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n") return script_id def check_status_and_trigger_vector(script_id,v_id): """ this function is used to check the status of the script audit , if the status is SUCCESS this will generate a docx audited file and send it to vector generation this function accepts 2 parameters 1.script_id --> audit id 2.v_id vector id """ pass # print("vector_id:",v_id) # audit_completed = False # while not audit_completed: # status = ScriptAuditModel.objects.get( # script = Script.objects.get( # id = script_id # )) # print("waiting for audit to get complete") # if status.status == "SUCCESS": # try: # a_path = get_file_path(str(script_id), "script-docx") # vector_gen(a_path,v_id) # audit_completed = True # break # except: # create_script_docx(script_id) # a_path = get_file_path(str(script_id), "script-docx") # print("Audited script path is fetched") # vector_gen(a_path,v_id) # audit_completed = True # break # elif status.status == "FAILURE": # print("Audit Failed") # audit_completed = True def generate_script_id(path,request,id): """ the below function is used to generate script id and the function accepts a docx file path as a parameter """ input_file = os.path.basename(path) input_file1 = os.path.splitext(input_file)[0] now = datetime.now() screenplay_name = input_file1 +"_"+ str(id) author = "mynextfilm-user" language = "en" script_file = path script_ext = script_file.split(".")[-1] script_file_name = screenplay_name + "." + script_ext print(script_file_name) with open(path, "rb") as script_file: file = ContentFile(script_file.read(), script_file_name, ) result = new_screenplay_without_audit_in_background( request.user, author, screenplay_name, file, "script-original", language, ) script_id = result.get("script", {}).get("id") print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n") try: update_audit_status(script_id, States.STARTED) except: update_audit_status(script_id, States.FAILURE) try: naudit = NeutralAudit(script_id) naudit.audit() ScriptAuditModel.objects.update_or_create( script = Script.objects.get( id = script_id ), defaults={"status" : "SUCCESS"} ) except: ScriptAuditModel.objects.update_or_create( script = Script.objects.get( id = script_id ), defaults={"status" : "FAILURE"} ) return script_id """ this below function is called when you want call audit + vector by giving the vectors """ def audit_vector_integration(path,v_id,request): print("vector_id:",v_id) script_id = generate_script_id(path,request,v_id) audit_completed = False status = ScriptAuditModel.objects.get( script = Script.objects.get( id = script_id )) if status.status == "SUCCESS": try: a_path = get_file_path(str(script_id), "script-docx") except: create_script_docx(script_id) a_path = get_file_path(str(script_id), "script-docx") print("Audited script path is fetched") vector_gen(a_path,v_id) elif status.status == "FAILURE": print("Audit Failed") # def audit_vector_integration_(path,id,request): # print("vector_id:",id) # script_id = generate_script_id(path,request,id) # audit_completed = False # while not audit_completed: # scripts = Script.objects.filter(screenplay__user=request.user) # for script in scripts: # if ScriptAuditModel.objects.filter(script=script).exists(): # audit_status_objects = ScriptAuditModel.objects.filter(script=script) # for audit_status_object in audit_status_objects: # script_audit_status = audit_status_object.status # if script_audit_status == States.SUCCESS: # try: # a_path = get_file_path(str(script_id), "script-docx") # except: # create_script_docx(script_id) # a_path = get_file_path(str(script_id), "script-docx") # print("Audited script path is fetched") # vector_gen(a_path,id) # elif script_audit_status == States.FAILURE: # raise Exception("Script Audit procress failed") # break # elif script_audit_status == States.STARTED or script_audit_status == States.PENDING: # # Wait for some time before checking the audit status again # sleep(10) # else: # raise Exception("Unexpected script audit status") # # If the loop breaks without returning, it means the audit status is FAILURE # raise Exception("Script audit failed") def send_email_to_user(user,screenplay_name,subject,message):# removed flag = 1 subject = subject + "." from_email = settings.EMAIL_HOST_USER to = user.email context = { "Name": user, "story_name": screenplay_name, "message" : message, } html_content = render_to_string( "audit/coree_email.html", context ) text_content = strip_tags(html_content) msg = EmailMultiAlternatives(subject, text_content, from_email, [to]) msg.attach_alternative(html_content, "text/html") msg.send() def split_pdf_into_chunks(input_pdf_path, chunk_size=3): chunk_uuid = f"chunk_{uuid.uuid4().hex[:8]}" chunk_folder = os.path.join("/content", chunk_uuid) os.makedirs(chunk_folder, exist_ok=True) reader = PdfReader(input_pdf_path) total_pages = len(reader.pages) file_paths = [] for start in range(0, total_pages, chunk_size): writer = PdfWriter() for page_num in range(start, min(start + chunk_size, total_pages)): writer.add_page(reader.pages[page_num]) chunk_path = os.path.join(chunk_folder, f"chunk_{start // chunk_size + 1}.pdf") with open(chunk_path, "wb") as f: writer.write(f) file_paths.append(chunk_path) return file_paths def split_text_file_by_lines(input_txt_path, lines_per_chunk=45): chunk_uuid = f"chunk_{uuid.uuid4().hex[:8]}" chunk_folder = os.path.join("/content", chunk_uuid) os.makedirs(chunk_folder, exist_ok=True) with open(input_txt_path, "r", encoding="utf-8") as f: lines = f.readlines() file_paths = [] for i in range(0, len(lines), lines_per_chunk): chunk_lines = lines[i:i + lines_per_chunk] chunk_path = os.path.join(chunk_folder, f"chunk_{i // lines_per_chunk + 1}.txt") with open(chunk_path, "w", encoding="utf-8") as f: f.writelines(chunk_lines) file_paths.append(chunk_path) if len(file_paths) == 10: break print(f"✅ Created {len(file_paths)} chunks in {chunk_folder}") return file_paths def extract_labeled_lines(response_text): pattern = re.compile(r"(.*?)(?:\s*)\{(.*?)\}") rows = [] for line in response_text.strip().split("\n"): matches = pattern.findall(line.strip()) for content, label in matches: rows.append([content.strip(), label.strip()]) return rows def remove_empty_content(df): df = df.dropna(subset=['content']) df = df[df['content'].str.strip() != ''] return df def remove_leading_numbers(df: pd.DataFrame) -> pd.DataFrame: def clean_content(text): if isinstance(text, str): return re.sub(r'^\s*\d+\.\s*', '', text) return text df['content'] = df['content'].apply(clean_content) return df def remove_numeric_only_content(df: pd.DataFrame) -> pd.DataFrame: def is_numeric_only(text): if isinstance(text, str): return re.fullmatch(r'\s*\d+\s*', text) is not None return False return df[~df['content'].apply(is_numeric_only)].reset_index(drop=True) def remove_emptyline_rows(df: pd.DataFrame) -> pd.DataFrame: def is_only_empty_line_repeats(text): if not isinstance(text, str): return False return re.fullmatch(r'(\s*\(empty line\)\s*)+', text.strip(), flags=re.IGNORECASE) is not None return df[~df['content'].apply(is_only_empty_line_repeats)].reset_index(drop=True) def merge_consecutive_action_lines(df: pd.DataFrame) -> pd.DataFrame: merged_rows = [] prev_row = None for _, row in df.iterrows(): if ( prev_row is not None and row['script_element'] == 'action' and prev_row['script_element'] == 'action' ): prev_row['content'] += " " + row['content'] else: if prev_row is not None: merged_rows.append(prev_row) prev_row = row.copy() if prev_row is not None: merged_rows.append(prev_row) return pd.DataFrame(merged_rows).reset_index(drop=True) def merge_consecutive_action_lines_new(df: pd.DataFrame) -> pd.DataFrame: merged_rows = [] prev_row = None for _, row in df.iterrows(): current_is_action = row['script_element'] == 'action' previous_is_action = prev_row is not None and prev_row['script_element'] == 'action' if ( current_is_action and previous_is_action and not prev_row['content'].strip().endswith(('.', '!', '?')) ): # Merge into previous action prev_row['content'] += ' ' + row['content'].strip() else: if prev_row is not None: merged_rows.append(prev_row) prev_row = row.copy() if prev_row is not None: merged_rows.append(prev_row) return pd.DataFrame(merged_rows).reset_index(drop=True) def merge_consecutive_dialogue_lines(df: pd.DataFrame) -> pd.DataFrame: merged_rows = [] prev_row = None for _, row in df.iterrows(): if ( prev_row is not None and row['script_element'] == 'dialogue' and prev_row['script_element'] == 'dialogue' ): prev_row['content'] += " " + row['content'] else: if prev_row is not None: merged_rows.append(prev_row) prev_row = row.copy() if prev_row is not None: merged_rows.append(prev_row) return pd.DataFrame(merged_rows).reset_index(drop=True) def insert_blank_lines(df: pd.DataFrame) -> pd.DataFrame: insert_after = {"slugline", "dialogue", "action", "transition"} new_rows = [] for _, row in df.iterrows(): new_rows.append(row) if row['script_element'] in insert_after: new_rows.append(pd.Series({'content': '', 'script_element': 'blank'})) return pd.DataFrame(new_rows).reset_index(drop=True) def add_fade_in_out(df: pd.DataFrame) -> pd.DataFrame: first_slugline_idx = df[df['script_element'] == 'slugline'].index.min() if pd.isna(first_slugline_idx): return df df_trimmed = df.loc[first_slugline_idx:].reset_index(drop=True) fade_in_row = pd.DataFrame([{'content': 'FADE IN', 'script_element': 'transition'}]) df_trimmed = pd.concat([fade_in_row, df_trimmed], ignore_index=True) fade_out_row = pd.DataFrame([{'content': 'FADE OUT', 'script_element': 'transition'}]) df_trimmed = pd.concat([df_trimmed, fade_out_row], ignore_index=True) return df_trimmed def remove_asterisks(df: pd.DataFrame) -> pd.DataFrame: df['content'] = df['content'].astype(str).str.replace(r'\*+', '', regex=True) return df def merge_consecutive_action_lines_new(df: pd.DataFrame) -> pd.DataFrame: merged_rows = [] prev_row = None for _, row in df.iterrows(): current_is_action = row['script_element'] == 'action' previous_is_action = prev_row is not None and prev_row['script_element'] == 'action' if ( current_is_action and previous_is_action and not prev_row['content'].strip().endswith(('.', '!', '?')) ): # Merge into previous action prev_row['content'] += ' ' + row['content'].strip() else: if prev_row is not None: merged_rows.append(prev_row) prev_row = row.copy() if prev_row is not None: merged_rows.append(prev_row) return pd.DataFrame(merged_rows).reset_index(drop=True) def extract_labeled_lines(response_lines: list[str]): pattern = re.compile(r"(.*?)(?:\s*)\{(.*?)\}") rows = [] for line in response_lines: matches = pattern.findall(line.strip()) for content, label in matches: rows.append([content.strip(), label.strip()]) return rows def remove_trailing_speaker(df: pd.DataFrame) -> pd.DataFrame: if not df.empty and df.iloc[-1]['script_element'] == 'speaker': return df.iloc[:-1].reset_index(drop=True) return df.reset_index(drop=True)