"""
Python action to calculate Secure Hash Algorithm SHA256 values for
all workflows.
@name getHashesForWorkflows
@returns {Properties}
@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.1.0
Checked with Aria Automation 8.17.0
"""
import hashlib
import json
import ssl
import urllib.request
def getAllWorkflows(vcoUrl, bearerToken):
""" Get all workflows.
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} bearerToken
@returns {dictionary}
"""
returnValue = {}
try:
requestWorkflows = urllib.request.Request(
url = vcoUrl + "/api/workflows"
)
requestWorkflows.add_header(
"Authorization", "Bearer " + bearerToken
)
requestWorkflows.add_header(
"Content-Type", "application/json"
)
responseWorkflows = urllib.request.urlopen(
requestWorkflows,
context = ssl._create_unverified_context()
)
if responseWorkflows.getcode() == 200:
returnValue = json.loads(responseWorkflows.read())
except Exception as err:
raise Exception("An error occurred at detecting all workflows") \
from err
return returnValue
def getWorkflowDetails(workflowHref, bearerToken):
""" Gets the details of the given workflow.
@param {string} workflowHref - Horizontal reference of workflow
@param {string} bearerToken
@returns {dictionary}
"""
returnValue = {}
try:
requestWorkflow = urllib.request.Request(
url = workflowHref
)
requestWorkflow.add_header(
"Authorization", "Bearer " + bearerToken
)
requestWorkflow.add_header(
"Content-Type", "application/json"
)
responseWorkflow = urllib.request.urlopen(
requestWorkflow,
context = ssl._create_unverified_context()
)
if responseWorkflow.getcode() == 200:
returnValue = json.loads(responseWorkflow.read())
except Exception as err:
raise Exception("An error occurred at detecting workflow details") \
from err
return returnValue
def getWorkflowContent(workflowHref, bearerToken):
""" Gets the content of the given workflow.
@param {string} workflowHref - Horizontal reference of workflow
@param {string} bearerToken
@returns {dictionary}
"""
returnValue = {}
try:
requestContent = urllib.request.Request(
url = workflowHref + "content/"
)
requestContent.add_header(
"Authorization", "Bearer " + bearerToken
)
requestContent.add_header(
"Content-Type", "application/json"
)
responseContent = urllib.request.urlopen(
requestContent,
context = ssl._create_unverified_context()
)
if responseContent.getcode() == 200:
returnValue = json.loads(responseContent.read())
except Exception as err:
raise Exception("An error occurred at detecting workflow content") \
from err
return returnValue
def getCategoryPath(vcoUrl, workflowCategoryId, bearerToken):
""" Gets the folder (category path) of the given workflow.
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} workflowCategoryId - GUID of category (folder)
@param {string} bearerToken
@returns {dictionary}
"""
returnValue = ""
try:
requestCategory = urllib.request.Request(
url = vcoUrl + "/api/categories/" + workflowCategoryId
)
requestCategory.add_header(
"Authorization", "Bearer " + bearerToken
)
requestCategory.add_header(
"Content-Type", "application/json"
)
responseCategory = urllib.request.urlopen(
requestCategory,
context = ssl._create_unverified_context()
)
if responseCategory.getcode() == 200:
categoryDetails = json.loads(responseCategory.read())
returnValue = categoryDetails["path"]
except Exception as err:
raise Exception("An error occurred at detecting category path") \
from err
return returnValue
def handler(context, inputs):
""" Aria Automation standard handler, the main function.
"""
results = {}
try:
vcoUrl = context["vcoUrl"]
bearerToken = context["getToken"]()
workflows = getAllWorkflows(
vcoUrl,
bearerToken
)
if not workflows:
raise ValueError("No workflows were detected")
for workflow in workflows["link"]:
workflowDetails = getWorkflowDetails(
workflow["href"],
bearerToken
)
if not workflowDetails:
raise ValueError("No workflow details were detected")
workflowName = workflowDetails["name"]
workflowVersion = workflowDetails["version"]
if "description" in workflowDetails:
workflowDescription = workflowDetails["description"]
else:
workflowDescription = ""
if "input-parameters" in workflowDetails:
workflowInputParameters = \
workflowDetails["input-parameters"]
else:
workflowInputParameters = ""
if "output-parameters" in workflowDetails:
workflowOutputParameters = \
workflowDetails["output-parameters"]
else:
workflowOutputParameters = ""
contentDetails = getWorkflowContent(
workflow["href"],
bearerToken
)
if not contentDetails:
raise ValueError("No workflow content were detected")
hashData = workflowName + " ~ " + \
workflowVersion + " ~ " + \
workflowDescription + " ~ " + \
str(workflowOutputParameters) + " ~ " + \
str(workflowInputParameters) + " ~ " + \
str(contentDetails)
workflowFolder = getCategoryPath(
vcoUrl,
workflowDetails["category-id"],
bearerToken
)
key = workflowFolder + "/" + workflowName
value = hashlib.sha256(
hashData.encode("utf-8")
).hexdigest()
results[key] = value
results = dict(sorted(results.items()))
for key in results:
print(key, results[key])
outputs = {
"status": "done",
"results": results
}
except Exception as err:
outputs = {
"status": "incomplete",
"error": repr(err),
"results": results
}
return outputs
|