VCF Automation Blog

from Stefan Schnell

Calculate SHA256 Hashes for Workflows


This Python source code shows how to built hash values of workflows. It uses the name, version, description, input-parameters, output-parameters and content of the workflow. This hash value can be used to compare a workflow in different stages, like development, integration and production. On this way it can be ensured that a change of a workflow can be made visible, if the hash values in the stages differ. The proof that a workflow has been transferred unchanged to another stage is a very important factor in security audits.

"""
Python action to calculate Secure Hash Algorithm SHA256 values for
all workflows.

@name getHashesForWorkflows
@returns {Properties}

@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.1.0

Checked with Aria Automation 8.17.0
"""

import hashlib
import json
import ssl
import urllib.request



def getAllWorkflows(vcoUrl, bearerToken):
    """
    Get all workflows.

    @param {string} vcoUrl - URL of Aria orchestrator
    @param {string} bearerToken
    @returns {dictionary}
    """

    returnValue = {}

    try:

        requestWorkflows = urllib.request.Request(
            url = vcoUrl + "/api/workflows"
        )
        requestWorkflows.add_header(
          "Authorization", "Bearer " + bearerToken
        )
        requestWorkflows.add_header(
          "Content-Type", "application/json"
        )

        responseWorkflows = urllib.request.urlopen(
            requestWorkflows,
            context = ssl._create_unverified_context()
        )

        if responseWorkflows.getcode() == 200:
            returnValue = json.loads(responseWorkflows.read())

    except Exception as err:
        raise Exception("An error occurred at detecting all workflows") \
            from err

    return returnValue



def getWorkflowDetails(workflowHref, bearerToken):
    """
    Gets the details of the given workflow.

    @param {string} workflowHref - Horizontal reference of workflow
    @param {string} bearerToken
    @returns {dictionary}
    """

    returnValue = {}

    try:

        requestWorkflow = urllib.request.Request(
            url = workflowHref
        )
        requestWorkflow.add_header(
            "Authorization", "Bearer " + bearerToken
        )
        requestWorkflow.add_header(
            "Content-Type", "application/json"
        )

        responseWorkflow = urllib.request.urlopen(
            requestWorkflow,
            context = ssl._create_unverified_context()
        )

        if responseWorkflow.getcode() == 200:
            returnValue = json.loads(responseWorkflow.read())

    except Exception as err:
        raise Exception("An error occurred at detecting workflow details") \
            from err

    return returnValue



def getWorkflowContent(workflowHref, bearerToken):
    """
    Gets the content of the given workflow.

    @param {string} workflowHref - Horizontal reference of workflow
    @param {string} bearerToken
    @returns {dictionary}
    """

    returnValue = {}

    try:

        requestContent = urllib.request.Request(
            url = workflowHref + "content/"
        )
        requestContent.add_header(
            "Authorization", "Bearer " + bearerToken
        )
        requestContent.add_header(
            "Content-Type", "application/json"
        )

        responseContent = urllib.request.urlopen(
            requestContent,
            context = ssl._create_unverified_context()
        )

        if responseContent.getcode() == 200:
            returnValue = json.loads(responseContent.read())

    except Exception as err:
        raise Exception("An error occurred at detecting workflow content") \
            from err

    return returnValue



def getCategoryPath(vcoUrl, workflowCategoryId, bearerToken):
    """
    Gets the folder (category path) of the given workflow.

    @param {string} vcoUrl - URL of Aria orchestrator
    @param {string} workflowCategoryId - GUID of category (folder)
    @param {string} bearerToken
    @returns {dictionary}
    """

    returnValue = ""

    try:

        requestCategory = urllib.request.Request(
            url = vcoUrl + "/api/categories/" + workflowCategoryId
        )
        requestCategory.add_header(
            "Authorization", "Bearer " + bearerToken
        )
        requestCategory.add_header(
            "Content-Type", "application/json"
        )

        responseCategory = urllib.request.urlopen(
            requestCategory,
            context = ssl._create_unverified_context()
        )

        if responseCategory.getcode() == 200:
            categoryDetails = json.loads(responseCategory.read())
            returnValue = categoryDetails["path"]

    except Exception as err:
        raise Exception("An error occurred at detecting category path") \
            from err

    return returnValue



def handler(context, inputs):
    """
    Aria Automation standard handler, the main function.
    """

    results = {}

    try:

        vcoUrl = context["vcoUrl"]
        bearerToken = context["getToken"]()

        workflows = getAllWorkflows(
            vcoUrl,
            bearerToken
        )
        if not workflows:
            raise ValueError("No workflows were detected")

        for workflow in workflows["link"]:

            workflowDetails = getWorkflowDetails(
                workflow["href"],
                bearerToken
            )
            if not workflowDetails:
                raise ValueError("No workflow details were detected")

            workflowName = workflowDetails["name"]
            workflowVersion = workflowDetails["version"]
            if "description" in workflowDetails:
                workflowDescription = workflowDetails["description"]
            else:
                workflowDescription = ""
            if "input-parameters" in workflowDetails:
                workflowInputParameters = \
                    workflowDetails["input-parameters"]
            else:
                workflowInputParameters = ""
            if "output-parameters" in workflowDetails:
                workflowOutputParameters = \
                    workflowDetails["output-parameters"]
            else:
                workflowOutputParameters = ""

            contentDetails = getWorkflowContent(
                workflow["href"],
                bearerToken
            )
            if not contentDetails:
                raise ValueError("No workflow content were detected")

            hashData = workflowName + " ~ " + \
                workflowVersion + " ~ " + \
                workflowDescription + " ~ " + \
                str(workflowOutputParameters) + " ~ " + \
                str(workflowInputParameters) + " ~ " + \
                str(contentDetails)

            workflowFolder = getCategoryPath(
                vcoUrl,
                workflowDetails["category-id"],
                bearerToken
            )

            key = workflowFolder + "/" + workflowName
            value = hashlib.sha256(
                hashData.encode("utf-8")
            ).hexdigest()

            results[key] = value

        results = dict(sorted(results.items()))

        for key in results:
            print(key, results[key])

        outputs = {
            "status": "done",
            "results": results
        }

    except Exception as err:

        outputs = {
            "status": "incomplete",
            "error": repr(err),
            "results": results
        }

    return outputs