Skip to content
Snippets Groups Projects
Select Git revision
  • 561a9b221a37213e724fbe57d18289d06f7da680
  • master default protected
  • refactor
  • repos_file_generator
  • refactoring-args
5 results

pwm

Blame
  • steven.liatti's avatar
    steven.liatti authored
    Now the token can be placed in ~/gitedu.token or
    in env var $GITEDU_TOKEN or
    given to command line with "-t" or "--token" argument
    561a9b22
    History
    pwm 11.88 KiB
    #!/usr/bin/env python3
    
    """
    Practical Work Manager (pwm)
    Steven Liatti
    2020
    """
    
    from argparse import Namespace
    import os
    from typing import Dict, List, Optional
    import requests
    import subprocess
    import argparse
    import yaml
    
    BASE_URL: str = 'https://gitedu.hesge.ch/api/v4'
    
    
    def create_group(token: str, name: str, visibility: str = 'private') -> str:
        """
        Create gitlab group from name and visibility given. Need valid api token.
        Return group_id created.
        """
        params = {'path': name, 'name': name, 'visibility': visibility}
        headers = {'PRIVATE-TOKEN': token}
    
        group = requests.post(BASE_URL + '/groups',
                              params=params, headers=headers).json()
        if 'message' in group:
            print('Error in creating group: %s' % group)
            exit(1)
    
        print("Group '" + group['name'] + "' with id '" + str(group['id']) + "' and visibility '" +
              group['visibility'] + "' available at '" + group['web_url'] + "'")
        return str(group['id'])
    
    
    def delete_group(token: str, group_id: str):
        """
        Delete a group and all subprojects.
        """
        headers = {'PRIVATE-TOKEN': token}
        deleted_group = requests.delete(
            BASE_URL + '/groups/' + group_id, headers=headers).json()
        if 'message' in deleted_group:
            if deleted_group['message'] != '202 Accepted':
                print('Error in deleting group: %s' % deleted_group)
                exit(1)
            else:
                print('Group ' + group_id + ' successfully deleted')
    
    
    def emails_to_ids(emails: List[str], headers: Dict[str, str]) -> List[int]:
        """
        Get students ids from their emails
        """
        user_ids = []
        for email in emails:
            user_requested = requests.get(
                BASE_URL + '/users', params={'search': email}, headers=headers).json()
            if len(user_requested) == 0:
                print('No user %s found, operation aborted' % email)
                exit(1)
            user_ids.append(user_requested[0]['id'])
        return user_ids
    
    
    def create_repository(token: str, group_id: str, emails: List[str], name: str, import_url: Optional[str], expires_at: Optional[str]):
        """
        Create repository in group_id, with members from emails, a name, and
        optional import_url and expiration date.
        """
        headers = {'PRIVATE-TOKEN': token}
    
        # Create project from name, import_url (if given) and group_id
        params = {'name': name, 'namespace_id': group_id, 'visibility': 'private'}
        if import_url:
            params['import_url'] = import_url
        project = requests.post(BASE_URL + '/projects',
                                params=params, headers=headers).json()
        if 'message' in project:
            print('Error in creating project: %s' % project)
            exit(1)
        print("Project '" + project['name'] + "' at '" +
              project['web_url'] + "' created")
    
        # Allow users with developer access level to push and merge on master
        access_level = 30
        params = {'name': 'master', 'push_access_level': str(
            access_level), 'merge_access_level': str(access_level)}
        requests.post(BASE_URL + '/projects/' +
                      str(project['id']) + '/protected_branches', params=params, headers=headers).json()
    
        # Get students ids from their emails
        user_ids = emails_to_ids(emails, headers)
    
        # Add each student as project's developer (level 30)
        for user_id in user_ids:
            params = {'user_id': user_id, 'access_level': access_level}
            if expires_at:
                params['expires_at'] = expires_at
            new_user = requests.post(BASE_URL + '/projects/' + str(
                project['id']) + '/members', params=params, headers=headers).json()
            if 'message' in new_user:
                print('Error in adding user: %s' % new_user)
            else:
                out = ("Adding '" + new_user['name'] + "' (" + new_user['username'] + ") in '"
                       + project['name'] + "' with access level: " + str(new_user['access_level']))
                if expires_at:
                    out += ", expires at: " + new_user['expires_at']
                print(out)
    
    
    def clone_all(token: str, id: str, directory: str, until_date: Optional[str], source: str = 'group'):
        """
        Clone all repositories (from a group or "forks of") from id (group or
        project id) in directory (created in function).
        """
        try:
            os.mkdir(directory)
        except OSError:
            print("Creation of the directory '%s' failed, exit\n" % directory)
            exit(1)
    
        params = {'simple': 'true', 'per_page': 100}
        headers = {'PRIVATE-TOKEN': token}
    
        if source == 'forks':
            url = BASE_URL + '/projects/' + id + '/forks'
        else:
            url = BASE_URL + '/groups/' + id + '/projects'
    
        repositories = requests.get(url, params=params, headers=headers).json()
        if 'message' in repositories:
            print('Error retrieving repositories: ' + repositories['message'])
            exit(1)
    
        for repo in repositories:
            repo_url = BASE_URL + '/projects/' + str(repo['id']) + '/members'
            members = requests.get(repo_url, headers=headers).json()
            if 'message' in members:
                print('Error retrieving members: ' + members['message'])
                exit(1)
    
            ssh_url_to_repo = repo['ssh_url_to_repo']
            web_url = repo['web_url']
            members_names = ''
    
            for member in members:
                if member['access_level'] > 20:  # Access level greater than "Reporter"
                    members_names += member['username'] + ', '
    
            if source == 'forks':
                repo_local_name = repo['namespace']['path']
            else:
                repo_local_name = repo['path']
    
            print('Members: ' + members_names)
            print('Web url: ' + web_url)
            print('Cloning in "' + directory + '/' + repo_local_name + '"')
    
            subprocess.run(["git", "clone", "-q", ssh_url_to_repo,
                            directory + '/' + repo_local_name])
            if until_date:
                commit_id = subprocess.check_output([
                    "git", "rev-list", "-n", "1", "--before=\"" + until_date + "\"",
                    "master"], cwd=directory + '/' + repo_local_name).decode('utf-8').rstrip()
                subprocess.run(
                    ["git", "checkout", "-q", str(commit_id)],
                    cwd=directory + '/' + repo_local_name)
                print("Checkout at " + str(commit_id) + "\n")
            else:
                print()
    
    
    def command_create_group_repos(args):
        """
        Combine create_group and create_repository. For each repository listed in
        given file, create a repo in group.
        """
        if args.visibility:
            group_id = create_group(args.token, args.group_name, args.visibility)
        else:
            group_id = create_group(args.token, args.group_name)
        print()
    
        with open(args.repos_file) as f:
            repos = yaml.full_load(f)
            for repo in repos:
                if 'name' in repo:
                    name = repo['name']
                elif 'emails' in repo:
                    name = repo['emails'][0].split('@')[0]
                else:
                    print('YAML file not correct, exit and delete group')
                    delete_group(args.token, group_id)
                    exit(1)
                create_repository(
                    args.token, group_id, repo['emails'], name, args.import_url, args.expires_at)
                print()
    
    
    def command_create_group(args):
        """
        Call create_group
        """
        if args.visibility:
            create_group(args.token, args.group_name, args.visibility)
        else:
            create_group(args.token, args.group_name)
    
    
    def command_create_repository(args):
        """
        Call create_repository
        """
        if args.name:
            name = args.name
        else:
            name = args.emails.split('@')[0]
        create_repository(args.token, args.group_id, args.emails.split(
            ','), name, args.import_url, args.expires_at)
    
    
    def command_clone_all(args):
        """
        Call clone_all
        """
        if args.forks:
            clone_all(args.token, args.id, args.directory,
                      args.until_date, 'forks')
        else:
            clone_all(args.token, args.id, args.directory, args.until_date)
    
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser(description='Practical Work Manager - \
            Manage students PW - Create group, projects or clone repositories')
        parser.set_defaults(func=lambda _: parser.print_help())
        parser.add_argument("-t", "--token", metavar="TOKEN",
                            help="Create a token here: https://gitedu.hesge.ch/profile/personal_access_tokens")
        subparsers = parser.add_subparsers(
            metavar='(group_repos | group | repo | clone)')
    
        parser_group_repos = subparsers.add_parser(
            'group_repos', help='Create group and repos associated')
        parser_group_repos.add_argument(
            "group_name", metavar="GROUP_NAME", help="The group name.")
        parser_group_repos.add_argument(
            "repos_file", metavar="REPOS_FILE", help="A file with projects names and students emails.")
        parser_group_repos.add_argument(
            "--visibility", help="Group visibility. By default private.")
        parser_group_repos.add_argument("-i", "--import_url",
                                        help="Import the publicly accessible project by URL given here (optional).")
        parser_group_repos.add_argument("-x", "--expires_at",
                                        help="Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).")
        parser_group_repos.set_defaults(func=command_create_group_repos)
    
        parser_group = subparsers.add_parser('group', help='Create gitlab group')
        parser_group.add_argument(
            "group_name", metavar="GROUP_NAME", help="The group name.")
        parser_group.add_argument(
            "--visibility", help="Group visibility. By default private.")
        parser_group.set_defaults(func=command_create_group)
    
        parser_repo = subparsers.add_parser('repo', help='Create gitlab project')
        parser_repo.add_argument(
            "group_id", metavar="GROUP_ID", help="The group id (int) where to store the created new project.")
        parser_repo.add_argument(
            "emails", metavar="EMAILS", help="Emails list of students working in this project, separated by commas (email1,email2).")
        parser_repo.add_argument(
            "-n", "--name", help="The project name. If blank, take the first student name (from email) as name.")
        parser_repo.add_argument("-i", "--import_url",
                                 help="Import the publicly accessible project by URL given here (optional).")
        parser_repo.add_argument("-x", "--expires_at",
                                 help="Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).")
        parser_repo.set_defaults(func=command_create_repository)
    
        parser_clone = subparsers.add_parser(
            'clone', help='Clone the repositories locally')
        group_clone = parser_clone.add_mutually_exclusive_group()
        group_clone.add_argument("-g", "--group", action="store_true",
                                 help="Clone repositories from a group (with group_id) or forks of a project (with project_id) (default behavior).")
        group_clone.add_argument("-f", "--forks", action="store_true",
                                 help="Clone forks of a project (with project_id).")
        parser_clone.add_argument(
            "id", metavar="ID", help="The group_id (int) of the projects or the project_id (int) of the forks.")
        parser_clone.add_argument(
            "directory", metavar="DIRECTORY", help="Local directory where clone all repositories.")
        parser_clone.add_argument(
            "-u", "--until_date", help="Do a git checkout for all repositories at given date, format \"YYYY-MM-DD hh:mm\" (optional).")
        parser_clone.set_defaults(func=command_clone_all)
    
        args = parser.parse_args()
    
        if not args.token:
            if os.path.isfile(path := os.environ.get('HOME') + '/.gitedu_token'):
                with open(path) as file:
                    args.token = file.read().strip()
            elif os.environ.get('GITEDU_TOKEN'):
                args.token = os.environ.get('GITEDU_TOKEN')
            else:
                print('Error: you must given a token')
                exit(1)
    
        args.func(args)