Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add concept of StatelessEventContext #2999

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 29 additions & 74 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

from frozendict import frozendict


class EventContext(object):
class StatelessEventContext(object):
"""
Attributes:
current_state_ids (dict[(str, str), str]):
The current state map including the current event.
(type, state_key) -> event_id

prev_state_ids (dict[(str, str), str]):
The current state map excluding the current event.
(type, state_key) -> event_id

state_group (int|None): state group id, if the state has been stored
as a state group. This is usually only None if e.g. the event is
an outlier.
Expand All @@ -40,76 +28,48 @@ class EventContext(object):

prev_group (int): Previously persisted state group. ``None`` for an
outlier.
delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
(type, state_key) -> event_id. ``None`` for an outlier.

prev_state_events (?): XXX: is this ever set to anything other than
the empty list?
"""

__slots__ = [
"current_state_ids",
"prev_state_ids",
"state_group",
"rejected",
"prev_group",
"delta_ids",
"prev_state_events",
"app_service",
]

def __init__(self):
# The current state including the current event
self.current_state_ids = None
# The current state excluding the current event
self.prev_state_ids = None
self.state_group = None

self.rejected = False

# A previously persisted state group and a delta between that
# and this state.
self.prev_group = None
self.delta_ids = None

self.prev_state_events = None

self.app_service = None

