Skip to content

Commit

Permalink
RLink fixes and enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
DZabavchik committed Aug 28, 2023
1 parent 0089c1e commit 895c4e7
Show file tree
Hide file tree
Showing 10 changed files with 596 additions and 178 deletions.
44 changes: 32 additions & 12 deletions crossbar/router/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def detach(self, session):

for subscription in self._session_to_subscriptions[session]:

was_subscribed, was_last_subscriber = self._subscription_map.drop_observer(session, subscription)
was_subscribed, was_last_subscriber, was_last_local_subscriber = self._subscription_map.drop_observer(session, subscription)
was_deleted = False

# delete it if there are no subscribers and no retained events
Expand All @@ -133,6 +133,10 @@ def detach(self, session):
was_deleted = True
self._subscription_map.delete_observation(subscription)

is_rlink_session = (session._authrole == 'rlink')

exclude_authid = session._authid

# publish WAMP meta events, if we have a service session, but
# not for the meta API itself!
#
Expand All @@ -143,10 +147,16 @@ def detach(self, session):
def _publish(subscription):
service_session = self._router._realm.session

# FIXME: what about exclude_authid as colleced from forward_for? like we do elsewhere in this file!
options = types.PublishOptions(correlation_id=None,
correlation_is_anchor=True,
correlation_is_last=False)
# FIXME: what about exclude_authid as collected from forward_for? like we do elsewhere in this file!
options = types.PublishOptions(
correlation_id=None,
correlation_is_anchor=True,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if was_last_local_subscriber and
not was_last_subscriber else None,
)

if was_subscribed:
service_session.publish(
Expand All @@ -156,7 +166,7 @@ def _publish(subscription):
options=options,
)

if was_deleted:
if was_deleted or was_last_local_subscriber:
options.correlation_is_last = True
service_session.publish(
'wamp.subscription.on_delete',
Expand Down Expand Up @@ -830,12 +840,15 @@ def on_authorize_success(authorization):

# ok, session authorized to subscribe. now get the subscription
#
subscription, was_already_subscribed, is_first_subscriber = self._subscription_map.add_observer(
session, subscribe.topic, subscribe.match, extra=SubscriptionExtra())
subscription, was_already_subscribed, is_first_subscriber, is_first_local_subscriber \
= self._subscription_map.add_observer(session, subscribe.topic,
subscribe.match, extra=SubscriptionExtra())

if not was_already_subscribed:
self._session_to_subscriptions[session].add(subscription)

is_rlink_session = (session._authrole == 'rlink')

# publish WAMP meta events, if we have a service session, but
# not for the meta API itself!
#
Expand All @@ -853,17 +866,20 @@ def on_authorize_success(authorization):
def _publish():
service_session = self._router._realm.session

if exclude_authid or self._router.is_traced:
if exclude_authid or self._router.is_traced or \
is_first_local_subscriber or is_rlink_session:
options = types.PublishOptions(
correlation_id=subscribe.correlation_id,
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if is_first_local_subscriber and not is_first_subscriber else None,
)
else:
options = None

if is_first_subscriber:
if is_first_subscriber or is_first_local_subscriber:
subscription_details = {
'id': subscription.id,
'created': subscription.created,
Expand Down Expand Up @@ -1037,7 +1053,7 @@ def _unsubscribe(self, subscription, session, unsubscribe=None):

# drop session from subscription observers
#
was_subscribed, was_last_subscriber = self._subscription_map.drop_observer(session, subscription)
was_subscribed, was_last_subscriber, was_last_local_subscriber = self._subscription_map.drop_observer(session, subscription)
was_deleted = False

if was_subscribed and was_last_subscriber and not subscription.extra.retained_events:
Expand All @@ -1049,6 +1065,8 @@ def _unsubscribe(self, subscription, session, unsubscribe=None):
if was_subscribed:
self._session_to_subscriptions[session].discard(subscription)

is_rlink_session = (session._authrole == 'rlink')

# publish WAMP meta events, if we have a service session, but
# not for the meta API itself!
#
Expand All @@ -1072,6 +1090,8 @@ def _publish():
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if was_last_local_subscriber and not was_last_subscriber else None,
)
else:
options = None
Expand All @@ -1084,7 +1104,7 @@ def _publish():
options=options,
)

