VCF Automation Blog

from Stefan Schnell


Integrations make it possible to add external systems to VCF Automation. On this way Git repositories can be used, that e.g. templates, workflows and actions are under source control. This functionality is among others important for auditing. In addition such an integrated Git repository can also be used directly from an action of VCF Automation to store files and thus persist data. This blog post shows how file handling with GitLab can be done from an action.

Handle Files in GitLab with Python


The following Python source code of an action shows the four basic operations to persist data in a GitLab repository, these are Create, Read, Update and Delete (CRUD).

Hint: In advance a private token must be created in GitLab for the user and of course this GitLab repository must be connected to VCF Automation. The name of the GitLab repository is required as a parameter to detect further necessary information.

"""
@module de.stschnell

@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.1.0

@runtime python:3.10

@param {string} in_operation - Operation to be executed with the file
    Operations are: Create, Read, Update or Delete
@param {SecureString} in_privateToken - Token in the Git
@param {string} in_gitRepositoryName - Name of Git repository in vRA
@param {string} in_gitBranch - Branch in the Git repository
@param {string} in_fileName - Name of the file with path
@param {string} in_fileContent - Content of file in case of create or
    update operation

@outputType Properties
"""


import base64
import json
import ssl
import time
import urllib.parse
import urllib.request


def request(
    url,
    user = None,
    password = None,
    bearerToken = None,
    method = "GET",
    header = [],
    body = {},
    contentType = "application/json;charset=utf-8",
    accept = "application/json"
):
    """ Executes a REST request

    @param {string} url - URL to execute the request
    @param {string} user
    @param {string} password
    @param {string} bearerToken
    @param {string} method - Method of request, e.g. GET, POST, etc
    @param {list.<list>} header - Additional header entries
    @param {dictionary} body - Body of request
    @param {string} contentType - MIME type of the body for the request
    @param {string} accept - MIME type of content expect/prefer as response
    @returns {dictionary or bytes}
    """

    returnValue = {}

    try:

        request = urllib.request.Request(
            url = url,
            method = method,
            data = bytes(json.dumps(body).encode("utf-8"))
        )

        if user and password:
            authorization = base64.b64encode(
                bytes(user + ":" + password, "UTF-8")
            ).decode("UTF-8")
            request.add_header(
                "Authorization", "Basic " + authorization
            )

        if bearerToken:
            request.add_header(
                "Authorization", "Bearer " + bearerToken
            )

        request.add_header(
            "Content-Type", contentType
        )

        request.add_header(
            "Accept", accept
        )

        if len(header) > 0:
            for key, value in header:
                request.add_header(
                    key, value
                )

        response = urllib.request.urlopen(
            request,
            context = ssl._create_unverified_context()
        )

        if response.getcode() == 200 or response.getcode() == 202:
            if "json" in accept:
                returnValue = json.loads(response.read())
            else:
                returnValue = response.read()

    except Exception as err:
        raise Exception(f"An error occurred at request - {err}") \
          from err

    return returnValue


def createFileInGit(
    privateToken,
    gitUrl,
    projectId,
    gitBranch,
    fileName,
    fileContent
):
    """ Creates file in Git repository
    """
    _fileName = urllib.parse.quote(fileName, safe = "")
    return request(
        url = gitUrl + "/api/v4/projects/" + projectId +
            "/repository/files/" + _fileName,
        method = "POST",
        header = [
            ["PRIVATE-TOKEN", privateToken]
        ],
        body = {
            "branch": gitBranch,
            "content": fileContent,
            "commit_message": "Create a new file"
        }
    )


def readFileInGit(
    privateToken,
    gitUrl,
    projectId,
    gitBranch,
    fileName
):
    """ Reades file in Git repository
    """
    _fileName = urllib.parse.quote(fileName, safe = "")
    return request(
        url = gitUrl + "/api/v4/projects/" + projectId +
            "/repository/files/" + _fileName +
            "?ref=" + gitBranch,
            header = [
                ["PRIVATE-TOKEN", privateToken]
            ]
    )


