diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d9cac3..9d3534d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Catch and report internal worker errors - Implement adaptive random fuzzing parameter selection (#24) +- Splicing mutation (#23) ### Fixed diff --git a/cobrafuzz/mutator.py b/cobrafuzz/mutator.py index f3becff..512a653 100644 --- a/cobrafuzz/mutator.py +++ b/cobrafuzz/mutator.py @@ -21,7 +21,11 @@ def update(self, success: bool = False) -> None: rand.update(success=success) -def _mutate_remove_range_of_bytes(res: bytearray, rand: Params) -> None: +def _mutate_remove_range_of_bytes( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 2: raise common.OutOfDataError assert isinstance(rand.length, util.AdaptiveRange) @@ -30,7 +34,11 @@ def _mutate_remove_range_of_bytes(res: bytearray, rand: Params) -> None: util.remove(data=res, start=rand.start.sample(0, len(res) - length), length=length) -def _mutate_insert_range_of_bytes(res: bytearray, rand: Params) -> None: +def _mutate_insert_range_of_bytes( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: assert isinstance(rand.length, util.AdaptiveRange) assert isinstance(rand.start, util.AdaptiveRange) assert isinstance(rand.data, util.AdaptiveRange) @@ -39,7 +47,11 @@ def _mutate_insert_range_of_bytes(res: bytearray, rand: Params) -> None: util.insert(data=res, start=rand.start.sample(0, len(res)), data_to_insert=data) -def _mutate_duplicate_range_of_bytes(res: bytearray, rand: Params) -> None: +def _mutate_duplicate_range_of_bytes( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 2: raise common.OutOfDataError assert isinstance(rand.src_pos, util.AdaptiveRange) @@ -51,7 +63,11 @@ def _mutate_duplicate_range_of_bytes(res: bytearray, rand: Params) -> None: util.insert(res, dst_pos, res[src_pos : src_pos + length]) -def _mutate_copy_range_of_bytes(res: bytearray, rand: Params) -> None: +def _mutate_copy_range_of_bytes( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 2: raise common.OutOfDataError assert isinstance(rand.src_pos, util.AdaptiveRange) @@ -63,7 +79,11 @@ def _mutate_copy_range_of_bytes(res: bytearray, rand: Params) -> None: util.copy(res, src_pos, dst_pos, length) -def _mutate_bit_flip(res: bytearray, rand: Params) -> None: +def _mutate_bit_flip( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 1: raise common.OutOfDataError assert isinstance(rand.byte_pos, util.AdaptiveRange) @@ -73,7 +93,11 @@ def _mutate_bit_flip(res: bytearray, rand: Params) -> None: res[byte_pos] ^= 1 << bit_pos -def _mutate_flip_random_bits_of_random_byte(res: bytearray, rand: Params) -> None: +def _mutate_flip_random_bits_of_random_byte( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 1: raise common.OutOfDataError assert isinstance(rand.pos, util.AdaptiveRange) @@ -82,7 +106,11 @@ def _mutate_flip_random_bits_of_random_byte(res: bytearray, rand: Params) -> Non res[pos] ^= rand.value.sample(0, 255) -def _mutate_swap_two_bytes(res: bytearray, rand: Params) -> None: +def _mutate_swap_two_bytes( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 2: raise common.OutOfDataError assert isinstance(rand.first_pos, util.AdaptiveRange) @@ -92,7 +120,11 @@ def _mutate_swap_two_bytes(res: bytearray, rand: Params) -> None: res[first_pos], res[second_pos] = res[second_pos], res[first_pos] -def _mutate_add_subtract_from_a_byte(res: bytearray, rand: Params) -> None: +def _mutate_add_subtract_from_a_byte( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 1: raise common.OutOfDataError assert isinstance(rand.pos, util.AdaptiveRange) @@ -102,7 +134,11 @@ def _mutate_add_subtract_from_a_byte(res: bytearray, rand: Params) -> None: res[pos] = (res[pos] + v_int) % 256 -def _mutate_add_subtract_from_a_uint16(res: bytearray, rand: Params) -> None: +def _mutate_add_subtract_from_a_uint16( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 2: raise common.OutOfDataError assert isinstance(rand.pos, util.AdaptiveRange) @@ -116,7 +152,11 @@ def _mutate_add_subtract_from_a_uint16(res: bytearray, rand: Params) -> None: res[pos + 1] = (res[pos + 1] + v[1]) % 256 -def _mutate_add_subtract_from_a_uint32(res: bytearray, rand: Params) -> None: +def _mutate_add_subtract_from_a_uint32( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 4: raise common.OutOfDataError assert isinstance(rand.pos, util.AdaptiveRange) @@ -131,7 +171,11 @@ def _mutate_add_subtract_from_a_uint32(res: bytearray, rand: Params) -> None: res[pos + 3] = (res[pos + 3] + v[3]) % 256 -def _mutate_add_subtract_from_a_uint64(res: bytearray, rand: Params) -> None: +def _mutate_add_subtract_from_a_uint64( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 8: raise common.OutOfDataError assert isinstance(rand.pos, util.AdaptiveRange) @@ -150,7 +194,11 @@ def _mutate_add_subtract_from_a_uint64(res: bytearray, rand: Params) -> None: res[pos + 7] = (res[pos + 7] + v[7]) % 256 -def _mutate_replace_a_byte_with_an_interesting_value(res: bytearray, rand: Params) -> None: +def _mutate_replace_a_byte_with_an_interesting_value( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 1: raise common.OutOfDataError assert isinstance(rand.pos, util.AdaptiveRange) @@ -159,7 +207,11 @@ def _mutate_replace_a_byte_with_an_interesting_value(res: bytearray, rand: Param res[pos] = rand.interesting_8.sample() -def _mutate_replace_an_uint16_with_an_interesting_value(res: bytearray, rand: Params) -> None: +def _mutate_replace_an_uint16_with_an_interesting_value( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 2: raise common.OutOfDataError assert isinstance(rand.pos, util.AdaptiveRange) @@ -172,7 +224,11 @@ def _mutate_replace_an_uint16_with_an_interesting_value(res: bytearray, rand: Pa res[pos + 1] = v[1] % 256 -def _mutate_replace_an_uint32_with_an_interesting_value(res: bytearray, rand: Params) -> None: +def _mutate_replace_an_uint32_with_an_interesting_value( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: if len(res) < 4: raise common.OutOfDataError assert isinstance(rand.pos, util.AdaptiveRange) @@ -187,7 +243,11 @@ def _mutate_replace_an_uint32_with_an_interesting_value(res: bytearray, rand: Pa res[pos + 3] = v[3] % 256 -def _mutate_replace_an_ascii_digit_with_another_digit(res: bytearray, rand: Params) -> None: +def _mutate_replace_an_ascii_digit_with_another_digit( + res: bytearray, + rand: Params, + _inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: digits_present = [i for i in range(len(res)) if ord("0") <= res[i] <= ord("9")] if len(digits_present) < 1: raise common.OutOfDataError @@ -197,6 +257,24 @@ def _mutate_replace_an_ascii_digit_with_another_digit(res: bytearray, rand: Para res[pos] = rand.digits.sample() +def _mutate_splice( + res: bytearray, + rand: Params, + inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None, +) -> None: + if len(res) < 1: + raise common.OutOfDataError + assert inputs is not None + right = inputs.sample() + if len(right) < 1: + raise common.OutOfDataError + assert isinstance(rand.left_pos, util.AdaptiveRange) + assert isinstance(rand.right_pos, util.AdaptiveRange) + res[rand.left_pos.sample(0, len(res) - 1) + 1 :] = right[ + rand.right_pos.sample(0, len(right) - 1) : + ] + + class Mutator: def __init__( self, @@ -210,7 +288,7 @@ def __init__( self._max_modifications = max_modifications self._modifications = util.AdaptiveRange(adaptive=adaptive) self._mutators: util.AdaptiveChoiceBase[ - tuple[Callable[[bytearray, Params], None], Params] + tuple[Callable[[bytearray, Params, util.AdaptiveChoiceBase[bytearray]], None], Params] ] = util.AdaptiveChoiceBase( population=[ ( @@ -350,6 +428,13 @@ def __init__( ), ), ), + ( + _mutate_splice, + Params( + left_pos=util.AdaptiveRange(adaptive=adaptive), + right_pos=util.AdaptiveRange(adaptive=adaptive), + ), + ), ], ) self._last_rands: Optional[Params] = None @@ -360,7 +445,7 @@ def _mutate(self, buf: bytearray) -> bytearray: while nm: modify, self._last_rands = self._mutators.sample() try: - modify(res, self._last_rands) + modify(res, self._last_rands, self._inputs) except common.OutOfDataError: pass else: diff --git a/tests/unit/test_mutator.py b/tests/unit/test_mutator.py index e9404e0..f161993 100644 --- a/tests/unit/test_mutator.py +++ b/tests/unit/test_mutator.py @@ -37,7 +37,7 @@ def sample(self) -> int: def test_mutate(monkeypatch: pytest.MonkeyPatch) -> None: - def modify(data: bytearray, _: mutator.Params) -> None: + def modify(data: bytearray, _m: mutator.Params, _i: util.AdaptiveChoiceBase[bytearray]) -> None: data.insert(0, ord("a")) data.append(ord("b")) @@ -51,7 +51,7 @@ def modify(data: bytearray, _: mutator.Params) -> None: def test_mutate_unmodified(monkeypatch: pytest.MonkeyPatch) -> None: m = mutator.Mutator() - def modify(data: bytearray, _: mutator.Params) -> None: + def modify(data: bytearray, _m: mutator.Params, _i: util.AdaptiveChoiceBase[bytearray]) -> None: if data[0] != 0: data[0] = 0 @@ -64,7 +64,11 @@ def modify(data: bytearray, _: mutator.Params) -> None: def test_mutate_truncated(monkeypatch: pytest.MonkeyPatch) -> None: m = mutator.Mutator(max_input_size=4) with monkeypatch.context() as mp: - mp.setattr(m, "_mutators", util.AdaptiveChoiceBase([(lambda _data, _: None, None)])) + mp.setattr( + m, + "_mutators", + util.AdaptiveChoiceBase([(lambda _data, _m, _i: (None, None, None), None)]), + ) assert m._mutate(bytearray(b"0123456789")) == bytearray(b"0123") @@ -611,6 +615,50 @@ def test_mutate_replace_an_ascii_digit_with_another_digit_success( assert tmp == expected +def test_mutate_splice_fail_left() -> None: + res = bytearray() + with pytest.raises(common.OutOfDataError): + mutator._mutate_splice(res, mutator.Params()) + + +def test_mutate_splice_fail_right() -> None: + res = bytearray(b"deadbeef") + with pytest.raises(common.OutOfDataError): + mutator._mutate_splice( + res, + mutator.Params(), + util.AdaptiveChoiceBase(population=[bytearray()]), + ) + + +@pytest.mark.parametrize( + ("left", "left_pos", "right", "right_pos", "expected"), + [ + (b"0123456789", 9, b"ABCDEFGHIJ", 0, b"0123456789ABCDEFGHIJ"), + (b"0123456789", 5, b"ABCDEFGHIJ", 5, b"012345FGHIJ"), + (b"0123456789", 0, b"ABCDEFGHIJ", 9, b"0J"), + ], +) +def test_mutate_splice( + left: bytes, + left_pos: int, + right: bytes, + right_pos: int, + expected: bytes, +) -> None: + tmp = bytearray(left) + + mutator._mutate_splice( + tmp, + mutator.Params( + left_pos=StaticRand(left_pos), + right_pos=StaticRand(right_pos), + ), + util.AdaptiveChoiceBase(population=[bytearray(right)]), + ) + assert tmp == bytearray(expected) + + def test_params_invalid() -> None: p = mutator.Params() with pytest.raises(AttributeError, match="^'Params' object has no attribute '_invalid'$"): @@ -637,7 +685,11 @@ def test_params_update() -> None: def test_mutator_detect_out_of_data_error(monkeypatch: pytest.MonkeyPatch) -> None: fail = True - def raise_out_of_data(_res: bytearray, _params: mutator.Params) -> None: + def raise_out_of_data( + _res: bytearray, + _params: mutator.Params, + _i: util.AdaptiveChoiceBase[bytearray], + ) -> None: nonlocal fail if fail: fail = False @@ -657,7 +709,11 @@ def raise_out_of_data(_res: bytearray, _params: mutator.Params) -> None: def test_mutator_update(monkeypatch: pytest.MonkeyPatch) -> None: - def mutate_noop(_res: bytearray, _params: mutator.Params) -> None: + def mutate_noop( + _res: bytearray, + _params: mutator.Params, + _i: util.AdaptiveChoiceBase[bytearray], + ) -> None: pass with monkeypatch.context() as mp: