Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
1 result

Target

Select target project
  • soufiane.elkharmo / TP-shell-student
  • gc_courses / sys-exploit / TP-shell-student
2 results
Select Git revision
  • master
1 result
Show changes

Commits on Source 2

1 file
+ 200
90
Compare changes
  • Side-by-side
  • Inline

Files

+200 −90
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@ import time
import psutil
import re


def print_usage():
    print("Usage: {} shell_executable_name".format(sys.argv[0]))
    print("")
@@ -22,145 +23,241 @@ def print_usage():
    print("                         if omitted, the script attempt to find it automatically")


# TODO: move in test class (failed so far)
test_failed = False
def test(func):
    def wrapper(self):
        global test_failed
        print(func.__name__ + ": ", end="")
        try:
            func(self)
        except Exception as e:
            print(Fore.RED + "FAILED" + Fore.RESET)
            logging.error(f"{type(e).__name__}:{str(e)}")
            test_failed = True
            return
        print(Fore.GREEN + "SUCCESS" + Fore.RESET)

    return wrapper


class Test:
    def __init__(self, shell_exec: str) -> None:
        self.shell_exec = Path(shell_exec).resolve()
class Cmd:
    def __init__(self, cmd: list[str]) -> None:
        self.cmd = cmd


    def _get_exit_code_from_stdout(self, stdout: str) -> int:
        # Find line with keyword "code"
        for line in stdout.splitlines():
            if "code" in line:
                # Assumes that exit code is after keyword code and that it is the only digits
                return int(''.join([c for c in line.split('code')[-1] if c.isdigit()]))
        raise AssertionError('No exit code found')

    def __str__(self) -> str:
        if self.cmd is None:
            return str(None)
        else:
            return ' '.join(self.cmd).strip()

    def _execute_shell_command(self, command: list[str], cwd: str = tempfile.gettempdir(), duration_cmd: float = 0.2, timeout: int = 3):
        """ Execute a shell command and tries to determine if the command is correctly executed
        """
    def __iter__(self) -> str:
        for c in self.cmd:
            yield c

        # Create a string out of a command
        str_cmd = ' '.join(command)

