Skip to content

Commit

Permalink
Fixes #434 - Added test for the dropping of a single destination in a…
Browse files Browse the repository at this point in the history
…ddress watches. Fixed the bug found by this test.
  • Loading branch information
ted-ross committed May 5, 2022
1 parent 1ccd148 commit 8794208
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/router_core/modules/edge_router/addr_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr
break;

case QDRC_EVENT_ADDR_NO_LONGER_SOURCE :
del_outlink(ap, addr);
if (!addr->watch)
del_outlink(ap, addr);
break;

case QDRC_EVENT_ADDR_TWO_SOURCE :
Expand All @@ -359,7 +360,7 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr

case QDRC_EVENT_ADDR_ONE_SOURCE :
link_ref = DEQ_HEAD(addr->inlinks);
if (!link_ref || link_ref->link->conn == ap->edge_conn)
if ((!link_ref || link_ref->link->conn == ap->edge_conn) && !addr->watch)
del_outlink(ap, addr);
break;

Expand Down
165 changes: 165 additions & 0 deletions tests/system_tests_address_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,51 @@ def test_21_dynamic_edge_edge_local(self):
test.run()
self.assertIsNone(test.error)

def test_22_drop_local_same_interior(self):
test = DropOneAddressWatchTest('test_22', self.routers[0], [self.routers[0], self.routers[0]], 2, 0, 1, 0)
test.run()
self.assertIsNone(test.error)

def test_23_drop_local_interior_interior(self):
test = DropOneAddressWatchTest('test_23', self.routers[0], [self.routers[0], self.routers[1]], 1, 1, 0, 1)
test.run()
self.assertIsNone(test.error)

def test_24_drop_remote_interior_interior(self):
test = DropOneAddressWatchTest('test_24', self.routers[0], [self.routers[1], self.routers[0]], 1, 1, 1, 0)
test.run()
self.assertIsNone(test.error)

def test_25_drop_local_edge_interior(self):
test = DropOneAddressWatchTest('test_25', self.routers[2], [self.routers[2], self.routers[0]], 2, 0, 1, 0)
test.run()
self.assertIsNone(test.error)

def test_26_drop_remote_edge_interior(self):
test = DropOneAddressWatchTest('test_26', self.routers[2], [self.routers[0], self.routers[2]], 2, 0, 1, 0)
test.run()
self.assertIsNone(test.error)

def test_27_drop_local_edge_far_interior(self):
test = DropOneAddressWatchTest('test_27', self.routers[2], [self.routers[2], self.routers[1]], 2, 0, 1, 0)
test.run()
self.assertIsNone(test.error)

def test_28_drop_remote_edge_far_interior(self):
test = DropOneAddressWatchTest('test_28', self.routers[2], [self.routers[1], self.routers[2]], 2, 0, 1, 0)
test.run()
self.assertIsNone(test.error)

def test_29_drop_local_edge_edge(self):
test = DropOneAddressWatchTest('test_29', self.routers[2], [self.routers[2], self.routers[3]], 2, 0, 1, 0)
test.run()
self.assertIsNone(test.error)

def test_30_drop_remote_edge_edge(self):
test = DropOneAddressWatchTest('test_30', self.routers[2], [self.routers[3], self.routers[2]], 2, 0, 1, 0)
test.run()
self.assertIsNone(test.error)


class AddressWatchTest(MessagingHandler):
def __init__(self, host_a, host_b, index):
Expand Down Expand Up @@ -228,6 +273,17 @@ def run(self):


class DynamicAddressWatchTest(MessagingHandler):
'''
Set up a watch on the watch_host for the address.
Ensure that a watch callback comes back with 0/0 destinations
Attach consumers for the address at all of the dest_hosts
Ensure that the local and remote counts for the address reach expected levels
Detach the consumers
Ensure that the local and remote counts don't exceed the expected levels
Ensure that the local and remote counts return to 0/0
Unwatch the address
Upon acceptance of the unwatch message delivery, pass the test
'''
def __init__(self, address, watch_host, dest_hosts, expected_local, expected_remote):
super(DynamicAddressWatchTest, self).__init__()
self.address = address
Expand Down Expand Up @@ -322,5 +378,114 @@ def run(self):
Container(self).run()


