#!/usr/bin/env python3 """ Practical Work Manager (pwm) Steven Liatti, Florent Gluck, Adrien Lescourt 2020-2025 """ from dataclasses import dataclass from requests.models import Response from typing import Any import argparse import json import os import requests import subprocess import yaml BASE_URL: str = "https://gitedu.hesge.ch" BASE_API_URL: str = BASE_URL + "/api/v4" TOKEN_URL: str = BASE_URL + "/-/user_settings/personal_access_tokens" @dataclass class User: user_login_aai: str @dataclass class Group: name: str user_ids: list[User] @dataclass class DataSource: groups: list[Group] @classmethod def from_yaml(cls, yaml_file_path: str) -> "DataSource": if not cls._validate_yaml(yaml_file_path): print("Syntax error in YAML file.\nAborted.") exit(1) groups = [] with open(yaml_file_path) as f: repos = yaml.load(f, Loader=yaml.FullLoader) for repo in repos: if "name" in repo: name = repo["name"] else: name = repo["users_login_aai"] groups.append(Group(name, [User(u) for u in repo["users_login_aai"]])) return cls(groups) @staticmethod def _validate_yaml(yaml_file_path) -> bool: with open(yaml_file_path) as f: repos = yaml.load(f, Loader=yaml.FullLoader) for repo in repos: if "name" in repo: pass elif "users_login_aai" in repo: pass else: return False return True class Gitlab: def __init__(self, token: str): self.token = token def create_group(self, name: str, visibility: str = "private") -> str: """Create gitlab group, Returns group_id created.""" params = {"path": name, "name": name, "visibility": visibility} headers = {"PRIVATE-TOKEN": self.token} request_res = requests.post( BASE_API_URL + "/groups", params=params, headers=headers ) if request_res.status_code != 201: print(f"Error: {request_res.status_code}") print(request_res.text) exit(1) res = request_res.json() print(f"Group created: name: {res['name']}, id: {res['id']}") return str(res["id"]) def delete_group(self, group_id: str): """Delete a group and all subprojects.""" headers = {"PRIVATE-TOKEN": self.token} deleted_group = requests.delete( BASE_API_URL + "/groups/" + group_id, headers=headers ).json() if "message" in deleted_group: if deleted_group["message"] != "202 Accepted": print("Error in deleting group: %s" % deleted_group) exit(1) else: print("Group " + group_id + " successfully deleted") def get_user_id(self, user: User) -> str | None: headers = {"PRIVATE-TOKEN": self.token} res = requests.get( BASE_API_URL + "/users", params={"search": user.user_login_aai}, headers=headers, ) if res.status_code != 200: print(f"Error searching user {user.user_login_aai}") print(res.text) else: res_json = res.json() if len(res_json) == 1: return str(res.json()[0]["id"]) elif len(res_json) > 1: print(f"ERROR: More than one users found for {user.user_login_aai}") else: print(f"ERROR: No user found for {user.user_login_aai}") def add_users_to_repository( self, project_id: str, project_name: str, users: list[User], expires_at: str | None, ) -> list[User]: """Add users to a repo, and set their permissions, returns all user added""" headers = {"PRIVATE-TOKEN": self.token} # Allow users with developer access level to push and merge on master access_level = 40 params = { "name": "master", "push_access_level": str(access_level), "merge_access_level": str(access_level), } res = requests.post( BASE_API_URL + "/projects/" + str(project_id) + "/protected_branches", params=params, headers=headers, ) if res.status_code != 201: raise Exception( f"Error setting project ({project_name}) premissions", res.text ) added_users = [] for user in users: user_id = self.get_user_id(user) if user_id is None: print( f"ERROR: User {user.user_login_aai} will not be added to project: {project_name}" ) continue params = {"user_id": user_id, "access_level": access_level} if expires_at: params["expires_at"] = expires_at res = requests.post( BASE_API_URL + "/projects/" + project_id + "/members", params=params, headers=headers, ) if res.status_code != 201: print( f"ERROR: User {user.user_login_aai} will not be added to project: {project_name}" ) print(res.text) added_users.append(user) return added_users def create_repository( self, group_id: str, name: str, import_url: str | None, ) -> str: """ Create repository in group_id, with a name and optional import_url """ headers = {"PRIVATE-TOKEN": self.token} # Create project from name, import_url (if given) and group_id params = {"name": name, "namespace_id": group_id, "visibility": "private"} if import_url: params["import_url"] = import_url res = requests.post(BASE_API_URL + "/projects", params=params, headers=headers) if res.status_code != 201: raise Exception(f"Error creating project {name}", res.text) return str(res.json()["id"]) @staticmethod def _paginate_responses( url: str, headers: dict[str, str], params: dict[str, Any] ) -> list[Response]: """ Manage gitlab pagination, max 100 results by request """ responses = [requests.get(url, params=params, headers=headers)] last_response = responses[len(responses) - 1] if last_response.status_code != 200: print(last_response.text) return [] while ( last_response.status_code == 200 and len(last_response.headers["X-Next-Page"]) != 0 ): next_page = last_response.headers["X-Next-Page"] params["page"] = next_page responses.append(requests.get(url, params=params, headers=headers)) last_response = responses[len(responses) - 1] return responses def get_users_in_repository(self, id: str) -> list: """ Return members list from given id """ # BUG: does not work if the repo members are inherited from a group url = BASE_API_URL + "/projects/" + id + "/members" headers = {"PRIVATE-TOKEN": self.token} params = {"simple": "true", "order_by": "name", "sort": "asc", "per_page": 100} responses = self._paginate_responses(url, headers, params) members = [] for r in responses: members += r.json() return members def get_projects_in_group(self, id: str) -> list: """ Return projects list from given group id """ url = BASE_API_URL + "/groups/" + id + "/projects" headers = {"PRIVATE-TOKEN": self.token} params = {"simple": "true", "order_by": "name", "sort": "asc", "per_page": 100} responses = self._paginate_responses(url, headers, params) projects = [] for r in responses: projects += r.json() return projects def clone_all(self, id: str, directory: str, until_date: str | None): """ Clone all repositories from a group id in directory (created in function). """ try: os.mkdir(directory) except OSError: print("Creation of the directory '%s' failed, exit\n" % directory) exit(1) headers = {"PRIVATE-TOKEN": self.token} repositories = self.get_projects_in_group(id) for repo in repositories: repo_url = BASE_API_URL + "/projects/" + str(repo["id"]) + "/members" members = requests.get(repo_url, headers=headers).json() if "message" in members: print("Error retrieving members: " + members["message"]) exit(1) web_url = repo["web_url"] members_names = "" for member in members: if member["access_level"] > 20: # Access level greater than "Reporter" members_names += member["username"] + ", " repo_local_name = repo["path"] print("Members: " + members_names) print("Web url: " + web_url) print('Cloning in "' + directory + "/" + repo_local_name + '"') scheme = "https://" after_https = BASE_URL.find(scheme) + len(scheme) url = ( BASE_URL[:after_https] + "{}:{}@".format("gitlab-ci-token", self.token) + BASE_URL[after_https:] ) subprocess.run( [ "git", "clone", "-q", web_url.replace(BASE_URL, url), directory + "/" + repo_local_name, ] ) if until_date: commit_id = ( subprocess.check_output( [ "git", "rev-list", "-n", "1", '--before="' + until_date + '"', "master", ], cwd=directory + "/" + repo_local_name, ) .decode("utf-8") .rstrip() ) subprocess.run( ["git", "checkout", "-q", str(commit_id)], cwd=directory + "/" + repo_local_name, ) print("Checkout at " + str(commit_id) + "\n") else: print() def command_create_group_repos(args): """ For each repository listed in given file, create a group and a repo in group. Add users in every repo created """ gl = Gitlab(args.token) ds = DataSource.from_yaml(args.repos_file) group_id = gl.create_group(args.group_name, args.visibility) for group in ds.groups: project_id = gl.create_repository( group_id, group.name, args.import_url, ) added_users = gl.add_users_to_repository( project_id, group.name, group.user_ids, args.expires_at ) print( f"Created repo: {group.name} with the users: {[u.user_login_aai for u in added_users]}" ) def command_clone_all(args): gl = Gitlab(args.token) gl.clone_all(args.group_id, args.directory, args.until_date) def command_list_projects(args): gl = Gitlab(args.token) projects = gl.get_projects_in_group(args.group_id) if args.show: if args.show == "all": print(json.dumps(projects, indent=2)) elif args.show == "url": results = list(map(lambda p: p["http_url_to_repo"], projects)) for r in results: print(r) elif args.show == "ssh": results = list(map(lambda p: p["ssh_url_to_repo"], projects)) for r in results: print(r) else: names = list(map(lambda p: p["name"], projects)) for name in names: print(name) else: names = list(map(lambda p: p["name"], projects)) for name in names: print(name) def command_list_users(args): gl = Gitlab(args.token) members = gl.get_users_in_repository(args.project_id) if args.show: if args.show == "all": print(json.dumps(members, indent=2)) elif args.show == "url": results = list(map(lambda p: p["web_url"], members)) for r in results: print(r) else: names = list(map(lambda p: p["username"], members)) for name in names: print(name) else: names = list(map(lambda p: p["username"], members)) for name in names: print(name) def main(): parser = argparse.ArgumentParser( description="Practical Work Manager - \ Manage students PW - Create group, projects or clone repositories" ) parser.set_defaults(func=lambda _: parser.print_help()) parser.add_argument( "-t", "--token", metavar="TOKEN", help=f"Create a token here: {TOKEN_URL}. -t is not needed if the env var GITEDU_TOKEN is set.", ) subparsers = parser.add_subparsers( metavar="(group_repos | clone | list_project | list_users)" ) parser_group_repos = subparsers.add_parser( "group_repos", help="Create group and repositories from file" ) parser_group_repos.add_argument( "group_name", metavar="GROUP_NAME", help="The group name." ) parser_group_repos.add_argument( "repos_file", metavar="REPOS_FILE", help="YAML file with projects names and/or students emails.", ) parser_group_repos.add_argument( "--visibility", help="Group visibility. By default private.", default="private" ) parser_group_repos.add_argument( "-i", "--import_url", help="Import the publicly accessible project by URL given here (optional).", ) parser_group_repos.add_argument( "-x", "--expires_at", help="Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).", ) parser_group_repos.set_defaults(func=command_create_group_repos) parser_clone = subparsers.add_parser("clone", help="Clone the repositories locally") parser_clone.add_argument( "group_id", metavar="GROUP_ID", help="The group_id (int) of the projects.", ) parser_clone.add_argument( "directory", metavar="DIRECTORY", help="Local directory where clone all repositories.", ) parser_clone.add_argument( "-u", "--until_date", help='Do a git checkout for all repositories at given date, format "YYYY-MM-DD hh:mm" (optional).', ) parser_clone.set_defaults(func=command_clone_all) parser_list = subparsers.add_parser( "list_projects", help="List all project in a group" ) parser_list.add_argument("group_id", metavar="GROUP_ID", help="The group_id (int).") parser_list.add_argument( "-s", "--show", help="Amount of informations (default name) : [all | name | url | ssh]", ) parser_list.set_defaults(func=command_list_projects) parser_list = subparsers.add_parser( "list_users", help="List all users in a repository" ) parser_list.add_argument( "project_id", metavar="PROJECT_ID", help="The repository project_id (int)." ) parser_list.add_argument( "-s", "--show", help="Amount of informations (default name) : [all | name | url | ssh]", ) parser_list.set_defaults(func=command_list_users) args = parser.parse_args() if not args.token: if os.environ.get("GITEDU_TOKEN"): args.token = os.environ.get("GITEDU_TOKEN") else: token_file = os.environ.get("HOME", "") + "/.config/gitedu_token" if os.path.isfile(token_file): with open(token_file) as file: args.token = file.read().strip() if args.token is None: print( "Error: you must give a valid api token. Create a token here: " + TOKEN_URL ) exit(1) args.func(args) if __name__ == "__main__": main()