diff --git a/bash_builtins.py b/bash_builtins.py index 152416c..4af8891 100644 --- a/bash_builtins.py +++ b/bash_builtins.py @@ -1,8 +1,8 @@ import re import os +import sys import argparse import threading -import signal import globals @@ -76,6 +76,7 @@ def wc_function(args, stdin, stdout): else: # READ STDIN fin = open(stdin, "r", closefd=False) + with fin: lines = fin.readlines() words_count = sum([len(line.split()) for line in lines]) @@ -108,12 +109,69 @@ def exit_function(args, stdin, stdout): globals.set_should_exit(True) +def grep_function(args, stdin, stdout): + """ + Match lines in FILE or stdin with PATTERN + :param args: arguments + :param stdin: input file descriptor + :param stdout: output file descriptor + :return: nothing + """ + parser = argparse.ArgumentParser(prog="grep", description='print lines matching a pattern') + parser.add_argument("-i", action='store_true', + help='Ignore case distinctions, so that characters that differ only in case match each other.') + parser.add_argument("-w", action='store_true', + help='Select only those lines containing matches that form whole words.') + parser.add_argument("-A", nargs='?', action='store', default=0, type=int, + help='Print A lines after matched lines') + parser.add_argument("PATTERN", help='Regular expression to be matched in line') + parser.add_argument("FILE", nargs='?', help='Path to file to scan for') + parsed_args = vars(parser.parse_args(args)) + + fin = None + if parsed_args.get('FILE'): + try: + fin = open(parsed_args.get('FILE'), 'r', closefd=True) + except FileNotFoundError: + print("File is not accessible", file=sys.stderr) + return + else: + fin = open(stdin, 'r', closefd=False) + + fout = open(stdout, 'w', closefd=False) + + with fin, fout: + + after_parameter = int(parsed_args.get('A')) + + if after_parameter < 0: + fout.write("grep error: -A parameter shouldn't be negative") + else: + pattern = parsed_args.get("PATTERN") + if parsed_args.get("w"): + pattern = r'\b' + pattern + r'\b' + flags = 0 + if parsed_args.get('i'): + flags |= re.IGNORECASE + + regexp = re.compile(pattern, flags) + + remaining_after = 0 + for line in fin: + if regexp.search(line): + remaining_after = after_parameter + 1 + if remaining_after > 0: + remaining_after -= 1 + fout.write(line) + + command_to_function = { "cat": cat_function, "echo": echo_function, "wc": wc_function, "pwd": pwd_function, - "exit": exit_function + "exit": exit_function, + "grep": grep_function } diff --git a/unittests.py b/unittests.py index 2c0daeb..61fa33b 100644 --- a/unittests.py +++ b/unittests.py @@ -48,8 +48,8 @@ def test_wc(self): file = tempfile.NamedTemporaryFile("w", delete=False) filename = file.name file.write(self.TEST_STRING) - read_pipe, write_pipe = os.pipe() file.close() + read_pipe, write_pipe = os.pipe() thread = bash_builtins.simple_interpret_single_builtin_command(["wc", file.name], stdin=sys.stdin.fileno(), @@ -86,6 +86,112 @@ def test_pwd(self): with open(read_pipe, "r") as fin: self.assertEqual(fin.read(), os.getcwd()) + def test_grep(self): + read_stdin_pipe, write_stdin_pipe = os.pipe() + read_pipe, write_pipe = os.pipe() + thread = bash_builtins.simple_interpret_single_builtin_command(["grep", "abc"], + stdin=read_stdin_pipe, + stdout=write_pipe) + with open(write_stdin_pipe, "w") as finout: + finout.writelines( + ["a\n", "ab\n", "abc\n", "abcd\n", "dcba\n", "aab\n", "aabcd\n", "aabc\n", "abacbca\n", "abc"]) + thread.wait() + os.close(read_stdin_pipe) + os.close(write_pipe) + with open(read_pipe, "r") as fin: + self.assertListEqual(fin.readlines(), ['abc\n', 'abcd\n', 'aabcd\n', 'aabc\n', 'abc']) + + def test_grep_with_file(self): + file = tempfile.NamedTemporaryFile("w", delete=False) + filename = file.name + file.writelines(["a\n", "ab\n", "abc\n", "abcd\n", "dcba\n", "aab\n", "aabcd\n", "aabc\n", "abacbca\n", "abc"]) + file.close() + read_pipe, write_pipe = os.pipe() + thread = bash_builtins.simple_interpret_single_builtin_command(["grep", "abc", filename], + stdin=sys.stdin.fileno(), + stdout=write_pipe) + thread.wait() + os.close(write_pipe) + with open(read_pipe, "r") as fin: + self.assertListEqual(fin.readlines(), ['abc\n', 'abcd\n', 'aabcd\n', 'aabc\n', 'abc']) + os.remove(filename) + + def test_grep_i(self): + read_stdin_pipe, write_stdin_pipe = os.pipe() + read_pipe, write_pipe = os.pipe() + thread = bash_builtins.simple_interpret_single_builtin_command(["grep", "-i", "aBc"], + stdin=read_stdin_pipe, + stdout=write_pipe) + with open(write_stdin_pipe, "w") as finout: + finout.writelines( + ["a\n", "ab\n", "abc\n", "ABcd\n", "dCbA\n", "aaB\n", "aabCd\n", "aABC\n", "aBacBcA\n", "AbC"]) + thread.wait() + os.close(read_stdin_pipe) + os.close(write_pipe) + with open(read_pipe, "r") as fin: + self.assertListEqual(fin.readlines(), ['abc\n', 'ABcd\n', 'aabCd\n', 'aABC\n', 'AbC']) + + def test_grep_w(self): + read_stdin_pipe, write_stdin_pipe = os.pipe() + read_pipe, write_pipe = os.pipe() + thread = bash_builtins.simple_interpret_single_builtin_command(["grep", "-w", "abc def ghi"], + stdin=read_stdin_pipe, + stdout=write_pipe) + with open(write_stdin_pipe, "w") as finout: + finout.writelines( + ["a\n", "abc def ghi\n", "abc def ghij\n", "abc abc def ghi ghi\n", "aabc def ghi\n", + " abc def ghi \n"]) + thread.wait() + os.close(read_stdin_pipe) + os.close(write_pipe) + with open(read_pipe, "r") as fin: + self.assertListEqual(fin.readlines(), ['abc def ghi\n', 'abc abc def ghi ghi\n', ' abc def ghi \n']) + + def test_grep_A1(self): + read_stdin_pipe, write_stdin_pipe = os.pipe() + read_pipe, write_pipe = os.pipe() + thread = bash_builtins.simple_interpret_single_builtin_command(["grep", "-A", "1", "abc"], + stdin=read_stdin_pipe, + stdout=write_pipe) + with open(write_stdin_pipe, "w") as finout: + finout.writelines( + ["a\n", "ab\n", "abc\n", "abcd\n", "dcba\n", "aab\n", "aabcd\n", "aabc\n", "abacbca\n", "abc"]) + thread.wait() + os.close(read_stdin_pipe) + os.close(write_pipe) + with open(read_pipe, "r") as fin: + self.assertListEqual(fin.readlines(), + ['abc\n', 'abcd\n', 'dcba\n', 'aabcd\n', 'aabc\n', 'abacbca\n', 'abc']) + + def test_grep_A2(self): + read_stdin_pipe, write_stdin_pipe = os.pipe() + read_pipe, write_pipe = os.pipe() + thread = bash_builtins.simple_interpret_single_builtin_command(["grep", "-A", "2", "abc"], + stdin=read_stdin_pipe, + stdout=write_pipe) + with open(write_stdin_pipe, "w") as finout: + finout.writelines( + ["a\n", "ab\n", "abc\n", "abcd\n", "dcba\n", "aab\n", "aabcd\n", "aabc\n", "abacbca\n", "abc"]) + thread.wait() + os.close(read_stdin_pipe) + os.close(write_pipe) + with open(read_pipe, "r") as fin: + self.assertListEqual(fin.readlines(), + ['abc\n', 'abcd\n', 'dcba\n', 'aab\n', 'aabcd\n', 'aabc\n', 'abacbca\n', 'abc']) + + def test_grep_incorrect_parameters(self): + read_stdin_pipe, write_stdin_pipe = os.pipe() + read_pipe, write_pipe = os.pipe() + thread = bash_builtins.simple_interpret_single_builtin_command( + ["grep", "-A", "-1", "param", "bash_builtins.py"], + stdin=read_stdin_pipe, + stdout=write_pipe) + thread.wait() + os.close(read_stdin_pipe) + os.close(write_pipe) + with open(read_pipe, "r") as fin: + self.assertListEqual(fin.readlines(), ["grep error: -A parameter shouldn't be negative"]) + class TestTokenize(unittest.TestCase): def test_shlex_tokenize(self):