from web3 import Web3 import time from web3.exceptions import TimeExhausted,TransactionNotFound from .models import FailedTransactions from django.contrib.auth import get_user_model User = get_user_model() PRIVATE_KEY = "a632f28406579d96879cfc49d55411bb1424dbbae60bf4dd93ad0710b63d6b80" # PRIVATE_KEY = "6f06e1108b833b1918067042e13e60eda262705b80385a02d0330ce0db31d3ad" # RPC = 'https://rpc-amoy.polygon.technology/' # CONTRACT_ADDRESS = '0xbE91e2294D12fa78Ce64657fD9Ee137cE3e99881' # CHAIN_ID = 80002 # ABI = '[ { "inputs": [], "stateMutability": "nonpayable", "type": "constructor" }, { "anonymous": false, "inputs": [ { "indexed": false, "internalType": "uint256", "name": "user_id", "type": "uint256" }, { "indexed": false, "internalType": "string", "name": "project", "type": "string" }, { "indexed": false, "internalType": "string", "name": "data", "type": "string" } ], "name": "conversion", "type": "event" }, { "inputs": [ { "internalType": "address", "name": "pubKey", "type": "address" }, { "internalType": "uint256", "name": "user_id", "type": "uint256" }, { "internalType": "string", "name": "project", "type": "string" }, { "internalType": "string", "name": "_data", "type": "string" } ], "name": "UploadScriptAuditData", "outputs": [], "stateMutability": "nonpayable", "type": "function" }, { "inputs": [ { "internalType": "uint256", "name": "user_id", "type": "uint256" }, { "internalType": "string", "name": "project", "type": "string" } ], "name": "deleteScriptAudit", "outputs": [], "stateMutability": "nonpayable", "type": "function" }, { "inputs": [ { "internalType": "uint256", "name": "userId", "type": "uint256" } ], "name": "getProjectId", "outputs": [ { "internalType": "string[]", "name": "", "type": "string[]" } ], "stateMutability": "view", "type": "function" }, { "inputs": [ { "internalType": "uint256", "name": "user_id", "type": "uint256" }, { "internalType": "string", "name": "project", "type": "string" } ], "name": "getScriptAudit", "outputs": [ { "components": [ { "internalType": "address", "name": "owner", "type": "address" }, { "internalType": "string", "name": "mydata", "type": "string" }, { "internalType": "bool", "name": "Write", "type": "bool" } ], "internalType": "struct ScriptAuditDataBase.privateData", "name": "", "type": "tuple" } ], "stateMutability": "view", "type": "function" } ]' # mainnet RPC = 'https://polygon-rpc.com' # RPC = 'https://polygon-mainnet.infura.io/v3/017957497a0d4e9c80750c18a431ac1e' CONTRACT_ADDRESS = '0x3a5d033CdF38aC4a9fe116CE88EBA2E93260044c' CHAIN_ID = 137 ABI = '[ { "inputs": [], "stateMutability": "nonpayable", "type": "constructor" }, { "anonymous": false, "inputs": [ { "indexed": false, "internalType": "uint256", "name": "user_id", "type": "uint256" }, { "indexed": false, "internalType": "string", "name": "project", "type": "string" }, { "indexed": false, "internalType": "string", "name": "data", "type": "string" } ], "name": "conversion", "type": "event" }, { "inputs": [ { "internalType": "address", "name": "pubKey", "type": "address" }, { "internalType": "uint256", "name": "user_id", "type": "uint256" }, { "internalType": "string", "name": "project", "type": "string" }, { "internalType": "string", "name": "_data", "type": "string" } ], "name": "UploadScriptAuditData", "outputs": [], "stateMutability": "nonpayable", "type": "function" }, { "inputs": [ { "internalType": "uint256", "name": "user_id", "type": "uint256" }, { "internalType": "string", "name": "project", "type": "string" } ], "name": "deleteScriptAudit", "outputs": [], "stateMutability": "nonpayable", "type": "function" }, { "inputs": [ { "internalType": "uint256", "name": "userId", "type": "uint256" } ], "name": "getProjectId", "outputs": [ { "internalType": "string[]", "name": "", "type": "string[]" } ], "stateMutability": "view", "type": "function" }, { "inputs": [ { "internalType": "uint256", "name": "user_id", "type": "uint256" }, { "internalType": "string", "name": "project", "type": "string" } ], "name": "getScriptAudit", "outputs": [ { "components": [ { "internalType": "address", "name": "owner", "type": "address" }, { "internalType": "string", "name": "mydata", "type": "string" }, { "internalType": "bool", "name": "Write", "type": "bool" } ], "internalType": "struct ScriptAuditDataBase.privateData", "name": "", "type": "tuple" } ], "stateMutability": "view", "type": "function" } ]' web3 = Web3(Web3.HTTPProvider(RPC)) CAddress =CONTRACT_ADDRESS abi = ABI contractInst = web3.eth.contract(address=CAddress, abi=abi) # def UploadScriptAuditData(privatekey,pubkey,user_id,project,data): # try: # acc1 = web3.eth.account.from_key(PRIVATE_KEY).address # def send_transaction( gas_price): # nonce = web3.eth.get_transaction_count(acc1,'pending') # Get nonce including pending transactions # uploadData = contractInst.functions.UploadScriptAuditData(pubkey, user_id, project, data).build_transaction({ # 'gasPrice': gas_price, # 'chainId': CHAIN_ID, # 'from': acc1, # 'nonce': nonce # }) # signed_transaction = web3.eth.account.sign_transaction(uploadData, private_key=PRIVATE_KEY) # transaction_hash = web3.eth.send_raw_transaction(signed_transaction.rawTransaction) # print("Transaction hash:", transaction_hash.hex()) # return transaction_hash # acc1 = web3.eth.account.from_key(PRIVATE_KEY).address # initial_gas_price = web3.eth.gas_price # max_retries = 4 # retries = 0 # transaction_hash = None # try: # transaction_hash = send_transaction(initial_gas_price) # except ValueError as e: # # If the error is due to "replacement transaction underpriced", handle it # if "replacement transaction underpriced" in str(e): # print("Initial transaction attempt failed due to 'replacement transaction underpriced'. Retrying with higher gas price.") # retries += 1 # new_gas_price = int(web3.eth.gas_price * (1 + 0.1 * retries)) # transaction_hash = send_transaction(new_gas_price) # elif "max fee per gas less than block base fee" in str(e): # print("Initial transaction attempt failed due to 'max fee per gas less than block base fee'. Retrying with higher gas price.") # retries += 1 # new_gas_price = int(web3.eth.gas_price * (1 + 0.1 * retries)) # transaction_hash = send_transaction(new_gas_price) # else: # raise # while retries < max_retries and transaction_hash: # try: # transaction_receipt = web3.eth.wait_for_transaction_receipt(transaction_hash, timeout=120) # print("Transaction confirmed:") # break # except TimeExhausted: # retries += 1 # print(f"Transaction not confirmed in {120 * retries} seconds, retrying... ({retries}/{max_retries})") # # Increase the gas price by 10% for each retry # new_gas_price = int(web3.eth.gas_price * (1 + 0.1 * retries)) # transaction_hash = send_transaction(new_gas_price) # else: # if retries == max_retries: # print("Failed to confirm the transaction after multiple attempts.") # raise TimeoutError("Failed to confirm the transaction after multiple attempts.") # transaction_fee = transaction_receipt.effectiveGasPrice * transaction_receipt.gasUsed # tx_id = transaction_hash.hex() # return tx_id, str(transaction_fee) # except Exception as e: # print({"error":str(e)}) # return e def UploadScriptAuditData(privatekey,pubkey,user_id,project,data): user = User.objects.get(id=user_id) try: acc1 = web3.eth.account.from_key(PRIVATE_KEY).address nonce = web3.eth.get_transaction_count(acc1,'pending') # Get nonce including pending transactions uploadData = contractInst.functions.UploadScriptAuditData(pubkey, user_id, project, data).build_transaction({ 'gasPrice': web3.eth.gas_price, 'chainId': CHAIN_ID, 'from': acc1, 'nonce': nonce }) signed_transaction = web3.eth.account.sign_transaction(uploadData, private_key=PRIVATE_KEY) transaction_hash = web3.eth.send_raw_transaction(signed_transaction.rawTransaction) print("Transaction hash:", transaction_hash.hex()) transaction_receipt = web3.eth.wait_for_transaction_receipt(transaction_hash, timeout=120) transaction_fee = transaction_receipt.effectiveGasPrice * transaction_receipt.gasUsed tx_id = transaction_hash.hex() # Check if the transaction was successful if transaction_receipt.status == 1: print("Transaction successful!") return tx_id, str(transaction_fee) else: print("Transaction failed!") obj,_ = FailedTransactions.objects.get_or_create(service_id=project,service_name="ScriptAudit") creator = User.objects.get(id=user_id) obj.creator = creator obj.transaction_type = "UploadScriptAuditData" obj.transaction_hash = "" obj.transaction_status = "failed" obj.transaction_error = "Transaction failed" obj.transaction_data = str(data) obj.save() return False, "Transaction failed" except Exception as e: print({"error":str(e)}) obj,_ = FailedTransactions.objects.get_or_create(service_id=project,service_name="ScriptAudit") creator = User.objects.get(id=user_id) obj.creator = creator obj.transaction_type = "UploadScriptAuditData" obj.transaction_hash = "" obj.transaction_status = "failed" obj.transaction_error = "Transaction failed" obj.transaction_data = str(data) obj.save() return False, str(e) def getScriptAudit(private_key,user_id,project): ACCOUNT = web3.eth.account.from_key(private_key).address try: getData = contractInst.functions.getScriptAudit(user_id,project).call({'from':ACCOUNT}) print(getData) return True ,getData except Exception as e: print("Error calling function:", e) return False,e def getUserprojectIds(user_id): try: getProjectId = contractInst.functions.getProjectId(user_id).call() return getProjectId except Exception as e: return e def deleteScriptAudit(privatekey,user_id,project): try: acc1 = web3.eth.account.from_key(privatekey).address nonce = web3.eth.getTransactionCount(acc1) deleteData = contractInst.functions.deleteScriptAudit(user_id,project).buildTransaction({ 'gasPrice': web3.eth.gas_price, 'chainId': CHAIN_ID, 'from': acc1, 'nonce': nonce}) signed_transaction = web3.eth.account.sign_transaction(deleteData, private_key=privatekey) transaction_hash = web3.eth.send_raw_transaction(signed_transaction.rawTransaction) transaction_receipt = web3.eth.wait_for_transaction_receipt(transaction_hash) tx_id = transaction_hash.hex() return tx_id except Exception as e: print({"PPT_conversion_delete_error":str(e)})