|
2 | 2 | # these are all functions _testcapi exports whose name begins with 'test_'.
|
3 | 3 |
|
4 | 4 | from collections import OrderedDict
|
| 5 | +from contextlib import contextmanager |
5 | 6 | import _thread
|
6 | 7 | import importlib.machinery
|
7 | 8 | import importlib.util
|
@@ -1393,5 +1394,136 @@ def func2(x=None):
|
1393 | 1394 | self.do_test(func2)
|
1394 | 1395 |
|
1395 | 1396 |
|
| 1397 | +class TestDictWatchers(unittest.TestCase): |
| 1398 | + # types of watchers testcapimodule can add: |
| 1399 | + EVENTS = 0 # appends dict events as strings to global event list |
| 1400 | + ERROR = 1 # unconditionally sets and signals a RuntimeException |
| 1401 | + SECOND = 2 # always appends "second" to global event list |
| 1402 | + |
| 1403 | + def add_watcher(self, kind=EVENTS): |
| 1404 | + return _testcapi.add_dict_watcher(kind) |
| 1405 | + |
| 1406 | + def clear_watcher(self, watcher_id): |
| 1407 | + _testcapi.clear_dict_watcher(watcher_id) |
| 1408 | + |
| 1409 | + @contextmanager |
| 1410 | + def watcher(self, kind=EVENTS): |
| 1411 | + wid = self.add_watcher(kind) |
| 1412 | + try: |
| 1413 | + yield wid |
| 1414 | + finally: |
| 1415 | + self.clear_watcher(wid) |
| 1416 | + |
| 1417 | + def assert_events(self, expected): |
| 1418 | + actual = _testcapi.get_dict_watcher_events() |
| 1419 | + self.assertEqual(actual, expected) |
| 1420 | + |
| 1421 | + def watch(self, wid, d): |
| 1422 | + _testcapi.watch_dict(wid, d) |
| 1423 | + |
| 1424 | + def test_set_new_item(self): |
| 1425 | + d = {} |
| 1426 | + with self.watcher() as wid: |
| 1427 | + self.watch(wid, d) |
| 1428 | + d["foo"] = "bar" |
| 1429 | + self.assert_events(["new:foo:bar"]) |
| 1430 | + |
| 1431 | + def test_set_existing_item(self): |
| 1432 | + d = {"foo": "bar"} |
| 1433 | + with self.watcher() as wid: |
| 1434 | + self.watch(wid, d) |
| 1435 | + d["foo"] = "baz" |
| 1436 | + self.assert_events(["mod:foo:baz"]) |
| 1437 | + |
| 1438 | + def test_clone(self): |
| 1439 | + d = {} |
| 1440 | + d2 = {"foo": "bar"} |
| 1441 | + with self.watcher() as wid: |
| 1442 | + self.watch(wid, d) |
| 1443 | + d.update(d2) |
| 1444 | + self.assert_events(["clone"]) |
| 1445 | + |
| 1446 | + def test_no_event_if_not_watched(self): |
| 1447 | + d = {} |
| 1448 | + with self.watcher() as wid: |
| 1449 | + d["foo"] = "bar" |
| 1450 | + self.assert_events([]) |
| 1451 | + |
| 1452 | + def test_del(self): |
| 1453 | + d = {"foo": "bar"} |
| 1454 | + with self.watcher() as wid: |
| 1455 | + self.watch(wid, d) |
| 1456 | + del d["foo"] |
| 1457 | + self.assert_events(["del:foo"]) |
| 1458 | + |
| 1459 | + def test_pop(self): |
| 1460 | + d = {"foo": "bar"} |
| 1461 | + with self.watcher() as wid: |
| 1462 | + self.watch(wid, d) |
| 1463 | + d.pop("foo") |
| 1464 | + self.assert_events(["del:foo"]) |
| 1465 | + |
| 1466 | + def test_clear(self): |
| 1467 | + d = {"foo": "bar"} |
| 1468 | + with self.watcher() as wid: |
| 1469 | + self.watch(wid, d) |
| 1470 | + d.clear() |
| 1471 | + self.assert_events(["clear"]) |
| 1472 | + |
| 1473 | + def test_dealloc(self): |
| 1474 | + d = {"foo": "bar"} |
| 1475 | + with self.watcher() as wid: |
| 1476 | + self.watch(wid, d) |
| 1477 | + del d |
| 1478 | + self.assert_events(["dealloc"]) |
| 1479 | + |
| 1480 | + def test_error(self): |
| 1481 | + d = {} |
| 1482 | + unraisables = [] |
| 1483 | + def unraisable_hook(unraisable): |
| 1484 | + unraisables.append(unraisable) |
| 1485 | + with self.watcher(kind=self.ERROR) as wid: |
| 1486 | + self.watch(wid, d) |
| 1487 | + orig_unraisable_hook = sys.unraisablehook |
| 1488 | + sys.unraisablehook = unraisable_hook |
| 1489 | + try: |
| 1490 | + d["foo"] = "bar" |
| 1491 | + finally: |
| 1492 | + sys.unraisablehook = orig_unraisable_hook |
| 1493 | + self.assert_events([]) |
| 1494 | + self.assertEqual(len(unraisables), 1) |
| 1495 | + unraisable = unraisables[0] |
| 1496 | + self.assertIs(unraisable.object, d) |
| 1497 | + self.assertEqual(str(unraisable.exc_value), "boom!") |
| 1498 | + |
| 1499 | + def test_two_watchers(self): |
| 1500 | + d1 = {} |
| 1501 | + d2 = {} |
| 1502 | + with self.watcher() as wid1: |
| 1503 | + with self.watcher(kind=self.SECOND) as wid2: |
| 1504 | + self.watch(wid1, d1) |
| 1505 | + self.watch(wid2, d2) |
| 1506 | + d1["foo"] = "bar" |
| 1507 | + d2["hmm"] = "baz" |
| 1508 | + self.assert_events(["new:foo:bar", "second"]) |
| 1509 | + |
| 1510 | + def test_watch_non_dict(self): |
| 1511 | + with self.watcher() as wid: |
| 1512 | + with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"): |
| 1513 | + self.watch(wid, 1) |
| 1514 | + |
| 1515 | + def test_watch_out_of_range_watcher_id(self): |
| 1516 | + d = {} |
| 1517 | + with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): |
| 1518 | + self.watch(-1, d) |
| 1519 | + with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): |
| 1520 | + self.watch(8, d) # DICT_MAX_WATCHERS = 8 |
| 1521 | + |
| 1522 | + def test_unassigned_watcher_id(self): |
| 1523 | + d = {} |
| 1524 | + with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"): |
| 1525 | + self.watch(1, d) |
| 1526 | + |
| 1527 | + |
1396 | 1528 | if __name__ == "__main__":
|
1397 | 1529 | unittest.main()
|
0 commit comments