Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement splicing mutation #40

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Catch and report internal worker errors
- Implement adaptive random fuzzing parameter selection (#24)
- Splicing mutation (#23)

### Fixed

Expand Down
119 changes: 102 additions & 17 deletions cobrafuzz/mutator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ def update(self, success: bool = False) -> None:
rand.update(success=success)


def _mutate_remove_range_of_bytes(res: bytearray, rand: Params) -> None:
def _mutate_remove_range_of_bytes(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 2:
raise common.OutOfDataError
assert isinstance(rand.length, util.AdaptiveRange)
Expand All @@ -30,7 +34,11 @@ def _mutate_remove_range_of_bytes(res: bytearray, rand: Params) -> None:
util.remove(data=res, start=rand.start.sample(0, len(res) - length), length=length)


def _mutate_insert_range_of_bytes(res: bytearray, rand: Params) -> None:
def _mutate_insert_range_of_bytes(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
assert isinstance(rand.length, util.AdaptiveRange)
assert isinstance(rand.start, util.AdaptiveRange)
assert isinstance(rand.data, util.AdaptiveRange)
Expand All @@ -39,7 +47,11 @@ def _mutate_insert_range_of_bytes(res: bytearray, rand: Params) -> None:
util.insert(data=res, start=rand.start.sample(0, len(res)), data_to_insert=data)


def _mutate_duplicate_range_of_bytes(res: bytearray, rand: Params) -> None:
def _mutate_duplicate_range_of_bytes(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 2:
raise common.OutOfDataError
assert isinstance(rand.src_pos, util.AdaptiveRange)
Expand All @@ -51,7 +63,11 @@ def _mutate_duplicate_range_of_bytes(res: bytearray, rand: Params) -> None:
util.insert(res, dst_pos, res[src_pos : src_pos + length])


def _mutate_copy_range_of_bytes(res: bytearray, rand: Params) -> None:
def _mutate_copy_range_of_bytes(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 2:
raise common.OutOfDataError
assert isinstance(rand.src_pos, util.AdaptiveRange)
Expand All @@ -63,7 +79,11 @@ def _mutate_copy_range_of_bytes(res: bytearray, rand: Params) -> None:
util.copy(res, src_pos, dst_pos, length)


def _mutate_bit_flip(res: bytearray, rand: Params) -> None:
def _mutate_bit_flip(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 1:
raise common.OutOfDataError
assert isinstance(rand.byte_pos, util.AdaptiveRange)
Expand All @@ -73,7 +93,11 @@ def _mutate_bit_flip(res: bytearray, rand: Params) -> None:
res[byte_pos] ^= 1 << bit_pos


def _mutate_flip_random_bits_of_random_byte(res: bytearray, rand: Params) -> None:
def _mutate_flip_random_bits_of_random_byte(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 1:
raise common.OutOfDataError
assert isinstance(rand.pos, util.AdaptiveRange)
Expand All @@ -82,7 +106,11 @@ def _mutate_flip_random_bits_of_random_byte(res: bytearray, rand: Params) -> Non
res[pos] ^= rand.value.sample(0, 255)


def _mutate_swap_two_bytes(res: bytearray, rand: Params) -> None:
def _mutate_swap_two_bytes(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 2:
raise common.OutOfDataError
assert isinstance(rand.first_pos, util.AdaptiveRange)
Expand All @@ -92,7 +120,11 @@ def _mutate_swap_two_bytes(res: bytearray, rand: Params) -> None:
res[first_pos], res[second_pos] = res[second_pos], res[first_pos]


def _mutate_add_subtract_from_a_byte(res: bytearray, rand: Params) -> None:
def _mutate_add_subtract_from_a_byte(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 1:
raise common.OutOfDataError
assert isinstance(rand.pos, util.AdaptiveRange)
Expand All @@ -102,7 +134,11 @@ def _mutate_add_subtract_from_a_byte(res: bytearray, rand: Params) -> None:
res[pos] = (res[pos] + v_int) % 256


def _mutate_add_subtract_from_a_uint16(res: bytearray, rand: Params) -> None:
def _mutate_add_subtract_from_a_uint16(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 2:
raise common.OutOfDataError
assert isinstance(rand.pos, util.AdaptiveRange)
Expand All @@ -116,7 +152,11 @@ def _mutate_add_subtract_from_a_uint16(res: bytearray, rand: Params) -> None:
res[pos + 1] = (res[pos + 1] + v[1]) % 256


def _mutate_add_subtract_from_a_uint32(res: bytearray, rand: Params) -> None:
def _mutate_add_subtract_from_a_uint32(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 4:
raise common.OutOfDataError
assert isinstance(rand.pos, util.AdaptiveRange)
Expand All @@ -131,7 +171,11 @@ def _mutate_add_subtract_from_a_uint32(res: bytearray, rand: Params) -> None:
res[pos + 3] = (res[pos + 3] + v[3]) % 256


def _mutate_add_subtract_from_a_uint64(res: bytearray, rand: Params) -> None:
def _mutate_add_subtract_from_a_uint64(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 8:
raise common.OutOfDataError
assert isinstance(rand.pos, util.AdaptiveRange)
Expand All @@ -150,7 +194,11 @@ def _mutate_add_subtract_from_a_uint64(res: bytearray, rand: Params) -> None:
res[pos + 7] = (res[pos + 7] + v[7]) % 256


def _mutate_replace_a_byte_with_an_interesting_value(res: bytearray, rand: Params) -> None:
def _mutate_replace_a_byte_with_an_interesting_value(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 1:
raise common.OutOfDataError
assert isinstance(rand.pos, util.AdaptiveRange)
Expand All @@ -159,7 +207,11 @@ def _mutate_replace_a_byte_with_an_interesting_value(res: bytearray, rand: Param
res[pos] = rand.interesting_8.sample()


def _mutate_replace_an_uint16_with_an_interesting_value(res: bytearray, rand: Params) -> None:
def _mutate_replace_an_uint16_with_an_interesting_value(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 2:
raise common.OutOfDataError
assert isinstance(rand.pos, util.AdaptiveRange)
Expand All @@ -172,7 +224,11 @@ def _mutate_replace_an_uint16_with_an_interesting_value(res: bytearray, rand: Pa
res[pos + 1] = v[1] % 256


def _mutate_replace_an_uint32_with_an_interesting_value(res: bytearray, rand: Params) -> None:
def _mutate_replace_an_uint32_with_an_interesting_value(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 4:
raise common.OutOfDataError
assert isinstance(rand.pos, util.AdaptiveRange)
Expand All @@ -187,7 +243,11 @@ def _mutate_replace_an_uint32_with_an_interesting_value(res: bytearray, rand: Pa
res[pos + 3] = v[3] % 256


def _mutate_replace_an_ascii_digit_with_another_digit(res: bytearray, rand: Params) -> None:
def _mutate_replace_an_ascii_digit_with_another_digit(
res: bytearray,
rand: Params,
_inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
digits_present = [i for i in range(len(res)) if ord("0") <= res[i] <= ord("9")]
if len(digits_present) < 1:
raise common.OutOfDataError
Expand All @@ -197,6 +257,24 @@ def _mutate_replace_an_ascii_digit_with_another_digit(res: bytearray, rand: Para
res[pos] = rand.digits.sample()


def _mutate_splice(
res: bytearray,
rand: Params,
inputs: Optional[util.AdaptiveChoiceBase[bytearray]] = None,
) -> None:
if len(res) < 1:
raise common.OutOfDataError
assert inputs is not None
right = inputs.sample()
if len(right) < 1:
raise common.OutOfDataError
assert isinstance(rand.left_pos, util.AdaptiveRange)
assert isinstance(rand.right_pos, util.AdaptiveRange)
res[rand.left_pos.sample(0, len(res) - 1) + 1 :] = right[
rand.right_pos.sample(0, len(right) - 1) :
]


class Mutator:
def __init__(
self,
Expand All @@ -210,7 +288,7 @@ def __init__(
self._max_modifications = max_modifications
self._modifications = util.AdaptiveRange(adaptive=adaptive)
self._mutators: util.AdaptiveChoiceBase[
tuple[Callable[[bytearray, Params], None], Params]
tuple[Callable[[bytearray, Params, util.AdaptiveChoiceBase[bytearray]], None], Params]
] = util.AdaptiveChoiceBase(
population=[
(
Expand Down Expand Up @@ -350,6 +428,13 @@ def __init__(
),
),
),
(
_mutate_splice,
Params(
left_pos=util.AdaptiveRange(adaptive=adaptive),
right_pos=util.AdaptiveRange(adaptive=adaptive),
),
),
],
)
self._last_rands: Optional[Params] = None
Expand All @@ -360,7 +445,7 @@ def _mutate(self, buf: bytearray) -> bytearray:
while nm:
modify, self._last_rands = self._mutators.sample()
try:
modify(res, self._last_rands)
modify(res, self._last_rands, self._inputs)
except common.OutOfDataError:
pass
else:
Expand Down
66 changes: 61 additions & 5 deletions tests/unit/test_mutator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def sample(self) -> int:


def test_mutate(monkeypatch: pytest.MonkeyPatch) -> None:
def modify(data: bytearray, _: mutator.Params) -> None:
def modify(data: bytearray, _m: mutator.Params, _i: util.AdaptiveChoiceBase[bytearray]) -> None:
data.insert(0, ord("a"))
data.append(ord("b"))

Expand All @@ -51,7 +51,7 @@ def modify(data: bytearray, _: mutator.Params) -> None:
def test_mutate_unmodified(monkeypatch: pytest.MonkeyPatch) -> None:
m = mutator.Mutator()

def modify(data: bytearray, _: mutator.Params) -> None:
def modify(data: bytearray, _m: mutator.Params, _i: util.AdaptiveChoiceBase[bytearray]) -> None:
if data[0] != 0:
data[0] = 0

Expand All @@ -64,7 +64,11 @@ def modify(data: bytearray, _: mutator.Params) -> None:
def test_mutate_truncated(monkeypatch: pytest.MonkeyPatch) -> None:
m = mutator.Mutator(max_input_size=4)
with monkeypatch.context() as mp:
mp.setattr(m, "_mutators", util.AdaptiveChoiceBase([(lambda _data, _: None, None)]))
mp.setattr(
m,
"_mutators",
util.AdaptiveChoiceBase([(lambda _data, _m, _i: (None, None, None), None)]),
)
assert m._mutate(bytearray(b"0123456789")) == bytearray(b"0123")


Expand Down Expand Up @@ -611,6 +615,50 @@ def test_mutate_replace_an_ascii_digit_with_another_digit_success(
assert tmp == expected


def test_mutate_splice_fail_left() -> None:
res = bytearray()
with pytest.raises(common.OutOfDataError):
mutator._mutate_splice(res, mutator.Params())


def test_mutate_splice_fail_right() -> None:
res = bytearray(b"deadbeef")
with pytest.raises(common.OutOfDataError):
mutator._mutate_splice(
res,
mutator.Params(),
util.AdaptiveChoiceBase(population=[bytearray()]),
)


@pytest.mark.parametrize(
("left", "left_pos", "right", "right_pos", "expected"),
[
(b"0123456789", 9, b"ABCDEFGHIJ", 0, b"0123456789ABCDEFGHIJ"),
(b"0123456789", 5, b"ABCDEFGHIJ", 5, b"012345FGHIJ"),
(b"0123456789", 0, b"ABCDEFGHIJ", 9, b"0J"),
],
)
def test_mutate_splice(
left: bytes,
left_pos: int,
right: bytes,
right_pos: int,
expected: bytes,
) -> None:
tmp = bytearray(left)

mutator._mutate_splice(
tmp,
mutator.Params(
left_pos=StaticRand(left_pos),
right_pos=StaticRand(right_pos),
),
util.AdaptiveChoiceBase(population=[bytearray(right)]),
)
assert tmp == bytearray(expected)


def test_params_invalid() -> None:
p = mutator.Params()
with pytest.raises(AttributeError, match="^'Params' object has no attribute '_invalid'$"):
Expand All @@ -637,7 +685,11 @@ def test_params_update() -> None:
def test_mutator_detect_out_of_data_error(monkeypatch: pytest.MonkeyPatch) -> None:
fail = True

def raise_out_of_data(_res: bytearray, _params: mutator.Params) -> None:
def raise_out_of_data(
_res: bytearray,
_params: mutator.Params,
_i: util.AdaptiveChoiceBase[bytearray],
) -> None:
nonlocal fail
if fail:
fail = False
Expand All @@ -657,7 +709,11 @@ def raise_out_of_data(_res: bytearray, _params: mutator.Params) -> None:


def test_mutator_update(monkeypatch: pytest.MonkeyPatch) -> None:
def mutate_noop(_res: bytearray, _params: mutator.Params) -> None:
def mutate_noop(
_res: bytearray,
_params: mutator.Params,
_i: util.AdaptiveChoiceBase[bytearray],
) -> None:
pass

with monkeypatch.context() as mp:
Expand Down
Loading