"""
Python action to calculate Secure Hash Algorithm SHA256 values for
all workflows.
@name getHashesForWorkflows
@runtime python:3.11
@returns {Properties}
@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.2.0
Checked with Aria Automation 8.17.0, 8.18.1 and
VCF Automation 9.0.0
"""
import hashlib
import json
from util.http import Http
http = Http()
def getAllWorkflows(
vcoUrl: str,
bearerToken: str
) -> dict:
""" Get all workflows.
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} bearerToken
@returns {dictionary}
"""
returnValue: dict = {}
try:
returnValue = http.request(
url = vcoUrl + "/api/workflows",
bearerToken = bearerToken
)
except Exception as err:
raise Exception("An error occurred at detecting all workflows") \
from err
return returnValue
def getWorkflowDetails(
workflowHref: str,
bearerToken: str
) -> dict:
""" Gets the details of the given workflow.
@param {string} workflowHref - Hyper reference of workflow
@param {string} bearerToken
@returns {dictionary}
"""
returnValue: dict = {}
try:
returnValue = http.request(
url = workflowHref,
bearerToken = bearerToken
)
except Exception as err:
raise Exception("An error occurred at detecting workflow details") \
from err
return returnValue
def getWorkflowContent(
workflowHref: str,
bearerToken: str
) -> dict:
"""
Gets the content of the given workflow.
@param {string} workflowHref - Hyper reference of workflow
@param {string} bearerToken
@returns {dictionary}
"""
returnValue: dict = {}
try:
returnValue = http.request(
url = workflowHref + "content/",
bearerToken = bearerToken
)
except Exception as err:
raise Exception("An error occurred at detecting workflow content") \
from err
return returnValue
def getCategoryPath(
vcoUrl: str,
workflowCategoryId: str,
bearerToken: str
) -> str:
"""
Gets the folder (category path) of the given workflow.
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} workflowCategoryId - GUID of category (folder)
@param {string} bearerToken
@returns {string}
"""
returnValue: str = ""
try:
returnValue = http.request(
url = vcoUrl + "/api/categories/" + workflowCategoryId,
bearerToken = bearerToken
)["path"]
except Exception as err:
raise Exception("An error occurred at detecting category path") \
from err
return returnValue
def handler(context: dict, inputs: dict) -> dict:
""" Aria Automation standard handler, the main function.
"""
results: dict = {}
output: dict = {}
try:
vcoUrl: str = context["vcoUrl"]
bearerToken: str = context["getToken"]()
workflows: dict = getAllWorkflows(
vcoUrl,
bearerToken
)
if not workflows:
raise ValueError("No workflows were detected")
for workflow in workflows["link"]:
workflowDetails: dict = getWorkflowDetails(
workflow["href"],
bearerToken
)
if not workflowDetails:
raise ValueError("No workflow details were detected")
workflowName: str = workflowDetails["name"]
workflowVersion: str = workflowDetails["version"]
workflowDescription: str = ""
if "description" in workflowDetails:
workflowDescription = workflowDetails["description"]
workflowInputParameters: str = ""
if "input-parameters" in workflowDetails:
workflowInputParameters = \
json.dumps(workflowDetails["input-parameters"])
workflowOutputParameters: str = ""
if "output-parameters" in workflowDetails:
workflowOutputParameters = \
json.dumps(workflowDetails["output-parameters"])
contentDetails: dict = getWorkflowContent(
workflow["href"],
bearerToken
)
if not contentDetails:
raise ValueError("No workflow content were detected")
hashData = workflowName + " ~ " + \
workflowVersion + " ~ " + \
workflowDescription + " ~ " + \
workflowOutputParameters + " ~ " + \
workflowInputParameters + " ~ " + \
str(contentDetails)
workflowFolder: str = getCategoryPath(
vcoUrl,
workflowDetails["category-id"],
bearerToken
)
key: str = workflowFolder + "/" + workflowName
value: str = hashlib.sha256(
hashData.encode("utf-8")
).hexdigest()
results[key] = value
output = dict(sorted(results.items()))
# for key in output:
# print(key, output[key])
outputs = {
"status": "done",
"error": None,
"results": output
}
except Exception as err:
outputs = {
"status": "incomplete",
"error": repr(err),
"results": None
}
return outputs
|