VCF Automation Blog

from Stefan Schnell

Calculate SHA256 Hashes for Workflows


This Python source code shows how to built hash values of workflows. It uses the name, version, description, input-parameters, output-parameters and content of the workflow. This hash value can be used to compare a workflow in different stages, like development, integration and production. On this way it can be ensured that a change of a workflow can be made visible, if the hash values in the stages differ. The proof that a workflow has been transferred unchanged to another stage is a very important factor in security audits.

"""
Python action to calculate Secure Hash Algorithm SHA256 values for
all workflows.

@name getHashesForWorkflows
@runtime python:3.11
@returns {Properties}

@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.2.0

Checked with Aria Automation 8.17.0, 8.18.1 and
VCF Automation 9.0.0
"""

import hashlib
import json
from util.http import Http

http = Http()


def getAllWorkflows(
    vcoUrl: str,
    bearerToken: str
) -> dict:
    """ Get all workflows.

    @param {string} vcoUrl - URL of Aria orchestrator
    @param {string} bearerToken
    @returns {dictionary}
    """

    returnValue: dict = {}

    try:

        returnValue = http.request(
            url = vcoUrl + "/api/workflows",
            bearerToken = bearerToken
        )

    except Exception as err:
        raise Exception("An error occurred at detecting all workflows") \
            from err

    return returnValue


def getWorkflowDetails(
    workflowHref: str,
    bearerToken: str
) -> dict:
    """ Gets the details of the given workflow.

    @param {string} workflowHref - Hyper reference of workflow
    @param {string} bearerToken
    @returns {dictionary}
    """

    returnValue: dict = {}

    try:

        returnValue = http.request(
            url = workflowHref,
            bearerToken = bearerToken
        )

    except Exception as err:
        raise Exception("An error occurred at detecting workflow details") \
            from err

    return returnValue


def getWorkflowContent(
    workflowHref: str,
    bearerToken: str
) -> dict:
    """
    Gets the content of the given workflow.

    @param {string} workflowHref - Hyper reference of workflow
    @param {string} bearerToken
    @returns {dictionary}
    """

    returnValue: dict = {}

    try:

        returnValue = http.request(
            url = workflowHref + "content/",
            bearerToken = bearerToken
        )

    except Exception as err:
        raise Exception("An error occurred at detecting workflow content") \
            from err

    return returnValue


def getCategoryPath(
    vcoUrl: str,
    workflowCategoryId: str,
    bearerToken: str
) -> str:
    """
    Gets the folder (category path) of the given workflow.

    @param {string} vcoUrl - URL of Aria orchestrator
    @param {string} workflowCategoryId - GUID of category (folder)
    @param {string} bearerToken
    @returns {string}
    """

    returnValue: str = ""

    try:

        returnValue = http.request(
            url = vcoUrl + "/api/categories/" + workflowCategoryId,
            bearerToken = bearerToken
        )["path"]

    except Exception as err:
        raise Exception("An error occurred at detecting category path") \
            from err

    return returnValue


def handler(context: dict, inputs: dict) -> dict:
    """ Aria Automation standard handler, the main function.
    """

    results: dict = {}
    output: dict = {}

    try:

        vcoUrl: str = context["vcoUrl"]
        bearerToken: str = context["getToken"]()

        workflows: dict = getAllWorkflows(
            vcoUrl,
            bearerToken
        )
        if not workflows:
            raise ValueError("No workflows were detected")

        for workflow in workflows["link"]:

            workflowDetails: dict = getWorkflowDetails(
                workflow["href"],
                bearerToken
            )
            if not workflowDetails:
                raise ValueError("No workflow details were detected")

            workflowName: str = workflowDetails["name"]
            workflowVersion: str = workflowDetails["version"]
            workflowDescription: str = ""
            if "description" in workflowDetails:
                workflowDescription = workflowDetails["description"]
            workflowInputParameters: str = ""
            if "input-parameters" in workflowDetails:
                workflowInputParameters = \
                    json.dumps(workflowDetails["input-parameters"])
            workflowOutputParameters: str = ""
            if "output-parameters" in workflowDetails:
                workflowOutputParameters = \
                    json.dumps(workflowDetails["output-parameters"])

            contentDetails: dict = getWorkflowContent(
                workflow["href"],
                bearerToken
            )
            if not contentDetails:
                raise ValueError("No workflow content were detected")

            hashData = workflowName + " ~ " + \
                workflowVersion + " ~ " + \
                workflowDescription + " ~ " + \
                workflowOutputParameters + " ~ " + \
                workflowInputParameters + " ~ " + \
                str(contentDetails)

            workflowFolder: str = getCategoryPath(
                vcoUrl,
                workflowDetails["category-id"],
                bearerToken
            )

            key: str = workflowFolder + "/" + workflowName
            value: str = hashlib.sha256(
                hashData.encode("utf-8")
            ).hexdigest()

            results[key] = value

        output = dict(sorted(results.items()))

        # for key in output:
        #     print(key, output[key])

        outputs = {
            "status": "done",
            "error": None,
            "results": output
        }

    except Exception as err:

        outputs = {
            "status": "incomplete",
            "error": repr(err),
            "results": None
        }

    return outputs

calculate SHA256 hashes for workflows with python

References