class DropOneAddressWatchTest(MessagingHandler):
'''
Set up a watch on the watch_host for the address.
Ensure that a watch callback comes back with 0/0 destinations
Attach consumers for the address at all of the dest_hosts
Ensure that the local and remote counts for the address reach expected peak levels
Detach the first consumer in the consumer list
Ensure that the local and remote counts don't exceed the expected levels
Ensure that the local and remote counts return to expected final levels
Unwatch the address
Upon acceptance of the unwatch message delivery, pass the test
'''
def __init__(self, address, watch_host, dest_hosts, peak_local, peak_remote, final_local, final_remote):
super(DropOneAddressWatchTest, self).__init__()
self.address = address
self.watch_host = watch_host
self.dest_hosts = dest_hosts
self.peak_local = peak_local
self.peak_remote = peak_remote
self.final_local = final_local
self.final_remote = final_remote

self.conn_watch = None
self.conn_dests = []
self.error = None
self.sender = None
self.receiver = None
self.dest_receivers = []
self.phase = "START"

def fail(self, error=None):
self.error = error
if self.conn_watch:
self.conn_watch.close()
for conn in self.conn_dests:
conn.close()
self.timer.cancel()

def timeout(self):
self.fail("Timeout Expired - Phase: %s" % self.phase)

def setup_dests(self):
for conn in self.conn_dests:
self.dest_receivers.append(self.container.create_receiver(conn, self.address))

def reduce_dests(self):
rx = self.dest_receivers.pop(0)
rx.close()

def on_start(self, event):
self.container = event.container
self.timer = event.reactor.schedule(10.0, TestTimeout(self))
self.conn_watch = event.container.connect(self.watch_host.addresses[0])
self.receiver = event.container.create_receiver(self.conn_watch, "_local/_testhook/watch_event")
for host in self.dest_hosts:
self.conn_dests.append(self.container.connect(host.addresses[0]))

def on_link_opened(self, event):
if event.receiver == self.receiver:
self.sender = event.container.create_sender(self.conn_watch, "_local/_testhook/address_watch")

def on_sendable(self, event):
if self.phase == "START":
self.phase = "SET-WATCH"
msg = Message(subject='watch', properties={'opcode': 'watch-on', 'address': self.address})
self.sender.send(msg)

def on_accepted(self, event):
if self.phase == "UNWATCH":
self.fail(None)

def on_message(self, event):
msg = event.message
ap = msg.properties
local = ap['local_consumers']
remote = ap['remote_consumers']
addr = ap['address']

if addr != self.address:
self.fail("Received a watch for an unexpected address: Expected %s, got %s" % (self.address, addr))

## print("phase=%s, local=%d, remote=%d" % (self.phase, local, remote))

if self.phase == "SET-WATCH":
if local == 0 and remote == 0:
self.phase = "WATCHING"
self.setup_dests()
else:
self.fail("Expected 0 consumers, got local=%d, remote=%d" % (local, remote))

elif self.phase == "WATCHING":
if local == self.peak_local and remote == self.peak_remote:
self.phase = "REDUCE-DESTS"
self.reduce_dests()

elif self.phase == "REDUCE-DESTS":
if local == self.final_local and remote == self.final_remote:
self.phase = "UNWATCH"
msg = Message(subject='watch', properties={'opcode': 'watch-off', 'address': self.address})
self.sender.send(msg)
elif local < self.final_local or remote < self.final_remote:
self.fail("Undershot final counts: expected l=%d, r=%d; got l=%d, r=%d" % (self.final_local, self.final_remote, local, remote))
elif local > self.peak_local or remote > self.peak_remote:
self.fail("Exceeded expected counts: expected l=%d, r=%d; got l=%d, r=%d" % (self.peak_local, self.peak_remote, local, remote))

def run(self):
Container(self).run()


if __name__ == '__main__':
unittest.main(main_module())

0 comments on commit 8794208

Please sign in to comment.