"""
@module de.stschnell
@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.3.0
@runtime python:3.11
@memoryLimit 128000000
@inputType in_userName {string}
@inputType in_password {SecureString}
@outputType Properties
"""
import json
import time
from util.http import Http
http = Http()
def getActionId(
vcoUrl: str,
bearerToken: str,
actionModule: str,
actionName: str
) -> str:
""" Gets the ID of an action.
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} bearerToken
@param {string} actionModule - Module of the action
@param {string} actionName - Name of the action
@returns {string}
"""
returnValue: str = ""
try:
actions: dict = http.request(
url = vcoUrl + "/api/actions",
bearerToken = bearerToken
)
found: bool = False
for action in actions["link"]:
for attribute in action["attributes"]:
if attribute["name"] == "fqn" and \
attribute["value"] == actionModule + "/" + actionName:
for attribute in action["attributes"]:
if attribute["name"] == "id":
returnValue = attribute["value"]
found = True
if found:
break
except Exception as err:
raise ValueError(f"An error occurred at get Action ID - {err}") \
from err
return returnValue
def executeAction(
vcoUrl: str,
bearerToken: str,
actionId: str,
parameters: dict = {}
) -> dict:
""" Executes an action.
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} bearerToken
@param {string} actionId - ID of the action
@param {dictionary} parameters - Parameters of the action
@returns {dictionary}
"""
returnValue: dict = {}
try:
returnValue = http.request(
url = vcoUrl + "/api/actions/" + actionId + "/executions",
bearerToken = bearerToken,
method = "POST",
body = parameters
)
except Exception as err:
raise ValueError(f"An error occurred at action executing - {err}") \
from err
return returnValue
def getActionLog(
vcoUrl: str,
bearerToken: str,
executionId: str
) -> dict:
""" Delivers the action log.
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} bearerToken
@param {string} executionId - ID of the execution, from executeAction
@returns {dictionary}
"""
returnValue: dict = {}
try:
returnValue = http.request(
url = vcoUrl + "/api/actions/" + executionId + \
"/logs?maxResult=2147483647",
bearerToken = bearerToken
)
except Exception as err:
raise ValueError(f"An error occurred at get action log - {err}") \
from err
return returnValue
def callAction(
vcoUrl: str,
bearerToken: str,
actionModule: str,
actionName: str,
parameters: list
) -> dict:
""" Calls an action
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} bearerToken
@param {string} actionModule - Module of the action
@param {string} actionName - Name of the action
@param {list} parameters - Parameters of the action
@returns {dictionary}
"""
returnValue: dict = {}
try:
actionID: str = getActionId(
vcoUrl,
bearerToken,
actionModule,
actionName
)
_parameters: dict = {
"async-execution": False,
"parameters": parameters
}
executionResult: dict = executeAction(
vcoUrl,
bearerToken,
actionID,
_parameters
)
returnValue["executionResult"] = executionResult
executionId: str = executionResult["execution-id"]
time.sleep(2.5)
actionLog: dict = getActionLog(vcoUrl, bearerToken, executionId)
returnValue["actionLog"] = actionLog
except Exception as err:
raise ValueError(f"An error occurred at call action - {err}") \
from err
return returnValue
def handler(context: dict, inputs: dict) -> dict:
""" Aria Automation standard handler, the main function.
"""
vcoUrl: str = context["vcoUrl"]
bearerToken: str = context["getToken"]()
outputs: dict = {}
try:
# Begin of action call -----------------------------------------
# The setting of the parameters is described in the
# REST API Calling Conventions.
# @example
# actionModule: str = "com.vmware.constants"
# actionName: str = "getDefaultCompanyName"
# parameters: list = []
# @example
actionModule: str = "com.vmware.basic"
actionName: str = "getFileName"
parameters: list = [
{
"name": "fileName",
"type": "Path",
"value": {
"string": {
"value": "/stefan/test/bambi"
}
}
}
]
output: dict = callAction(
vcoUrl, bearerToken, actionModule, actionName, parameters
)
# print(output["executionResult"]["value"]["string"]["value"])
# delivers VMware Inc. with the first example and
# bambi with the second example
# End of action call -------------------------------------------
outputs = {
"status": "done",
"error": None,
"result": output
}
except Exception as err:
outputs = {
"status": "incomplete",
"error": repr(err),
"result": None
}
return outputs
|