Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't store duplicate events in the notice queue #1372

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions ops/_private/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ def __init__(
self._meta,
self._model,
juju_debug_at=self._juju_context.debug_at,
# Harness tests will often have defer() usage without 'purging' the
# deferred handler with reemit(), but still expect the next emit()
# to result in a call, so we can't safely skip duplicate events.
# When behaviour matching production is required, Scenario tests
# should be used instead.
skip_duplicate_events=False,
)

warnings.warn(
Expand Down
63 changes: 57 additions & 6 deletions ops/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ def __init__(
model: 'Model',
event_name: Optional[str] = None,
juju_debug_at: Optional[Set[str]] = None,
skip_duplicate_events: bool = True,
):
super().__init__(self, None)

Expand All @@ -624,6 +625,7 @@ def __init__(
self._event_name = event_name
self.meta = meta
self.model = model
self.skip_duplicate_events = skip_duplicate_events
# [(observer_path, method_name, parent_path, event_key)]
self._observers: _ObserverPath = []
# {observer_path: observing Object}
Expand Down Expand Up @@ -719,15 +721,15 @@ def register_type(
self._type_registry[parent_path, kind_] = cls
self._type_known.add(cls)

def save_snapshot(self, value: Union['StoredStateData', 'EventBase']):
"""Save a persistent snapshot of the provided value."""
def _validate_snapshot_data(
self, value: Union['StoredStateData', 'EventBase'], data: Dict[str, Any]
):
if type(value) not in self._type_known:
raise RuntimeError(
f'cannot save {type(value).__name__} values before registering that type'
)
data = value.snapshot()

# Use marshal as a validator, enforcing the use of simple types, as we later the
# Use marshal as a validator, enforcing the use of simple types, as later the
# information is really pickled, which is too error-prone for future evolution of the
# stored data (e.g. if the developer stores a custom object and later changes its
# class name; when unpickling the original class will not be there and event
Expand All @@ -738,6 +740,10 @@ def save_snapshot(self, value: Union['StoredStateData', 'EventBase']):
msg = 'unable to save the data for {}, it must contain only simple types: {!r}'
raise ValueError(msg.format(value.__class__.__name__, data)) from None

def save_snapshot(self, value: Union['StoredStateData', 'EventBase']):
"""Save a persistent snapshot of the provided value."""
data = value.snapshot()
self._validate_snapshot_data(value, data)
self._storage.save_snapshot(value.handle.path, data)

def load_snapshot(self, handle: Handle) -> Serializable:
Expand Down Expand Up @@ -831,6 +837,35 @@ def _next_event_key(self) -> str:
self._stored['event_count'] += 1
return str(self._stored['event_count'])

def _event_is_in_storage(
self, observer_path: str, method_name: str, event_path: str, event_data: Dict[str, Any]
) -> bool:
"""Check if there is already a notice with the same snapshot in the storage."""
# Check all the notices to see if there is one that is the same other
# than the event ID.
for (
existing_event_path,
existing_observer_path,
existing_method_name,
) in self._storage.notices():
if (
existing_observer_path != observer_path
or existing_method_name != method_name
dimaqq marked this conversation as resolved.
Show resolved Hide resolved
# The notices all have paths that include [id] at the end. If one
# was somehow missing, then the split would be the empty string and
# match anyway.
or existing_event_path.split('[')[0] != event_path.split('[')[0]
tonyandrewmeyer marked this conversation as resolved.
Show resolved Hide resolved
):
continue
# Check if the snapshot for this notice is the same.
try:
existing_event_data = self._storage.load_snapshot(existing_event_path)
except NoSnapshotError:
existing_event_data = {}
dimaqq marked this conversation as resolved.
Show resolved Hide resolved
if event_data == existing_event_data:
return True
return False

def _emit(self, event: EventBase):
"""See BoundEvent.emit for the public way to call this."""
saved = False
Expand All @@ -839,17 +874,33 @@ def _emit(self, event: EventBase):
parent = event.handle.parent
assert isinstance(parent, Handle), 'event handle must have a parent'
parent_path = parent.path
this_event_data = event.snapshot()
self._validate_snapshot_data(event, this_event_data)
# TODO Track observers by (parent_path, event_kind) rather than as a list of
# all observers. Avoiding linear search through all observers for every event
# all observers. Avoiding linear search through all observers for every event
for observer_path, method_name, _parent_path, _event_kind in self._observers:
if _parent_path != parent_path:
continue
if _event_kind and _event_kind != event_kind:
continue
if self.skip_duplicate_events and self._event_is_in_storage(
observer_path, method_name, event_path, this_event_data
):
logger.info(
'Skipping notice (%s/%s/%s) - already in the queue.',
event_path,
observer_path,
method_name,
)
# We don't need to save a new notice and snapshot, but we do
# want the event to run, because it has been saved previously
# and not completed.
saved = True
continue
if not saved:
# Save the event for all known observers before the first notification
# takes place, so that either everyone interested sees it, or nobody does.
self.save_snapshot(event)
self._storage.save_snapshot(event.handle.path, this_event_data)
saved = True
# Again, only commit this after all notices are saved.
self._storage.save_notice(event_path, observer_path, method_name)
Expand Down
127 changes: 121 additions & 6 deletions test/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,15 @@ def test_defer_and_reemit(self, request: pytest.FixtureRequest):
framework = create_framework(request)

class MyEvent(ops.EventBase):
pass
def __init__(self, handle: ops.Handle, data: str):
super().__init__(handle)
self.data: str = data

def restore(self, snapshot: typing.Dict[str, typing.Any]):
self.data = typing.cast(str, snapshot['data'])

def snapshot(self) -> typing.Dict[str, typing.Any]:
return {'data': self.data}

class MyNotifier1(ops.Object):
a = ops.EventSource(MyEvent)
Expand Down Expand Up @@ -404,18 +412,18 @@ def on_any(self, event: ops.EventBase):
framework.observe(pub1.b, obs2.on_any)
framework.observe(pub2.c, obs2.on_any)

pub1.a.emit()
pub1.b.emit()
pub2.c.emit()
pub1.a.emit('a')
pub1.b.emit('b')
pub2.c.emit('c')

# Events remain stored because they were deferred.
# Events remain stored because they were deferred (and distinct).
ev_a_handle = ops.Handle(pub1, 'a', '1')
framework.load_snapshot(ev_a_handle)
ev_b_handle = ops.Handle(pub1, 'b', '2')
framework.load_snapshot(ev_b_handle)
ev_c_handle = ops.Handle(pub2, 'c', '3')
framework.load_snapshot(ev_c_handle)
# make sure the objects are gone before we reemit them
# Make sure the objects are gone before we reemit them.
gc.collect()

framework.reemit()
Expand All @@ -439,6 +447,113 @@ def on_any(self, event: ops.EventBase):
pytest.raises(NoSnapshotError, framework.load_snapshot, ev_b_handle)
pytest.raises(NoSnapshotError, framework.load_snapshot, ev_c_handle)

def test_repeated_defer(self, request: pytest.FixtureRequest):
framework = create_framework(request)

class MyEvent(ops.EventBase):
data: typing.Optional[str] = None

class MyDataEvent(MyEvent):
def __init__(self, handle: ops.Handle, data: str):
super().__init__(handle)
self.data: typing.Optional[str] = data

def restore(self, snapshot: typing.Dict[str, typing.Any]):
self.data = typing.cast(typing.Optional[str], snapshot['data'])

def snapshot(self) -> typing.Dict[str, typing.Any]:
return {'data': self.data}

class ReleaseEvent(ops.EventBase):
pass

class MyNotifier(ops.Object):
n = ops.EventSource(MyEvent)
d = ops.EventSource(MyDataEvent)
r = ops.EventSource(ReleaseEvent)

class MyObserver(ops.Object):
def __init__(self, parent: ops.Object, key: str):
super().__init__(parent, key)
self.defer_all = True

def stop_deferring(self, _: MyEvent):
self.defer_all = False

def on_any(self, event: MyEvent):
if self.defer_all:
event.defer()

pub = MyNotifier(framework, 'n')
obs1 = MyObserver(framework, '1')
obs2 = MyObserver(framework, '2')

framework.observe(pub.n, obs1.on_any)
framework.observe(pub.n, obs2.on_any)
framework.observe(pub.d, obs1.on_any)
framework.observe(pub.d, obs2.on_any)
framework.observe(pub.r, obs1.stop_deferring)

# Emit an event, which will be deferred.
pub.d.emit('foo')
notices = tuple(framework._storage.notices())
assert len(notices) == 2 # One per observer.
assert framework._storage.load_snapshot(notices[0][0]) == {'data': 'foo'}

# Emit the same event, and we'll still just have the single notice.
pub.d.emit('foo')
assert len(tuple(framework._storage.notices())) == 2

# Emit the same event kind but with a different snapshot, and we'll get a new notice.
pub.d.emit('bar')
notices = tuple(framework._storage.notices())
assert len(notices) == 4
assert framework._storage.load_snapshot(notices[2][0]) == {'data': 'bar'}

# Emit a totally different event, and we'll get a new notice.
pub.n.emit()
notices = tuple(framework._storage.notices())
assert len(notices) == 6
assert framework._storage.load_snapshot(notices[2][0]) == {'data': 'bar'}
assert framework._storage.load_snapshot(notices[4][0]) == {}

# Even though these events are far back in the queue, since they're
# duplicates, they will get skipped.
pub.d.emit('foo')
pub.d.emit('bar')
pub.n.emit()
assert len(tuple(framework._storage.notices())) == 6

def notices_for_observer(n: int):
return [
notice for notice in framework._storage.notices() if notice[1].endswith(f'[{n}]')
]

# Stop deferring on the first observer, and all those events will be
# completed and the notices removed, while the second observer will
# still have them queued.
pub.r.emit()
assert len(tuple(framework._storage.notices())) == 6
pub.n.emit()
framework.reemit()
assert len(notices_for_observer(1)) == 0
assert len(notices_for_observer(2)) == 3

# Without the defer active, the first observer always ends up with an
# empty queue, while the second observer's queue continues to skip
# duplicates and add new events.
pub.d.emit('foo')
pub.d.emit('foo')
pub.d.emit('bar')
pub.n.emit()
pub.d.emit('foo')
pub.d.emit('bar')
pub.n.emit()
pub.d.emit('baz')
framework.reemit()
assert len(notices_for_observer(1)) == 0
assert len(notices_for_observer(2)) == 4
Comment on lines +497 to +555
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider refactoring the setup and splitting this flow into separate tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that what this is testing is the behaviour over multiple types of events, some deferred, some not, some running previously run deferred events. It's all about checking whether the state is correct over a long sequence of changes.

I could split it - for example, each comment could be the start of a new test. But the problem there is that each test would either have to do everything in the previous test to get the database into the correct state (which means it'd just be the same plus multiple shorter versions as well) or I'd need to manually set up the database in all but the first test, to have the expected data. That seems worse and more fragile to me.

I could split it into two tests, one for each type of observer, but part of the test is making sure that when there are two observers that are properly handled separately, and it feels like splitting the test would make it easier to break that.

I definitely agree that the test is very long, and I don't like that.

Would it help if there were some extra helper functions so that something like:

pub.d.emit('foo')
pub.d.emit('foo')
pub.d.emit('foo')
pub.n.emit()

Was something like this?

emit([(d, 'foo')], repeat=3)
emit([(n)])

It would reduce the number of lines, but my feeling is that it would end up less understandable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not what I was trying to point out.

How about something like below?

def test_foo():
        pub.d.emit('foo')
        notices = tuple(framework._storage.notices())
        assert len(notices) == 2  # One per observer.
        assert framework._storage.load_snapshot(notices[0][0]) == {'data': 'foo'}


def test_bar():
        some.reused.setup()
        # Emit the same event, and we'll still just have the single notice.
        pub.d.emit('foo')
        assert len(tuple(framework._storage.notices())) == 2


def test_custom_event_data(self, request: pytest.FixtureRequest):
framework = create_framework(request)

Expand Down
Loading