Skip to content

Commit 27b5d41

Browse files
nicolasmorinirjarry
authored andcommitted
Pass extra session info to subscription callbacks that requested so
* For some niche use cases, during the execution of a subscription callback, it is useful to have access to certain session information (e.g., username and NETCONF session ID that triggered the callback, etc.) * Add "extra_info" argument to all "SysrepoSession.subscribe_xxx" methods (with a default value of False) to indicate the desire to receive extra keyword arguments during the callback call * Pass "netconf_id" (int) and "user" (str) keyword arguments to all callbacks which were subscribed with "extra_info=True" * Extend unit tests to include and cover this new feature
1 parent c195117 commit 27b5d41

8 files changed

+288
-27
lines changed

cffi/cdefs.h

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ int sr_session_stop(sr_session_ctx_t *);
8585
int sr_session_switch_ds(sr_session_ctx_t *, sr_datastore_t);
8686
sr_datastore_t sr_session_get_ds(sr_session_ctx_t *);
8787
sr_conn_ctx_t *sr_session_get_connection(sr_session_ctx_t *);
88+
uint32_t sr_session_get_event_nc_id(sr_session_ctx_t *);
89+
const char *sr_session_get_event_user(sr_session_ctx_t *);
8890
int sr_get_error(sr_session_ctx_t *, const sr_error_info_t **);
8991
int sr_set_error(sr_session_ctx_t *, const char *, const char *, ...);
9092

sysrepo/session.py

+84-5
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,30 @@ def set_error(self, xpath: Optional[str], message: str):
121121
lib.sr_set_error, self.cdata, str2c(xpath), str2c("%s"), str2c(message)
122122
)
123123

