from google.cloud import translate_v2 as Translate from google.cloud import translate from MNF.settings import BasePath from requests.exceptions import SSLError # from .script_writing import default_script from .translation_variables import code_script from .script_detector import script_cat from statistics import mode from collections import Counter # import textract from tqdm import tqdm import sys import re import os from .script_reading import getRefined, getSlugAndNonSlug, getSpeakers, getScenes import requests import uuid import json import boto3 from collections import Counter import pandas as pd basePath = BasePath() # -> Google Translation API Credentials os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = rf"{basePath}/MNF/json_keys/authentication.json" translate_client = Translate.Client() client = translate.TranslationServiceClient() # -> For Detecting language of any text def language_detector(text): lang_detected = [] print(text,"sentence recieved") #primary language detector try: result = translate_client.detect_language(text) print("length re:",len(result['language'])) if len(result['language']) > 3: #print((str(result['language']).split("-"))[0]) return (str(result['language']).split("-"))[0] #lang_detected.append((str(result['language']).split("-"))[0]) else: #lang_detected.append(result['language']) return result['language'] except SSLError: language_detector(text) # takes too long to use this #secondary translation_detection try: subscription_key = "83ce6233419541929f7ab0d3035fca58" location = "eastus" headers = { 'Ocp-Apim-Subscription-Key': subscription_key, 'Ocp-Apim-Subscription-Region': location, 'Content-type': 'application/json', 'X-ClientTraceId': str(uuid.uuid4()) } params = {'api-version': '3.0'} body = [{'text': text}] request = requests.post("https://api.cognitive.microsofttranslator.com/detect?api-version=3.0", params=params, headers=headers, json=body) response = request.json() lang_detected.append(str(response[0]['language'])) except: print("azure is not working ") pass #tertiary translation_detection try: aws_json_path = basePath + "/MNF/json_keys" with open(rf"{aws_json_path}/keys_aws.json") as f: keys1 = json.load(f) session = boto3.Session(aws_access_key_id=keys1["aws_access_key_id"], aws_secret_access_key=keys1["aws_secret_access_key"], region_name=keys1["region_name"]) detect_aws = session.client(service_name='comprehend', region_name='us-east-2', use_ssl=True) pred_3 = (detect_aws.detect_dominant_language(Text=text))['Languages'][0]["LanguageCode"] lang_detected.append(str(pred_3)) except: print("aws is not working ") pass most_common_lang = Counter(lang_detected) sorted_values = sorted(most_common_lang.values(), reverse=True) # Sort the values sorted_dict = {} for i in sorted_values: for k in most_common_lang.keys(): if most_common_lang[k] == i: sorted_dict[k] = most_common_lang[k] sources = list(sorted_dict.keys()) return sources[0] # -> For Detecting Script of any text def script_det(text): punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~''' no_punct = "" for char in text: if char not in punctuations: no_punct = char break script = script_cat(no_punct)[0] return script ''' A. Language of Highest number of full dialogues, B. Numbers of dialogues in action line language, C. Number of dialogues in other languages) ''' # -> For Detecting presence of different languages in dialogues (whole sentences) def A_B_C(dialogue_language, non_dial_src_lang): print("line 316:dialogue_language", dialogue_language) dict1 = dict(Counter(dialogue_language)) print("line 319:dict1", dict1) sorted_values = sorted(dict1.values(), reverse=True) # Sort the values print("line 321:sorted_values:", sorted_values) sorted_dict = {} for i in sorted_values: for k in dict1.keys(): if dict1[k] == i: sorted_dict[k] = dict1[k] sources = list(sorted_dict.keys()) print("line 328: sources: ", sources) A = sources[0] print("Most Prominent Dialogue Language", A) if len(sources) == 1: B = 0 C = 0 elif non_dial_src_lang not in sources: B = 0 C = sum(sorted_values[1:]) else: if A == non_dial_src_lang: B = 0 else: B = sorted_values[sources.index(non_dial_src_lang)] C = sum(sorted_values[1:]) - B return A, B, C def get1(non_dial_src_lang, word): if language_detector(word) == non_dial_src_lang: return True else: return False def get1_wrapper(args, word): return get1(*args, word) # -> Detection of Different Lanugages and Scripts in Script def dial_each_word_lang1(non_dial_src_lang, dial): from functools import partial import multiprocessing num_processes = 4 with multiprocessing.Pool(processes=num_processes) as pool: partial_get1 = partial(get1_wrapper, (non_dial_src_lang)) # Use the pool to map the worker function to a range of values results = pool.map(partial_get1, dial.split()) if True in results: return "True" return "False" # -> Detection of Different Lanugages and Scripts in Script def dial_each_word_lang2(non_dial_src_lang, A, dial): for word in dial.split(): if (language_detector(word) != non_dial_src_lang) or (language_detector(word) != A): return "True" return "False" def word_with_actionline_other_lang(lines, A, non_dial_src_lang): dials_with_actionline_langs = 0 dials_with_other_langs = 0 lineno = 0 actionline_lang_output = "False" other_lang_output = "False" ignore_actionline_match = "False" if A == non_dial_src_lang: ignore_actionline_match = "True" for i, line in enumerate(lines): if i == 0: continue if line[3] == "dialogue": # [speaker] = line.keys() # if speaker == 'Transition': # continue dial_src_lang = language_detector(line[2]) if actionline_lang_output == "False" or other_lang_output == "False": print( "Still Searching if Words of other langs are present or not...") if dial_src_lang == A: if actionline_lang_output != "True" and not ignore_actionline_match: output = dial_each_word_lang1( non_dial_src_lang, line[2]) if output == "True": dials_with_actionline_langs += 1 if dials_with_actionline_langs > 5: actionline_lang_output = "True" if other_lang_output != "True": output = dial_each_word_lang2( non_dial_src_lang, A, line[2]) if output == "True": dials_with_other_langs += 1 if dials_with_other_langs > 5: other_lang_output = "True" else: print("Found Presence of other Langs in Words") return actionline_lang_output, other_lang_output return actionline_lang_output, other_lang_output # -> Detection of Different Lanugages and Scripts in Script def getInputs(request, filename1, id): import multiprocessing from utils import filesystem from utils.utilities import fdx_to_txt from django.core.files.base import ContentFile print("Detecting Languages and Scripts present in Script") is_fdx = False if ((((filename1).split("/"))[-1]).split("."))[-1] == "fdx": is_fdx = True if is_fdx: filer = open(filename1, 'r') text = fdx_to_txt(filer) filename = rf"{basePath}/conversion/translation/file_lines.txt" f = open(filename, 'w') f.write(text) f.close() else: text = textract.process(filename1, encoding="utf8", errors='ignore') filename = rf"{basePath}/conversion/translation/file_lines.txt" f = open(filename, 'wb') f.write(text) f.close() with open(rf"{basePath}/conversion/translation/file_lines.txt", 'r') as file: lines = file.readlines() cleaned_lines = [] prev_line_blank = False for line in lines: if line.strip() == '' or line.strip() == "\\n": if not prev_line_blank: cleaned_lines.append(line) prev_line_blank = True else: cleaned_lines.append(line) prev_line_blank = False with open(rf"{basePath}/conversion/translation/cleaned_file.txt", 'w') as file: file.writelines(cleaned_lines) with open(rf"{basePath}/conversion/translation/cleaned_file.txt") as file: li = file.readlines() print("line", li) total_line = len(li) print("total_line", total_line) initial_lines = 50 if total_line < initial_lines: txt_file_200 = "".join((li)[:total_line]) # elif total_line < 600 and total_line > 300: # # mid = total_line // 2 # txt_file_200 = "".join((li)[0:150]) else: # mid = total_line // 2 txt_file_200 = "".join((li)[0:50]) filename = rf"{basePath}/conversion/translation/file_lines_200.txt" f = open(filename, 'w') f.write(txt_file_200) f.close() # print("txt_file_200", txt_file_200) original_stdout = sys.stdout import time start_time = time.time() print("----Auditing----") script1 = str(rf"{basePath}/conversion/translation/file_lines_200.txt") doc = open(script1, 'rb').read() file = ContentFile( doc, (script1.split("/"))[-1], ) language_code = "en" name_script = str((((filename1.split("/"))[-1]).split("."))[0]) + "_language-audit" result = filesystem.new_screenplay( request.user, request.user.username, name_script, file, "script-original", language_code, ) # result = filesystem.new_screenplay_without_audit_in_background audit_id = result.get("script", {}).get("id") audit_found = False while audit_found != True: try: file_path_ = filesystem.get_file_path( audit_id, "script-csv") audit_found = True except: pass end_time = time.time() f = open(f"{basePath}/log/debu1414.log", "w") sys.stdout = f print("Time for Audit it took is" , str(end_time-start_time)) try: df = pd.read_csv(file_path_, encoding="utf-8") except UnicodeError: df = pd.read_csv(file_path_, encoding="utf-16") pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) from mnfapp.models import MNFScriptDatabase script_get = MNFScriptDatabase.objects.get(id=id) script_get.language_audit_id = audit_id script_get.save() sys.stdout = f dataframe = df # print("Dataframe is:", dataframe) # f = open("/home/user/mnf/project/MNF/log/debug501.log", 'w') # f.write(str(dataframe)) # Convert DataFrame to a list of lists list_of_lists = dataframe.values.tolist() sys.stdout = f # Print the resulting list print(list_of_lists) # f.write(str(list_of_lists)) # f.write(str(df['script_element'].unique())) # f.close() print("step 1") # refined, total_scenes = getRefined(filename1) # sluglines, without_slug = getSlugAndNonSlug(refined) # characters = getSpeakers(without_slug) # scenes, actionline, parenthetical_lis, speakers, dialogues = getScenes( # refined, total_scenes, characters) # print("line 520:scenes: ", scenes) sys.stdout = f language_of_all_dialogues = [] script_of_all_dialogues = [] def detecting_languages(i, line): non_dial_src_lang = "" non_dial_src_script = "" dial_src_lang = "" dial_src_script = "" if line[3] == "action": non_dial_src_lang = language_detector(line[2]) non_dial_src_script = script_det(line[2]) elif line[3] == "dialogue": dial_src_lang = language_detector(line[2]) dial_src_script = script_det(line[2]) return [non_dial_src_lang, non_dial_src_script, dial_src_lang, dial_src_script] # length = len(scenes) # if (length > 5): # length = 5 # scenes = scenes[:length] num_processes = 4 print("step 2") with multiprocessing.Pool(processes=num_processes) as pool: # Use the pool to map the worker function to a range of values results = pool.map(detecting_languages, enumerate(list_of_lists)) # for scene in tqdm(scenes[:length]): print("step 3") for result in results: if result[0] != "" and result[1] != "": non_dial_src_lang = result[0] non_dial_src_script = result[1] # def get_dial_langs(result): # lang = "" # if result[2]: # lang = result[2] # return lang # with multiprocessing.Pool(processes=num_processes) as pool: # # Use the pool to map the worker function to a range of values # language_of_all_dialogues = pool.map(get_dial_langs, enumerate(result)) print("step 4") language_of_all_dialogues = [result[2] for result in results if result[2]] # -> For Detecting presence of different languages in dialogues (whole sentences) A, B, C = A_B_C(language_of_all_dialogues, non_dial_src_lang) totaldials = len(language_of_all_dialogues) dial_src_script = mode(script_of_all_dialogues) dial_src_lang = A print("step 5") one_step_process = "Yes" if dial_src_script == code_script[A] else "Can_not_say" # word_lang_with_actionline = word_with_actionline(scenes, A, non_dial_src_lang) # word_lang_with_other = word_with_other(scenes, A, non_dial_src_lang) # -> For Detecting presence of different languages in dialogues (words) word_lang_with_actionline, word_lang_with_other = word_with_actionline_other_lang( list_of_lists, A, non_dial_src_lang) print("A = {} B = {} C = {}".format(A, B, C)) print("dial_language", A) print("dial_src_script", dial_src_script) if round(B / totaldials, 2) > 0.15: print("UI option3 - yes") UI_option3 = "Yes" else: print("UI option3 - no") UI_option3 = "No" if round(C / totaldials, 2) > 0.20: print("UI option4 - yes") UI_option4 = "Yes" else: print("UI option4 - no") UI_option4 = "No" if word_lang_with_actionline == "True": print("UI option5 - Yes") UI_option5 = "Yes" else: print("UI_option5 - NO") UI_option5 = "No" print("checking other lang", word_lang_with_other) if word_lang_with_other == "True": print("UI option6 - Yes") UI_option6 = "Yes" else: print("UI option6 - No") UI_option6 = "No" print("*******************------------Detection------------***********************") print(UI_option3, UI_option4, UI_option5, UI_option6, non_dial_src_script) f.close() return [non_dial_src_lang, dial_src_lang, dial_src_script, non_dial_src_script, UI_option3, UI_option4, UI_option5, UI_option6]