if was_deleted:
if was_deleted or was_last_local_subscriber:
if options:
options.correlation_is_last = True

Expand Down
63 changes: 37 additions & 26 deletions crossbar/router/dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def detach(self, session):
invoke.caller._transport.send(reply)

for registration in self._session_to_registrations[session]:
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)
was_registered, was_last_callee, was_last_local_callee = self._registration_map.drop_observer(session, registration)

if was_registered and was_last_callee:
self._registration_map.delete_observation(registration)
Expand All @@ -250,7 +250,12 @@ def _publish(registration):
service_session = self._router._realm.session

# FIXME: what about exclude_authid as collected from forward_for? like we do elsewhere in this file!
options = types.PublishOptions(correlation_id=None)
options = types.PublishOptions(
correlation_id=None,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if was_last_local_callee and
not was_last_callee else None,
)

if was_registered:
service_session.publish(
Expand All @@ -260,14 +265,13 @@ def _publish(registration):
options=options,
)

if was_last_callee:
if not is_rlink_session:
service_session.publish(
'wamp.registration.on_delete',
session._session_id,
registration.id,
options=options,
)
if was_last_callee or was_last_local_callee:
service_session.publish(
'wamp.registration.on_delete',
session._session_id,
registration.id,
options=options,
)

# we postpone actual sending of meta events until we return to this client session
self._reactor.callLater(0, _publish, registration)
Expand All @@ -281,7 +285,7 @@ def processRegister(self, session, register):
"""
Implements :func:`crossbar.router.interfaces.IDealer.processRegister`
"""
# check topic URI: for SUBSCRIBE, must be valid URI (either strict or loose), and all
# check topic URI: for REGISTER, must be valid URI (either strict or loose), and all
# URI components must be non-empty other than for wildcard subscriptions
#
is_rlink_session = (session._authrole == "rlink")
Expand Down Expand Up @@ -437,6 +441,7 @@ def on_authorize_success(authorization):
if authorization['allow']:
registration = self._registration_map.get_observation(register.procedure, register.match)
if register.force_reregister and registration:
# TODO handle Unregistered in RLink
for obs in registration.observers:
self._registration_map.drop_observer(obs, registration)
kicked = message.Unregistered(
Expand All @@ -455,7 +460,7 @@ def on_authorize_success(authorization):
#
registration_extra = RegistrationExtra(register.invoke)
registration_callee_extra = RegistrationCalleeExtra(register.concurrency)
registration, was_already_registered, is_first_callee = self._registration_map.add_observer(
registration, was_already_registered, is_first_callee, is_first_local_callee = self._registration_map.add_observer(
session, register.procedure, register.match, registration_extra, registration_callee_extra)

if not was_already_registered:
Expand Down Expand Up @@ -489,29 +494,34 @@ def on_authorize_success(authorization):
def _publish():
service_session = self._router._realm.session

if exclude_authid or self._router.is_traced:
if exclude_authid or self._router.is_traced or \
is_rlink_session or is_first_local_callee:
options = types.PublishOptions(
correlation_id=register.correlation_id,
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if is_first_local_callee and
not is_first_callee else None,

)
else:
options = None

if is_first_callee:
if is_first_callee or is_first_local_callee:
registration_details = {
'id': registration.id,
'created': registration.created,
'uri': registration.uri,
'match': registration.match,
'invoke': registration.extra.invoke,
'forced_reregister': register.force_reregister
}
if not is_rlink_session:
service_session.publish('wamp.registration.on_create',
session._session_id,
registration_details,
options=options)
service_session.publish('wamp.registration.on_create',
session._session_id,
registration_details,
options=options)

if not was_already_registered:
if options:
Expand Down Expand Up @@ -612,7 +622,7 @@ def _unregister(self, registration, session, unregister=None):

# drop session from registration observers
#
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)
was_registered, was_last_callee, was_last_local_callee = self._registration_map.drop_observer(session, registration)
was_deleted = False
is_rlink_session = (session._authrole == "rlink")

Expand Down Expand Up @@ -652,6 +662,8 @@ def _publish():
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if was_last_local_callee and not was_last_callee else None,
)
else:
options = None
Expand All @@ -662,15 +674,14 @@ def _publish():
registration.id,
options=options)