def updateFileInGit(
    privateToken,
    gitUrl,
    projectId,
    gitBranch,
    fileName,
    fileContent
):
    """ Updates file in Git repository
    """
    _fileName = urllib.parse.quote(fileName, safe = "")
    return request(
        url = gitUrl + "/api/v4/projects/" + projectId +
            "/repository/files/" + _fileName,
        method = "PUT",
        header = [
            ["PRIVATE-TOKEN", privateToken]
        ],
        body = {
            "branch": gitBranch,
            "content": fileContent,
            "commit_message": "Update a file"
        }
    )


def deleteFileInGit(
    privateToken,
    gitUrl,
    projectId,
    gitBranch,
    fileName
):
    """ Deletes file in Git repository
    """
    _fileName = urllib.parse.quote(fileName, safe = "")
    return request(
        url = gitUrl + "/api/v4/projects/" + projectId +
            "/repository/files/" + _fileName,
        method = "DELETE",
        header = [
            ["PRIVATE-TOKEN", privateToken]
        ],
        body = {
            "branch": gitBranch,
            "commit_message": "Delete a file"
        }
    )


def handler(context, inputs):

    if not inputs["in_operation"]:
        raise Exception("in_operation is required")

    if not inputs["in_privateToken"]:
        raise Exception("in_privateToken is required")

    if not inputs["in_gitRepositoryName"]:
        raise Exception("in_gitRepositoryName is required")

    if not inputs["in_gitBranch"]:
        raise Exception("in_gitBranch is required")

    if not inputs["in_fileName"]:
        raise Exception("in_fileName is required")

    operation = inputs["in_operation"].lower()

    vcoUrl = context["vcoUrl"]
    bearerToken = context["getToken"]()

    output = {}

    try:

        result = None

        # Orchestrator API > Repository(Version Control) Service
        result = request(
            url = vcoUrl + "/api/content-repositories",
            bearerToken = bearerToken
        )

        for attribute in result["link"][0]["attributes"]:
            if attribute["name"] == "name":
                if attribute["value"] == inputs["in_gitRepositoryName"]:
                    repositoryAttributes = result["link"][0]["attributes"]

        if len(repositoryAttributes) > 0:
            for attribute in repositoryAttributes:
                if attribute["name"] == "remoteUrl":
                    gitUrl = attribute["value"].split("/", 3)
                    # Remove .git extension and URL encoded path
                    repositoryPath = urllib.parse.quote(
                        gitUrl[3].split(".", 2)[0], safe = ""
                    )
                    gitUrl = gitUrl[0] + "//" + gitUrl[2]

        if gitUrl:
            # https://docs.gitlab.com/ee/api/projects.html
            projectId = str(request(
                url = gitUrl + "/api/v4/projects/" + repositoryPath,
                header = [
                    ["PRIVATE-TOKEN", inputs["in_privateToken"]]
                ]
            )["id"])

        if projectId:
            # https://docs.gitlab.com/ee/api/repository_files.html
            match operation:
                case "create":
                    result = createFileInGit(
                        inputs["in_privateToken"],
                        gitUrl,
                        projectId,
                        inputs["in_gitBranch"],
                        inputs["in_fileName"],
                        inputs["in_fileContent"]
                    )
                case "read":
                    result = readFileInGit(
                        inputs["in_privateToken"],
                        gitUrl,
                        projectId,
                        inputs["in_gitBranch"],
                        inputs["in_fileName"]
                    )
                    # Example how to get file content, it is base64 encoded
                    # result = result["content"].decode()
                case "update":
                    result = updateFileInGit(
                        inputs["in_privateToken"],
                        gitUrl,
                        projectId,
                        inputs["in_gitBranch"],
                        inputs["in_fileName"],
                        inputs["in_fileContent"]
                    )
                case "delete":
                    result = deleteFileInGit(
                        inputs["in_privateToken"],
                        gitUrl,
                        projectId,
                        inputs["in_gitBranch"],
                        inputs["in_fileName"]
                    )
                case _:
                    result = "Operation not suported"

        print(result)

        outputs = {
            "status": "done",
            "result": result,
            "error": None
        }

    except Exception as err:

        print(repr(err))

        outputs = {
            "status": "incomplete",
            "error": repr(err)
        }

    return outputs