Conversion_Kitchen_Code/kitchen_counter/conversion/translation/detectiongood2.py

468 lines
17 KiB
Python
Raw Permalink Normal View History

2024-04-27 09:33:09 +00:00
from google.cloud import translate_v2 as Translate
from google.cloud import translate
from MNF.settings import BasePath
from requests.exceptions import SSLError
# from .script_writing import default_script
from .translation_variables import code_script
from .script_detector import script_cat
from statistics import mode
from collections import Counter
# import textract
from tqdm import tqdm
import sys
import re
import os
from .script_reading import getRefined, getSlugAndNonSlug, getSpeakers, getScenes
import requests
import uuid
import json
import boto3
from collections import Counter
import pandas as pd
basePath = BasePath()
# -> Google Translation API Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = rf"{basePath}/MNF/json_keys/authentication.json"
translate_client = Translate.Client()
client = translate.TranslationServiceClient()
# -> For Detecting language of any text
def language_detector(text):
lang_detected = []
print(text,"sentence recieved")
#primary language detector
try:
result = translate_client.detect_language(text)
print("length re:",len(result['language']))
if len(result['language']) > 3:
#print((str(result['language']).split("-"))[0])
return (str(result['language']).split("-"))[0]
#lang_detected.append((str(result['language']).split("-"))[0])
else:
#lang_detected.append(result['language'])
return result['language']
except SSLError:
language_detector(text)
# takes too long to use this
#secondary translation_detection
try:
subscription_key = "83ce6233419541929f7ab0d3035fca58"
location = "eastus"
headers = {
'Ocp-Apim-Subscription-Key': subscription_key,
'Ocp-Apim-Subscription-Region': location,
'Content-type': 'application/json',
'X-ClientTraceId': str(uuid.uuid4())
}
params = {'api-version': '3.0'}
body = [{'text': text}]
request = requests.post("https://api.cognitive.microsofttranslator.com/detect?api-version=3.0", params=params,
headers=headers, json=body)
response = request.json()
lang_detected.append(str(response[0]['language']))
except:
print("azure is not working ")
pass
#tertiary translation_detection
try:
aws_json_path = basePath + "/MNF/json_keys"
with open(rf"{aws_json_path}/keys_aws.json") as f:
keys1 = json.load(f)
session = boto3.Session(aws_access_key_id=keys1["aws_access_key_id"],
aws_secret_access_key=keys1["aws_secret_access_key"],
region_name=keys1["region_name"])
detect_aws = session.client(service_name='comprehend', region_name='us-east-2', use_ssl=True)
pred_3 = (detect_aws.detect_dominant_language(Text=text))['Languages'][0]["LanguageCode"]
lang_detected.append(str(pred_3))
except:
print("aws is not working ")
pass
most_common_lang = Counter(lang_detected)
sorted_values = sorted(most_common_lang.values(), reverse=True) # Sort the values
sorted_dict = {}
for i in sorted_values:
for k in most_common_lang.keys():
if most_common_lang[k] == i:
sorted_dict[k] = most_common_lang[k]
sources = list(sorted_dict.keys())
return sources[0]
# -> For Detecting Script of any text
def script_det(text):
punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
no_punct = ""
for char in text:
if char not in punctuations:
no_punct = char
break
script = script_cat(no_punct)[0]
return script
'''
A. Language of Highest number of full dialogues,
B. Numbers of dialogues in action line language,
C. Number of dialogues in other languages)
'''
# -> For Detecting presence of different languages in dialogues (whole sentences)
def A_B_C(dialogue_language, non_dial_src_lang):
print("line 316:dialogue_language", dialogue_language)
dict1 = dict(Counter(dialogue_language))
print("line 319:dict1", dict1)
sorted_values = sorted(dict1.values(), reverse=True) # Sort the values
print("line 321:sorted_values:", sorted_values)
sorted_dict = {}
for i in sorted_values:
for k in dict1.keys():
if dict1[k] == i:
sorted_dict[k] = dict1[k]
sources = list(sorted_dict.keys())
print("line 328: sources: ", sources)
A = sources[0]
print("Most Prominent Dialogue Language", A)
if len(sources) == 1:
B = 0
C = 0
elif non_dial_src_lang not in sources:
B = 0
C = sum(sorted_values[1:])
else:
if A == non_dial_src_lang:
B = 0
else:
B = sorted_values[sources.index(non_dial_src_lang)]
C = sum(sorted_values[1:]) - B
return A, B, C
# -> Detection of Different Lanugages and Scripts in Script
def dial_each_word_lang1(non_dial_src_lang, dial):
for word in dial.split():
if language_detector(word) == non_dial_src_lang:
return "True"
return "False"
# -> Detection of Different Lanugages and Scripts in Script
def dial_each_word_lang2(non_dial_src_lang, A, dial):
for word in dial.split():
if (language_detector(word) != non_dial_src_lang) or (language_detector(word) != A):
return "True"
return "False"
# -> Detection of words in lines with different languages
def word_with_actionline_other_lang(lines, A, non_dial_src_lang):
dials_with_actionline_langs = 0
dials_with_other_langs = 0
lineno = 0
actionline_lang_output = "False"
other_lang_output = "False"
ignore_actionline_match = "False"
if A == non_dial_src_lang:
ignore_actionline_match = "True"
for i, line in enumerate(lines):
if i == 0:
continue
if line[3] == "dialogue":
# [speaker] = line.keys()
# if speaker == 'Transition':
# continue
dial_src_lang = language_detector(line[2])
if actionline_lang_output == "False" or other_lang_output == "False":
print(
"Still Searching if Words of other langs are present or not...")
if dial_src_lang == A:
if actionline_lang_output != "True" and not ignore_actionline_match:
output = dial_each_word_lang1(
non_dial_src_lang, line[2])
if output == "True":
dials_with_actionline_langs += 1
if dials_with_actionline_langs > 5:
actionline_lang_output = "True"
if other_lang_output != "True":
output = dial_each_word_lang2(
non_dial_src_lang, A, line[2])
if output == "True":
dials_with_other_langs += 1
if dials_with_other_langs > 5:
other_lang_output = "True"
else:
print("Found Presence of other Langs in Words")
return actionline_lang_output, other_lang_output
return actionline_lang_output, other_lang_output
# -> Detection of Different Lanugages and Scripts in Script
def getInputs(request, filename1, id):
from utils import filesystem
from utils.utilities import fdx_to_txt
from django.core.files.base import ContentFile
print("Detecting Languages and Scripts present in Script")
is_fdx = False
if ((((filename1).split("/"))[-1]).split("."))[-1] == "fdx":
is_fdx = True
if is_fdx:
filer = open(filename1, 'r')
text = fdx_to_txt(filer)
filename = rf"{basePath}/conversion/translation/file_lines.txt"
f = open(filename, 'w')
f.write(text)
f.close()
else:
text = textract.process(filename1, encoding="utf8", errors='ignore')
filename = rf"{basePath}/conversion/translation/file_lines.txt"
f = open(filename, 'wb')
f.write(text)
f.close()
with open(rf"{basePath}/conversion/translation/file_lines.txt", 'r') as file:
lines = file.readlines()
cleaned_lines = []
prev_line_blank = False
for line in lines:
if line.strip() == '' or line.strip() == "\\n":
if not prev_line_blank:
cleaned_lines.append(line)
prev_line_blank = True
else:
cleaned_lines.append(line)
prev_line_blank = False
with open(rf"{basePath}/conversion/translation/cleaned_file.txt", 'w') as file:
file.writelines(cleaned_lines)
with open(rf"{basePath}/conversion/translation/cleaned_file.txt") as file:
li = file.readlines()
print("line", li)
total_line = len(li)
print("total_line", total_line)
initial_lines = 50
if total_line < initial_lines:
txt_file_200 = "".join((li)[:total_line])
# elif total_line < 600 and total_line > 300:
# # mid = total_line // 2
# txt_file_200 = "".join((li)[0:150])
else:
# mid = total_line // 2
txt_file_200 = "".join((li)[0:50])
filename = rf"{basePath}/conversion/translation/file_lines_200.txt"
f = open(filename, 'w')
f.write(txt_file_200)
f.close()
# print("txt_file_200", txt_file_200)
original_stdout = sys.stdout
f = open(f"{basePath}/log/debu1414.log", "w")
sys.stdout = f
print("----Auditing----")
script1 = str(rf"{basePath}/conversion/translation/file_lines_200.txt")
doc = open(script1, 'rb').read()
file = ContentFile(
doc,
(script1.split("/"))[-1],
)
language_code = "en"
name_script = str((((filename1.split("/"))[-1]).split("."))[0]) + "_language-audit"
result = filesystem.new_screenplay(
request.user,
request.user.username,
name_script,
file,
"script-original",
language_code,
)
# result = filesystem.new_screenplay_without_audit_in_background
audit_id = result.get("script", {}).get("id")
audit_found = False
while audit_found != True:
try:
file_path_ = filesystem.get_file_path(
audit_id, "script-csv")
audit_found = True
except:
pass
try:
df = pd.read_csv(file_path_, encoding="utf-8")
except UnicodeError:
df = pd.read_csv(file_path_, encoding="utf-16")
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
from mnfapp.models import MNFScriptDatabase
script_get = MNFScriptDatabase.objects.get(id=id)
script_get.language_audit_id = audit_id
script_get.save()
sys.stdout = original_stdout
dataframe = df
# print("Dataframe is:", dataframe)
# f = open("/home/user/mnf/project/MNF/log/debug501.log", 'w')
# f.write(str(dataframe))
# Convert DataFrame to a list of lists
list_of_lists = dataframe.values.tolist()
# Print the resulting list
print(list_of_lists)
# f.write(str(list_of_lists))
# f.write(str(df['script_element'].unique()))
# f.close()
# refined, total_scenes = getRefined(filename1)
# sluglines, without_slug = getSlugAndNonSlug(refined)
# characters = getSpeakers(without_slug)
# scenes, actionline, parenthetical_lis, speakers, dialogues = getScenes(
# refined, total_scenes, characters)
# print("line 520:scenes: ", scenes)
language_of_all_dialogues = []
script_of_all_dialogues = []
count = 0
# length = len(scenes)
# if (length > 5):
# length = 5
# scenes = scenes[:length]
# -> older code starts
# for scene in tqdm(scenes[:length]):
# for i, line in enumerate(list_of_lists):
# if i == 0:
# continue
# if line[3] == "action":
# if count == 0:
# non_dial_src_lang = language_detector(line[2])
# non_dial_src_script = script_det(line[2])
# count += 1
# print("Non Dialogue/Actionline Language:", non_dial_src_lang)
# print("Non Dialogue/Actionline Script:", non_dial_src_script)
#
# elif line[3] == "dialogue":
# # [speaker] = line.keys()
# # if speaker == 'Transition':
# # continue
# dial_src_lang = language_detector(line[2])
# language_of_all_dialogues.append(dial_src_lang)
# script_of_all_dialogues.append(script_det(line[2]))
# -> older code ends
# new code starts
script_of_all_dialogues = []
def detecting_languages(i, line):
non_dial_src_lang = ""
non_dial_src_script = ""
dial_src_lang = ""
dial_src_script = ""
if line[3] == "action":
non_dial_src_lang = language_detector(line[2])
non_dial_src_script = script_det(line[2])
elif line[3] == "dialogue":
dial_src_lang = language_detector(line[2])
dial_src_script = script_det(line[2])
return [non_dial_src_lang, non_dial_src_script, dial_src_lang, dial_src_script]
# length = len(scenes)
# if (length > 5):
# length = 5
# scenes = scenes[:length]
import multiprocessing
num_processes = 4
print("step 2")
with multiprocessing.Pool(processes=num_processes) as pool:
# Use the pool to map the worker function to a range of values
results = pool.map(detecting_languages, enumerate(list_of_lists))
# for scene in tqdm(scenes[:length]):
print("step 3")
for result in results:
if result[0] != "" and result[1] != "":
non_dial_src_lang = result[0]
non_dial_src_script = result[1]
# def get_dial_langs(result):
# lang = ""
# if result[2]:
# lang = result[2]
# return lang
# with multiprocessing.Pool(processes=num_processes) as pool:
# # Use the pool to map the worker function to a range of values
# language_of_all_dialogues = pool.map(get_dial_langs, enumerate(result))
print("step 4")
language_of_all_dialogues = [result[2] for result in results if result[2]]
# new cod ends
# -> For Detecting presence of different languages in dialogues (whole sentences)
A, B, C = A_B_C(language_of_all_dialogues, non_dial_src_lang)
totaldials = len(language_of_all_dialogues)
dial_src_script = mode(script_of_all_dialogues)
dial_src_lang = A
one_step_process = "Yes" if dial_src_script == code_script[A] else "Can_not_say"
# word_lang_with_actionline = word_with_actionline(scenes, A, non_dial_src_lang)
# word_lang_with_other = word_with_other(scenes, A, non_dial_src_lang)
# -> For Detecting presence of different languages in dialogues (words)
word_lang_with_actionline, word_lang_with_other = word_with_actionline_other_lang(
list_of_lists, A, non_dial_src_lang)
print("A = {} B = {} C = {}".format(A, B, C))
print("dial_language", A)
print("dial_src_script", dial_src_script)
if round(B / totaldials, 2) > 0.15:
print("UI option3 - yes")
UI_option3 = "Yes"
else:
print("UI option3 - no")
UI_option3 = "No"
if round(C / totaldials, 2) > 0.20:
print("UI option4 - yes")
UI_option4 = "Yes"
else:
print("UI option4 - no")
UI_option4 = "No"
if word_lang_with_actionline == "True":
print("UI option5 - Yes")
UI_option5 = "Yes"
else:
print("UI_option5 - NO")
UI_option5 = "No"
print("checking other lang", word_lang_with_other)
if word_lang_with_other == "True":
print("UI option6 - Yes")
UI_option6 = "Yes"
else:
print("UI option6 - No")
UI_option6 = "No"
print("*******************------------Detection------------***********************")
print(UI_option3, UI_option4, UI_option5, UI_option6, non_dial_src_script)
f.close()
return [non_dial_src_lang, dial_src_lang, dial_src_script, non_dial_src_script, UI_option3, UI_option4, UI_option5,
UI_option6]