|
8 | 8 | import pytest
|
9 | 9 |
|
10 | 10 | from c8y_api import CumulocityApi
|
11 |
| -from c8y_api.model import Device, ManagedObject, Subscription |
| 11 | +from c8y_api.model import Device, ManagedObject, Subscription, Measurement, Value, Event, Alarm, Operation |
12 | 12 | from c8y_tk.notification2 import AsyncListener, Listener
|
| 13 | +from tests.utils import assert_in_any |
13 | 14 |
|
14 | 15 | from util.testing_util import RandomNameGenerator
|
15 | 16 |
|
@@ -47,6 +48,87 @@ def build():
|
47 | 48 | return build
|
48 | 49 |
|
49 | 50 |
|
| 51 | +@pytest.mark.parametrize("api_filters, expected", [ |
| 52 | + ('*', 'M,E,EwC,A,AwC,MO'), |
| 53 | + ('M', 'M'), |
| 54 | + ('E', 'E'), |
| 55 | + ('EwC', 'E,EwC'), |
| 56 | + ('A', 'A'), |
| 57 | + ('AwC', 'A,AwC'), |
| 58 | + ('MO', 'MO'), |
| 59 | +], ids=[ |
| 60 | + "*", |
| 61 | + 'measurements', |
| 62 | + 'events', |
| 63 | + 'events+children', |
| 64 | + 'alarms', |
| 65 | + 'alarms+children', |
| 66 | + 'managedObjects', |
| 67 | +]) |
| 68 | +def test_api_filters(live_c8y: CumulocityApi, sample_object, api_filters, expected): |
| 69 | + """Verify that API filters work as expected. |
| 70 | +
|
| 71 | + This test creates a subscription with selected API filters and performs |
| 72 | + a couple of corresponding changes. It then matches the received |
| 73 | + notifications against expectations. |
| 74 | + """ |
| 75 | + # TODO: Add Operation |
| 76 | + apis = { |
| 77 | + '*': '*', |
| 78 | + 'M': 'measurements', |
| 79 | + 'E': 'events', |
| 80 | + 'A': 'alarms', |
| 81 | + 'EwC': 'eventsWithChildren', |
| 82 | + 'AwC': 'alarmsWithChildren', |
| 83 | + 'O': 'operations', |
| 84 | + 'MO': 'managedobjects', |
| 85 | + } |
| 86 | + expected = [apis[x] for x in expected.split(',')] |
| 87 | + api_filters = [apis[x] for x in api_filters.split(',')] |
| 88 | + |
| 89 | + mo = sample_object |
| 90 | + mo['c8y_IsDevice'] = {} |
| 91 | + mo.update() |
| 92 | + sub = Subscription( |
| 93 | + live_c8y, |
| 94 | + name=f'{mo.name.replace("_", "")}Subscription', |
| 95 | + context=Subscription.Context.MANAGED_OBJECT, |
| 96 | + api_filter=api_filters, |
| 97 | + source_id=mo.id, |
| 98 | + ).create() |
| 99 | + |
| 100 | + notifications = queue.Queue() |
| 101 | + def receive_notification(m:Listener.Message): |
| 102 | + notifications.put(m) |
| 103 | + m.ack() |
| 104 | + |
| 105 | + # (1) Create listener and start listening |
| 106 | + listener = Listener(live_c8y, subscription_name=sub.name) |
| 107 | + listener_thread = threading.Thread(target=listener.listen, args=[receive_notification]) |
| 108 | + listener_thread.start() |
| 109 | + try: |
| 110 | + time.sleep(3) # ensure creation |
| 111 | + |
| 112 | + # (2) apply updates |
| 113 | + m_id = Measurement(live_c8y, source=mo.id, type="c8y_TestMeasurement", metric=Value(1, '')).create() |
| 114 | + e_id = Event(live_c8y, source=mo.id, type="c8y_TestEvent", time='now', text='text').create() |
| 115 | + a_id = Alarm(live_c8y, source=mo.id, type="c8y_TestAlarm", time='now', text='text', severity=Alarm.Severity.WARNING).create() |
| 116 | + # o_id = Operation(live_c8y, device_id=mo.id, c8y_Operation={}).create() |
| 117 | + mo.apply({'some_tag': {}}) |
| 118 | + |
| 119 | + time.sleep(1) |
| 120 | + ns = list(notifications.queue) |
| 121 | + # collect message types from source URL |
| 122 | + types = {n.source.split('/')[2] for n in ns} |
| 123 | + for e in expected: |
| 124 | + assert_in_any(e, *types) |
| 125 | + |
| 126 | + finally: |
| 127 | + # (99) cleanup |
| 128 | + listener.close() |
| 129 | + listener_thread.join() |
| 130 | + |
| 131 | + |
50 | 132 | def test_object_update_and_deletion(live_c8y: CumulocityApi, sample_object):
|
51 | 133 | """Verify that we can subscribe to managed object changes and they are received.
|
52 | 134 |
|
|
0 commit comments