if was_deleted:
if was_deleted or was_last_local_callee:
if options:
options.correlation_is_last = True

if not is_rlink_session:
service_session.publish('wamp.registration.on_delete',
session._session_id,
registration.id,
options=options)
service_session.publish('wamp.registration.on_delete',
session._session_id,
registration.id,
options=options)

# we postpone actual sending of meta events until we return to this client session
self._reactor.callLater(0, _publish)
Expand Down
22 changes: 19 additions & 3 deletions crossbar/router/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def add_observer(self, observer, uri, match="exact", extra=None, observer_extra=
raise Exception("'uri' should be unicode, not {}".format(type(uri).__name__))

is_first_observer = False
is_first_local_observer = False

if match == "exact":

Expand Down Expand Up @@ -218,6 +219,13 @@ def add_observer(self, observer, uri, match="exact", extra=None, observer_extra=
else:
raise Exception("invalid match strategy '{}'".format(match))

is_rlink_observer = observer.authrole == 'rlink'
if is_first_observer:
is_first_local_observer = not is_rlink_observer
else:
is_first_local_observer = not is_rlink_observer and \
next(filter(lambda o: o.authrole != 'rlink', observation.observers), None) is None

# add observer if not already in observation
#
if observer not in observation.observers:
Expand All @@ -232,7 +240,7 @@ def add_observer(self, observer, uri, match="exact", extra=None, observer_extra=
else:
was_already_observed = True

return observation, was_already_observed, is_first_observer
return observation, was_already_observed, is_first_observer, is_first_local_observer

def get_observation(self, uri, match="exact"):
"""
Expand Down Expand Up @@ -383,10 +391,13 @@ def drop_observer(self, observer, observation):
:rtype: tuple
"""
was_last_observer = False
was_last_local_observer = False

if observer in observation.observers:
was_observed = True

is_rlink_observer = observer.authrole == 'rlink'

# remove observer from observation
#
observation.observers.discard(observer)
Expand All @@ -400,12 +411,17 @@ def drop_observer(self, observer, observation):
#
if not observation.observers:
was_last_observer = True

was_last_local_observer = True
else:
was_last_observer = False
was_last_local_observer = not is_rlink_observer and \
next(filter(lambda o: o.authrole != 'rlink', observation.observers),
None) is None
else:
# observer wasn't on this observation
was_observed = False

return was_observed, was_last_observer
return was_observed, was_last_observer, was_last_local_observer

def delete_observation(self, observation):
"""
Expand Down
7 changes: 4 additions & 3 deletions crossbar/router/realmstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ def attach_subscription_map(self, subscription_map: UriObservationMap):
for sub in self._config.get('event-history', []):
uri = sub['uri']
match = sub.get('match', 'exact')
observation, was_already_observed, was_first_observer = subscription_map.add_observer(self,
uri=uri,
match=match)
observation, was_already_observed, was_first_observer, was_first_local_observer = \
subscription_map.add_observer(self,
uri=uri,
match=match)
subscription_id = observation.id

# for in-memory history, we just use a double-ended queue
Expand Down
Loading

0 comments on commit 895c4e7

Please sign in to comment.