Conversion_Kitchen_Code/kitchen_counter/MNF/utils/filesystem.py

316 lines
9.4 KiB
Python
Raw Normal View History

2024-04-27 09:33:09 +00:00
import tempfile
from io import BytesIO
from datetime import datetime
import pandas as pd
from django.core.files.base import ContentFile
from django.core.files.base import File as DjangoFile
from centralisedFileSystem.models import File, ScreenPlay, Script
from centralisedFileSystem.serializers import (FileSerializer,
ScreenPlayScriptSerializer,
ScriptSerializer,
ScreenPlaySerializer)
from rest_framework.serializers import ReturnDict, ValidationError
from scriptpage.serializers import ScriptRelatedDataSerializer
from utils import utilities
def new_version(screenplay_name : str, user, file, file_type : str, language : str, raise_exception : bool = False) -> ReturnDict:
"""
Use this function for creatinng a new version. This creates a script in the existing Screenplay object.
Args:
screenplay_name (str): name of the screenplay you want to add another script
user : user
file : File object
file_type (str): Type of file you want to upload
language (str): language of the script you are uploading
Returns:
ReturnDict: the Serialised dictionary of the ner script.
"""
screenplay = ScreenPlay.objects.get(name=screenplay_name, user=user)
data = {
"screenplay": screenplay.id,
"file": {
"file": file,
"type": file_type,
},
"language": language,
"user" : user
}
serializer = ScriptSerializer(data=data)
# serializer.context["request"].user = user
if serializer.is_valid(raise_exception):
serializer.save()
return serializer.data
return serializer.errors
def new_screenplay(user, author : str, name : str, file, file_type : str, language : str, raise_exception : bool = False) -> ReturnDict:
"""
This function creates a new screenplay and a script inside it with a file.
Args:
user : user
author (str): name of the author of screenplay
name (str): name of the screenplay
file : file object
file_type (str): Type of file you are uploading
language (str): language of the script you are uploading
raise_exception (bool): default = False
To raise exception on invalid data or to return error dictionary.
Returns:
ReturnDict: A serialised dicionary of the new screenplay, script and the file created.
"""
print("test00", user)
data = {
"author": author,
"name": name,
"script": {
"file": {
"file": file,
"type": file_type,
},
"language": language,
},
}
serializer = ScreenPlayScriptSerializer(data=data)
serializer.user = user
# raise_exception has been removed here
if serializer.is_valid(raise_exception=True):
serializer.save(user=user)
return serializer.data
return serializer.errors
def update_file(script_id : str, file_type : str, file) -> ReturnDict:
"""
This function is used for updating the existing file_type.
Args:
script_id (str): Id of Script
file_type (str): Type of file you want to update
file : The file object which will be replacing the existing file object.
Returns:
ReturnDict: Serialised dectionary.
"""
req_file = File.objects.get(script=script_id, type=file_type)
req_file.file = file
req_file.save()
return FileSerializer(req_file).data
def get_file(script_id : str, file_type : str, *args, **kwargs):
"""
Returns the file object of the given file type in the Script.
Args:
script_id (str): Id of the Script
file_type (str): Type of File you want
Returns:
file object: The file object which `open` returns.
"""
file = File.objects.get(
script__id = script_id,
type = file_type,
).file
return file
def get_file_path(script_id : str, file_type : str) -> str:
"""
Retuens the absolute path of the file path from script_id anf file_type.
Args:
script_id (str): Id of Script
file_type (str): The type of file of which you want the path
Returns:
str: Path of the desired file.
"""
return get_file(script_id, file_type).path
def create_script_docx(script_id : str, *args, **kwargs):
"""
Creates the docx file from the audited script-csv and saves it in the CFS
Args:
script_id (str): Id of the Script
Returns:
file object: The file object which `open` returns.
"""
print("apple 102.1 creating docx")
df = pd.read_csv(
get_file(script_id, "script-csv")
)
docx = utilities.csv_to_docx(df)
temp_file_stream = BytesIO()
docx.save(temp_file_stream)
temp_file_stream.seek(0)
docx_file = ContentFile(
temp_file_stream.getvalue(),
"from_audited_csv_to_document.docx",
)
new_file, _ = File.objects.update_or_create(
script = Script.objects.get(id=script_id),
type = "script-docx",
defaults = {
"file" : docx_file,
}
)
temp_file_stream.close()
print("apple 102.2 docx created")
return new_file.file
def create_script_pdf(script_id : str, *args, **kwargs):
"""
Creates the pdf file from the audited script-csv and saves it in the CFS
Args:
script_id (str): Id of the Script
Returns:
file object: The file object which `open` returns.
"""
try:
docx_file = get_file(script_id, "script-docx")
except File.DoesNotExist:
docx_file = create_script_docx(script_id)
temp_dir = tempfile.TemporaryDirectory()
pdf_file_path = utilities.docx_to_pdf(docx_file.path, temp_dir.name)
with open(pdf_file_path, "rb") as temp_pdf:
pdf_file = DjangoFile(
temp_pdf,
pdf_file_path.rsplit('/',1)[1],
)
new_file, _ = File.objects.update_or_create(
script = Script.objects.get(id=script_id),
type = "script-pdf",
defaults = {
"file" : pdf_file,
}
)
return new_file.file
VALID_SCRIPT_TYPES = ("script-docx", "script-csv", "script-pdf")
def get_script(script_id : str, script_type : str, create : bool = False, *args, **kwargs):
"""
This function is for getting scripts with the script id. If a script_type is not present,
create flag can be set to true so that the file is created and then returned.
Args:
script_id (str): Id of the script
script_type (str): script type (valid types : docx, csv, pdf)
create (bool): Default is Flag, This flag indicates weather to create a file in case of absence.
"""
if script_type not in VALID_SCRIPT_TYPES:
raise ValueError(f"{script_type} is not a valid type. Valid types : {VALID_SCRIPT_TYPES}.")
try:
print("apple trying")
return get_file(script_id, script_type, *args, **kwargs)
# except File.DoesNotExist as exp:
except Exception as exp:
print("apple exception")
if not create:
raise File.DoesNotExist(f"{script_type} does not exist. If you want to create and then return one, make sure create flag is set True. If you are looking for the audited script, why not use script-csv?") from exp
elif script_type == "script-docx":
print("apple 101")
return create_script_docx(script_id, *args, **kwargs)
elif script_type == "script-pdf":
return create_script_pdf(script_id, *args, **kwargs)
def get_script_data(script_id : str) -> ReturnDict:
"""
Returns all the data of script and its related models.
Args:
script_id (str): Id of the Script
Returns:
ReturnDict: Dictionary of data.
"""
scriptqueryobject = Script.objects.get(
id = script_id,
)
screenplay = ScreenPlaySerializer(scriptqueryobject.screenplay)
script_related = ScriptRelatedDataSerializer(scriptqueryobject)
data = screenplay.data
data["script"] = script_related.data
return data
def new_screenplay_without_audit_in_background(user, author : str, name : str, file, file_type : str, language : str, raise_exception : bool = False) -> ReturnDict:
"""
This function creates a new screenplay and a script inside it with a file.
Args:
user : user
author (str): name of the author of screenplay
name (str): name of the screenplay
file : file object
file_type (str): Type of file you are uploading
language (str): language of the script you are uploading
raise_exception (bool): default = False
To raise exception on invalid data or to return error dictionary.
Returns:
ReturnDict: A serialised dicionary of the new screenplay, script and the file created.
"""
print("test00", user)
print("heheh",file)
data = {
"author": author,
"name": name,
"script": {
"file": {
"file": file,
"type": file_type,
"skip_post_save" : True ,
},
"language": language,
},
}
print("DATA : ",data)
serializer = ScreenPlayScriptSerializer(data=data)
serializer.user = user
# raise_exception has been removed here
if serializer.is_valid(raise_exception=True):
serializer.save(user=user)
return serializer.data
return serializer.errors