Skip to content
Snippets Groups Projects
Select Git revision
  • 004c73dc9961baa52719e7a6f9bcd584b67a583d
  • main default protected
  • jw_sonar
  • v6.0.0 protected
  • interactive-mode-preference
  • bedran_exercise-list
  • add_route_user
  • Jw_sonar_backup
  • exercise_list_filter
  • assignment_filter
  • add_route_assignments
  • move-to-esm-only
  • 6.0.0-dev
  • Pre-alpha
  • 5.0.0
  • Latest
  • 4.2.0
  • 4.1.1
  • 4.1.0
  • 4.0.1
  • 4.0.0
  • 3.5.0
  • 3.4.2
  • 3.4.1
  • 3.3.0
  • 3.2.3
  • 3.2.2
  • 3.2.0
  • 3.1.2
  • 3.1.1
  • 3.1.0
  • 3.0.1
32 results

CommanderApp.ts

Blame
  • pwm.py 15.79 KiB
    #!/usr/bin/env python3
    
    """
    Practical Work Manager (pwm)
    Steven Liatti, Florent Gluck, Adrien Lescourt
    2020-2025
    """
    
    from dataclasses import dataclass
    from requests.models import Response
    from typing import Any, Dict, List, Optional
    import argparse
    import json
    import os
    import requests
    import subprocess
    import yaml
    
    BASE_URL: str = "https://gitedu.hesge.ch"
    BASE_API_URL: str = BASE_URL + "/api/v4"
    TOKEN_URL: str = BASE_URL + "/-/user_settings/personal_access_tokens"
    
    
    @dataclass
    class User:
        def __init__(self, user_id: str) -> None:
            self.id = user_id
    
    
    @dataclass
    class Group:
        def __init__(self, name: str, user_ids: List[User]) -> None:
            self.name = name
            self.user_ids = user_ids
    
    
    @dataclass
    class DataSource:
        def __init__(self, groups: List[Group]) -> None:
            self.groups = groups
    
        @classmethod
        def from_yaml(cls, yaml_file_path: str) -> "DataSource":
            if not cls._validate_yaml(yaml_file_path):
                print("Syntax error in YAML file.\nAborted.")
                exit(1)
            groups = []
            repos = yaml.full_load(yaml_file_path)
            for repo in repos:
                if "name" in repo:
                    name = repo["name"]
                else:
                    name = repo["users_login_aai"]
                groups.append(Group(name, repo["users"]))
            return cls(groups)
    
        @staticmethod
        def _validate_yaml(yaml_file_path) -> bool:
            with open(yaml_file_path) as f:
                repos = yaml.full_load(f)
                for repo in repos:
                    if "name" in repo:
                        pass
                    elif "users_login_aai" in repo:
                        pass
                    else:
                        return False
            return True
    
    
    class Gitlab:
        def __init__(self, token: str):
            self.token = token
    
        def create_group(self, group: Group, visibility: str = "private") -> str:
            """Create gitlab group, Returns group_id created."""
    
            params = {"path": group.name, "name": group.name, "visibility": visibility}
            headers = {"PRIVATE-TOKEN": self.token}
            group_res = requests.post(
                BASE_API_URL + "/group_ress", params=params, headers=headers
            ).json()
            if "message" in group_res:
                print("Error in creating group_res: %s" % group_res)
                exit(1)
    
            print(
                "group_res '"
                + group_res["name"]
                + "' with id '"
                + str(group_res["id"])
                + "' and visibility '"
                + group_res["visibility"]
                + "' available at '"
                + group_res["web_url"]
                + "'"
            )
            return str(group_res["id"])
    
        def delete_group(self, group_id: str):
            """Delete a group and all subprojects."""
            headers = {"PRIVATE-TOKEN": self.token}
            deleted_group = requests.delete(
                BASE_API_URL + "/groups/" + group_id, headers=headers
            ).json()
            if "message" in deleted_group:
                if deleted_group["message"] != "202 Accepted":
                    print("Error in deleting group: %s" % deleted_group)
                    exit(1)
                else:
                    print("Group " + group_id + " successfully deleted")
    
        def create_repository(
            self,
            group_id: str,
            users: List[User],
            name: str,
            import_url: Optional[str],
            expires_at: Optional[str],
        ):
            """
            Create repository in group_id, with members from user_ids, a name, and
            optional import_url and expiration date.
            """
            headers = {"PRIVATE-TOKEN": self.token}
    
            # Create project from name, import_url (if given) and group_id
            params = {"name": name, "namespace_id": group_id, "visibility": "private"}
            if import_url:
                params["import_url"] = import_url
            project = requests.post(
                BASE_API_URL + "/projects", params=params, headers=headers
            ).json()
            if "message" in project:
                print("Error in creating project: %s" % project)
                exit(1)
            print(
                "Project '" + project["name"] + "' at '" + project["web_url"] + "' created"
            )
    
            # Allow users with developer access level to push and merge on master
            access_level = 40
            params = {
                "name": "master",
                "push_access_level": str(access_level),
                "merge_access_level": str(access_level),
            }
            requests.post(
                BASE_API_URL + "/projects/" + str(project["id"]) + "/protected_branches",
                params=params,
                headers=headers,
            ).json()
    
            # Add each student as maintainer (level 40)
            for user in users:
                params = {"user_id": user.id, "access_level": access_level}
                if expires_at:
                    params["expires_at"] = expires_at
                new_user = requests.post(
                    BASE_API_URL + "/projects/" + str(project["id"]) + "/members",
                    params=params,
                    headers=headers,
                ).json()
                if "message" in new_user:
                    print("Error in adding user: %s" % new_user)
                else:
                    out = (
                        "Adding '"
                        + new_user["name"]
                        + "' ("
                        + new_user["username"]
                        + ") in '"
                        + project["name"]
                        + "' with access level: "
                        + str(new_user["access_level"])
                    )
                    if expires_at:
                        out += ", expires at: " + new_user["expires_at"]
                    print(out)
    
        @staticmethod
        def _paginate_responses(
            url: str, headers: Dict[str, str], params: Dict[str, Any]
        ) -> List[Response]:
            """
            Manage gitlab pagination, max 100 results by request
            """
            responses = [requests.get(url, params=params, headers=headers)]
            last_response = responses[len(responses) - 1]
    
            if last_response.status_code != 200:
                print(last_response.text)
                return []
    
            while (
                last_response.status_code == 200
                and len(last_response.headers["X-Next-Page"]) != 0
            ):
                next_page = last_response.headers["X-Next-Page"]
                params["page"] = next_page
                responses.append(requests.get(url, params=params, headers=headers))
                last_response = responses[len(responses) - 1]
    
            return responses
    
        def get_users_in_repository(self, id: str) -> List:
            """
            Return members list from given id
            """
            # BUG: does not work if the repo members are inherited from a group
            url = BASE_API_URL + "/projects/" + id + "/members"
    
            headers = {"PRIVATE-TOKEN": self.token}
            params = {"simple": "true", "order_by": "name", "sort": "asc", "per_page": 100}
            responses = self._paginate_responses(url, headers, params)
    
            members = []
            for r in responses:
                members += r.json()
            return members
    
        def get_projects_in_group(self, id: str) -> List:
            """
            Return projects list from given group id
            """
            url = BASE_API_URL + "/groups/" + id + "/projects"
            headers = {"PRIVATE-TOKEN": self.token}
            params = {"simple": "true", "order_by": "name", "sort": "asc", "per_page": 100}
            responses = self._paginate_responses(url, headers, params)
    
            projects = []
            for r in responses:
                projects += r.json()
            return projects
    
        def clone_all(self, id: str, directory: str, until_date: Optional[str]):
            """
            Clone all repositories from a group id in directory (created in function).
            """
            try:
                os.mkdir(directory)
            except OSError:
                print("Creation of the directory '%s' failed, exit\n" % directory)
                exit(1)
    
            headers = {"PRIVATE-TOKEN": self.token}
            repositories = self.get_projects_in_group(id)
    
            for repo in repositories:
                repo_url = BASE_API_URL + "/projects/" + str(repo["id"]) + "/members"
                members = requests.get(repo_url, headers=headers).json()
                if "message" in members:
                    print("Error retrieving members: " + members["message"])
                    exit(1)
    
                web_url = repo["web_url"]
                members_names = ""
    
                for member in members:
                    if member["access_level"] > 20:  # Access level greater than "Reporter"
                        members_names += member["username"] + ", "
    
                repo_local_name = repo["path"]
    
                print("Members: " + members_names)
                print("Web url: " + web_url)
                print('Cloning in "' + directory + "/" + repo_local_name + '"')
    
                scheme = "https://"
                after_https = BASE_URL.find(scheme) + len(scheme)
    
                url = (
                    BASE_URL[:after_https]
                    + "{}:{}@".format("gitlab-ci-token", self.token)
                    + BASE_URL[after_https:]
                )
                subprocess.run(
                    [
                        "git",
                        "clone",
                        "-q",
                        web_url.replace(BASE_URL, url),
                        directory + "/" + repo_local_name,
                    ]
                )
    
                if until_date:
                    commit_id = (
                        subprocess.check_output(
                            [
                                "git",
                                "rev-list",
                                "-n",
                                "1",
                                '--before="' + until_date + '"',
                                "master",
                            ],
                            cwd=directory + "/" + repo_local_name,
                        )
                        .decode("utf-8")
                        .rstrip()
                    )
                    subprocess.run(
                        ["git", "checkout", "-q", str(commit_id)],
                        cwd=directory + "/" + repo_local_name,
                    )
                    print("Checkout at " + str(commit_id) + "\n")
                else:
                    print()
    
    
    def command_create_group_repos(args):
        """
        For each repository listed in given file, create a group and a repo in group.
        Add users in every repo created
        """
        gl = Gitlab(args.token)
        ds = DataSource.from_yaml(args.repos_file)
        group_id = gl.create_group(args.group_name, args.visibility)
        for group in ds.groups:
            gl.create_repository(
                group_id,
                group.user_ids,
                group.name,
                args.import_url,
                args.expires_at,
            )
            print(f"created repo: {group.name} with the users: {group.user_ids}")
            print()
    
    
    def command_clone_all(args):
        gl = Gitlab(args.token)
        gl.clone_all(args.id, args.directory, args.until_date)
    
    
    def command_list_projects(args):
        gl = Gitlab(args.token)
        projects = gl.get_projects_in_group(args.group_id)
        if args.show:
            if args.show == "all":
                print(json.dumps(projects, indent=2))
            elif args.show == "url":
                results = list(map(lambda p: p["http_url_to_repo"], projects))
                for r in results:
                    print(r)
            elif args.show == "ssh":
                results = list(map(lambda p: p["ssh_url_to_repo"], projects))
                for r in results:
                    print(r)
            else:
                names = list(map(lambda p: p["name"], projects))
                for name in names:
                    print(name)
        else:
            names = list(map(lambda p: p["name"], projects))
            for name in names:
                print(name)
    
    
    def command_list_users(args):
        gl = Gitlab(args.token)
        members = gl.get_users_in_repository(args.project_id)
        if args.show:
            if args.show == "all":
                print(json.dumps(members, indent=2))
            elif args.show == "url":
                results = list(map(lambda p: p["web_url"], members))
                for r in results:
                    print(r)
            else:
                names = list(map(lambda p: p["username"], members))
                for name in names:
                    print(name)
        else:
            names = list(map(lambda p: p["username"], members))
            for name in names:
                print(name)
    
    
    def main():
        parser = argparse.ArgumentParser(
            description="Practical Work Manager - \
            Manage students PW - Create group, projects or clone repositories"
        )
        parser.set_defaults(func=lambda _: parser.print_help())
        parser.add_argument(
            "-t",
            "--token",
            metavar="TOKEN",
            help=f"Create a token here: {TOKEN_URL}. -t is not needed if the env var GITEDU_TOKEN is set.",
        )
        subparsers = parser.add_subparsers(
            metavar="(group_repos | clone | list_project | list_users)"
        )
    
        parser_group_repos = subparsers.add_parser(
            "group_repos", help="Create group and repositories from file"
        )
        parser_group_repos.add_argument(
            "group_name", metavar="GROUP_NAME", help="The group name."
        )
        parser_group_repos.add_argument(
            "repos_file",
            metavar="REPOS_FILE",
            help="YAML file with projects names and/or students emails.",
        )
        parser_group_repos.add_argument(
            "--visibility", help="Group visibility. By default private.", default="private"
        )
        parser_group_repos.add_argument(
            "-i",
            "--import_url",
            help="Import the publicly accessible project by URL given here (optional).",
        )
        parser_group_repos.add_argument(
            "-x",
            "--expires_at",
            help="Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).",
        )
        parser_group_repos.set_defaults(func=command_create_group_repos)
    
        parser_clone = subparsers.add_parser("clone", help="Clone the repositories locally")
        group_clone = parser_clone.add_mutually_exclusive_group()
        parser_clone.add_argument(
            "group_id",
            metavar="GROUP_ID",
            help="The group_id (int) of the projects.",
        )
        parser_clone.add_argument(
            "directory",
            metavar="DIRECTORY",
            help="Local directory where clone all repositories.",
        )
        parser_clone.add_argument(
            "-u",
            "--until_date",
            help='Do a git checkout for all repositories at given date, format "YYYY-MM-DD hh:mm" (optional).',
        )
        parser_clone.add_argument(
            "--use_http",
            help="Use the HTTP client instead of SSH. False by default.",
            action="store_true",
        )
        parser_clone.set_defaults(func=command_clone_all)
    
        parser_list = subparsers.add_parser(
            "list_projects", help="List all project in a group"
        )
        parser_list.add_argument("group_id", metavar="GROUP_ID", help="The group_id (int).")
        parser_list.add_argument(
            "-s",
            "--show",
            help="Amount of informations (default name) : [all | name | url | ssh]",
        )
        parser_list.set_defaults(func=command_list_projects)
    
        parser_list = subparsers.add_parser(
            "list_users", help="List all users in a repository"
        )
        parser_list.add_argument(
            "project_id", metavar="PROJECT_ID", help="The repository project_id (int)."
        )
        parser_list.add_argument(
            "-s",
            "--show",
            help="Amount of informations (default name) : [all | name | url | ssh]",
        )
        parser_list.set_defaults(func=command_list_users)
    
        args = parser.parse_args()
    
        if not args.token:
            if os.environ.get("GITEDU_TOKEN"):
                args.token = os.environ.get("GITEDU_TOKEN")
            else:
                token_file = os.environ.get("HOME", "") + "/.config/gitedu_token"
                if os.path.isfile(token_file):
                    with open(token_file) as file:
                        args.token = file.read().strip()
            if args.token is None:
                print(
                    "Error: you must give a valid api token. Create a token here: "
                    + TOKEN_URL
                )
                exit(1)
    
        args.func(args)
    
    
    if __name__ == "__main__":
        main()