def serialize(self, event):
def serialize(self):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`

Args:
event (FrozenEvent): The event that this context relates to

Returns:
dict
"""

# We don't serialize the full state dicts, instead they get pulled out
# of the DB on the other side. However, the other side can't figure out
# the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state.
if event.is_state():
prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
else:
prev_state_id = None

return {
"prev_state_id": prev_state_id,
"event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None,
"state_group": self.state_group,
"rejected": self.rejected,
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
}

@staticmethod
@defer.inlineCallbacks
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
Expand All @@ -121,52 +81,47 @@ def deserialize(store, input):
Returns:
EventContext
"""
context = EventContext()
context = StatelessEventContext()
context.state_group = input["state_group"]
context.rejected = input["rejected"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]

# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
prev_state_id = input["prev_state_id"]
event_type = input["event_type"]
event_state_key = input["event_state_key"]

context.current_state_ids = yield store.get_state_ids_for_group(
context.state_group,
)
if prev_state_id and event_state_key:
context.prev_state_ids = dict(context.current_state_ids)
context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
else:
context.prev_state_ids = context.current_state_ids

app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = store.get_app_service_by_id(app_service_id)

defer.returnValue(context)
return context


def _encode_state_dict(state_dict):
"""Since dicts of (type, state_key) -> event_id cannot be serialized in
JSON we need to convert them to a form that can.
class EventContext(StatelessEventContext):
"""
if state_dict is None:
return None
Attributes:
current_state_ids (dict[(str, str), str]):
The current state map including the current event.
(type, state_key) -> event_id

return [
(etype, state_key, v)
for (etype, state_key), v in state_dict.iteritems()
]
prev_state_ids (dict[(str, str), str]):
The current state map excluding the current event.
(type, state_key) -> event_id

delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
(type, state_key) -> event_id. ``None`` for an outlier.

def _decode_state_dict(input):
"""Decodes a state dict encoded using `_encode_state_dict` above
"""
if input is None:
return None

return frozendict({(etype, state_key,): v for etype, state_key, v in input})
__slots__ = [
"current_state_ids",
"prev_state_ids",
"delta_ids",
]

def __init__(self):
# The current state including the current event
self.current_state_ids = None
# The current state excluding the current event
self.prev_state_ids = None

self.delta_ids = None

super(EventContext, self).__init__()
14 changes: 12 additions & 2 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
Expand Down Expand Up @@ -665,7 +666,7 @@ def handle_new_client_event(
Args:
requester (Requester)
event (FrozenEvent)
context (EventContext)
context (StatelessEventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""
Expand Down Expand Up @@ -763,9 +764,18 @@ def is_inviter_member_event(e):
e.sender == event.sender
)

# We get the current state at the event. If we have a full
# EventContext, use it, otherwise we hit the DB.
if isinstance(context, EventContext):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isintance is a bit nasty. Can we do something like get_state_ids() which is defined to return None if they are unknown?

current_state_ids = context.current_state_ids
else:
current_state_ids = yield self.store.get_state_ids_for_group(
context.state_group,
)

state_to_include_ids = [
e_id
for k, e_id in context.current_state_ids.iteritems()
for k, e_id in current_state_ids.iteritems()
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
Expand Down
10 changes: 6 additions & 4 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
SynapseError, MatrixCodeMessageException, CodeMessageException,
)
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.events.snapshot import StatelessEventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
Expand All @@ -44,7 +44,7 @@ def send_event_to_master(client, host, port, requester, event, context,
port (int): port on master listening for HTTP replication
requester (Requester)
event (FrozenEvent)
context (EventContext)
context (StatelessEventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""
Expand All @@ -56,7 +56,7 @@ def send_event_to_master(client, host, port, requester, event, context,
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": context.serialize(event),
"context": context.serialize(),
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
Expand Down Expand Up @@ -140,7 +140,9 @@ def _handle_request(self, request):
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)

requester = Requester.deserialize(self.store, content["requester"])
context = yield EventContext.deserialize(self.store, content["context"])
context = yield StatelessEventContext.deserialize(
self.store, content["context"],
)

ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
Expand Down
73 changes: 40 additions & 33 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from twisted.internet import defer

from synapse.events import USE_FROZEN_DICTS
from synapse.events.snapshot import EventContext

from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import (
Expand All @@ -42,7 +43,6 @@

# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -504,61 +504,68 @@ def _get_new_state_after_events(self, room_id, events_context, new_latest_event_
defer.returnValue({})

# map from state_group to ((type, key) -> event_id) state map
state_groups = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly failing to follow how the new code relates to the old in this file, but it generally looks sane, and a cleanup which may well be worth shipping independently of StatelessEventContext?

missing_event_ids = []
was_updated = False
state_groups_map = {}
for ev, ctx in events_context:
if ctx.state_group is None:
# I don't think this can happen, but let's double-check
raise Exception(
"Context for new extremity event %s has no state "
"group" % (ev.event_id, ),
)

if ctx.state_group in state_groups_map:
continue

if isinstance(ctx, EventContext) and ctx.current_state_ids:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this largely used on the master? Will this do anything on a system with a split-out event creator worker? a comment here might be useful.

state_groups_map[ctx.state_group] = ctx.current_state_ids

# We need to map new_latest_event_ids to their state groups. First, lets
# check if the event is one we're persisting and then we can pull the
# state group from its context.
# Otherwise we need to pull the state group from the database.
missing_event_ids = [] # List of events we need to fetch groups for
state_groups_to_resolve = set() # State groups of new_latest_event_ids
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it's only a subset of the state groups of new_latest_event_ids?

for event_id in new_latest_event_ids:
# First search in the list of new events we're adding,
# and then use the current state from that
# First search in the list of new events we're adding.
for ev, ctx in events_context:
if event_id == ev.event_id:
if ctx.current_state_ids is None:
raise Exception("Unknown current state")

if ctx.state_group is None:
# I don't think this can happen, but let's double-check
raise Exception(
"Context for new extremity event %s has no state "
"group" % (event_id, ),
)

# If we've already seen the state group don't bother adding
# it to the state sets again
if ctx.state_group not in state_groups:
state_groups[ctx.state_group] = ctx.current_state_ids
if ctx.delta_ids or hasattr(ev, "state_key"):
was_updated = True
state_groups_to_resolve.add(ctx.state_group)
break
else:
# If we couldn't find it, then we'll need to pull
# the state from the database
was_updated = True
missing_event_ids.append(event_id)

if not was_updated:
return

if missing_event_ids:
# Now pull out the state for any missing events from DB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now only get the state group ids here

event_to_groups = yield self._get_state_group_for_events(
missing_event_ids,
)
state_groups_to_resolve.update(event_to_groups.itervalues())

groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())

if groups:
group_to_state = yield self._get_state_for_groups(groups)
state_groups.update(group_to_state)
# Now that we have calculated state_groups_to_resolve we need to get
# their state so we can resolve to a single state set.
missing_state = state_groups_to_resolve - set(state_groups_map)
if missing_state:
group_to_state = yield self._get_state_for_groups(missing_state)
state_groups_map.update(group_to_state)

if len(state_groups) == 1:
if len(state_groups_to_resolve) == 1:
# If there is only one state group, then we know what the current
# state is.
defer.returnValue(state_groups.values()[0])
defer.returnValue(state_groups_map[state_groups_to_resolve.pop()])

# Ok, we need to defer to the state handler to resolve our state sets.

def get_events(ev_ids):
return self.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)

state_groups = {
sg: state_groups_map[sg] for sg in state_groups_to_resolve
}

events_map = {ev.event_id: ev for ev, _ in events_context}
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
Expand Down