124+
def get_netconf_id(self) -> int:
125+
"""
126+
It can only be called on an implicit sysrepo.Session (i.e., it can only be
127+
called from an event callback)
128+
129+
:returns: the NETCONF session ID set for the event originator sysrepo session
130+
"""
131+
if not self.is_implicit:
132+
raise SysrepoUnsupportedError(
133+
"can only report netconf id on implicit sessions"
134+
)
135+
return lib.sr_session_get_event_nc_id(self.cdata)
136+
137+
def get_user(self) -> str:
138+
"""
139+
It can only be called on an implicit sysrepo.Session (i.e., it can only be
140+
called from an event callback)
141+
142+
:returns: the effective username of the event originator sysrepo session
143+
"""
144+
if not self.is_implicit:
145+
raise SysrepoUnsupportedError("can only report user on implicit sessions")
146+
return c2str(lib.sr_session_get_event_user(self.cdata))
147+
124148
def get_ly_ctx(self) -> libyang.Context:
125149
"""
126150
:returns:
@@ -152,6 +176,13 @@ def get_ly_ctx(self) -> libyang.Context:
152176
have changed.
153177
:arg private_data:
154178
Private context opaque to sysrepo used when subscribing.
179+
:arg kwargs (optional):
180+
If the callback was registered with the argument extra_info=True (see
181+
Session.subscribe_module_change), then extra keyword arguments are passed when
182+
calling the callback:
183+
* netconf_id: the NETCONF session ID set for the event originator
184+
sysrepo session
185+
* user: the effective username of the event originator sysrepo session
155186
156187
When event is one of ("update", "change"), if the callback raises an exception, the
157188
changes will be rejected and the error will be forwarded to the client that made the
@@ -174,7 +205,8 @@ def subscribe_module_change(
174205
enabled: bool = False,
175206
private_data: Any = None,
176207
asyncio_register: bool = False,
177-
include_implicit_defaults: bool = True
208+
include_implicit_defaults: bool = True,
209+
extra_info: bool = False
178210
) -> None:
179211
"""
180212
Subscribe for changes made in the specified module.
@@ -210,6 +242,10 @@ def subscribe_module_change(
210242
monitored read file descriptors. Implies `no_thread=True`.
211243
:arg include_implicit_defaults:
212244
Include implicit default nodes in changes.
245+
:arg extra_info:
246+
When True, the given callback is called with extra keyword arguments
247+
containing extra information of the sysrepo session that gave origin to the
248+
event (see ModuleChangeCallbackType for more details)
213249
"""
214250
if self.is_implicit:
215251
raise SysrepoUnsupportedError("cannot subscribe with implicit sessions")
@@ -220,6 +256,7 @@ def subscribe_module_change(
220256
private_data,
221257
asyncio_register=asyncio_register,
222258
include_implicit_defaults=include_implicit_defaults,
259+
extra_info=extra_info,
223260
)
224261
sub_p = ffi.new("sr_subscription_ctx_t **")
225262

@@ -253,6 +290,13 @@ def subscribe_module_change(
253290
module operational data.
254291
:arg private_data:
255292
Private context opaque to sysrepo used when subscribing.
293+
:arg kwargs (optional):
294+
If the callback was registered with the argument extra_info=True (see
295+
Session.subscribe_module_change), then extra keyword arguments are passed when
296+
calling the callback:
297+
* netconf_id: the NETCONF session ID set for the event originator
298+
sysrepo session
299+
* user: the effective username of the event originator sysrepo session
256300
257301
The callback is expected to return a python dictionary containing the operational
258302
data. The dictionary should be in the libyang "dict" format. It will be parsed to a
@@ -272,7 +316,8 @@ def subscribe_oper_data_request(
272316
no_thread: bool = False,
273317
private_data: Any = None,
274318
asyncio_register: bool = False,
275-
strict: bool = False
319+
strict: bool = False,
320+
extra_info: bool = False
276321
) -> None:
277322
"""
278323
Register for providing operational data at the given xpath.
@@ -296,13 +341,21 @@ def subscribe_oper_data_request(
296341
:arg strict:
297342
Reject the whole data returned by callback if it contains elements without
298343
schema definition.
344+
:arg extra_info:
345+
When True, the given callback is called with extra keyword arguments
346+
containing extra information of the sysrepo session that gave origin to the
347+
event (see OperDataCallbackType for more details)
299348
"""
300349
if self.is_implicit:
301350
raise SysrepoUnsupportedError("cannot subscribe with implicit sessions")
302351
_check_subscription_callback(callback, self.OperDataCallbackType)
303352

304353
sub = Subscription(
305-
callback, private_data, asyncio_register=asyncio_register, strict=strict
354+
callback,
355+
private_data,
356+
asyncio_register=asyncio_register,
357+
strict=strict,
358+
extra_info=extra_info,
306359
)
307360
sub_p = ffi.new("sr_subscription_ctx_t **")
308361

@@ -369,6 +422,13 @@ def subscribe_oper_data_request(
369422
will be called with 'abort'.
370423
:arg private_data:
371424
Private context opaque to sysrepo used when subscribing.
425+
:arg kwargs (optional):
426+
If the callback was registered with the argument extra_info=True (see
427+
Session.subscribe_module_change), then extra keyword arguments are passed when
428+
calling the callback:
429+
* netconf_id: the NETCONF session ID set for the event originator
430+
sysrepo session
431+
* user: the effective username of the event originator sysrepo session
372432
373433
The callback is expected to return a python dictionary containing the RPC output
374434
data. The dictionary should be in the libyang "dict" format and must only contain
@@ -393,7 +453,8 @@ def subscribe_rpc_call(
393453
private_data: Any = None,
394454
asyncio_register: bool = False,
395455
strict: bool = False,
396-
include_implicit_defaults: bool = True
456+
include_implicit_defaults: bool = True,
457+
extra_info: bool = False
397458
) -> None:
398459
"""
399460
Subscribe for the delivery of an RPC/action.
@@ -418,6 +479,10 @@ def subscribe_rpc_call(
418479
schema definition.
419480
:arg include_implicit_defaults:
420481
Include implicit defaults into input parameters passed to callbacks.
482+
:arg extra_info:
483+
When True, the given callback is called with extra keyword arguments
484+
containing extra information of the sysrepo session that gave origin to the
485+
event (see RpcCallbackType for more details)
421486
"""
422487
if self.is_implicit:
423488
raise SysrepoUnsupportedError("cannot subscribe with implicit sessions")
@@ -429,6 +494,7 @@ def subscribe_rpc_call(
429494
asyncio_register=asyncio_register,
430495
strict=strict,
431496
include_implicit_defaults=include_implicit_defaults,
497+
extra_info=extra_info,
432498
)
433499
sub_p = ffi.new("sr_subscription_ctx_t **")
434500

@@ -480,6 +546,13 @@ def subscribe_rpc_call(
480546
Timestamp of the notification as an unsigned 32-bits integer.
481547
:arg private_data:
482548
Private context opaque to sysrepo used when subscribing.
549+
:arg kwargs (optional):
550+
If the callback was registered with the argument extra_info=True (see
551+
Session.subscribe_module_change), then extra keyword arguments are passed when
552+
calling the callback:
553+
* netconf_id: the NETCONF session ID set for the event originator
554+
sysrepo session
555+
* user: the effective username of the event originator sysrepo session
483556
"""
484557

485558
def subscribe_notification(
@@ -492,7 +565,8 @@ def subscribe_notification(
492565
stop_time: int = 0,
493566
no_thread: bool = False,
494567
asyncio_register: bool = False,
495-
private_data: Any = None
568+
private_data: Any = None,
569+
extra_info: bool = False
496570
) -> None:
497571
"""
498572
Subscribe for the delivery of a notification.
@@ -516,6 +590,10 @@ def subscribe_notification(
516590
read file descriptors. Implies no_thread=True.
517591
:arg private_data:
518592
Private context passed to the callback function, opaque to sysrepo.
593+
:arg extra_info:
594+
When True, the given callback is called with extra keyword arguments
595+
containing extra information of the sysrepo session that gave origin to the
596+
event (see RpcCallbackType for more details)
519597
"""
520598

521599
if self.is_implicit:
@@ -526,6 +604,7 @@ def subscribe_notification(
526604
callback,
527605
private_data,
528606
asyncio_register=asyncio_register,
607+
extra_info=extra_info,
529608
)
530609

531610
sub_p = ffi.new("sr_subscription_ctx_t **")

sysrepo/subscription.py

+51-8
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(
3333
asyncio_register: bool = False,
3434
strict: bool = False,
3535
include_implicit_defaults: bool = True,
36+
extra_info: bool = False,
3637
):
3738
"""
3839
:arg callback:
@@ -49,6 +50,10 @@ def __init__(
4950
:arg include_implicit_defaults:
5051
If True, include implicit default nodes into Change objects passed to module
5152
change callbacks and into input parameters passed to RPC/action callbacks.
53+
:arg extra_info:
54+
When True, the given callback is called with extra keyword arguments
55+
containing extra information of the sysrepo session that gave origin to the
56+
event
5257
"""
5358
if is_async_func(callback) and not asyncio_register:
5459
raise ValueError(
@@ -59,6 +64,7 @@ def __init__(
5964
self.asyncio_register = asyncio_register
6065
self.strict = strict
6166
self.include_implicit_defaults = include_implicit_defaults
67+
self.extra_info = extra_info
6268
if asyncio_register:
6369
self.loop = asyncio.get_event_loop()
6470
else:
@@ -214,6 +220,13 @@ def module_change_callback(session, module, xpath, event, req_id, priv):
214220
callback = subscription.callback
215221
private_data = subscription.private_data
216222
event_name = EVENT_NAMES[event]
223+
if subscription.extra_info:
224+
extra_info = {
225+
"netconf_id": session.get_netconf_id(),
226+
"user": session.get_user(),
227+
}
228+
else:
229+
extra_info = {}
217230

218231
if is_async_func(callback):
219232
task_id = (event, req_id)
@@ -230,7 +243,7 @@ def module_change_callback(session, module, xpath, event, req_id, priv):
230243
)
231244
)
232245
task = subscription.loop.create_task(
233-
callback(event_name, req_id, changes, private_data)
246+
callback(event_name, req_id, changes, private_data, **extra_info)
234247
)
235248
task.add_done_callback(
236249
functools.partial(subscription.task_done, task_id, event_name)
@@ -257,7 +270,7 @@ def module_change_callback(session, module, xpath, event, req_id, priv):
257270
include_implicit_defaults=subscription.include_implicit_defaults,
258271
)
259272
)
260-
callback(event_name, req_id, changes, private_data)
273+
callback(event_name, req_id, changes, private_data, **extra_info)
261274

262275
return lib.SR_ERR_OK
263276

@@ -328,12 +341,21 @@ def oper_data_callback(session, module, xpath, req_xpath, req_id, parent, priv):
328341
subscription = ffi.from_handle(priv)
329342
callback = subscription.callback
330343
private_data = subscription.private_data
344+
if subscription.extra_info:
345+
extra_info = {
346+
"netconf_id": session.get_netconf_id(),
347+
"user": session.get_user(),
348+
}
349+
else:
350+
extra_info = {}
331351

332352
if is_async_func(callback):
333353
task_id = req_id
334354

335355
if task_id not in subscription.tasks:
336-
task = subscription.loop.create_task(callback(req_xpath, private_data))
356+
task = subscription.loop.create_task(
357+
callback(req_xpath, private_data, **extra_info)
358+
)
337359
task.add_done_callback(
338360
functools.partial(subscription.task_done, task_id, "oper")
339361
)
@@ -349,7 +371,7 @@ def oper_data_callback(session, module, xpath, req_xpath, req_id, parent, priv):
349371
oper_data = task.result()
350372

351373
else:
352-
oper_data = callback(req_xpath, private_data)
374+
oper_data = callback(req_xpath, private_data, **extra_info)
353375

354376
if isinstance(oper_data, dict):
355377
# convert oper_data to a libyang.DNode object
@@ -438,13 +460,20 @@ def rpc_callback(session, xpath, input_node, event, req_id, output_node, priv):
438460
).values()
439461
)
440462
)
463+
if subscription.extra_info:
464+
extra_info = {
465+
"netconf_id": session.get_netconf_id(),
466+
"user": session.get_user(),
467+
}
468+
else:
469+
extra_info = {}
441470

