Conversion_Kitchen_Code/kitchen_counter/conversion/translation/detection.py

591 lines
23 KiB
Python
Raw Normal View History

2024-04-27 09:33:09 +00:00
from google.cloud import translate_v2 as Translate
from google.cloud import translate
from MNF.settings import BasePath
from requests.exceptions import SSLError
# from .script_writing import default_script
from .translation_variables import code_script
from bs4 import BeautifulSoup
from .script_detector import script_cat
from statistics import mode
from collections import Counter
# import textract
from tqdm import tqdm
import math
import sys
import re
import os
from .script_reading import getRefined, getSlugAndNonSlug, getSpeakers, getScenes
import requests
import uuid
import json
import boto3
from collections import Counter
import pandas as pd
import nltk
from nltk.corpus import stopwords
# nltk.data.clear_cache()
# nltk.download('stopwords')
# nltk.download('punkt')
import re
stop_words = set(['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"])
basePath = BasePath()
# -> Google Translation API Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = rf"{basePath}/MNF/json_keys/authentication.json"
translate_client = Translate.Client()
client = translate.TranslationServiceClient()
# -> For Detecting language of any text
def language_detector(text):
lang_detected = []
#primary language detector
try:
result = translate_client.detect_language(text)
print("length re:",len(result['language']), text, "sentence recieved")
if len(result['language']) > 3:
return (str(result['language']).split("-"))[0]
else:
return result['language']
except SSLError:
return language_detector(text)
except Exception as e:
return text
# takes too long to use this
#secondary translation_detection
try:
subscription_key = "83ce6233419541929f7ab0d3035fca58"
location = "eastus"
headers = {
'Ocp-Apim-Subscription-Key': subscription_key,
'Ocp-Apim-Subscription-Region': location,
'Content-type': 'application/json',
'X-ClientTraceId': str(uuid.uuid4())
}
params = {'api-version': '3.0'}
body = [{'text': text}]
request = requests.post("https://api.cognitive.microsofttranslator.com/detect?api-version=3.0", params=params,
headers=headers, json=body)
response = request.json()
lang_detected.append(str(response[0]['language']))
except:
print("azure is not working ")
pass
#tertiary translation_detection
try:
aws_json_path = basePath + "/MNF/json_keys"
with open(rf"{aws_json_path}/keys_aws.json") as f:
keys1 = json.load(f)
session = boto3.Session(aws_access_key_id=keys1["aws_access_key_id"],
aws_secret_access_key=keys1["aws_secret_access_key"],
region_name=keys1["region_name"])
detect_aws = session.client(service_name='comprehend', region_name='us-east-2', use_ssl=True)
pred_3 = (detect_aws.detect_dominant_language(Text=text))['Languages'][0]["LanguageCode"]
lang_detected.append(str(pred_3))
except:
print("aws is not working ")
pass
most_common_lang = Counter(lang_detected)
sorted_values = sorted(most_common_lang.values(), reverse=True) # Sort the values
sorted_dict = {}
for i in sorted_values:
for k in most_common_lang.keys():
if most_common_lang[k] == i:
sorted_dict[k] = most_common_lang[k]
sources = list(sorted_dict.keys())
return sources[0]
# -> For Detecting Script of any text
def script_det(text):
punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
no_punct = ""
for char in text:
if char not in punctuations:
no_punct = char
break
script = script_cat(no_punct)[0]
return script
'''
A. Language of Highest number of full dialogues,
B. Numbers of dialogues in action line language,
C. Number of dialogues in other languages)
'''
# -> For Detecting presence of different languages in dialogues (whole sentences)
def A_B_C(dialogue_language, non_dial_src_lang):
print("line 316:dialogue_language", dialogue_language)
dict1 = dict(Counter(dialogue_language))
print("line 319:dict1", dict1)
sorted_values = sorted(dict1.values(), reverse=True) # Sort the values
print("line 321:sorted_values:", sorted_values)
sorted_dict = {}
for i in sorted_values:
for k in dict1.keys():
if dict1[k] == i:
sorted_dict[k] = dict1[k]
sources = list(sorted_dict.keys())
print("line 328: sources: ", sources)
A = sources[0]
print("Most Prominent Dialogue Language", A)
if len(sources) == 1:
B = 0
C = 0
elif non_dial_src_lang not in sources:
B = 0
C = sum(sorted_values[1:])
else:
if A == non_dial_src_lang:
B = 0
else:
B = sorted_values[sources.index(non_dial_src_lang)]
C = sum(sorted_values[1:]) - B
return A, B, C
# -> Detection of Different Lanugages and Scripts in Script
def dial_each_word_lang1(non_dial_src_lang, dial):
for word in dial.split():
if word.lower() not in stop_words:
if language_detector(word) == non_dial_src_lang:
return "True"
return "False"
# -> Detection of Different Lanugages and Scripts in Script
def dial_each_word_lang2(non_dial_src_lang, A, dial):
for word in dial.split():
if word.lower() not in stop_words:
if (language_detector(word) != non_dial_src_lang) or (language_detector(word) != A):
return "True"
return "False"
# -> Detection of words in lines with different languages
def word_with_actionline_other_lang(lines, A, non_dial_src_lang):
dials_with_actionline_langs = 0
dials_with_other_langs = 0
lineno = 0
actionline_lang_output = "False"
other_lang_output = "False"
ignore_actionline_match = False
if A == non_dial_src_lang:
ignore_actionline_match = True
for i, line in enumerate(lines):
if i == 0:
continue
if line[3] == "dialogue":
# [speaker] = line.keys()
# if speaker == 'Transition':
# continue
dial_src_lang = language_detector(line[2])
if actionline_lang_output == "False" or other_lang_output == "False":
print(
"Still Searching if Words of other langs are present or not...")
if dial_src_lang == A:
if actionline_lang_output != "True" and not ignore_actionline_match:
output = dial_each_word_lang1(
non_dial_src_lang, line[2])
if output == "True":
dials_with_actionline_langs += 1
if dials_with_actionline_langs >= 3:
actionline_lang_output = "True"
if other_lang_output != "True":
output = dial_each_word_lang2(
non_dial_src_lang, A, line[2])
if output == "True":
dials_with_other_langs += 1
if dials_with_other_langs >= 3:
other_lang_output = "True"
else:
print("Found Presence of other Langs in Words")
return actionline_lang_output, other_lang_output
return actionline_lang_output, other_lang_output
def convert_to_pdf(input_docx, out_folder):
import subprocess
p = subprocess.Popen(
[
"libreoffice",
"--headless",
"--convert-to",
"pdf",
"--outdir",
out_folder,
input_docx,
]
)
print(["--convert-to", "pdf", input_docx])
p.communicate()
# -> Detection of Different Lanugages and Scripts in Script
def getInputs(request, filename1, id=None):
from centralisedFileSystem.models import Script
from scriptAudit.models import ScriptAuditModel
from utils import filesystem
from scriptAudit.sa_functions import conv_to_txt, convert_txt_to_docx, fdx_to_audited_df
from django.core.files.base import ContentFile
from auto_email.views import sendmail
import time
from scriptAudit.mnf_script_audit import NeutralAudit
from scriptAudit.models import States
import PyPDF2
from utils import utilities
from django.contrib.auth.models import User
original_stdout = sys.stdout
# f = open(f"{basePath}/log/debu77.log", "w")
# sys.stdout = original_stdout
print("Detecting Languages and Scripts present in Script")
# is_fdx = False
# if ((((filename1).split("/"))[-1]).split("."))[-1] == "fdx":
# is_fdx = True
#
# if is_fdx:
# filer = open(filename1, 'r')
# text = fdx_to_txt(filer)
# filename = rf"{basePath}/conversion/translation/file_lines.txt"
# f = open(filename, 'w')
# f.write(text)
# f.close()
# else:
# text = textract.process(filename1, encoding="utf8", errors='ignore')
# filename = rf"{basePath}/conversion/translation/file_lines.txt"
# f = open(filename, 'wb')
# f.write(text)
# f.close()
"""
new code for getting text from any type of file
"""
ext = (((filename1.split("/"))[-1]).split("."))[-1]
if ext == "fdx":
print("fdx part")
file = ContentFile(
open(filename1, "rb").read(),
(filename1.split("/"))[-1],
)
print(file,"<-this is the file")
with open(filename1, 'r') as file111:
xml_data = file111.read()
soup = BeautifulSoup(xml_data, "xml")
paragraphs = soup.find_all("Paragraph")
for para in paragraphs:
elem = para.find("SceneProperties")
if elem is not None:
numPages = elem.attrs['Page']
# df = fdx_to_audited_df(filename1)
# pd.set_option('display.max_rows', None)
# pd.set_option('display.max_columns', None)
# print("step 6")
# print("step 7")
# dataframe = df
# list_of_lists = dataframe.values.tolist()
#
# last_line_not_upper = 60
# for idx in range(60, 50, -1):
# if (str(list_of_lists[idx]['content']).strip() == "" or
# (str(list_of_lists[idx]['content']).isupper() and len(list_of_lists[idx]['content']) > 0) or
# ")" in str(list_of_lists[idx]['content']).strip() or
# "(" in str(list_of_lists[idx]['content']).strip()):
# pass
# else:
# last_line_not_upper = idx
# break
# list_of_lists = list_of_lists[:last_line_not_upper + 1]
else:
filee = "".join(((filename1.split("."))[:-1]))
conv_to_txt(filename1, rf"{filee}_file_lines.docx",
rf"{filee}_file_lines.txt")
with open(rf"{filee}_file_lines.txt", 'r') as file:
lines = file.readlines()
cleaned_lines = []
prev_line_blank = False
for line in lines:
if line.strip() == '' or line.strip() == "\\n":
if not prev_line_blank:
cleaned_lines.append(line)
prev_line_blank = True
else:
cleaned_lines.append(line)
prev_line_blank = False
with open(rf"{filee}_cleaned_file.txt", 'w') as file:
file.writelines(cleaned_lines)
with open(rf"{filee}_cleaned_file.txt") as file:
cleaned_lines_data = file.readlines()
total_lines = len(cleaned_lines_data)
print("name of file", filename1)
if ext == "docx":
numPages = math.ceil(total_lines / 50) if math.ceil(total_lines / 50) > 0 else 1
elif ext == "pdf":
file = open(filename1, 'rb')
pdfReader = PyPDF2.PdfReader(file)
numPages = len(pdfReader.pages)
# elif ext == "fdx":
# with open(filename1, 'r') as file:
# xml_data = file.read()
# soup = BeautifulSoup(xml_data, "xml")
# paragraphs = soup.find_all("Paragraph")
# for para in paragraphs:
# elem = para.find("SceneProperties")
# if elem is not None:
# numPages = elem.attrs['Page']
# fdx_to_docx = "".join(((filename1.split("."))[:-1])) + ".docx"
# convert_txt_to_docx(rf"{filee}_file_lines.txt", fdx_to_docx)
# convert_to_pdf(fdx_to_docx, rf"{basePath}/media/scripts/")
# time.sleep(5)
# os.chmod("".join(((filename1.split("."))[:-1])) + ".pdf", 0o777)
# file = open("".join(((filename1.split("."))[:-1])) + ".pdf", 'rb')
#
# pdfReader = PyPDF2.PdfReader(file)
# numPages = len(pdfReader.pages)
# numPages = math.ceil(total_lines / 50) if math.ceil(total_lines / 50) > 0 else 1
# divison_factor = 50 if ext == "docx" or ext == "pdf" else 18
# numPages = math.ceil(total_lines / divison_factor) if math.ceil(total_lines / divison_factor) > 0 else 1
initial_lines = 60
if total_lines < initial_lines:
txt_file_200 = "".join((cleaned_lines_data)[:total_lines])
else:
last_line_not_upper = 60
for idx in range(60, 50, -1):
if (str(cleaned_lines_data[idx]).strip() == "" or
(str(cleaned_lines_data[idx]).isupper() and len(cleaned_lines_data[idx]) > 0) or
")" in str(cleaned_lines_data[idx]).strip() or
"(" in str(cleaned_lines_data[idx]).strip()):
pass
else:
last_line_not_upper = idx
break
print(cleaned_lines_data)
txt_file_200 = "".join((cleaned_lines_data)[0:last_line_not_upper+1])
language_selection_audit_inp = rf"{filee}_file_lines_2003.txt"
f = open(language_selection_audit_inp, 'w')
f.write(txt_file_200)
f.close()
audit_inp = open(language_selection_audit_inp, 'rb').read()
file = ContentFile(
audit_inp,
(language_selection_audit_inp.split("/"))[-1],
)
language_code = "en"
name_script = str((((filename1.split("/"))[-1]).split("."))[0]) + "_language-audit"
user = User.objects.get(id=1)
result = filesystem.new_screenplay_without_audit_in_background(
user,
request.user.username,
str(name_script),
file,
"script-original",
language_code,
)
print("already called", result)
audit_id = result.get("script", {}).get("id")
ScriptAuditModel.objects.update_or_create(
script=Script.objects.get(
id=audit_id
),
defaults={"status": States.STARTED}
)
audit = NeutralAudit(audit_id)
status = ScriptAuditModel.objects.get(
script=Script.objects.get(
id=audit_id
)
)
print("audit will start")
try:
if ext == "fdx":
audit.audit_fdx()
else:
audit.audit()
status.status = "SUCCESS"
status.save()
except Exception as e:
print("Error of Audit is:", e)
status.status = "FAILURE"
status.save()
to_email = [request.user.email]
email_code = 'SB2'
sendmail(to_email=to_email, email_code=email_code)
return None
sys.stdout = original_stdout
# result = filesystem.new_screenplay(
# user,
# request.user.username,
# name_script,
# file,
# "script-original",
# language_code,
# )
# audit_id = result.get("script", {}).get("id")
# while True:
# try:
# status = ScriptAuditModel.objects.get(
# script=Script.objects.get(id=audit_id)).status
# if status == "SUCCESS":
# break
# elif status == "FAILURE":
# to_email = [request.user.email]
# email_code = 'SB2'
# sendmail(to_email=to_email, email_code=email_code)
# return None
# elif status == "STARTED" or status == "PENDING":
# pass
# except:
# pass
# time.sleep(1)
print("step 3")
print("audit id -> ", audit_id)
file_path_ = filesystem.get_file_path(
audit_id, "script-csv")
print("step 4")
try:
print("step 5")
df = pd.read_csv(file_path_, encoding="utf-8")
except UnicodeError:
df = pd.read_csv(file_path_, encoding="utf-16")
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
print("step 6")
if id is not None:
from mnfapp.models import MNFScriptDatabase
script_get = MNFScriptDatabase.objects.get(id=id)
script_get.language_audit_id = audit_id
script_get.save()
print("step 7")
dataframe = df
list_of_lists = dataframe.values.tolist()
print("step 8")
# sys.stdout = original_stdout
# Print the resulting list
print("Audited Df ->", list_of_lists)
if ext == "fdx":
total_lines = len(list_of_lists)
initial_lines = 60
if total_lines < initial_lines:
last_line_not_upper = total_lines
else:
last_line_not_upper = 60
for idx in range(60, 50, -1):
if (str(list_of_lists[idx][2]).strip() == "" or
(str(list_of_lists[idx][2]).isupper() and len(list_of_lists[idx][2]) > 0) or
")" in str(list_of_lists[idx][2]).strip() or
"(" in str(list_of_lists[idx][2]).strip()):
pass
else:
last_line_not_upper = idx
break
list_of_lists = list_of_lists[:last_line_not_upper+1]
from .multiprocessing_funcs import detecting_languages
import multiprocessing
num_processes = 4
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(detecting_languages, list_of_lists)
print("ALL Results -> ", results)
# for result in results:
# if result[0] != "" and result[1] != "":
# non_dial_src_lang = result[0]
# non_dial_src_script = result[1]
# break
# print("step 4", non_dial_src_lang, non_dial_src_script)
languages_of_actionlines = [result[0] for result in results if result[0]]
script_of_actionlines = [result[1] for result in results if result[1]]
non_dial_src_lang = Counter(languages_of_actionlines).most_common(1)[0][0]
non_dial_src_script = Counter(script_of_actionlines).most_common(1)[0][0]
language_of_all_dialogues = [result[2] for result in results if result[2]]
# new code ends
script_of_all_dialogues = [result[3] for result in results if result[3]]
# -> For Detecting presence of different languages in dialogues (whole sentences)
A, B, C = A_B_C(language_of_all_dialogues, non_dial_src_lang)
print("A,B,C", A, B, C)
totaldials = len(language_of_all_dialogues)
try:
dial_src_script = mode(script_of_all_dialogues)
except:
dial_src_script = "Common"
for script in list(Counter(script_of_all_dialogues).values()):
if script != "Common":
dial_src_script = script
break
dial_src_lang = A
one_step_process = "Yes" if dial_src_script == code_script[A] else "Can_not_say"
# word_lang_with_actionline = word_with_actionline(scenes, A, non_dial_src_lang)
# word_lang_with_other = word_with_other(scenes, A, non_dial_src_lang)
# -> For Detecting presence of different languages in dialogues (words)
word_lang_with_actionline, word_lang_with_other = word_with_actionline_other_lang(
list_of_lists, A, non_dial_src_lang)
print("A = {} B = {} C = {}".format(A, B, C))
print("dial_language", A)
print("dial_src_script", dial_src_script)
if round(B / totaldials, 2) > 0.15:
print("UI option3 - yes")
UI_option3 = "Yes"
else:
print("UI option3 - no")
UI_option3 = "No"
if round(C / totaldials, 2) > 0.20:
print("UI option4 - yes")
UI_option4 = "Yes"
else:
print("UI option4 - no")
UI_option4 = "No"
if word_lang_with_actionline == "True":
print("UI option5 - Yes")
UI_option5 = "Yes"
else:
print("UI_option5 - NO")
UI_option5 = "No"
print("checking other lang", word_lang_with_other)
if word_lang_with_other == "True":
print("UI option6 - Yes")
UI_option6 = "Yes"
else:
print("UI option6 - No")
UI_option6 = "No"
print("*******************------------Detection------------***********************")
print(UI_option3, UI_option4, UI_option5, UI_option6, non_dial_src_script)
# f.close()
return [non_dial_src_lang, dial_src_lang, dial_src_script, non_dial_src_script, UI_option3, UI_option4, UI_option5,
UI_option6, numPages]