Conversion_Kitchen_Code/kitchen_counter/scriptAudit/utils.py

498 lines
16 KiB
Python
Executable File

from multiprocessing import Process
import os
from centralisedFileSystem.models import Script
from scriptAudit.models import ScriptAuditModel, States
from scriptAudit.mnf_script_audit import NeutralAudit
from datetime import datetime
from django.core.files.base import ContentFile
from utils.filesystem import new_screenplay, create_script_docx,get_file_path,new_screenplay_without_audit_in_background
from .mnf_script_audit import NeutralAudit
from time import sleep
import time
import pandas as pd
import re
import uuid
from django.conf import settings
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.core.mail import EmailMultiAlternatives
from PyPDF2 import PdfReader, PdfWriter
def update_audit_status(script_id : str, status : str) -> None:
ScriptAuditModel.objects.update_or_create(
script = Script.objects.get(
id = script_id
),
defaults={"status" : status}
)
def audit_in_background(script_id : str) -> None:
# ---------------------------------------------------
# for running audit without celery
#
try:
update_audit_status(script_id, States.STARTED)
except:
update_audit_status(script_id, States.FAILURE)
audit = NeutralAudit(script_id)
process1 = Process(target=audit.audit_in_background())
process1.start()
process1.join()
# status = ScriptAuditModel.objects.get(
# script = Script.objects.get(
# id = script_id
# ))
# if status.status == "SUCCESS":
# to_email = [request.user.email]
# email_code = 'SB1'
# sendmail(to_email=to_email , email_code=email_code )
# elif status.status == "FAILURE":
# to_email = [request.user.email]
# email_code = 'SB2'
# sendmail(to_email=to_email , email_code=email_code )
# del audit
# ---------------------------------------------------
# ---------------------------------------------------
# for running audit with celery
#
# uncomment only on AWS
# NeutralAuditTask().delay(
# script_id = script_id,
# )
# ---------------------------------------------------
def generate_script_id_for_pitchdeck(path,request):
input_file = os.path.basename(path)
input_file1 = os.path.splitext(input_file)[0]
now = datetime.now()
screenplay_name = input_file1 +"_"+ str(now)
author = "mynextfilm-user"
language = "en"
script_file = path
script_ext = script_file.split(".")[-1]
script_file_name = screenplay_name + "." + script_ext
print(script_file_name)
with open(path, "rb") as script_file:
file = ContentFile(script_file.read(),
script_file_name,
)
result = new_screenplay(
request.user,
author,
screenplay_name,
file,
"script-original",
language,
)
script_id = result.get("script", {}).get("id")
print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n")
return script_id
def check_status_and_trigger_vector(script_id,v_id):
"""
this function is used to check the status of the script audit , if the status is SUCCESS
this will generate a docx audited file and send it to vector generation
this function accepts 2 parameters
1.script_id --> audit id
2.v_id vector id
"""
pass
# print("vector_id:",v_id)
# audit_completed = False
# while not audit_completed:
# status = ScriptAuditModel.objects.get(
# script = Script.objects.get(
# id = script_id
# ))
# print("waiting for audit to get complete")
# if status.status == "SUCCESS":
# try:
# a_path = get_file_path(str(script_id), "script-docx")
# vector_gen(a_path,v_id)
# audit_completed = True
# break
# except:
# create_script_docx(script_id)
# a_path = get_file_path(str(script_id), "script-docx")
# print("Audited script path is fetched")
# vector_gen(a_path,v_id)
# audit_completed = True
# break
# elif status.status == "FAILURE":
# print("Audit Failed")
# audit_completed = True
def generate_script_id(path,request,id):
"""
the below function is used to generate script id
and the function accepts a docx file path as a parameter
"""
input_file = os.path.basename(path)
input_file1 = os.path.splitext(input_file)[0]
now = datetime.now()
screenplay_name = input_file1 +"_"+ str(id)
author = "mynextfilm-user"
language = "en"
script_file = path
script_ext = script_file.split(".")[-1]
script_file_name = screenplay_name + "." + script_ext
print(script_file_name)
with open(path, "rb") as script_file:
file = ContentFile(script_file.read(),
script_file_name,
)
result = new_screenplay_without_audit_in_background(
request.user,
author,
screenplay_name,
file,
"script-original",
language,
)
script_id = result.get("script", {}).get("id")
print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n")
try:
update_audit_status(script_id, States.STARTED)
except:
update_audit_status(script_id, States.FAILURE)
try:
naudit = NeutralAudit(script_id)
naudit.audit()
ScriptAuditModel.objects.update_or_create(
script = Script.objects.get(
id = script_id
),
defaults={"status" : "SUCCESS"}
)
except:
ScriptAuditModel.objects.update_or_create(
script = Script.objects.get(
id = script_id
),
defaults={"status" : "FAILURE"}
)
return script_id
"""
this below function is called when you want call audit + vector by giving the vectors
"""
def audit_vector_integration(path,v_id,request):
print("vector_id:",v_id)
script_id = generate_script_id(path,request,v_id)
audit_completed = False
status = ScriptAuditModel.objects.get(
script = Script.objects.get(
id = script_id
))
if status.status == "SUCCESS":
try:
a_path = get_file_path(str(script_id), "script-docx")
except:
create_script_docx(script_id)
a_path = get_file_path(str(script_id), "script-docx")
print("Audited script path is fetched")
vector_gen(a_path,v_id)
elif status.status == "FAILURE":
print("Audit Failed")
# def audit_vector_integration_(path,id,request):
# print("vector_id:",id)
# script_id = generate_script_id(path,request,id)
# audit_completed = False
# while not audit_completed:
# scripts = Script.objects.filter(screenplay__user=request.user)
# for script in scripts:
# if ScriptAuditModel.objects.filter(script=script).exists():
# audit_status_objects = ScriptAuditModel.objects.filter(script=script)
# for audit_status_object in audit_status_objects:
# script_audit_status = audit_status_object.status
# if script_audit_status == States.SUCCESS:
# try:
# a_path = get_file_path(str(script_id), "script-docx")
# except:
# create_script_docx(script_id)
# a_path = get_file_path(str(script_id), "script-docx")
# print("Audited script path is fetched")
# vector_gen(a_path,id)
# elif script_audit_status == States.FAILURE:
# raise Exception("Script Audit procress failed")
# break
# elif script_audit_status == States.STARTED or script_audit_status == States.PENDING:
# # Wait for some time before checking the audit status again
# sleep(10)
# else:
# raise Exception("Unexpected script audit status")
# # If the loop breaks without returning, it means the audit status is FAILURE
# raise Exception("Script audit failed")
def send_email_to_user(user,screenplay_name,subject,message):# removed flag = 1
subject = subject + "."
from_email = settings.EMAIL_HOST_USER
to = user.email
context = {
"Name": user,
"story_name": screenplay_name,
"message" : message,
}
html_content = render_to_string(
"audit/coree_email.html", context
)
text_content = strip_tags(html_content)
msg = EmailMultiAlternatives(subject, text_content, from_email, [to])
msg.attach_alternative(html_content, "text/html")
msg.send()
def split_pdf_into_chunks(input_pdf_path, chunk_size=3):
chunk_uuid = f"chunk_{uuid.uuid4().hex[:8]}"
chunk_folder = os.path.join("/content", chunk_uuid)
os.makedirs(chunk_folder, exist_ok=True)
reader = PdfReader(input_pdf_path)
total_pages = len(reader.pages)
file_paths = []
for start in range(0, total_pages, chunk_size):
writer = PdfWriter()
for page_num in range(start, min(start + chunk_size, total_pages)):
writer.add_page(reader.pages[page_num])
chunk_path = os.path.join(chunk_folder, f"chunk_{start // chunk_size + 1}.pdf")
with open(chunk_path, "wb") as f:
writer.write(f)
file_paths.append(chunk_path)
return file_paths
def split_text_file_by_lines(input_txt_path, lines_per_chunk=45):
chunk_uuid = f"chunk_{uuid.uuid4().hex[:8]}"
chunk_folder = os.path.join("/content", chunk_uuid)
os.makedirs(chunk_folder, exist_ok=True)
with open(input_txt_path, "r", encoding="utf-8") as f:
lines = f.readlines()
file_paths = []
for i in range(0, len(lines), lines_per_chunk):
chunk_lines = lines[i:i + lines_per_chunk]
chunk_path = os.path.join(chunk_folder, f"chunk_{i // lines_per_chunk + 1}.txt")
with open(chunk_path, "w", encoding="utf-8") as f:
f.writelines(chunk_lines)
file_paths.append(chunk_path)
if len(file_paths) == 10:
break
print(f"✅ Created {len(file_paths)} chunks in {chunk_folder}")
return file_paths
def extract_labeled_lines(response_text):
pattern = re.compile(r"(.*?)(?:\s*)\{(.*?)\}")
rows = []
for line in response_text.strip().split("\n"):
matches = pattern.findall(line.strip())
for content, label in matches:
rows.append([content.strip(), label.strip()])
return rows
def remove_empty_content(df):
df = df.dropna(subset=['content'])
df = df[df['content'].str.strip() != '']
return df
def remove_leading_numbers(df: pd.DataFrame) -> pd.DataFrame:
def clean_content(text):
if isinstance(text, str):
return re.sub(r'^\s*\d+\.\s*', '', text)
return text
df['content'] = df['content'].apply(clean_content)
return df
def remove_numeric_only_content(df: pd.DataFrame) -> pd.DataFrame:
def is_numeric_only(text):
if isinstance(text, str):
return re.fullmatch(r'\s*\d+\s*', text) is not None
return False
return df[~df['content'].apply(is_numeric_only)].reset_index(drop=True)
def remove_emptyline_rows(df: pd.DataFrame) -> pd.DataFrame:
def is_only_empty_line_repeats(text):
if not isinstance(text, str):
return False
return re.fullmatch(r'(\s*\(empty line\)\s*)+', text.strip(), flags=re.IGNORECASE) is not None
return df[~df['content'].apply(is_only_empty_line_repeats)].reset_index(drop=True)
def merge_consecutive_action_lines(df: pd.DataFrame) -> pd.DataFrame:
merged_rows = []
prev_row = None
for _, row in df.iterrows():
if (
prev_row is not None and
row['script_element'] == 'action' and
prev_row['script_element'] == 'action'
):
prev_row['content'] += " " + row['content']
else:
if prev_row is not None:
merged_rows.append(prev_row)
prev_row = row.copy()
if prev_row is not None:
merged_rows.append(prev_row)
return pd.DataFrame(merged_rows).reset_index(drop=True)
def merge_consecutive_action_lines_new(df: pd.DataFrame) -> pd.DataFrame:
merged_rows = []
prev_row = None
for _, row in df.iterrows():
current_is_action = row['script_element'] == 'action'
previous_is_action = prev_row is not None and prev_row['script_element'] == 'action'
if (
current_is_action and
previous_is_action and
not prev_row['content'].strip().endswith(('.', '!', '?'))
):
# Merge into previous action
prev_row['content'] += ' ' + row['content'].strip()
else:
if prev_row is not None:
merged_rows.append(prev_row)
prev_row = row.copy()
if prev_row is not None:
merged_rows.append(prev_row)
return pd.DataFrame(merged_rows).reset_index(drop=True)
def merge_consecutive_dialogue_lines(df: pd.DataFrame) -> pd.DataFrame:
merged_rows = []
prev_row = None
for _, row in df.iterrows():
if (
prev_row is not None and
row['script_element'] == 'dialogue' and
prev_row['script_element'] == 'dialogue'
):
prev_row['content'] += " " + row['content']
else:
if prev_row is not None:
merged_rows.append(prev_row)
prev_row = row.copy()
if prev_row is not None:
merged_rows.append(prev_row)
return pd.DataFrame(merged_rows).reset_index(drop=True)
def insert_blank_lines(df: pd.DataFrame) -> pd.DataFrame:
insert_after = {"slugline", "dialogue", "action", "transition"}
new_rows = []
for _, row in df.iterrows():
new_rows.append(row)
if row['script_element'] in insert_after:
new_rows.append(pd.Series({'content': '', 'script_element': 'blank'}))
return pd.DataFrame(new_rows).reset_index(drop=True)
def add_fade_in_out(df: pd.DataFrame) -> pd.DataFrame:
first_slugline_idx = df[df['script_element'] == 'slugline'].index.min()
if pd.isna(first_slugline_idx):
return df
df_trimmed = df.loc[first_slugline_idx:].reset_index(drop=True)
fade_in_row = pd.DataFrame([{'content': 'FADE IN', 'script_element': 'transition'}])
df_trimmed = pd.concat([fade_in_row, df_trimmed], ignore_index=True)
fade_out_row = pd.DataFrame([{'content': 'FADE OUT', 'script_element': 'transition'}])
df_trimmed = pd.concat([df_trimmed, fade_out_row], ignore_index=True)
return df_trimmed
def remove_asterisks(df: pd.DataFrame) -> pd.DataFrame:
df['content'] = df['content'].astype(str).str.replace(r'\*+', '', regex=True)
return df
def merge_consecutive_action_lines_new(df: pd.DataFrame) -> pd.DataFrame:
merged_rows = []
prev_row = None
for _, row in df.iterrows():
current_is_action = row['script_element'] == 'action'
previous_is_action = prev_row is not None and prev_row['script_element'] == 'action'
if (
current_is_action and
previous_is_action and
not prev_row['content'].strip().endswith(('.', '!', '?'))
):
# Merge into previous action
prev_row['content'] += ' ' + row['content'].strip()
else:
if prev_row is not None:
merged_rows.append(prev_row)
prev_row = row.copy()
if prev_row is not None:
merged_rows.append(prev_row)
return pd.DataFrame(merged_rows).reset_index(drop=True)
def extract_labeled_lines(response_lines: list[str]):
pattern = re.compile(r"(.*?)(?:\s*)\{(.*?)\}")
rows = []
for line in response_lines:
matches = pattern.findall(line.strip())
for content, label in matches:
rows.append([content.strip(), label.strip()])
return rows
def remove_trailing_speaker(df: pd.DataFrame) -> pd.DataFrame:
if not df.empty and df.iloc[-1]['script_element'] == 'speaker':
return df.iloc[:-1].reset_index(drop=True)
return df.reset_index(drop=True)