VCF Automation Blog

from Stefan Schnell


Get Configuration Variable Value with Python


"""
@module de.stschnell

@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.4.0

@runtime python:3.10

@memoryLimit 128000000

@inputType in_userName {string}
@inputType in_password {string}

@outputType Properties
"""


import base64
import json
import ssl
import urllib.request


def request(
    url,
    user = None,
    password = None,
    bearerToken = None,
    method = "GET",
    header = [],
    body = {},
    contentType = "application/json;charset=utf-8",
    accept = "application/json"
):
    """ Executes a REST request

    @param {string} url - URL to execute the request
    @param {string} user
    @param {string} password
    @param {string} bearerToken
    @param {string} method - Method of request, e.g. GET, POST, etc
    @param {list.<list>} header - Additional header entries
    @param {dictionary} body - Body of request
    @param {string} contentType - MIME type of the body for the request
    @param {string} accept - MIME type of content expect/prefer as response
    @returns {dictionary or bytes}
    """

    returnValue = {}

    try:

        request = urllib.request.Request(
            url = url,
            method = method,
            data = bytes(json.dumps(body).encode("utf-8"))
        )

        if user and password:
            authorization = base64.b64encode(
                bytes(user + ":" + password, "UTF-8")
            ).decode("UTF-8")
            request.add_header(
                "Authorization", "Basic " + authorization
            )

        if bearerToken:
            request.add_header(
                "Authorization", "Bearer " + bearerToken
            )

        request.add_header(
            "Content-Type", contentType
        )

        request.add_header(
            "Accept", accept
        )

        if len(header) > 0:
            for key, value in header:
                request.add_header(
                    key, value
                )

        response = urllib.request.urlopen(
            request,
            context = ssl._create_unverified_context()
        )

        if response.getcode() == 200 or response.getcode() == 202:
            if "json" in accept:
                returnValue = json.loads(response.read())
            else:
                returnValue = response.read()

    except Exception as err:
        raise Exception(f"An error occurred at request - {err}") \
          from err

    return returnValue


def getConfigurationVariableValue(
    vcoUrl,
    bearerToken,
    configurationFolder,
    configurationName,
    configurationVariableName,
    configurationVariableType
):
    """ Gets a value of a variable from a configuration

    @param {string} vcoUrl - URL of Aria orchestrator
    @param {string} bearerToken
    @param {string} configurationFolder - Path of the configuration
    @param {string} configurationName - Name of the configuration
    @param {string} configurationVariableName - Name of the variable
    @param {string} configurationVariableType - Type of the variable
    """

    returnValue = None

    try:

        configurations = request(
            url = vcoUrl + "/api/configurations",
            bearerToken = bearerToken
        )

        found = False
        for configuration in configurations["link"]:
            id = None
            for attribute in configuration["attributes"]:
                if attribute["name"] == "name" and \
                attribute["value"] == configurationName:
                    found = True
                if attribute["name"] == "id":
                    id = attribute["value"]

            if found == True and id != None:

                categoryId = request(
                    url = vcoUrl + "/api/configurations/" + id,
                    bearerToken = bearerToken
                )["category-id"]

                categoryPath = request(
                    url = vcoUrl + "/api/categories/" + categoryId,
                    bearerToken = bearerToken
                )["path"]

                if categoryPath != configurationFolder:
                    found = False

            if found:
                break

        if id != None:

            config = request(
                url = vcoUrl + "/api/configurations/" + id,
                bearerToken = bearerToken
            )

            for attribute in config["attributes"]:
                if attribute["name"] == configurationVariableName:
                    returnValue = \
                    attribute["value"][configurationVariableType.lower()]
                    break

    except Exception as err:
        raise ValueError(f"An error occurred at get variable value - {err}") \
          from err

    return returnValue


def handler(context, inputs):
    """ Aria Automation standard handler, the main function.
    """

    vcoUrl = context["vcoUrl"]
    bearerToken = context["getToken"]()

    # @example
    configurationFolder = "Configurations"
    configurationName = "Host Security Hardening Options"
    configurationVariableName = "authSource"
    configurationVariableType = "string"
    # In the HOL, this delivers local.
    # Types are: string, boolean, number, date, secure-string, array,
    # sdk-object, properties, encrypted-string, mime-attachment and
    # regex.
    # Hint: Do not use a leading slash for the folder variable.

    output = {}

    try:

        output = getConfigurationVariableValue(
            vcoUrl,
            bearerToken,
            configurationFolder,
            configurationName,
            configurationVariableName,
            configurationVariableType
        )

        outputs = {
            "status": "done",
            "error": None,
            "result": output
        }

    except Exception as err:

        outputs = {
            "status": "incomplete",
            "error": repr(err),
            "result": output
        }

    return outputs

vcf automation orchestrator python action to get configuration variable value