442471
if is_async_func(callback):
443472
task_id = (event, req_id)
444473

445474
if task_id not in subscription.tasks:
446475
task = subscription.loop.create_task(
447-
callback(xpath, input_dict, event_name, private_data)
476+
callback(xpath, input_dict, event_name, private_data, **extra_info)
448477
)
449478
task.add_done_callback(
450479
functools.partial(subscription.task_done, task_id, event_name)
@@ -461,7 +490,9 @@ def rpc_callback(session, xpath, input_node, event, req_id, output_node, priv):
461490
output_dict = task.result()
462491

463492
else:
464-
output_dict = callback(xpath, input_dict, event_name, private_data)
493+
output_dict = callback(
494+
xpath, input_dict, event_name, private_data, **extra_info
495+
)
465496

466497
if event != lib.SR_EV_RPC:
467498
# May happen when there are multiple callback registered for the
@@ -543,15 +574,27 @@ def event_notif_tree_callback(session, notif_type, notif, timestamp, priv):
543574
).values()
544575
)
545576
)
577+
if subscription.extra_info:
578+
extra_info = {
579+
"netconf_id": session.get_netconf_id(),
580+
"user": session.get_user(),
581+
}
582+
else:
583+
extra_info = {}
584+
546585
if is_async_func(callback):
547586
task = subscription.loop.create_task(
548-
callback(xpath, notif_type, notif_dict, timestamp, private_data)
587+
callback(
588+
xpath, notif_type, notif_dict, timestamp, private_data, **extra_info
589+
)
549590
)
550591
task.add_done_callback(
551592
functools.partial(subscription.task_done, None, "notif")
552593
)
553594
else:
554-
callback(xpath, notif_type, notif_dict, timestamp, private_data)
595+
callback(
596+
xpath, notif_type, notif_dict, timestamp, private_data, **extra_info
597+
)
555598

556599
except BaseException:
557600
# ATTENTION: catch all exceptions!

tests/test_session.py

+8
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,11 @@ def iface(name, field):
125125
}
126126
},
127127
)
128+
129+
def test_get_netconf_id_and_get_user_are_only_available_in_implicit_session(self):
130+
with self.conn.start_session("running") as sess:
131+
with self.assertRaises(sysrepo.SysrepoUnsupportedError):
132+
sess.get_netconf_id()
133+
134+
with self.assertRaises(sysrepo.SysrepoUnsupportedError):
135+
sess.get_user()

0 commit comments

Comments
 (0)