class Shell:
    def __init__(self, executable: str, cwd: str = tempfile.gettempdir()) -> None:
        # Execute the shell and keep track of the process
        shell_process = subprocess.Popen(
            [self.shell_exec], cwd=cwd, encoding='utf-8',
        self.executable = executable
        self.shell_process = subprocess.Popen(
            [self.executable], cwd=cwd, encoding='utf-8',
            stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        shell_ps = psutil.Process(shell_process.pid)
        try:
            # Execute the command and wait for it to finish
            shell_process.stdin.write(str_cmd + '\n')
            shell_process.stdin.flush()
        self.shell_ps = psutil.Process(self.shell_process.pid)

        # Keep track of last executed command
        self.last_cmd = Cmd(None)

            #TODO: we wait a bit here to ensure the child is created ? However it seems is it not needed, how to ensure it ?
        # No ouputs so far...
        self.stdout = None
        self.stderr = None


    # TODO: move out of Shell ?
    def check_ophans(self, cmd: Cmd):
        """ Check if an orphan process (child of 1) was executed with cmd
        cmd: if None the tested command will be the last executed command
        """
        if cmd is None:
            if self.last_cmd is not None:
                cmd = self.last_cmd
            else:
                return
        str_cmd = str(cmd)
        init_ps = psutil.Process(1)
        for p in init_ps.children():
            if p.cmdline() == str_cmd:
                raise AssertionError('The command "{}" seem to be a child of process 1'.format(str_cmd))


    def wait_children(self, test_zombies: bool = True, timeout: int = 3):
        """ Wait for children of the shell to temrinate
        test_zombies: if True the command will raise an AssertionError if some children are zombies
        timeout: time after which a Timeout exception is raised if there are still children
        """
        # Wait for all children and check zombies / timeout
        start_time = time.time()
        while True:
            # Wait a bit before checking
            time.sleep(0.1)

            # Wait for children to die and check if some are zombies
            # TODO: it can happen that the child is stuck (e.g. if it becomes a shell cause cannot execve) in that case the code bellow blocks
            children = shell_ps.children()
            while(children):
            # Check if some children are zombies
            children = self.shell_ps.children()
            if test_zombies:
                for c in children:
                    if c.status() == psutil.STATUS_ZOMBIE:
                        raise AssertionError('The shell child process {} is a zombie for command "{}"'.format(c, str_cmd))
                time.sleep(0.1)
                children = shell_ps.children()
                        raise AssertionError('The shell child process {} is a zombie (last command executed: {})'.format(c, self.last_cmd))

            # No more children to wait for -> stop looping
            if (len(children) == 0):
                break

            # as the command exceeded the timeout ?
            duration = time.time() - start_time
            if duration > timeout:
                raise psutil.TimeoutExpired('The process took more than the timeout ({}s) to terminate (last command executed: {})'.format(duration, self.last_cmd))

            # Check if command process has become a child of process 1 (e.g. by double fork)
            init_ps = psutil.Process(1)
            for c in init_ps.children():
                if c.cmdline() == command:
                    raise AssertionError('The command "{}" seem to be a child of process 1'.format(str_cmd))

            # update current working directory
            cwd = shell_ps.cwd()
    def exec_command(self, command: Cmd, wait_cmd: bool = True, timeout: int = 3):
        """Execute a command in the shell without existing the shell
        wait_cmd: wait for all the command processes to finish and raise an error on zombies
        timeout: terminate the command if it last longer than timeout (does not apply if wait_cmd = False)"""
        # Execute the command
        self.shell_process.stdin.write(str(command) + '\n')
        self.shell_process.stdin.flush()
        self.last_cmd = command

            # Terminate shell by using exit command and returning shell outputs
            stdout, stderr = shell_process.communicate(input='exit\n', timeout=timeout)
        # Wait for the shell childs to finish while checking for zombies
        if wait_cmd:
            self.wait_children(timeout=timeout)

            # Test if shell is still running
            if shell_ps.is_running():
                raise AssertionError('Shell is still running after en of communication, either exit is not working or the shell does not terminate on Ctrl+D')

            return stdout, stderr, cwd
    def exec_commands(self, commands: list[Cmd], wait_cmd: bool = True, timeout: int = 3):
        """Execute a list of commands in the shell without existing the shell and exits
        wait_cmd: wait for all the command processes to finish and raise an error on zombies
        timeout: terminate the command if it last longer than timeout (does not apply if wait_cmd = False)"""
        for cmd in commands:
            self.exec_command(cmd, wait_cmd, timeout)
        self.exit()


    def exit(self):
        # We use communicate to be sure that all streams are closed (i.e. process terminated)
        timeout = 1
        try:
            self.stdout, self.stderr = self.shell_process.communicate(input='exit\n', timeout=timeout)
        except subprocess.TimeoutExpired:
            shell_process.kill()
            print('Timeout during command {}'.format(str_cmd))
            raise subprocess.TimeoutExpired
            self.shell_process.kill()
            raise subprocess.TimeoutExpired('The exit command did not exit the shell after {}s'.format(timeout))
        #TODO: should I check if process is still running at that point (stream closed but process alive ?)

    @test
    def test_simple_foregroundjob(self):
        # sleep is one of the given exemples
        self._execute_shell_command(['sleep', '1'], timeout=5)

    def read_stdout(self):
        if self.stdout is not None:
            return self.stdout
        else:
            raise ValueError('stdout is None probably because exit was not called on the shell before accessing it')

    @test
    def test_successfull_foregroundjob(self):
        # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
        cmd = ['ls', '-l', '--all', '--author', '-h', '-i', '-S']
        std_stdout, std_stderr, _ = self._execute_shell_command(cmd)

        # get "real" output
        real = subprocess.run(cmd, cwd=tempfile.gettempdir(), capture_output=True, encoding='utf-8')
    def read_stderr(self):
        if self.stderr is not None:
            return self.stderr
        else:
            raise ValueError('stderr is None probably because exit was not called on the shell before accessing it')

        # check standard output
        if not real.stdout in std_stdout:
            raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stdout, std_stdout))

        # check standard error
        if std_stderr:
            raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(std_stderr))
    def get_cwd(self):
        return self.shell_ps.cwd()

        # check return code
        std_returncode = self._get_exit_code_from_stdout(std_stdout)
        if std_returncode != real.returncode:
            raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode))
    def is_running(self):
        return self.shell_process.poll() == None


    def test_error_foregroundjob(self, cmd: list[str]):
        # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
        std_stdout, std_stderr, _ = self._execute_shell_command(cmd)
