Skip to content
Snippets Groups Projects
Commit 061b6a74 authored by Guillaume Chanel's avatar Guillaume Chanel
Browse files

Refactor code with a Shell and Cmd class

This refactoring allowed to correct several bugs.
TODO: non-blocking I/O
TODO: other tests for the next part of the TP
parent 1ebd0f9f
No related branches found
No related tags found
No related merge requests found
...@@ -15,6 +15,7 @@ import time ...@@ -15,6 +15,7 @@ import time
import psutil import psutil
import re import re
def print_usage(): def print_usage():
print("Usage: {} shell_executable_name".format(sys.argv[0])) print("Usage: {} shell_executable_name".format(sys.argv[0]))
print("") print("")
...@@ -22,145 +23,241 @@ def print_usage(): ...@@ -22,145 +23,241 @@ def print_usage():
print(" if omitted, the script attempt to find it automatically") print(" if omitted, the script attempt to find it automatically")
# TODO: move in test class (failed so far)
test_failed = False
def test(func): def test(func):
def wrapper(self): def wrapper(self):
global test_failed
print(func.__name__ + ": ", end="") print(func.__name__ + ": ", end="")
try: try:
func(self) func(self)
except Exception as e: except Exception as e:
print(Fore.RED + "FAILED" + Fore.RESET) print(Fore.RED + "FAILED" + Fore.RESET)
logging.error(f"{type(e).__name__}:{str(e)}") logging.error(f"{type(e).__name__}:{str(e)}")
test_failed = True
return return
print(Fore.GREEN + "SUCCESS" + Fore.RESET) print(Fore.GREEN + "SUCCESS" + Fore.RESET)
return wrapper return wrapper
class Test: class Cmd:
def __init__(self, shell_exec: str) -> None: def __init__(self, cmd: list[str]) -> None:
self.shell_exec = Path(shell_exec).resolve() self.cmd = cmd
def _get_exit_code_from_stdout(self, stdout: str) -> int:
# Find line with keyword "code"
for line in stdout.splitlines():
if "code" in line:
# Assumes that exit code is after keyword code and that it is the only digits
return int(''.join([c for c in line.split('code')[-1] if c.isdigit()]))
raise AssertionError('No exit code found')
def __str__(self) -> str:
if self.cmd is None:
return str(None)
else:
return ' '.join(self.cmd).strip()
def _execute_shell_command(self, command: list[str], cwd: str = tempfile.gettempdir(), duration_cmd: float = 0.2, timeout: int = 3): def __iter__(self) -> str:
""" Execute a shell command and tries to determine if the command is correctly executed for c in self.cmd:
""" yield c
# Create a string out of a command
str_cmd = ' '.join(command)
class Shell:
def __init__(self, executable: str, cwd: str = tempfile.gettempdir()) -> None:
# Execute the shell and keep track of the process # Execute the shell and keep track of the process
shell_process = subprocess.Popen( self.executable = executable
[self.shell_exec], cwd=cwd, encoding='utf-8', self.shell_process = subprocess.Popen(
[self.executable], cwd=cwd, encoding='utf-8',
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
) )
shell_ps = psutil.Process(shell_process.pid) self.shell_ps = psutil.Process(self.shell_process.pid)
try:
# Execute the command and wait for it to finish # Keep track of last executed command
shell_process.stdin.write(str_cmd + '\n') self.last_cmd = Cmd(None)
shell_process.stdin.flush()
# No ouputs so far...
self.stdout = None
self.stderr = None
#TODO: we wait a bit here to ensure the child is created ? However it seems is it not needed, how to ensure it ?
# TODO: move out of Shell ?
def check_ophans(self, cmd: Cmd):
""" Check if an orphan process (child of 1) was executed with cmd
cmd: if None the tested command will be the last executed command
"""
if cmd is None:
if self.last_cmd is not None:
cmd = self.last_cmd
else:
return
str_cmd = str(cmd)
init_ps = psutil.Process(1)
for p in init_ps.children():
if p.cmdline() == str_cmd:
raise AssertionError('The command "{}" seem to be a child of process 1'.format(str_cmd))
def wait_children(self, test_zombies: bool = True, timeout: int = 3):
""" Wait for children of the shell to temrinate
test_zombies: if True the command will raise an AssertionError if some children are zombies
timeout: time after which a Timeout exception is raised if there are still children
"""
# Wait for all children and check zombies / timeout
start_time = time.time()
while True:
# Wait a bit before checking
time.sleep(0.1) time.sleep(0.1)
# Wait for children to die and check if some are zombies # Check if some children are zombies
# TODO: it can happen that the child is stuck (e.g. if it becomes a shell cause cannot execve) in that case the code bellow blocks children = self.shell_ps.children()
children = shell_ps.children() if test_zombies:
while(children):
for c in children: for c in children:
if c.status() == psutil.STATUS_ZOMBIE: if c.status() == psutil.STATUS_ZOMBIE:
raise AssertionError('The shell child process {} is a zombie for command "{}"'.format(c, str_cmd)) raise AssertionError('The shell child process {} is a zombie (last command executed: {})'.format(c, self.last_cmd))
time.sleep(0.1)
children = shell_ps.children() # No more children to wait for -> stop looping
if (len(children) == 0):
break
# as the command exceeded the timeout ?
duration = time.time() - start_time
if duration > timeout:
raise psutil.TimeoutExpired('The process took more than the timeout ({}s) to terminate (last command executed: {})'.format(duration, self.last_cmd))
# Check if command process has become a child of process 1 (e.g. by double fork)
init_ps = psutil.Process(1)
for c in init_ps.children():
if c.cmdline() == command:
raise AssertionError('The command "{}" seem to be a child of process 1'.format(str_cmd))
# update current working directory def exec_command(self, command: Cmd, wait_cmd: bool = True, timeout: int = 3):
cwd = shell_ps.cwd() """Execute a command in the shell without existing the shell
wait_cmd: wait for all the command processes to finish and raise an error on zombies
timeout: terminate the command if it last longer than timeout (does not apply if wait_cmd = False)"""
# Execute the command
self.shell_process.stdin.write(str(command) + '\n')
self.shell_process.stdin.flush()
self.last_cmd = command
# Terminate shell by using exit command and returning shell outputs # Wait for the shell childs to finish while checking for zombies
stdout, stderr = shell_process.communicate(input='exit\n', timeout=timeout) if wait_cmd:
self.wait_children(timeout=timeout)
# Test if shell is still running
if shell_ps.is_running():
raise AssertionError('Shell is still running after en of communication, either exit is not working or the shell does not terminate on Ctrl+D')
return stdout, stderr, cwd def exec_commands(self, commands: list[Cmd], wait_cmd: bool = True, timeout: int = 3):
"""Execute a list of commands in the shell without existing the shell and exits
wait_cmd: wait for all the command processes to finish and raise an error on zombies
timeout: terminate the command if it last longer than timeout (does not apply if wait_cmd = False)"""
for cmd in commands:
self.exec_command(cmd, wait_cmd, timeout)
self.exit()
def exit(self):
# We use communicate to be sure that all streams are closed (i.e. process terminated)
timeout = 1
try:
self.stdout, self.stderr = self.shell_process.communicate(input='exit\n', timeout=timeout)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
shell_process.kill() self.shell_process.kill()
print('Timeout during command {}'.format(str_cmd)) raise subprocess.TimeoutExpired('The exit command did not exit the shell after {}s'.format(timeout))
raise subprocess.TimeoutExpired #TODO: should I check if process is still running at that point (stream closed but process alive ?)
@test
def test_simple_foregroundjob(self):
# sleep is one of the given exemples
self._execute_shell_command(['sleep', '1'], timeout=5)
def read_stdout(self):
if self.stdout is not None:
return self.stdout
else:
raise ValueError('stdout is None probably because exit was not called on the shell before accessing it')
@test
def test_successfull_foregroundjob(self):
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
cmd = ['ls', '-l', '--all', '--author', '-h', '-i', '-S']
std_stdout, std_stderr, _ = self._execute_shell_command(cmd)
# get "real" output def read_stderr(self):
real = subprocess.run(cmd, cwd=tempfile.gettempdir(), capture_output=True, encoding='utf-8') if self.stderr is not None:
return self.stderr
else:
raise ValueError('stderr is None probably because exit was not called on the shell before accessing it')
# check standard output
if not real.stdout in std_stdout:
raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stdout, std_stdout))
# check standard error def get_cwd(self):
if std_stderr: return self.shell_ps.cwd()
raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(std_stderr))
# check return code def is_running(self):
std_returncode = self._get_exit_code_from_stdout(std_stdout) return self.shell_process.poll() == None
if std_returncode != real.returncode:
raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode))
def test_error_foregroundjob(self, cmd: list[str]): class Test:
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command def __init__(self, shell_exec: str) -> None:
std_stdout, std_stderr, _ = self._execute_shell_command(cmd) self.shell_exec = Path(shell_exec).resolve()
self.shell = None # currently no shell run
#TODO: remove self.shell as it should be passed from function to function (or not in case also correct some function that does)
def _get_exit_code_from_stdout(self, stdout: str) -> int:
# Find line with keyword "code"
for line in stdout.splitlines():
if "code" in line:
# Assumes that exit code is after keyword code and that it is the only digits
return int(''.join([c for c in line.split('code')[-1] if c.isdigit()]))
raise AssertionError('No exit code found')
def _test_command_results(self, cmd: Cmd, shell: Shell, test_stdout: str, test_stderr: str, test_return: bool):
""" Test if the results (standard outputs and return code) of a command are the correct ones
test_stdout/test_stderr: a string indicating if the standard output should include the normal output ('include'),
be empty ('empty'), or not tested ('notest' or any other string)
test_return: should the return code be tested (relies on the computation of the return code from the standard output)
"""
# get "real" output # get "real" output
real = subprocess.run(cmd, cwd=tempfile.gettempdir(), capture_output=True, encoding='utf-8') real = subprocess.run(cmd, cwd=tempfile.gettempdir(), capture_output=True, encoding='utf-8')
# check standard output # check standard output
if not real.stderr in std_stderr: # TODO combine the two tests (the one below) in one fonction
raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stderr, std_stderr)) shell_stdout = shell.read_stdout()
if test_stdout == 'include':
if not real.stdout in shell_stdout:
raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stdout, shell_stdout))
elif test_stdout == 'empty':
if shell_stdout:
raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(shell_stderr))
# do not check if stdout is empty because it will contain return code... # check standard output
shell_stderr = shell.read_stderr()
if test_stderr == 'include':
if not real.stderr in shell_stderr:
raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stderr, shell_stderr))
elif test_stdout == 'empty':
if shell_stderr:
raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(shell_stderr))
# check return code # check return code
std_returncode = self._get_exit_code_from_stdout(std_stdout) if test_return:
if std_returncode != real.returncode: std_returncode = self._get_exit_code_from_stdout(shell_stdout)
raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode)) if std_returncode != real.returncode:
raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode))
@test
def test_simple_foregroundjob(self):
# sleep is one of the given exemples
cmd = Cmd(['sleep', '1'])
shell = Shell(self.shell_exec)
shell.exec_commands([cmd], timeout=5)
@test
def test_successfull_foregroundjob(self):
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
cmd = Cmd(['ls', '-l', '--all', '--author', '-h', '-i', '-S'])
shell = Shell(self.shell_exec)
shell.exec_commands([cmd])
self._test_command_results(cmd, shell, test_stdout='include', test_stderr='empty', test_return=True)
def test_error_foregroundjob(self, cmd: Cmd):
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
shell = Shell(self.shell_exec)
shell.exec_commands([cmd])
self._test_command_results(cmd, shell, test_stdout='notest', test_stderr='include', test_return=True)
@test @test
def test_wrongcmd(self): def test_wrongcmd(self):
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
str_cmd = 'ffof cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa' str_cmd = 'ffof cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa'
cmd = str_cmd.split(' ') cmd = Cmd(str_cmd.split(' '))
_, std_stderr, _ = self._execute_shell_command(cmd) shell = Shell(self.shell_exec)
shell.exec_commands([cmd])
if not std_stderr: if not shell.read_stderr():
raise AssertionError('The command "{}" should return an error but stderr is empty'.format(str_cmd)) raise AssertionError('The command "{}" should return an error but stderr is empty'.format(str_cmd))
...@@ -174,12 +271,12 @@ class Test: ...@@ -174,12 +271,12 @@ class Test:
@test @test
def test_error_foregroundjob_1(self): def test_error_foregroundjob_1(self):
self.test_error_foregroundjob(['ls', '-l', '--all', '--author', '-h', '-i', '-S', 'thisfileshouldnotexist']) self.test_error_foregroundjob(Cmd(['ls', '-l', '--all', '--author', '-h', '-i', '-S', 'thisfileshouldnotexist']))
test_error_foregroundjob_1(self) test_error_foregroundjob_1(self)
@test @test
def test_error_foregroundjob_2(self): def test_error_foregroundjob_2(self):
self.test_error_foregroundjob(['stat', 'thisfileshouldnotexist']) self.test_error_foregroundjob(Cmd(['stat', 'thisfileshouldnotexist']))
test_error_foregroundjob_2(self) test_error_foregroundjob_2(self)
...@@ -187,21 +284,32 @@ class Test: ...@@ -187,21 +284,32 @@ class Test:
def test_builtin_exit(self): def test_builtin_exit(self):
# Cannot test exit because otherwise the shell will exit before some test on it (see execute_shell_command) # Cannot test exit because otherwise the shell will exit before some test on it (see execute_shell_command)
# An empty command is tested instead since the exit command is tested anyway at the end # An empty command is tested instead since the exit command is tested anyway at the end
self._execute_shell_command([''], timeout=1) shell = Shell(self.shell_exec)
cmd = Cmd(['exit'])
shell.exec_command(cmd)
time.sleep(0.5) # wait to be sure that command was executed
if shell.is_running():
raise AssertionError('Command exit was sent but shell is still running')
shell.exit()
@test @test
def test_builtin_cd(self): def test_builtin_cd(self):
# Test existing directory # Test existing directory
dir = tempfile.TemporaryDirectory() dir = tempfile.TemporaryDirectory()
_, _, cwd = self._execute_shell_command(['cd', dir.name], cwd='.', timeout=1) cmd = Cmd(['cd', dir.name])
if dir.name != cwd: shell = Shell(self.shell_exec, cwd='.')
raise AssertionError('Changing directory failed: the directory shouldbe {} but it is {}'.format(dir, cwd)) shell.exec_command(cmd, timeout=1)
time.sleep(0.5) # to be "sure" that the shell command executed
if dir.name != shell.get_cwd():
raise AssertionError('Changing directory failed: the directory shouldbe {} but it is {}'.format(dir, shell.get_cwd()))
shell.exit()
# Test non-existing directory # Test non-existing directory
cmd = ['cd', 'thisfoldershouldnotexist'] cmd = Cmd(['cd', 'thisfoldershouldnotexist'])
_, stderr, _ = self._execute_shell_command(cmd, timeout=1) shell = Shell(self.shell_exec)
if not stderr: shell.exec_commands([cmd], timeout=1)
if not shell.read_stderr():
raise AssertionError('The command "{}" should return an error but stderr is empty'.format(' '.join(cmd))) raise AssertionError('The command "{}" should return an error but stderr is empty'.format(' '.join(cmd)))
...@@ -221,6 +329,8 @@ if __name__ == "__main__": ...@@ -221,6 +329,8 @@ if __name__ == "__main__":
# Empty command # Empty command
t.test_builtin() t.test_builtin()
t.test_foregroundjobs() t.test_foregroundjobs()
sys.exit(test_failed)
# # Long nonsensical command # # Long nonsensical command
# execute_commandon_shell(tp_dir, tp_shell_name, b'ffof cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa') # execute_commandon_shell(tp_dir, tp_shell_name, b'ffof cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa')
# # cd without check it works # # cd without check it works
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment