1326 lines
58 KiB
Python
1326 lines
58 KiB
Python
|
# from .models import AuditStatus
|
||
|
import time,pytz
|
||
|
import mimetypes
|
||
|
from rest_framework_simplejwt.authentication import JWTAuthentication
|
||
|
import tempfile
|
||
|
import requests
|
||
|
import pandas as pd
|
||
|
from io import BytesIO
|
||
|
import json
|
||
|
# from users.models import UserCredentialsForBlockchain
|
||
|
# from Blockchain2.block_user_info import *
|
||
|
# from Blockchain2.decryption import decryptionOfPrivate, decryptionOfUrl, download_file_System,decryptionOflocalUrl,hash_decrypation
|
||
|
# from Blockchain2.DataStorage import uploadDataToIPFSNode
|
||
|
# from Blockchain2.scriptAudit import getUserprojectIds,UploadScriptAuditData,getScriptAudit
|
||
|
# from Blockchain2.blockchainsetting import OWNER_KEY
|
||
|
from django.core import exceptions as django_exception
|
||
|
from django.http import FileResponse, HttpResponseBadRequest,HttpResponse
|
||
|
from django.core.files.base import File as DjangoFile
|
||
|
from django.core.files.base import ContentFile
|
||
|
from django.shortcuts import redirect, render
|
||
|
from rest_framework import status
|
||
|
from rest_framework.authentication import BasicAuthentication, SessionAuthentication
|
||
|
from rest_framework.decorators import authentication_classes, permission_classes
|
||
|
from rest_framework.permissions import IsAuthenticated
|
||
|
from rest_framework.response import Response
|
||
|
from rest_framework.views import APIView
|
||
|
from .utils import audit_in_background, update_audit_status
|
||
|
from utils.scripts_functions import script_id_generator
|
||
|
# from centralisedFileSystem.models import BeatSheet
|
||
|
from centralisedFileSystem.models import Script,File, ScreenPlay
|
||
|
from scriptAudit.mnf_script_audit import NeutralAudit
|
||
|
from utils import filesystem,utilities
|
||
|
from datetime import datetime
|
||
|
from .forms import AuditForm
|
||
|
from .models import ScriptAuditModel, States
|
||
|
from .utils import audit_in_background,send_email_to_user
|
||
|
from scriptAudit import exceptions as spex
|
||
|
import os
|
||
|
from django.contrib.sites.shortcuts import get_current_site
|
||
|
from rest_framework_simplejwt.authentication import JWTAuthentication
|
||
|
# from rest_framework.permissions import IsAuthenticated
|
||
|
from datetime import timedelta
|
||
|
# from lpp.certificate.createCertificate import certificateGenrate
|
||
|
# url = "http://115.245.192.138"
|
||
|
# url = "http://1.6.141.104:4000"
|
||
|
# url = "http://1.6.141.108"
|
||
|
from django.views import View
|
||
|
# url = "https://mynextfilm.ai"
|
||
|
from utils.filesystem import get_file,get_file_path, get_script_data
|
||
|
# from utils.utilities import send_email_to_user
|
||
|
# from narration.vectorcode.code import vector_generation
|
||
|
from auto_email.views import sendmail
|
||
|
# from mnfapp.views import update_juggernaut
|
||
|
# from mnfapp.views import check_juggernaut
|
||
|
# from Pitchdeck.views import multilingual_render
|
||
|
# from centralizePayment.views import callback, create_indent
|
||
|
|
||
|
#api_gateway_url = "https://diybbp-main-ed89647.d2.zuplo.dev"
|
||
|
api_gateway_url = "https://mnf-apigateway-main-6268011.d2.zuplo.dev"
|
||
|
gateway_audit_url = api_gateway_url +"/sify/audit/headless/taj" # /aws for aws
|
||
|
|
||
|
|
||
|
class AuditFacadeView(APIView):
|
||
|
authentication_classes = [BasicAuthentication,SessionAuthentication]
|
||
|
permission_classes = [IsAuthenticated]
|
||
|
|
||
|
def get(self, request):
|
||
|
form = AuditForm()
|
||
|
return render(request, "audit/audit_facade.html", {"form": form})
|
||
|
|
||
|
def post(self, request):
|
||
|
form = AuditForm(request.POST, request.FILES)
|
||
|
if form.is_valid():
|
||
|
screenplay_name = form.cleaned_data["screenplay_name"]
|
||
|
author = form.cleaned_data["author_name"]
|
||
|
language = form.cleaned_data["language"]
|
||
|
script_file = form.cleaned_data["script_file"]
|
||
|
|
||
|
script_ext = script_file.name.split(".")[-1]
|
||
|
script_file_name = screenplay_name + "." + script_ext
|
||
|
|
||
|
file = ContentFile(
|
||
|
script_file.read(),
|
||
|
script_file_name,
|
||
|
)
|
||
|
|
||
|
result = filesystem.new_screenplay_without_audit_in_background(
|
||
|
request.user,
|
||
|
author,
|
||
|
screenplay_name,
|
||
|
file,
|
||
|
"script-original",
|
||
|
language,
|
||
|
)
|
||
|
|
||
|
script_id = result.get("script", {}).get("id")
|
||
|
|
||
|
update_audit_status(script_id,"PENDING")
|
||
|
|
||
|
file_to_original = File.objects.get(
|
||
|
script=script_id,
|
||
|
type="script-original"
|
||
|
|
||
|
)
|
||
|
|
||
|
## call audit gateway url
|
||
|
form_data = {'author_name':'taj_audit',
|
||
|
'screenplay_name':screenplay_name+"_facade",
|
||
|
'language':language,
|
||
|
'facade_script_id': str(script_id)}
|
||
|
with open(file_to_original.file.path, 'rb') as file:
|
||
|
files = {'script_file':file}
|
||
|
res = requests.post(gateway_audit_url, files=files, data= form_data)
|
||
|
print("Facade=============>>>>>>>>>",script_id)
|
||
|
print("Response",res.text)
|
||
|
|
||
|
|
||
|
status = ScriptAuditModel.objects.get(
|
||
|
script = Script.objects.get(
|
||
|
id = script_id
|
||
|
))
|
||
|
print("STATUS AUDIT",status)
|
||
|
|
||
|
|
||
|
# if status.status == "SUCCESS":
|
||
|
# try:
|
||
|
# script = Script.objects.get(id=script_id)
|
||
|
# no_of_pages = script.no_of_pages
|
||
|
# user_id = script.screenplay.user.id
|
||
|
# except Script.DoesNotExist:
|
||
|
# print(f"Script with ID {script_id} does not exist.")
|
||
|
# # Handle the case when the script is not found
|
||
|
# # You might want to return an error response or raise an exception.
|
||
|
|
||
|
# to_email = [request.user.email]
|
||
|
# email_code = 'SB1'
|
||
|
|
||
|
|
||
|
# sendmail(to_email=to_email , email_code=email_code )
|
||
|
|
||
|
# # send_email_to_user(request.user,screenplay_name,"MyNextFilm: Successfully Audited Your Script ","Woohoo ! Your Script is Audited SuccessFully ")
|
||
|
|
||
|
# elif status.status == "FAILURE":
|
||
|
# to_email = [request.user.email]
|
||
|
# email_code = 'SB2'
|
||
|
|
||
|
# sendmail(to_email=to_email , email_code=email_code )
|
||
|
|
||
|
print("script id is", script_id)
|
||
|
|
||
|
return redirect("audit_processing_page")
|
||
|
|
||
|
else :
|
||
|
return HttpResponseBadRequest(render(request, "audit/audit_facade.html", {"form": form,"errors": form.errors}))
|
||
|
|
||
|
## to update status via api
|
||
|
class AuditStatusUpdate(APIView):
|
||
|
def post(self,request):
|
||
|
facade_script_id = request.data['facade_script_id']
|
||
|
status = request.data['status']
|
||
|
try:
|
||
|
update_audit_status(facade_script_id,status)
|
||
|
return Response("ok")
|
||
|
except :
|
||
|
return Response("nok")
|
||
|
|
||
|
## to update audited csv via api
|
||
|
class AuditedFileUpdate(APIView):
|
||
|
def post(self,request):
|
||
|
facade_script_id = request.data['facade_script_id']
|
||
|
script_file = request.data['script_file']
|
||
|
audited_file_name = str(facade_script_id) + ".csv"
|
||
|
req_file = ContentFile(script_file.read(),audited_file_name)
|
||
|
try:
|
||
|
File.objects.create(
|
||
|
script= Script.objects.get(id=facade_script_id),
|
||
|
type="script-csv",
|
||
|
file=req_file,
|
||
|
)
|
||
|
return Response("ok")
|
||
|
except :
|
||
|
return Response("nok")
|
||
|
|
||
|
|
||
|
## to update audited report via api
|
||
|
class AuditedReportUpdate(APIView):
|
||
|
def post(self,request):
|
||
|
facade_script_id = request.data['facade_script_id']
|
||
|
script_file = request.data['script_file']
|
||
|
audited_file_name = str(facade_script_id) + "_report.docx"
|
||
|
req_file = ContentFile(script_file.read(),audited_file_name)
|
||
|
try:
|
||
|
File.objects.create(
|
||
|
script= Script.objects.get(id=facade_script_id),
|
||
|
type="audit-report",
|
||
|
file=req_file,
|
||
|
)
|
||
|
return Response("ok")
|
||
|
except :
|
||
|
return Response("nok")
|
||
|
|
||
|
class AuditHomeView(APIView):
|
||
|
authentication_classes = [BasicAuthentication,SessionAuthentication]
|
||
|
permission_classes = [IsAuthenticated]
|
||
|
def get(self, request):
|
||
|
form = AuditForm()
|
||
|
return render(request, "audit/audit.html", {"form": form})
|
||
|
|
||
|
def post(self, request):
|
||
|
form = AuditForm(request.POST, request.FILES)
|
||
|
if form.is_valid():
|
||
|
screenplay_name = form.cleaned_data["screenplay_name"]
|
||
|
author = form.cleaned_data["author_name"]
|
||
|
language = form.cleaned_data["language"]
|
||
|
script_file = form.cleaned_data["script_file"]
|
||
|
|
||
|
script_ext = script_file.name.split(".")[-1]
|
||
|
script_file_name = screenplay_name + "." + script_ext
|
||
|
# Blockchain_script_id = int(time.time())
|
||
|
# request.session["Blockchain_script_id"] = Blockchain_script_id
|
||
|
|
||
|
file = ContentFile(
|
||
|
script_file.read(),
|
||
|
script_file_name,
|
||
|
)
|
||
|
|
||
|
result = filesystem.new_screenplay(
|
||
|
request.user,
|
||
|
author,
|
||
|
screenplay_name,
|
||
|
file,
|
||
|
"script-original",
|
||
|
language,
|
||
|
)
|
||
|
|
||
|
script_id = result.get("script", {}).get("id")
|
||
|
# print("calling blockchain function")
|
||
|
file_to_original = File.objects.get(
|
||
|
script=script_id,
|
||
|
type="script-original"
|
||
|
|
||
|
)
|
||
|
status = ScriptAuditModel.objects.get(
|
||
|
script = Script.objects.get(
|
||
|
id = script_id
|
||
|
))
|
||
|
if status.status == "SUCCESS":
|
||
|
try:
|
||
|
script = Script.objects.get(id=script_id)
|
||
|
#no_of_pages = script.no_of_pages
|
||
|
user_id = script.screenplay.user.id
|
||
|
except Script.DoesNotExist:
|
||
|
print(f"Script with ID {script_id} does not exist.")
|
||
|
# Handle the case when the script is not found
|
||
|
to_email = [request.user.email]
|
||
|
email_code = 'SB1'
|
||
|
sendmail(to_email=to_email , email_code=email_code )
|
||
|
|
||
|
elif status.status == "FAILURE":
|
||
|
email_code = 'SB2'
|
||
|
sendmail(to_email=to_email , email_code=email_code )
|
||
|
pass
|
||
|
|
||
|
print("script id is", script_id)
|
||
|
return redirect("audit_processing_page")
|
||
|
|
||
|
else :
|
||
|
|
||
|
return HttpResponseBadRequest(render(request, "audit/audit.html", {"form": form,"errors": form.errors}))
|
||
|
|
||
|
# class AuditHomeView(APIView):
|
||
|
# authentication_classes = [BasicAuthentication,SessionAuthentication]
|
||
|
# permission_classes = [IsAuthenticated]
|
||
|
# def get(self, request):
|
||
|
|
||
|
# # context = create_indent(request)
|
||
|
|
||
|
# form = AuditForm()
|
||
|
# # return multilingual_render(request, "audit/audit.html", {"form": form})
|
||
|
# return render(request, "audit/audit.html", {"form": form})
|
||
|
|
||
|
# def post(self, request):
|
||
|
# form = AuditForm(request.POST, request.FILES)
|
||
|
# if form.is_valid():
|
||
|
# screenplay_name = form.cleaned_data["screenplay_name"]
|
||
|
# author = form.cleaned_data["author_name"]
|
||
|
# language = form.cleaned_data["language"]
|
||
|
# script_file = form.cleaned_data["script_file"]
|
||
|
|
||
|
# script_ext = script_file.name.split(".")[-1]
|
||
|
# script_file_name = screenplay_name + "." + script_ext
|
||
|
# Blockchain_script_id = int(time.time())
|
||
|
# request.session["Blockchain_script_id"] = Blockchain_script_id
|
||
|
|
||
|
# file = ContentFile(
|
||
|
# script_file.read(),
|
||
|
# script_file_name,
|
||
|
# )
|
||
|
|
||
|
# result = filesystem.new_screenplay(
|
||
|
# request.user,
|
||
|
# author,
|
||
|
# screenplay_name,
|
||
|
# file,
|
||
|
# "script-original",
|
||
|
# language,
|
||
|
# )
|
||
|
|
||
|
# script_id = result.get("script", {}).get("id")
|
||
|
|
||
|
# print("calling blockchain function")
|
||
|
# file_to_original = File.objects.get(
|
||
|
# script=script_id,
|
||
|
# type="script-original"
|
||
|
|
||
|
# )
|
||
|
# status = ScriptAuditModel.objects.get(
|
||
|
# script = Script.objects.get(
|
||
|
# id = script_id
|
||
|
# ))
|
||
|
# print("STATUS AUDIT",status)
|
||
|
# with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# file01.write("audit userkey------------------\n")
|
||
|
# # Blockchain
|
||
|
# if UserCredentialsForBlockchain.objects.filter(user=request.user).exists():
|
||
|
# blockchain_obj = UserCredentialsForBlockchain.objects.get(user=request.user)
|
||
|
# script_original= {}
|
||
|
# audit_data={}
|
||
|
# script_original["screenplay_name"] = screenplay_name
|
||
|
# script_original["author"] = author
|
||
|
# script_original["language"] = language
|
||
|
# script_original["script_file_name"] = script_file_name
|
||
|
# script_original["status"] = "STARTED"
|
||
|
# script_original["script_id"] = script_id
|
||
|
# with open(file_to_original.file.path, 'rb') as file:
|
||
|
# hash = uploadDataToIPFSNode(file)
|
||
|
# script_original["script_file"] = hash
|
||
|
# script_original["type"] = "script-original"
|
||
|
# script_original["script_file_path"] = file_to_original.file.path
|
||
|
# script_original["script_file_size"] = file_to_original.file.size
|
||
|
# audit_data["script-original"] = script_original
|
||
|
# userkey= decryptionOfPrivate(blockchain_obj.privateKey)
|
||
|
# print("userkey = ", str(userkey))
|
||
|
# # status,getData = getScriptAudit(userkey.decode('utf-8'),user_id,str(script_id))
|
||
|
# privatekeyfordb = userkey.decode('utf-8')
|
||
|
# print(privatekeyfordb)
|
||
|
# status.bchain_privatekey = privatekeyfordb
|
||
|
# status.save()
|
||
|
# with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# file01.write("audit userkey-------------------" + str(blockchain_obj.publicKey) + "\n")
|
||
|
# with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# file01.write("audit public key-------------------" + str(blockchain_obj.privateKey) + "\n")
|
||
|
# print("blockchain_obj.publicKey",blockchain_obj.publicKey)
|
||
|
# print("blockchain_obj.privateKey",blockchain_obj.privateKey)
|
||
|
# if status.status == "SUCCESS":
|
||
|
# file_to_audit = File.objects.get(
|
||
|
# script=script_id,
|
||
|
# type="script-csv"
|
||
|
# )
|
||
|
# file_to_audit_report = File.objects.get(
|
||
|
# script=script_id,
|
||
|
# type="audit-report"
|
||
|
# )
|
||
|
# hash2 = ""
|
||
|
# # with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# # file01.write("5.index hash2-------------------" + str(file_to_audit_docx) + "\n")
|
||
|
# try:
|
||
|
# file_to_audit_docx = File.objects.get(
|
||
|
# script=script_id,
|
||
|
# type="script-docx"
|
||
|
# )
|
||
|
# script_docx = {}
|
||
|
# script_path1 = file_to_audit_docx.file.path
|
||
|
# script_size = file_to_audit_docx.file.size
|
||
|
# with open(script_path1, 'rb') as _file:
|
||
|
# hash2 = uploadDataToIPFSNode(_file)
|
||
|
# # with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# # file01.write("5.index hash2-------------------" + str(hash2) + "\n")
|
||
|
# script_docx["script_file_path"] = script_path1
|
||
|
# script_docx["script_file"] = hash2
|
||
|
# script_docx["type"] = "script-docx"
|
||
|
# script_docx["script_file_size"] = script_size
|
||
|
# audit_data["script-docx"] = script_docx
|
||
|
# # with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# # file01.write("5.index hash2-------------------" + str(script_docx) + "\n")
|
||
|
# except:
|
||
|
# pass
|
||
|
# script_csv = {}
|
||
|
# audit_report ={}
|
||
|
# audit_report_path = file_to_audit_report.file.path
|
||
|
# script_path = file_to_audit.file.path
|
||
|
# script_size = file_to_audit.file.size
|
||
|
|
||
|
# print("script_file_path_is_here",script_path)
|
||
|
# with open(script_path, 'rb') as _file:
|
||
|
# hash1 = uploadDataToIPFSNode(_file)
|
||
|
# script_csv["script_file"] = hash1
|
||
|
# script_csv["script_file_path"] = script_path
|
||
|
# script_csv["type"] = "script-csv"
|
||
|
# script_csv["script_file_size"] = script_size
|
||
|
# with open(audit_report_path, 'rb') as file1:
|
||
|
# hash2 = uploadDataToIPFSNode(file1)
|
||
|
# audit_report["script_file"] = hash2
|
||
|
# script_csv["script_file_path"] = audit_report_path
|
||
|
# script_csv["type"] = "audit-report"
|
||
|
# audit_data["script-csv"]= script_csv
|
||
|
# audit_data["audit-report"]= audit_report
|
||
|
|
||
|
# with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# file01.write("5.index hash2-------------------" + str(audit_data) + "\n")
|
||
|
# Response = UploadScriptAuditData(OWNER_KEY,blockchain_obj.publicKey,blockchain_obj.user_id,script_id,str(audit_data))
|
||
|
# print("tx_hash",Response)
|
||
|
# transactioni_id = str(Response)
|
||
|
# with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# file01.write(f"Transaction hash for audit is {str(transactioni_id)}\n")
|
||
|
# current_time = datetime.now()
|
||
|
# formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
# file01.write(f"Transaction hash for audit is {str(type(transactioni_id))}\n")
|
||
|
# file01.write(f" time for the above transaction id {str(formatted_time)}\n")
|
||
|
# # audit_model_obj = ScriptAuditModel.objects.get(
|
||
|
# # script = Script.objects.get(id = script_id))
|
||
|
# status.transaction_hash =str(transactioni_id)
|
||
|
# # audit_model_obj.transaction_hash = str(transactioni_id)
|
||
|
# # audit_model_obj.save()
|
||
|
# status.save()
|
||
|
|
||
|
# user_infos = user_info(tx_hash=Response,service="Script Audit",gas_fee=1234567888882)
|
||
|
# addition_result = user_infos.update_info(request)
|
||
|
# hash = hash_decrypation(hash)
|
||
|
# # certificate = certificateGenrate(request.user.username,"script audit",hash1)
|
||
|
|
||
|
# tx_id = Response
|
||
|
|
||
|
# to_email = [request.user.email]
|
||
|
# email_code = 'BL1'
|
||
|
# key_value = {
|
||
|
# "service":"Audited Script",
|
||
|
# "hash": hash2,
|
||
|
# "public key":blockchain_obj.publicKey,
|
||
|
# "private key":userkey,
|
||
|
# "Transaction Hash": tx_id,
|
||
|
# }
|
||
|
# print("userkey = ", userkey)
|
||
|
# #sendmail(to_email=to_email , email_code=email_code, key_value=key_value, filePath=certificate)
|
||
|
|
||
|
# print("mail send sucessfully:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::")
|
||
|
# else:
|
||
|
# Response = UploadScriptAuditData(OWNER_KEY,blockchain_obj.publicKey,blockchain_obj.user_id,script_id,str(audit_data))
|
||
|
# print("tx_hash",Response)
|
||
|
# hash = hash_decrypation(hash)
|
||
|
# #certificate = certificateGenrate(request.user.username,"script audit",hash)
|
||
|
|
||
|
# tx_id = Response
|
||
|
|
||
|
# to_email = [request.user.email]
|
||
|
# email_code = 'BL1'
|
||
|
# key_value = {
|
||
|
# "service":"Orginal Script Audit",
|
||
|
# "hash": hash,
|
||
|
# "public key":blockchain_obj.publicKey,
|
||
|
# "private key":userkey,
|
||
|
# "Transaction Hash": tx_id,
|
||
|
# }
|
||
|
|
||
|
# print("::::::::::::::",key_value)
|
||
|
# print("userkey = ", userkey)
|
||
|
# # sendmail(to_email=to_email , email_code=email_code,key_value=key_value,filePath=certificate)
|
||
|
|
||
|
|
||
|
# if status.status == "SUCCESS":
|
||
|
# try:
|
||
|
# script = Script.objects.get(id=script_id)
|
||
|
# no_of_pages = script.no_of_pages
|
||
|
# user_id = script.screenplay.user.id
|
||
|
# except Script.DoesNotExist:
|
||
|
# print(f"Script with ID {script_id} does not exist.")
|
||
|
# # Handle the case when the script is not found
|
||
|
# # You might want to return an error response or raise an exception.
|
||
|
|
||
|
# try:
|
||
|
# update_juggernaut(request, service_name='audit', audit_pages=int(no_of_pages))
|
||
|
# except Exception as e:
|
||
|
# print("Failed to update Juggernaut:", e)
|
||
|
# # Handle the case when the update_juggernaut function fails.
|
||
|
# to_email = [request.user.email]
|
||
|
# email_code = 'SB1'
|
||
|
|
||
|
|
||
|
# sendmail(to_email=to_email , email_code=email_code )
|
||
|
|
||
|
# # send_email_to_user(request.user,screenplay_name,"MyNextFilm: Successfully Audited Your Script ","Woohoo ! Your Script is Audited SuccessFully ")
|
||
|
|
||
|
# elif status.status == "FAILURE":
|
||
|
# to_email = [request.user.email]
|
||
|
# email_code = 'SB2'
|
||
|
|
||
|
# sendmail(to_email=to_email , email_code=email_code )
|
||
|
# # send_email_to_user(request.user,screenplay_name,"MyNextFilm: Auditing was Unsuccessfull ","Oops! Something went wrong and Your Script is Not Audited ")
|
||
|
# # ## to be run via scriptpage
|
||
|
# print("script id is", script_id)
|
||
|
# # run_service(script_id,'audit')
|
||
|
# # audit_in_background(script_id)
|
||
|
# # time.sleep(3)
|
||
|
# # return redirect("my_audited_scripts")
|
||
|
# return redirect("audit_processing_page")
|
||
|
# # return multilingual_render(request, "audit/audit_processing.html", None)
|
||
|
# else :
|
||
|
# # return HttpResponseBadRequest(multilingual_render(request, "audit/audit.html", {"form": form,"errors": form.errors}))
|
||
|
# return HttpResponseBadRequest(render(request, "audit/audit.html", {"form": form,"errors": form.errors}))
|
||
|
|
||
|
def audit_processing_view(request):
|
||
|
# multilingual_render the HTML template and return it as a response
|
||
|
# return multilingual_render(request, 'audit/audit_processing.html')
|
||
|
print("<<<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
||
|
print(get_current_site(request).domain)
|
||
|
return render(request, 'audit/audit_processing.html')
|
||
|
|
||
|
|
||
|
# class AuditedScriptsView(APIView):
|
||
|
# authentication_classes = [BasicAuthentication,SessionAuthentication]
|
||
|
# permission_classes = [IsAuthenticated]
|
||
|
# def get(self, request):
|
||
|
# display_list = []
|
||
|
# scripts = Script.objects.filter(screenplay__user=request.user).order_by('-created_on')
|
||
|
# # try:
|
||
|
# # print("scripts list:",scripts)
|
||
|
# # except:
|
||
|
# # pass
|
||
|
|
||
|
# for script in scripts:
|
||
|
|
||
|
# if ScriptAuditModel.objects.filter(script=script).exists():
|
||
|
# # audit_status_objects = AuditStatus.objects.filter(script=str(script.id))
|
||
|
# audit_status_objects = ScriptAuditModel.objects.filter(script=script)
|
||
|
# for audit_status_object in audit_status_objects:
|
||
|
# script_audit_status = audit_status_object.status
|
||
|
# if script_audit_status == States.SUCCESS:
|
||
|
# display_list.append(
|
||
|
# {
|
||
|
# "scriptid": str(script.id),
|
||
|
# "scriptName": str(script.screenplay.name),
|
||
|
# "author": str(script.screenplay.author),
|
||
|
# "iscomplete": "S",
|
||
|
# "created_on" : str(script.created_on.astimezone(pytz.timezone('Asia/Kolkata')).strftime("%B %d, %Y %I:%M %p")),
|
||
|
# }
|
||
|
# )
|
||
|
# elif script_audit_status == States.FAILURE:
|
||
|
# display_list.append(
|
||
|
# {
|
||
|
# "scriptid": str(script.id),
|
||
|
# "scriptName": str(script.screenplay.name),
|
||
|
# "author": str(script.screenplay.author),
|
||
|
# "iscomplete": "F",
|
||
|
# "created_on" : str(script.created_on.astimezone(pytz.timezone('Asia/Kolkata')).strftime("%B %d, %Y %I:%M %p")),
|
||
|
# }
|
||
|
# )
|
||
|
# elif script_audit_status == States.STARTED:
|
||
|
# display_list.append(
|
||
|
# {
|
||
|
# "scriptid": str(script.id),
|
||
|
# "scriptName": str(script.screenplay.name),
|
||
|
# "author": str(script.screenplay.author),
|
||
|
# "iscomplete": "R",
|
||
|
# "created_on" : str(script.created_on.astimezone(pytz.timezone('Asia/Kolkata')).strftime("%B %d, %Y %I:%M %p")),
|
||
|
# }
|
||
|
# )
|
||
|
# elif script_audit_status == States.PENDING:
|
||
|
# continue
|
||
|
# print(get_current_site(request).domain)
|
||
|
# return render(
|
||
|
# request,
|
||
|
# "audit/audited.html",
|
||
|
# {"i": display_list, "url": url},
|
||
|
# )
|
||
|
|
||
|
class AuditedScriptsView_kunal(APIView):
|
||
|
authentication_classes = [BasicAuthentication,SessionAuthentication]
|
||
|
permission_classes = [IsAuthenticated]
|
||
|
def get(self, request):
|
||
|
display_list = []
|
||
|
scripts = Script.objects.filter(screenplay__user=request.user).order_by('-created_on')
|
||
|
|
||
|
|
||
|
for script in scripts:
|
||
|
|
||
|
if ScriptAuditModel.objects.filter(script=script).exists():
|
||
|
# audit_status_objects = AuditStatus.objects.filter(script=str(script.id))
|
||
|
audit_status_objects = ScriptAuditModel.objects.filter(script=script)
|
||
|
for audit_status_object in audit_status_objects:
|
||
|
script_audit_status = audit_status_object.status
|
||
|
if script_audit_status == States.SUCCESS:
|
||
|
extimated_time = audit_status_object.expected_duration
|
||
|
no_of_pages = audit_status_object.number_of_pages
|
||
|
screenplay_language = audit_status_object.screenplay_language
|
||
|
dialogue_language = audit_status_object.dialogue_language
|
||
|
# transaction_haash = audit_status_object.transaction_hash
|
||
|
# privatekeycnf = audit_status_object.bchain_privatekey
|
||
|
privatekeycnf = "dhjdjfhd"
|
||
|
print("SCRIPT ID = ", str(script.id))
|
||
|
print("extimated time = ", extimated_time)
|
||
|
print("no of pages = ", no_of_pages)
|
||
|
print("screenplay language = ", screenplay_language)
|
||
|
print("dialogue_language =", dialogue_language)
|
||
|
# print("transaction hash = ", transaction_haash)
|
||
|
print(" private key = ", privatekeycnf)
|
||
|
display_list.append(
|
||
|
{
|
||
|
"scriptid": str(script.id),
|
||
|
"scriptName": str(script.screenplay.name),
|
||
|
"author": str(script.screenplay.author),
|
||
|
"iscomplete": "S",
|
||
|
"created_on" : str(script.created_on.astimezone(pytz.timezone('Asia/Kolkata')).strftime("%B %d, %Y %I:%M %p")),
|
||
|
"extimated_time": str(extimated_time),
|
||
|
"screenplay_language": str(screenplay_language),
|
||
|
"dialogue_language": str(dialogue_language),
|
||
|
"page_number" : str(no_of_pages),
|
||
|
"transaction_hash" : str(1),
|
||
|
"confirmkey": str(privatekeycnf)
|
||
|
}
|
||
|
)
|
||
|
elif script_audit_status == States.FAILURE:
|
||
|
display_list.append(
|
||
|
{
|
||
|
"scriptid": str(script.id),
|
||
|
"scriptName": str(script.screenplay.name),
|
||
|
"author": str(script.screenplay.author),
|
||
|
"iscomplete": "F",
|
||
|
"created_on" : str(script.created_on.astimezone(pytz.timezone('Asia/Kolkata')).strftime("%B %d, %Y %I:%M %p")),
|
||
|
}
|
||
|
)
|
||
|
elif script_audit_status == States.STARTED:
|
||
|
extimated_time = audit_status_object.expected_duration
|
||
|
no_of_pages = audit_status_object.number_of_pages
|
||
|
display_list.append(
|
||
|
{
|
||
|
"scriptid": str(script.id),
|
||
|
"scriptName": str(script.screenplay.name),
|
||
|
"author": str(script.screenplay.author),
|
||
|
"iscomplete": "R",
|
||
|
"created_on" : str(script.created_on.astimezone(pytz.timezone('Asia/Kolkata')).strftime("%B %d, %Y %I:%M %p")),
|
||
|
"extimated_time": str(extimated_time),
|
||
|
}
|
||
|
)
|
||
|
elif script_audit_status == States.PENDING:
|
||
|
continue
|
||
|
url = get_current_site(request).domain
|
||
|
return render(
|
||
|
request,
|
||
|
"audit/audited_copy.html",
|
||
|
{"i": display_list, "url": url},
|
||
|
)
|
||
|
|
||
|
|
||
|
class CheckDomain(APIView):
|
||
|
authentication_classes = [JWTAuthentication]
|
||
|
permission_classes = [IsAuthenticated]
|
||
|
def get(self, request):
|
||
|
domain_name = (get_current_site(request).domain)
|
||
|
return Response(f"Audit started {domain_name}")
|
||
|
|
||
|
class AuditView(APIView):
|
||
|
authentication_classes = [JWTAuthentication]
|
||
|
permission_classes = [IsAuthenticated]
|
||
|
def get(self, request, script_id):
|
||
|
audit_in_background(script_id)
|
||
|
return Response("Audit started")
|
||
|
|
||
|
|
||
|
|
||
|
class CharacterListView(APIView):
|
||
|
authentication_classes = [JWTAuthentication]
|
||
|
permission_classes = [IsAuthenticated]
|
||
|
def get(self, request, script_id):
|
||
|
audit = NeutralAudit(script_id)
|
||
|
char_lst = audit.get_character_list()
|
||
|
# char_sub = audit.get_character_subset(char_lst)
|
||
|
char_sub = audit.get_character_subset(char_lst)
|
||
|
return Response([char_lst, char_sub])
|
||
|
|
||
|
|
||
|
def generate_script_id(path,request):
|
||
|
input_file = os.path.basename(path)
|
||
|
input_file1 = os.path.splitext(input_file)[0]
|
||
|
now = datetime.now()
|
||
|
screenplay_name = input_file1 +"_"+ now.strftime("%d-%m-%Y_%H-%M-%S_")
|
||
|
author = "SELF_DEFINED"
|
||
|
language = "en"
|
||
|
script_file = path
|
||
|
script_ext = script_file.split(".")[-1]
|
||
|
script_file_name = screenplay_name + "." + script_ext
|
||
|
print(script_file_name)
|
||
|
with open(path, "rb") as script_file:
|
||
|
file = ContentFile(script_file.read(),
|
||
|
script_file_name,
|
||
|
)
|
||
|
result = filesystem.new_screenplay(
|
||
|
request.user,
|
||
|
author,
|
||
|
screenplay_name,
|
||
|
file,
|
||
|
"script-original",
|
||
|
language,
|
||
|
)
|
||
|
script_id = result.get("script", {}).get("id")
|
||
|
print("\n\n\n\nSCRIPT____ID :",script_id,"\n\n\n\n")
|
||
|
return script_id
|
||
|
|
||
|
def check_audit_status(request):
|
||
|
scripts = Script.objects.filter(screenplay__user=request.user)
|
||
|
for script in scripts:
|
||
|
# audit_exists = File.objects.filter(script=str(script.id),type='script-csv').exists()
|
||
|
# if AuditStatus.objects.filter(script=str(script.id)).exists():
|
||
|
if ScriptAuditModel.objects.filter(script=script).exists():
|
||
|
# audit_status_objects = AuditStatus.objects.filter(script=str(script.id))
|
||
|
audit_status_objects = ScriptAuditModel.objects.filter(script=script)
|
||
|
for audit_status_object in audit_status_objects:
|
||
|
script_audit_status = audit_status_object.status
|
||
|
while script_audit_status != States.SUCCESS:
|
||
|
if script_audit_status == States.SUCCESS:
|
||
|
return "SUCCESS"
|
||
|
|
||
|
if script_audit_status == States.FAILURE:
|
||
|
return "FAILURE"
|
||
|
|
||
|
|
||
|
def write_dict_to_file(file_path, dictionary):
|
||
|
try:
|
||
|
with open(file_path, 'w') as file:
|
||
|
json.dump(dictionary, file)
|
||
|
print("Dictionary successfully written to the file.")
|
||
|
except IOError:
|
||
|
print("Error: Unable to open the file.")
|
||
|
|
||
|
def audit_vector_gen(request,path,mnf_db_id):
|
||
|
filesystem_id = generate_script_id(path,request)
|
||
|
audit_in_background(filesystem_id)
|
||
|
|
||
|
pass
|
||
|
|
||
|
|
||
|
# class DownloadScriptFromBlockchain(APIView):
|
||
|
# #authentication_classes = [JWTAuthentication]
|
||
|
# authentication_classes = [BasicAuthentication,SessionAuthentication]
|
||
|
# permission_classes = [IsAuthenticated]
|
||
|
|
||
|
# """
|
||
|
# View for downloding scripts of desired type eg- docx, pdf
|
||
|
# """
|
||
|
|
||
|
# VALID_FILE_TYPES = ("script-docx", "script-pdf","script-original","audit-report")
|
||
|
# """
|
||
|
# Files of only above types are allwed to be downloaded.
|
||
|
# """
|
||
|
|
||
|
# def get(self, request) -> FileResponse:
|
||
|
|
||
|
# if not "script_id" in request.query_params:
|
||
|
# raise spex.ScriptIdNotFound("script_id must be provided in body.")
|
||
|
|
||
|
# if not "type" in request.query_params:
|
||
|
# raise spex.ScriptAuditException(
|
||
|
# "type(i.e. type of file) must be provided in body.")
|
||
|
|
||
|
# script_id = request.query_params.get("script_id")
|
||
|
# file_type = request.query_params.get("type")
|
||
|
# privatekey1 = request.query_params.get("privatekey")
|
||
|
|
||
|
|
||
|
# self.user = request.user
|
||
|
# self.Blockchain_script = script_id
|
||
|
# if not request.query_params.get("type") in self.VALID_FILE_TYPES:
|
||
|
# raise spex.IllegalFiletype(file_type, self.VALID_FILE_TYPES)
|
||
|
|
||
|
# try:
|
||
|
# if UserCredentialsForBlockchain.objects.filter(user=request.user).exists():
|
||
|
# blockchain_obj = UserCredentialsForBlockchain.objects.get(user=request.user)
|
||
|
# userkeys = decryptionOfPrivate(blockchain_obj.privateKey)
|
||
|
# with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# file01.write("1.user private key-------------------" + str(userkeys) + "\n")
|
||
|
# with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# file01.write("1.user script_id -------------------" + str(script_id) + "\n")
|
||
|
# UserId = blockchain_obj.user_id
|
||
|
# status,getdata = getScriptAudit(privatekey1,UserId,str(script_id))
|
||
|
# print(status,">>>>>>>>>>>>>>>>>>>>")
|
||
|
# if status == True:
|
||
|
# self.auditData = eval(getdata[1])
|
||
|
# else:
|
||
|
# return HttpResponse('External API returned a non-200 status code.', status=500)
|
||
|
# except:
|
||
|
# return HttpResponse('External API returned a non-200 status code.', status=500)
|
||
|
|
||
|
# # self.file_model_objects = File.objects.filter(script=script_id)
|
||
|
# # print(">><<>><<")
|
||
|
# # print(">><<>><<",self.file_model_objects)
|
||
|
|
||
|
|
||
|
# # if not self.file_model_objects.exists():
|
||
|
# # raise spex.ScriptDoesNotExist()
|
||
|
|
||
|
# try:
|
||
|
# # self.query_file = self.file_model_objects.get(type=file_type)
|
||
|
# query_file1 = self.auditData.get(file_type, {})
|
||
|
|
||
|
# except django_exception.ObjectDoesNotExist:
|
||
|
# return HttpResponse('External API returned a non-200 status code.Blockchain uploaded data not found', status=500)
|
||
|
# # try:
|
||
|
# # query_file2 = self.auditData.get("script-csv", {})
|
||
|
# # file_hash1 = query_file2.get('script_file')
|
||
|
# # file_path1 = query_file2.get('script_file_path')
|
||
|
# # download_file_System(file_hash1,file_path1)
|
||
|
# # self.auditData.get("script-docx",{})
|
||
|
# # if file_type == "script-docx":
|
||
|
# # self._check_docx()
|
||
|
|
||
|
# # elif file_type == "script-pdf":
|
||
|
# # self._check_pdf()
|
||
|
|
||
|
# # if UserCredentialsForBlockchain.objects.filter(user=request.user).exists():
|
||
|
# # blockchain_obj = UserCredentialsForBlockchain.objects.get(user=request.user)
|
||
|
# # userkeys = decryptionOfPrivate(blockchain_obj.privateKey)
|
||
|
# # UserId = blockchain_obj.user_id
|
||
|
# # status,getdata = getScriptAudit(userkeys.decode('utf-8'),UserId,str(script_id))
|
||
|
# # if status == True:
|
||
|
# # self.auditData = eval(getdata[1])
|
||
|
# # query_file1 = self.auditData[file_type]
|
||
|
# # except:
|
||
|
# # if file_type == "script-docx":
|
||
|
# # self._check_docx()
|
||
|
|
||
|
# # elif file_type == "script-pdf":
|
||
|
# # self._check_pdf()
|
||
|
|
||
|
# # file_path = self.query_file.file.path
|
||
|
# # file_size = self.query_file.file.size
|
||
|
|
||
|
# # file_path = query_file1.get('script_file_path')
|
||
|
# # file_size = query_file1.get('script_file_size')
|
||
|
# try:
|
||
|
# screenplayName= self.auditData.get("script-original", {})
|
||
|
# file_hash = query_file1.get('script_file')
|
||
|
# file_path= query_file1.get("script_file_path")
|
||
|
# except:
|
||
|
# pass
|
||
|
|
||
|
|
||
|
# # file_path = self.latest_file.file.path
|
||
|
# # file_size = self.latest_file.file.size
|
||
|
# try:
|
||
|
# print("file_path")
|
||
|
# geting_files = decryptionOflocalUrl(file_hash)
|
||
|
# response = requests.get(geting_files, stream=True)
|
||
|
# with open("/home/mnfbeta/mnf/app/Blockchain2/file_stroge.txt","a") as file01:
|
||
|
# file01.write("1.gasfee file_uploading_end-------------------" + str(geting_files) + "\n")
|
||
|
# if response.status_code == 200:
|
||
|
# content_type = response.headers['Content-Type'] # Change to the appropriate format
|
||
|
# response = HttpResponse(response.iter_content(chunk_size=8192), content_type=content_type)
|
||
|
# # response[
|
||
|
# # 'Content-Disposition'] = f"attachment; filename={screenplayName.get("screenplay_name")}.{file_path.rsplit('.',1)[1]}"
|
||
|
# return response
|
||
|
# else:
|
||
|
# return HttpResponse('External API returned a non-200 status code.File CID Not Valid', status=500)
|
||
|
# except:
|
||
|
# return HttpResponse('External API returned a non-200 status code.This File Type Is Not Here', status=500)
|
||
|
|
||
|
|
||
|
#download Audited Script
|
||
|
|
||
|
|
||
|
# class DownloadScript(APIView):
|
||
|
|
||
|
# #authentication_classes = [JWTAuthentication]
|
||
|
# authentication_classes = [BasicAuthentication,SessionAuthentication]
|
||
|
# permission_classes = [IsAuthenticated]
|
||
|
# """
|
||
|
# View for downloding scripts of desired type eg- docx, pdf
|
||
|
# """
|
||
|
|
||
|
# VALID_FILE_TYPES = ("script-docx", "script-pdf","script-original","audit-report")
|
||
|
# """
|
||
|
# Files of only above types are allwed to be downloaded.
|
||
|
# """
|
||
|
|
||
|
|
||
|
# def get(self, request) -> FileResponse:
|
||
|
|
||
|
# if not "script_id" in request.query_params:
|
||
|
# raise spex.ScriptIdNotFound("script_id must be provided in body.")
|
||
|
|
||
|
# if not "type" in request.query_params:
|
||
|
# raise spex.ScriptAuditException(
|
||
|
# "type(i.e. type of file) must be provided in body.")
|
||
|
|
||
|
# script_id = request.query_params.get("script_id")
|
||
|
# file_type = request.query_params.get("type")
|
||
|
|
||
|
# self.user = request.user
|
||
|
# self.Blockchain_script = script_id
|
||
|
# if not request.query_params.get("type") in self.VALID_FILE_TYPES:
|
||
|
# raise spex.IllegalFiletype(file_type, self.VALID_FILE_TYPES)
|
||
|
|
||
|
# try:
|
||
|
# if UserCredentialsForBlockchain.objects.filter(user=request.user).exists():
|
||
|
# blockchain_obj = UserCredentialsForBlockchain.objects.get(user=request.user)
|
||
|
# userkeys = decryptionOfPrivate(blockchain_obj.privateKey)
|
||
|
# UserId = blockchain_obj.user_id
|
||
|
# status,getdata = getScriptAudit(userkeys.decode('utf-8'),UserId,str(script_id))
|
||
|
# print(status,">>>>>>>>>>>>>>>>>>>>")
|
||
|
# if status == True:
|
||
|
# self.auditData = eval(getdata[1])
|
||
|
# except:
|
||
|
# pass
|
||
|
|
||
|
# self.file_model_objects = File.objects.filter(script=script_id)
|
||
|
# print(">><<>><<")
|
||
|
# print(">><<>><<",self.file_model_objects)
|
||
|
|
||
|
|
||
|
# if not self.file_model_objects.exists():
|
||
|
# raise spex.ScriptDoesNotExist()
|
||
|
|
||
|
# try:
|
||
|
# self.query_file = self.file_model_objects.get(type=file_type)
|
||
|
# try:
|
||
|
# query_file1 = self.auditData.get(file_type, {})
|
||
|
# except:
|
||
|
# pass
|
||
|
|
||
|
# except django_exception.ObjectDoesNotExist:
|
||
|
# try:
|
||
|
# query_file2 = self.auditData.get("script-csv", {})
|
||
|
# file_hash1 = query_file2.get('script_file')
|
||
|
# file_path1 = query_file2.get('script_file_path')
|
||
|
# download_file_System(file_hash1,file_path1)
|
||
|
# self.auditData.get("script-docx",{})
|
||
|
# if file_type == "script-docx":
|
||
|
# self._check_docx()
|
||
|
|
||
|
# elif file_type == "script-pdf":
|
||
|
# self._check_pdf()
|
||
|
|
||
|
# if UserCredentialsForBlockchain.objects.filter(user=request.user).exists():
|
||
|
# blockchain_obj = UserCredentialsForBlockchain.objects.get(user=request.user)
|
||
|
# userkeys = decryptionOfPrivate(blockchain_obj.privateKey)
|
||
|
# UserId = blockchain_obj.user_id
|
||
|
# status,getdata = getScriptAudit(userkeys.decode('utf-8'),UserId,str(script_id))
|
||
|
# if status == True:
|
||
|
# self.auditData = eval(getdata[1])
|
||
|
# query_file1 = self.auditData[file_type]
|
||
|
# except:
|
||
|
# if file_type == "script-docx":
|
||
|
# self._check_docx()
|
||
|
|
||
|
# elif file_type == "script-pdf":
|
||
|
# self._check_pdf()
|
||
|
|
||
|
# file_path = self.query_file.file.path
|
||
|
# file_size = self.query_file.file.size
|
||
|
|
||
|
# # file_path = query_file1.get('script_file_path')
|
||
|
# # file_size = query_file1.get('script_file_size')
|
||
|
# try:
|
||
|
# file_hash = query_file1.get('script_file')
|
||
|
# except:
|
||
|
# pass
|
||
|
|
||
|
|
||
|
# # file_path = self.latest_file.file.path
|
||
|
# # file_size = self.latest_file.file.size
|
||
|
# try:
|
||
|
# print("file_path")
|
||
|
# print(file_path)
|
||
|
# geting_files = decryptionOflocalUrl(file_hash)
|
||
|
# response = requests.get(geting_files, stream=True)
|
||
|
# if response.status_code == 200:
|
||
|
# content_type = response.headers['Content-Type'] # Change to the appropriate format
|
||
|
# response = HttpResponse(response.iter_content(chunk_size=8192), content_type=content_type)
|
||
|
# response[
|
||
|
# 'Content-Disposition'] = f"attachment; filename={self.query_file.script.screenplay.name + '_' + str(self.query_file)}.{file_path.rsplit('.',1)[1]}"
|
||
|
# return response
|
||
|
# else:
|
||
|
# return HttpResponse('External API returned a non-200 status code.', status=500)
|
||
|
# except:
|
||
|
# mimetype, _ = mimetypes.guess_type(file_path)
|
||
|
|
||
|
# response = FileResponse(self.query_file.file, content_type=mimetype)
|
||
|
# response['Content-Length'] = file_size
|
||
|
# response[
|
||
|
# 'Content-Disposition'] = f"attachment; filename={self.query_file.script.screenplay.name + '_' + str(self.query_file)}.{file_path.rsplit('.',1)[1]}"
|
||
|
# return response
|
||
|
|
||
|
|
||
|
|
||
|
# def _check_docx(self) -> None:
|
||
|
# """
|
||
|
# When docx not found this function can create docx of script for available formats.
|
||
|
# """
|
||
|
# try:
|
||
|
# audit_file_object = self.file_model_objects.get(type="script-csv")
|
||
|
# except django_exception.ObjectDoesNotExist:
|
||
|
# self._download_original()
|
||
|
# return
|
||
|
|
||
|
# df = pd.read_csv(audit_file_object.file)
|
||
|
# docx = utilities.csv_to_docx(df)
|
||
|
|
||
|
# temp_file_stream = BytesIO()
|
||
|
# docx.save(temp_file_stream)
|
||
|
# temp_file_stream.seek(0)
|
||
|
|
||
|
# docx_file = ContentFile(
|
||
|
# temp_file_stream.getvalue(),
|
||
|
# "from_audited_csv_to_document.docx",
|
||
|
# )
|
||
|
|
||
|
# self.query_file = File.objects.create(
|
||
|
# script=audit_file_object.script,
|
||
|
# file=docx_file,
|
||
|
# type="script-docx",
|
||
|
# )
|
||
|
# try:
|
||
|
# if UserCredentialsForBlockchain.objects.filter(user=self.user).exists():
|
||
|
# blockchain_obj = UserCredentialsForBlockchain.objects.get(user=self.user)
|
||
|
# userkeys = decryptionOfPrivate(blockchain_obj.privateKey)
|
||
|
# UserId = blockchain_obj.user_id
|
||
|
# status,getdata = getScriptAudit(userkeys.decode('utf-8'),UserId,str(self.Blockchain_script))
|
||
|
# if status == True:
|
||
|
# audit_data = eval(getdata[1])
|
||
|
# file_to_audit = File.objects.get(
|
||
|
# script=self.Blockchain_script,
|
||
|
# type="script-docx"
|
||
|
# )
|
||
|
# script_docx = {}
|
||
|
# script_path = file_to_audit.file.path
|
||
|
# script_size = file_to_audit.file.size
|
||
|
# with open(script_path, 'rb') as _file:
|
||
|
# hash = uploadDataToIPFSNode(_file)
|
||
|
# script_docx["script_file_path"] = script_path
|
||
|
# script_docx["script_file"] = hash
|
||
|
# script_docx["type"] = "script-docx"
|
||
|
# script_docx["script_file_size"] = script_size
|
||
|
# audit_data["script-docx"] = script_docx
|
||
|
# Response = UploadScriptAuditData(OWNER_KEY,blockchain_obj.publicKey,blockchain_obj.user_id,self.Blockchain_script,str(audit_data))
|
||
|
# print("tx_hash",Response)
|
||
|
# except:
|
||
|
# pass
|
||
|
|
||
|
# temp_file_stream.close()
|
||
|
|
||
|
# def _check_pdf(self) -> None:
|
||
|
# """
|
||
|
# When pdf not found this function can create docx of script for available formats.
|
||
|
# """
|
||
|
# print("I am printing in _check_pdf method")
|
||
|
# if not self.file_model_objects.filter(type="script-docx").exists():
|
||
|
# self._check_docx()
|
||
|
|
||
|
# try:
|
||
|
# docx_file_object = self.file_model_objects.get(type="script-docx")
|
||
|
# except django_exception.ObjectDoesNotExist:
|
||
|
# self._download_original()
|
||
|
# return
|
||
|
|
||
|
# temp_dir = tempfile.TemporaryDirectory()
|
||
|
# pdf_file_path = utilities.docx_to_pdf(
|
||
|
# docx_file_object.file.path, temp_dir.name)
|
||
|
|
||
|
# with open(pdf_file_path, "rb") as temp_pdf:
|
||
|
|
||
|
# pdf_file = DjangoFile(
|
||
|
# temp_pdf,
|
||
|
# pdf_file_path.rsplit('/', 1)[1],
|
||
|
# )
|
||
|
|
||
|
# self.query_file = File.objects.create(
|
||
|
# script=docx_file_object.script,
|
||
|
# file=pdf_file,
|
||
|
# type="script-pdf",
|
||
|
# )
|
||
|
|
||
|
# try:
|
||
|
# if UserCredentialsForBlockchain.objects.filter(user=self.user).exists():
|
||
|
# blockchain_obj = UserCredentialsForBlockchain.objects.get(user=self.user)
|
||
|
# userkeys = decryptionOfPrivate(blockchain_obj.privateKey)
|
||
|
# UserId = blockchain_obj.user_id
|
||
|
# status,getdata = getScriptAudit(userkeys.decode('utf-8'),UserId,str(self.Blockchain_script))
|
||
|
# if status == True:
|
||
|
# audit_data = eval(getdata[1])
|
||
|
# file_to_audit = File.objects.get(
|
||
|
# script=self.Blockchain_script,
|
||
|
# type="script-pdf"
|
||
|
# )
|
||
|
# script_pdf = {}
|
||
|
# script_path = file_to_audit.file.path
|
||
|
# script_size = file_to_audit.file.size
|
||
|
# with open(script_path, 'rb') as _file:
|
||
|
# hash = uploadDataToIPFSNode(_file)
|
||
|
# script_pdf["script_file_path"] = script_path
|
||
|
# script_pdf["script_file"] = hash
|
||
|
# script_pdf["type"] = "script-pdf"
|
||
|
# script_pdf["script_file_size"] = script_size
|
||
|
# audit_data["script-pdf"] = script_pdf
|
||
|
# Response = UploadScriptAuditData(OWNER_KEY,blockchain_obj.publicKey,blockchain_obj.user_id,self.Blockchain_script,str(audit_data))
|
||
|
# print("tx_hash",Response)
|
||
|
|
||
|
# except:
|
||
|
# pass
|
||
|
# self.latest_file = self.query_file
|
||
|
|
||
|
# def _download_original(self) -> None:
|
||
|
# """
|
||
|
# used for downloafing the script-original when no other type of scripts are found.
|
||
|
# This downloads the unaudited file.
|
||
|
# """
|
||
|
# self.query_file = self.file_model_objects.get(type="script-original")
|
||
|
|
||
|
|
||
|
|
||
|
class DownloadScript(APIView):
|
||
|
#authentication_classes = [JWTAuthentication]
|
||
|
authentication_classes = [BasicAuthentication,SessionAuthentication]
|
||
|
permission_classes = [IsAuthenticated]
|
||
|
"""
|
||
|
View for downloding scripts of desired type eg- docx, pdf
|
||
|
"""
|
||
|
|
||
|
VALID_FILE_TYPES = ("script-docx", "script-pdf","script-original","audit-report")
|
||
|
"""
|
||
|
Files of only above types are allwed to be downloaded.
|
||
|
"""
|
||
|
|
||
|
def get(self, request) -> FileResponse:
|
||
|
if not "script_id" in request.query_params:
|
||
|
# script_id = request.query_params.get("script_id")
|
||
|
# if not "script_id" in request.data.get("script_id"):
|
||
|
raise spex.ScriptidNotFound("script_id must be provided in body.")
|
||
|
|
||
|
if not "type" in request.query_params:
|
||
|
raise spex.ScriptpadException("type(i.e. type of file) must be provided in body.")
|
||
|
|
||
|
script_id = request.query_params.get("script_id")
|
||
|
file_type = request.query_params.get("type")
|
||
|
|
||
|
if not request.query_params.get("type") in self.VALID_FILE_TYPES:
|
||
|
raise spex.IllegalFiletype(file_type, self.VALID_FILE_TYPES)
|
||
|
|
||
|
self.file_model_objects = File.objects.filter(script = script_id)
|
||
|
|
||
|
if not self.file_model_objects.exists():
|
||
|
raise spex.BadQuery()
|
||
|
|
||
|
try:
|
||
|
self.query_file = self.file_model_objects.get(type=file_type)
|
||
|
|
||
|
except django_exception.ObjectDoesNotExist:
|
||
|
|
||
|
if file_type == "script-docx":
|
||
|
self._check_docx()
|
||
|
|
||
|
elif file_type == "script-pdf":
|
||
|
self._check_pdf()
|
||
|
|
||
|
|
||
|
file_path = self.query_file.file.path
|
||
|
file_size = self.query_file.file.size
|
||
|
|
||
|
mimetype, _ = mimetypes.guess_type(file_path)
|
||
|
|
||
|
response = FileResponse(self.query_file.file, content_type=mimetype)
|
||
|
response['Content-Length'] = file_size
|
||
|
response['Content-Disposition'] = f"attachment; filename={self.query_file.script.screenplay.name + '_' + str(self.query_file)}.{file_path.rsplit('.',1)[1]}"
|
||
|
return response
|
||
|
|
||
|
def _check_docx(self) -> None:
|
||
|
"""
|
||
|
When docx not found this function can create docx of script for available formats.
|
||
|
"""
|
||
|
try:
|
||
|
audit_file_object = self.file_model_objects.get(type="script-csv")
|
||
|
|
||
|
except django_exception.ObjectDoesNotExist:
|
||
|
self._download_original()
|
||
|
return
|
||
|
|
||
|
df = pd.read_csv(audit_file_object.file)
|
||
|
docx = utilities.csv_to_docx(df)
|
||
|
|
||
|
temp_file_stream = BytesIO()
|
||
|
docx.save(temp_file_stream)
|
||
|
temp_file_stream.seek(0)
|
||
|
|
||
|
docx_file = ContentFile(
|
||
|
temp_file_stream.getvalue(),
|
||
|
"from_audited_csv_to_document.docx",
|
||
|
)
|
||
|
|
||
|
self.query_file = File.objects.create(
|
||
|
script = audit_file_object.script,
|
||
|
file = docx_file,
|
||
|
type = "script-docx",
|
||
|
)
|
||
|
|
||
|
temp_file_stream.close()
|
||
|
|
||
|
def _check_pdf(self) -> None:
|
||
|
"""
|
||
|
When pdf not found this function can create docx of script for available formats.
|
||
|
"""
|
||
|
|
||
|
if not self.file_model_objects.filter(type="script-docx").exists():
|
||
|
self._check_docx()
|
||
|
|
||
|
try:
|
||
|
docx_file_object = self.file_model_objects.get(type="script-docx")
|
||
|
|
||
|
except django_exception.ObjectDoesNotExist:
|
||
|
self._download_original()
|
||
|
return
|
||
|
|
||
|
temp_dir = tempfile.TemporaryDirectory()
|
||
|
pdf_file_path = utilities.docx_to_pdf(docx_file_object.file.path, temp_dir.name)
|
||
|
|
||
|
with open(pdf_file_path, "rb") as temp_pdf:
|
||
|
|
||
|
pdf_file = DjangoFile(
|
||
|
temp_pdf,
|
||
|
pdf_file_path.rsplit('/',1)[1],
|
||
|
)
|
||
|
|
||
|
self.query_file = File.objects.create(
|
||
|
script = docx_file_object.script,
|
||
|
file = pdf_file,
|
||
|
type = "script-pdf",
|
||
|
)
|
||
|
|
||
|
def _download_original(self) -> None:
|
||
|
"""
|
||
|
used for downloafing the script-original when no other type of scripts are found.
|
||
|
This downloads the unaudited file.
|
||
|
"""
|
||
|
self.query_file = self.file_model_objects.get(type="script-original")
|
||
|
|
||
|
|
||
|
|
||
|
# @authentication_classes([SessionAuthentication, BasicAuthentication])
|
||
|
# @permission_classes([IsAuthenticated])
|
||
|
class DeleteScriptAPIView(APIView):
|
||
|
# authentication_classes = [JWTAuthentication]
|
||
|
# permission_classes = [IsAuthenticated]
|
||
|
authentication_classes = [BasicAuthentication,SessionAuthentication]
|
||
|
permission_classes = [IsAuthenticated]
|
||
|
|
||
|
def delete(self,request,screen_play):
|
||
|
try:
|
||
|
screenplay = ScreenPlay.objects.get(name=screen_play)
|
||
|
# scripts = Script.objects.filter(screenplay=screenplay)
|
||
|
# scripts.delete()
|
||
|
screenplay.delete()
|
||
|
return Response("Scripts deleted successfully.", status=status.HTTP_204_NO_CONTENT)
|
||
|
except ScreenPlay.DoesNotExist:
|
||
|
return Response(f"Screenplay with name '{screen_play}' does not exist.", status=status.HTTP_404_NOT_FOUND)
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
def string_to_audit(request,path):
|
||
|
|
||
|
"""
|
||
|
the function uses:
|
||
|
request for use detail
|
||
|
path : path of the script to be audited
|
||
|
"""
|
||
|
gpt_path = path
|
||
|
|
||
|
u_name = script_id_generator()
|
||
|
screenplay_name = u_name
|
||
|
|
||
|
author = "MNF"
|
||
|
language = "en"
|
||
|
script_file = gpt_path
|
||
|
script_ext = script_file.split(".")[-1]
|
||
|
script_file_name = screenplay_name + "." + script_ext
|
||
|
|
||
|
with open(script_file, "rb") as script_file:
|
||
|
file = ContentFile(script_file.read(),
|
||
|
script_file_name,
|
||
|
)
|
||
|
|
||
|
result = filesystem.new_screenplay_without_audit_in_background(
|
||
|
request.user,
|
||
|
author,
|
||
|
screenplay_name,
|
||
|
file,
|
||
|
"script-original",
|
||
|
language,
|
||
|
|
||
|
)
|
||
|
script_id = result.get("script", {}).get("id")
|
||
|
try:
|
||
|
update_audit_status(script_id, States.STARTED)
|
||
|
except:
|
||
|
update_audit_status(script_id, States.FAILURE)
|
||
|
|
||
|
|
||
|
try:
|
||
|
naudit = NeutralAudit(script_id)
|
||
|
naudit.audit()
|
||
|
ScriptAuditModel.objects.update_or_create(
|
||
|
script = Script.objects.get(
|
||
|
id = script_id
|
||
|
),
|
||
|
defaults={"status" : "SUCCESS"}
|
||
|
)
|
||
|
|
||
|
except:
|
||
|
ScriptAuditModel.objects.update_or_create(
|
||
|
script = Script.objects.get(
|
||
|
id = script_id
|
||
|
),
|
||
|
defaults={"status" : "FAILURE"}
|
||
|
)
|
||
|
|
||
|
return script_id
|
||
|
|
||
|
|
||
|
|