"""
@module de.stschnell
@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.2.0
@runtime python:3.10
@param {string} in_operation - Operation to be executed with the file
Operations are: Create, Read, Update or Delete
@param {SecureString} in_privateToken - Token in the Git
@param {string} in_gitRepositoryName - Name of Git repository in vRA
@param {string} in_gitBranch - Branch in the Git repository
@param {string} in_fileName - Name of the file with path
@param {string} in_fileContent - Content of file in case of create or
update operation
@outputType Properties
"""
"""
@example
var operation = "create";
const privateToken = "yourGitLabToken";
const gitRepositoryName = "yourRepositoryNameInVra ";
const gitBranch = "yourBranchNameInGitLab"
var fileName = "test.txt"
var fileContent = "Hello World"
var result = System.getModule("de.stschnell").handleFileInGitLab(
operation,
privateToken,
gitRepositoryName,
gitBranch,
fileName,
fileContent
);
if (result) {
System.log(JSON.stringify(result));
}
@example
var operation = "read";
const privateToken = "yourGitLabToken";
const gitRepositoryName = "yourRepositoryNameInVra ";
const gitBranch = "yourBranchNameInGitLab"
var fileName = "test.txt"
var result = System.getModule("de.stschnell").handleFileInGitLab(
operation,
privateToken,
gitRepositoryName,
gitBranch,
fileName
);
var content = "";
if (result.result.encoding === "base64") {
var byteContent = java.util.Base64.getDecoder().decode(
java.lang.String(result.result.content).getBytes()
);
for (i = 0; i < byteContent.length; i++) {
content += String.fromCharCode(byteContent[i]);
}
System.log(content);
}
if (result) {
System.log(JSON.stringify(result));
}
@example
var operation = "update"
const privateToken = "yourGitLabToken";
const gitRepositoryName = "yourRepositoryNameInVra ";
const gitBranch = "yourBranchNameInGitLab"
var fileName = "test.txt"
var fileContent = "This is new content"
var result = System.getModule("de.stschnell").handleFileInGitLab(
operation,
privateToken,
gitRepositoryName,
gitBranch,
fileName,
fileContent
);
if (result) {
System.log(JSON.stringify(result));
}
@example
var operation = "delete"
const privateToken = "yourGitLabToken";
const gitRepositoryName = "yourRepositoryNameInVra ";
const gitBranch = "yourBranchNameInGitLab"
var fileName = "test.txt"
var result = System.getModule("de.stschnell").handleFileInGitLab(
operation,
privateToken,
gitRepositoryName,
gitBranch,
fileName
);
if (result) {
System.log(JSON.stringify(result));
}
"""
import base64
import json
import ssl
import time
import urllib.parse
import urllib.request
def request(
url: str,
user: str | None = None,
password: str | None = None,
bearerToken: str | None = None,
method: str = "GET",
header: list[list] = [],
body: dict = {},
contentType: str = "application/json;charset=utf-8",
accept: str = "application/json"
) -> dict | bytes | None:
""" Executes a REST request
@param {string} url - URL to execute the request
@param {string} user
@param {string} password
@param {string} bearerToken
@param {string} method - Method of request, e.g. GET, POST, etc
@param {list.<list>} header - Additional header entries
@param {dictionary} body - Body of request
@param {string} contentType - MIME type of the body for the request
@param {string} accept - MIME type of content expect/prefer as response
@returns {dictionary or bytes or None}
"""
returnValue: dict | bytes | None = None
try:
request: urllib.request.Request = urllib.request.Request(
url = url,
method = method,
data = bytes(json.dumps(body).encode("utf-8"))
)
if user and password:
authorization: str = base64.b64encode(
bytes(user + ":" + password, "UTF-8")
).decode("UTF-8")
request.add_header(
"Authorization", "Basic " + authorization
)
if bearerToken:
request.add_header(
"Authorization", "Bearer " + bearerToken
)
request.add_header(
"Content-Type", contentType
)
request.add_header(
"Accept", accept
)
if len(header) > 0:
for key, value in header:
request.add_header(
key, value
)
response: http.client.HTTPResponse = urllib.request.urlopen(
request,
context = ssl._create_unverified_context()
)
responseCode: int = response.status
responseRead: bytes = response.read()
if responseCode == 200 or responseCode == 202:
if len(responseRead) > 0:
if "json" in accept:
returnValue = json.loads(responseRead)
else:
returnValue = responseRead
except Exception as err:
raise Exception(f"An error occurred at request - {err}") \
from err
return returnValue
def createFileInGit(
privateToken: str,
gitUrl: str,
projectId: str,
gitBranch: str,
fileName: str,
fileContent: str
) -> dict | bytes | None:
""" Creates file in Git repository
"""
_fileName: str = urllib.parse.quote(fileName, safe = "")
return request(
url = gitUrl + "/api/v4/projects/" + projectId +
"/repository/files/" + _fileName,
method = "POST",
header = [
["PRIVATE-TOKEN", privateToken]
],
body = {
"branch": gitBranch,
"content": fileContent,
"commit_message": "Create a new file"
}
)
def readFileInGit(
privateToken: str,
gitUrl: str,
projectId: str,
gitBranch: str,
fileName: str
) -> dict | bytes | None:
""" Reades file in Git repository
"""
_fileName: str = urllib.parse.quote(fileName, safe = "")
return request(
url = gitUrl + "/api/v4/projects/" + projectId +
"/repository/files/" + _fileName +
"?ref=" + gitBranch,
header = [
["PRIVATE-TOKEN", privateToken]
]
)
def updateFileInGit(
privateToken: str,
gitUrl: str,
projectId: str,
gitBranch: str,
fileName: str,
fileContent: str
) -> dict | bytes | None:
""" Updates file in Git repository
"""
_fileName: str = urllib.parse.quote(fileName, safe = "")
return request(
url = gitUrl + "/api/v4/projects/" + projectId +
"/repository/files/" + _fileName,
method = "PUT",
header = [
["PRIVATE-TOKEN", privateToken]
],
body = {
"branch": gitBranch,
"content": fileContent,
"commit_message": "Update a file"
}
)
def deleteFileInGit(
privateToken: str,
gitUrl: str,
projectId: str,
gitBranch: str,
fileName: str
) -> dict | bytes | None:
""" Deletes file in Git repository
"""
_fileName: str = urllib.parse.quote(fileName, safe = "")
return request(
url = gitUrl + "/api/v4/projects/" + projectId +
"/repository/files/" + _fileName,
method = "DELETE",
header = [
["PRIVATE-TOKEN", privateToken]
],
body = {
"branch": gitBranch,
"commit_message": "Delete a file"
}
)
def handler(context: dict, inputs: dict) -> dict:
if not inputs["in_operation"]:
raise Exception("in_operation is required")
if not inputs["in_privateToken"]:
raise Exception("in_privateToken is required")
if not inputs["in_gitRepositoryName"]:
raise Exception("in_gitRepositoryName is required")
if not inputs["in_gitBranch"]:
raise Exception("in_gitBranch is required")
if not inputs["in_fileName"]:
raise Exception("in_fileName is required")
operation: str = inputs["in_operation"].lower()
vcoUrl: str = context["vcoUrl"]
bearerToken: str = context["getToken"]()
outputs: dict = {}
try:
result: str | dict | bytes | None = None
# Orchestrator API > Repository(Version Control) Service
contentRepositories: dict = request(
url = vcoUrl + "/api/content-repositories",
bearerToken = bearerToken
)
for attribute in contentRepositories["link"][0]["attributes"]:
if attribute["name"] == "name":
if attribute["value"] == inputs["in_gitRepositoryName"]:
repositoryAttributes: list = \
contentRepositories["link"][0]["attributes"]
if len(repositoryAttributes) > 0:
for attribute in repositoryAttributes:
if attribute["name"] == "remoteUrl":
gitUrlItems: list = attribute["value"].split("/", 3)
# Remove .git extension and URL encoded path
repositoryPath: str = urllib.parse.quote(
gitUrlItems[3].split(".", 2)[0], safe = ""
)
gitUrl: str = gitUrlItems[0] + "//" + gitUrlItems[2]
if gitUrl:
# https://docs.gitlab.com/ee/api/projects.html
projectId: str = str(request(
url = gitUrl + "/api/v4/projects/" + repositoryPath,
header = [
["PRIVATE-TOKEN", inputs["in_privateToken"]]
]
)["id"])
if projectId:
# https://docs.gitlab.com/ee/api/repository_files.html
match operation:
case "create":
result = createFileInGit(
inputs["in_privateToken"],
gitUrl,
projectId,
inputs["in_gitBranch"],
inputs["in_fileName"],
inputs["in_fileContent"]
)
case "read":
result = readFileInGit(
inputs["in_privateToken"],
gitUrl,
projectId,
inputs["in_gitBranch"],
inputs["in_fileName"]
)
# Example how to get file content, it is base64 encoded
# result = result["content"].decode()
case "update":
result = updateFileInGit(
inputs["in_privateToken"],
gitUrl,
projectId,
inputs["in_gitBranch"],
inputs["in_fileName"],
inputs["in_fileContent"]
)
case "delete":
result = deleteFileInGit(
inputs["in_privateToken"],
gitUrl,
projectId,
inputs["in_gitBranch"],
inputs["in_fileName"]
)
case _:
result = "Operation not suported"
print(result)
outputs = {
"status": "done",
"result": result,
"error": None
}
except Exception as err:
print(repr(err))
outputs = {
"status": "incomplete",
"error": repr(err)
}
return outputs
|