Select Git revision
pwm.py 15.79 KiB
#!/usr/bin/env python3
"""
Practical Work Manager (pwm)
Steven Liatti, Florent Gluck, Adrien Lescourt
2020-2025
"""
from dataclasses import dataclass
from requests.models import Response
from typing import Any, Dict, List, Optional
import argparse
import json
import os
import requests
import subprocess
import yaml
BASE_URL: str = "https://gitedu.hesge.ch"
BASE_API_URL: str = BASE_URL + "/api/v4"
TOKEN_URL: str = BASE_URL + "/-/user_settings/personal_access_tokens"
@dataclass
class User:
def __init__(self, user_id: str) -> None:
self.id = user_id
@dataclass
class Group:
def __init__(self, name: str, user_ids: List[User]) -> None:
self.name = name
self.user_ids = user_ids
@dataclass
class DataSource:
def __init__(self, groups: List[Group]) -> None:
self.groups = groups
@classmethod
def from_yaml(cls, yaml_file_path: str) -> "DataSource":
if not cls._validate_yaml(yaml_file_path):
print("Syntax error in YAML file.\nAborted.")
exit(1)
groups = []
repos = yaml.full_load(yaml_file_path)
for repo in repos:
if "name" in repo:
name = repo["name"]
else:
name = repo["users_login_aai"]
groups.append(Group(name, repo["users"]))
return cls(groups)
@staticmethod
def _validate_yaml(yaml_file_path) -> bool:
with open(yaml_file_path) as f:
repos = yaml.full_load(f)
for repo in repos:
if "name" in repo:
pass
elif "users_login_aai" in repo:
pass
else:
return False
return True
class Gitlab:
def __init__(self, token: str):
self.token = token
def create_group(self, group: Group, visibility: str = "private") -> str:
"""Create gitlab group, Returns group_id created."""
params = {"path": group.name, "name": group.name, "visibility": visibility}
headers = {"PRIVATE-TOKEN": self.token}
group_res = requests.post(
BASE_API_URL + "/group_ress", params=params, headers=headers
).json()
if "message" in group_res:
print("Error in creating group_res: %s" % group_res)
exit(1)
print(
"group_res '"
+ group_res["name"]
+ "' with id '"
+ str(group_res["id"])
+ "' and visibility '"
+ group_res["visibility"]
+ "' available at '"
+ group_res["web_url"]
+ "'"
)
return str(group_res["id"])
def delete_group(self, group_id: str):
"""Delete a group and all subprojects."""
headers = {"PRIVATE-TOKEN": self.token}
deleted_group = requests.delete(
BASE_API_URL + "/groups/" + group_id, headers=headers
).json()
if "message" in deleted_group:
if deleted_group["message"] != "202 Accepted":
print("Error in deleting group: %s" % deleted_group)
exit(1)
else:
print("Group " + group_id + " successfully deleted")
def create_repository(
self,
group_id: str,
users: List[User],
name: str,
import_url: Optional[str],
expires_at: Optional[str],
):
"""
Create repository in group_id, with members from user_ids, a name, and
optional import_url and expiration date.
"""
headers = {"PRIVATE-TOKEN": self.token}
# Create project from name, import_url (if given) and group_id
params = {"name": name, "namespace_id": group_id, "visibility": "private"}
if import_url:
params["import_url"] = import_url
project = requests.post(
BASE_API_URL + "/projects", params=params, headers=headers
).json()
if "message" in project:
print("Error in creating project: %s" % project)
exit(1)
print(
"Project '" + project["name"] + "' at '" + project["web_url"] + "' created"
)
# Allow users with developer access level to push and merge on master
access_level = 40
params = {
"name": "master",
"push_access_level": str(access_level),
"merge_access_level": str(access_level),
}
requests.post(
BASE_API_URL + "/projects/" + str(project["id"]) + "/protected_branches",
params=params,
headers=headers,
).json()
# Add each student as maintainer (level 40)
for user in users:
params = {"user_id": user.id, "access_level": access_level}
if expires_at:
params["expires_at"] = expires_at
new_user = requests.post(
BASE_API_URL + "/projects/" + str(project["id"]) + "/members",
params=params,
headers=headers,
).json()
if "message" in new_user:
print("Error in adding user: %s" % new_user)
else:
out = (
"Adding '"
+ new_user["name"]
+ "' ("
+ new_user["username"]
+ ") in '"
+ project["name"]
+ "' with access level: "
+ str(new_user["access_level"])
)
if expires_at:
out += ", expires at: " + new_user["expires_at"]
print(out)
@staticmethod
def _paginate_responses(
url: str, headers: Dict[str, str], params: Dict[str, Any]
) -> List[Response]:
"""
Manage gitlab pagination, max 100 results by request
"""
responses = [requests.get(url, params=params, headers=headers)]
last_response = responses[len(responses) - 1]
if last_response.status_code != 200:
print(last_response.text)
return []
while (
last_response.status_code == 200
and len(last_response.headers["X-Next-Page"]) != 0
):
next_page = last_response.headers["X-Next-Page"]
params["page"] = next_page
responses.append(requests.get(url, params=params, headers=headers))
last_response = responses[len(responses) - 1]
return responses
def get_users_in_repository(self, id: str) -> List:
"""
Return members list from given id
"""
# BUG: does not work if the repo members are inherited from a group
url = BASE_API_URL + "/projects/" + id + "/members"
headers = {"PRIVATE-TOKEN": self.token}
params = {"simple": "true", "order_by": "name", "sort": "asc", "per_page": 100}
responses = self._paginate_responses(url, headers, params)
members = []
for r in responses:
members += r.json()
return members
def get_projects_in_group(self, id: str) -> List:
"""
Return projects list from given group id
"""
url = BASE_API_URL + "/groups/" + id + "/projects"
headers = {"PRIVATE-TOKEN": self.token}
params = {"simple": "true", "order_by": "name", "sort": "asc", "per_page": 100}
responses = self._paginate_responses(url, headers, params)
projects = []
for r in responses:
projects += r.json()
return projects
def clone_all(self, id: str, directory: str, until_date: Optional[str]):
"""
Clone all repositories from a group id in directory (created in function).
"""
try:
os.mkdir(directory)
except OSError:
print("Creation of the directory '%s' failed, exit\n" % directory)
exit(1)
headers = {"PRIVATE-TOKEN": self.token}
repositories = self.get_projects_in_group(id)
for repo in repositories:
repo_url = BASE_API_URL + "/projects/" + str(repo["id"]) + "/members"
members = requests.get(repo_url, headers=headers).json()
if "message" in members:
print("Error retrieving members: " + members["message"])
exit(1)
web_url = repo["web_url"]
members_names = ""
for member in members:
if member["access_level"] > 20: # Access level greater than "Reporter"
members_names += member["username"] + ", "
repo_local_name = repo["path"]
print("Members: " + members_names)
print("Web url: " + web_url)
print('Cloning in "' + directory + "/" + repo_local_name + '"')
scheme = "https://"
after_https = BASE_URL.find(scheme) + len(scheme)
url = (
BASE_URL[:after_https]
+ "{}:{}@".format("gitlab-ci-token", self.token)
+ BASE_URL[after_https:]
)
subprocess.run(
[
"git",
"clone",
"-q",
web_url.replace(BASE_URL, url),
directory + "/" + repo_local_name,
]
)
if until_date:
commit_id = (
subprocess.check_output(
[
"git",
"rev-list",
"-n",
"1",
'--before="' + until_date + '"',
"master",
],
cwd=directory + "/" + repo_local_name,
)
.decode("utf-8")
.rstrip()
)
subprocess.run(
["git", "checkout", "-q", str(commit_id)],
cwd=directory + "/" + repo_local_name,
)
print("Checkout at " + str(commit_id) + "\n")
else:
print()
def command_create_group_repos(args):
"""
For each repository listed in given file, create a group and a repo in group.
Add users in every repo created
"""
gl = Gitlab(args.token)
ds = DataSource.from_yaml(args.repos_file)
group_id = gl.create_group(args.group_name, args.visibility)
for group in ds.groups:
gl.create_repository(
group_id,
group.user_ids,
group.name,
args.import_url,
args.expires_at,
)
print(f"created repo: {group.name} with the users: {group.user_ids}")
print()
def command_clone_all(args):
gl = Gitlab(args.token)
gl.clone_all(args.id, args.directory, args.until_date)
def command_list_projects(args):
gl = Gitlab(args.token)
projects = gl.get_projects_in_group(args.group_id)
if args.show:
if args.show == "all":
print(json.dumps(projects, indent=2))
elif args.show == "url":
results = list(map(lambda p: p["http_url_to_repo"], projects))
for r in results:
print(r)
elif args.show == "ssh":
results = list(map(lambda p: p["ssh_url_to_repo"], projects))
for r in results:
print(r)
else:
names = list(map(lambda p: p["name"], projects))
for name in names:
print(name)
else:
names = list(map(lambda p: p["name"], projects))
for name in names:
print(name)
def command_list_users(args):
gl = Gitlab(args.token)
members = gl.get_users_in_repository(args.project_id)
if args.show:
if args.show == "all":
print(json.dumps(members, indent=2))
elif args.show == "url":
results = list(map(lambda p: p["web_url"], members))
for r in results:
print(r)
else:
names = list(map(lambda p: p["username"], members))
for name in names:
print(name)
else:
names = list(map(lambda p: p["username"], members))
for name in names:
print(name)
def main():
parser = argparse.ArgumentParser(
description="Practical Work Manager - \
Manage students PW - Create group, projects or clone repositories"
)
parser.set_defaults(func=lambda _: parser.print_help())
parser.add_argument(
"-t",
"--token",
metavar="TOKEN",
help=f"Create a token here: {TOKEN_URL}. -t is not needed if the env var GITEDU_TOKEN is set.",
)
subparsers = parser.add_subparsers(
metavar="(group_repos | clone | list_project | list_users)"
)
parser_group_repos = subparsers.add_parser(
"group_repos", help="Create group and repositories from file"
)
parser_group_repos.add_argument(
"group_name", metavar="GROUP_NAME", help="The group name."
)
parser_group_repos.add_argument(
"repos_file",
metavar="REPOS_FILE",
help="YAML file with projects names and/or students emails.",
)
parser_group_repos.add_argument(
"--visibility", help="Group visibility. By default private.", default="private"
)
parser_group_repos.add_argument(
"-i",
"--import_url",
help="Import the publicly accessible project by URL given here (optional).",
)
parser_group_repos.add_argument(
"-x",
"--expires_at",
help="Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).",
)
parser_group_repos.set_defaults(func=command_create_group_repos)
parser_clone = subparsers.add_parser("clone", help="Clone the repositories locally")
group_clone = parser_clone.add_mutually_exclusive_group()
parser_clone.add_argument(
"group_id",
metavar="GROUP_ID",
help="The group_id (int) of the projects.",
)
parser_clone.add_argument(
"directory",
metavar="DIRECTORY",
help="Local directory where clone all repositories.",
)
parser_clone.add_argument(
"-u",
"--until_date",
help='Do a git checkout for all repositories at given date, format "YYYY-MM-DD hh:mm" (optional).',
)
parser_clone.add_argument(
"--use_http",
help="Use the HTTP client instead of SSH. False by default.",
action="store_true",
)
parser_clone.set_defaults(func=command_clone_all)
parser_list = subparsers.add_parser(
"list_projects", help="List all project in a group"
)
parser_list.add_argument("group_id", metavar="GROUP_ID", help="The group_id (int).")
parser_list.add_argument(
"-s",
"--show",
help="Amount of informations (default name) : [all | name | url | ssh]",
)
parser_list.set_defaults(func=command_list_projects)
parser_list = subparsers.add_parser(
"list_users", help="List all users in a repository"
)
parser_list.add_argument(
"project_id", metavar="PROJECT_ID", help="The repository project_id (int)."
)
parser_list.add_argument(
"-s",
"--show",
help="Amount of informations (default name) : [all | name | url | ssh]",
)
parser_list.set_defaults(func=command_list_users)
args = parser.parse_args()
if not args.token:
if os.environ.get("GITEDU_TOKEN"):
args.token = os.environ.get("GITEDU_TOKEN")
else:
token_file = os.environ.get("HOME", "") + "/.config/gitedu_token"
if os.path.isfile(token_file):
with open(token_file) as file:
args.token = file.read().strip()
if args.token is None:
print(
"Error: you must give a valid api token. Create a token here: "
+ TOKEN_URL
)
exit(1)
args.func(args)
if __name__ == "__main__":
main()