class Test:
    def __init__(self, shell_exec: str) -> None:
        self.shell_exec = Path(shell_exec).resolve()
        self.shell = None  # currently no shell run
        #TODO: remove self.shell as it should be passed from function to function (or not in case also correct some function that does)


    def _get_exit_code_from_stdout(self, stdout: str) -> int:
        # Find line with keyword "code"
        for line in stdout.splitlines():
            if "code" in line:
                # Assumes that exit code is after keyword code and that it is the only digits
                return int(''.join([c for c in line.split('code')[-1] if c.isdigit()]))
        raise AssertionError('No exit code found')


    def _test_command_results(self, cmd: Cmd, shell: Shell, test_stdout: str, test_stderr: str, test_return: bool):
        """ Test if the results (standard outputs and return code) of a command are the correct ones
        test_stdout/test_stderr: a string indicating if the standard output should include the normal output ('include'),
                                 be empty ('empty'), or not tested ('notest' or any other string)
        test_return: should the return code be tested (relies on the computation of the return code from the standard output)
        """
        # get "real" output
        real = subprocess.run(cmd, cwd=tempfile.gettempdir(), capture_output=True, encoding='utf-8')

        # check standard output
        if not real.stderr in std_stderr:
            raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stderr, std_stderr))
        # TODO combine the two tests (the one below) in one fonction
        shell_stdout = shell.read_stdout()
        if test_stdout == 'include':
            if not real.stdout in shell_stdout:
                raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stdout, shell_stdout))
        elif test_stdout == 'empty':
            if shell_stdout:
                raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(shell_stderr))

        # do not check if stdout is empty because it will contain return code...
        # check standard output
        shell_stderr = shell.read_stderr()
        if test_stderr == 'include':
            if not real.stderr in shell_stderr:
                raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stderr, shell_stderr))
        elif test_stdout == 'empty':
            if shell_stderr:
                raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(shell_stderr))

        # check return code
        std_returncode = self._get_exit_code_from_stdout(std_stdout)
        if test_return:
            std_returncode = self._get_exit_code_from_stdout(shell_stdout)
            if std_returncode != real.returncode:
                raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode))


    @test
    def test_simple_foregroundjob(self):
        # sleep is one of the given exemples
        cmd = Cmd(['sleep', '1'])
        shell = Shell(self.shell_exec)
        shell.exec_commands([cmd], timeout=5)


    @test
    def test_successfull_foregroundjob(self):
        # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
        cmd = Cmd(['ls', '-l', '--all', '--author', '-h', '-i', '-S'])
        shell = Shell(self.shell_exec)
        shell.exec_commands([cmd])
        self._test_command_results(cmd, shell, test_stdout='include', test_stderr='empty', test_return=True)


    def test_error_foregroundjob(self, cmd: Cmd):
        # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
        shell = Shell(self.shell_exec)
        shell.exec_commands([cmd])
        self._test_command_results(cmd, shell, test_stdout='notest', test_stderr='include', test_return=True)


    @test
    def test_wrongcmd(self):
        # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
        str_cmd = 'ffof  cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa'
        cmd = str_cmd.split(' ')
        _, std_stderr, _ = self._execute_shell_command(cmd)

        if not std_stderr:
        cmd = Cmd(str_cmd.split(' '))
        shell = Shell(self.shell_exec)
        shell.exec_commands([cmd])
        if not shell.read_stderr():
            raise AssertionError('The command "{}" should return an error but stderr is empty'.format(str_cmd))


@@ -174,12 +271,12 @@ class Test:

        @test
        def test_error_foregroundjob_1(self):
            self.test_error_foregroundjob(['ls', '-l', '--all', '--author', '-h', '-i', '-S', 'thisfileshouldnotexist'])
            self.test_error_foregroundjob(Cmd(['ls', '-l', '--all', '--author', '-h', '-i', '-S', 'thisfileshouldnotexist']))
        test_error_foregroundjob_1(self)

        @test
        def test_error_foregroundjob_2(self):
            self.test_error_foregroundjob(['stat', 'thisfileshouldnotexist'])
            self.test_error_foregroundjob(Cmd(['stat', 'thisfileshouldnotexist']))
        test_error_foregroundjob_2(self)


@@ -187,21 +284,32 @@ class Test:
    def test_builtin_exit(self):
        # Cannot test exit because otherwise the shell will exit before some test on it (see execute_shell_command)
        # An empty command is tested instead since the exit command is tested anyway at the end
        self._execute_shell_command([''], timeout=1)
        shell = Shell(self.shell_exec)
        cmd = Cmd(['exit'])
        shell.exec_command(cmd)
        time.sleep(0.5)  # wait to be sure that command was executed
        if shell.is_running():
            raise AssertionError('Command exit was sent but shell is still running')
        shell.exit()


    @test
    def test_builtin_cd(self):
        # Test existing directory
        dir = tempfile.TemporaryDirectory()
        _, _, cwd = self._execute_shell_command(['cd', dir.name], cwd='.', timeout=1)
        if dir.name != cwd:
            raise AssertionError('Changing directory failed: the directory shouldbe {} but it is {}'.format(dir, cwd))
        cmd = Cmd(['cd', dir.name])
        shell = Shell(self.shell_exec, cwd='.')
        shell.exec_command(cmd, timeout=1)
        time.sleep(0.5) # to be "sure" that the shell command executed
        if dir.name != shell.get_cwd():
            raise AssertionError('Changing directory failed: the directory shouldbe {} but it is {}'.format(dir, shell.get_cwd()))
        shell.exit()

        # Test non-existing directory
        cmd = ['cd', 'thisfoldershouldnotexist']
        _, stderr, _ = self._execute_shell_command(cmd, timeout=1)
        if not stderr:
        cmd = Cmd(['cd', 'thisfoldershouldnotexist'])
        shell = Shell(self.shell_exec)
        shell.exec_commands([cmd], timeout=1)
        if not shell.read_stderr():
            raise AssertionError('The command "{}" should return an error but stderr is empty'.format(' '.join(cmd)))


@@ -221,6 +329,8 @@ if __name__ == "__main__":
    # Empty command
    t.test_builtin()
    t.test_foregroundjobs()

    sys.exit(test_failed)
    # # Long nonsensical command
    # execute_commandon_shell(tp_dir, tp_shell_name, b'ffof  cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa')
    # # cd without check it works