VCF Automation Blog

from Stefan Schnell


Get Resource with Python


"""
@module de.stschnell

@version 0.1.0

@runtime python:3.10

@inputType in_userName {string}
@inputType in_password {string}

@outputType Properties
"""


import base64
import json
import ssl
import urllib.request


def request(
    url,
    user = None,
    password = None,
    bearerToken = None,
    method = "GET",
    body = {},
    contentType = "application/json;charset=utf-8",
    accept = "application/json"
):
    """ Executes a REST request

    @param {string} url - URL to execute the request
    @param {string} user
    @param {string} password
    @param {string} bearerToken
    @param {string} method - Method of request, e.g. GET, POST, etc
    @param {dictionary} body - Body of request
    @param {string} contentType - MIME type of the body for the request
    @param {string} accept - MIME type of response content
    @returns {dictionary}
    """

    returnValue = {}

    try:

        request = urllib.request.Request(
            url = url,
            method = method,
            data = bytes(json.dumps(body).encode("utf-8"))
        )

        if user and password:
            authorization = base64.b64encode(
                bytes(user + ":" + password, "UTF-8")
            ).decode("UTF-8")
            request.add_header(
                "Authorization", "Basic " + authorization
            )

        if bearerToken:
            request.add_header(
                "Authorization", "Bearer " + bearerToken
            )

        request.add_header(
            "Content-Type", contentType
        )

        request.add_header(
            "Accept", accept
        )

        response = urllib.request.urlopen(
            request,
            context = ssl._create_unverified_context()
        )

        if response.getcode() == 200 or response.getcode() == 202:
            if "json" in accept:
                returnValue = json.loads(response.read())
            else:
                returnValue = response.read()

    except Exception as err:
        raise Exception(f"An error occurred at request - {err}") \
          from err

    return returnValue


def getResource(
    vcoUrl,
    bearerToken,
    resourceFolder,
    resourceName,
    mimeType
):
    """ Gets a resource

    @param {string} vcoUrl - URL of Aria orchestrator
    @param {string} bearerToken
    @param {string} resourceFolder - Path of the resource
    @param {string} resourceName - Name of the resource
    @param {string} mimeType - MIME type of the resource
    """

    returnValue = None

    try:

        resources = request(
            url = vcoUrl + "/api/resources",
            bearerToken = bearerToken
        )

        found = False
        for resource in resources["link"]:
            id = None
            for attribute in resource["attributes"]:
                if attribute["name"] == "name" and \
                attribute["value"] == resourceName:
                    found = True
                if attribute["name"] == "id":
                    id = attribute["value"]

            if found == True and id != None:

                categoryId = request(
                    url = vcoUrl + "/api/resources/" + id,
                    bearerToken = bearerToken,
                    accept = \
                    "application/vnd.o11n.resource.metadata+json;charset=UTF-8"
                )["category-id"]

                categoryPath = request(
                    url = vcoUrl + "/api/categories/" + categoryId,
                    bearerToken = bearerToken
                )["path"]

                if categoryPath != resourceFolder:
                    found = False

            if found:
                break

        if id != None:

            returnValue = request(
                url = vcoUrl + "/api/resources/" + id,
                bearerToken = bearerToken,
                accept = mimeType
            )

    except Exception as err:
        raise ValueError(f"An error occurred at get resource id - {err}") \
            from err

    return returnValue


def handler(context, inputs):
    """ Aria Automation standard handler, the main function.
    """

    vcoUrl = context["vcoUrl"]
    bearerToken = context["getToken"]()

    resourceFolder = "Library/VC/Configuration"
    resourceName = "b56969bb-eef7-459c-aad1-13d23ece2f97"
    mimeType = "text/xml"

    # resourceFolder = "Library/VC/Configuration"
    # resourceName = "properties.json"
    # mimeType = "application/json"

    output = None

    try:

        output = getResource(
            vcoUrl,
            bearerToken,
            resourceFolder,
            resourceName,
            mimeType
        )

        if not isinstance(output, dict):
            output = output.decode("utf-8")

        outputs = {
            "status": "done",
            "error": None,
            "result": output
        }

    except Exception as err:

        outputs = {
            "status": "incomplete",
            "error": repr(err),
            "result": output
        }

    return outputs

vcf automation orchestrator get resource action with result