Skip to content

Commit

Permalink
Test RealtimeProvider for parallel group suppoprt
Browse files Browse the repository at this point in the history
  • Loading branch information
mira-merkell committed Oct 30, 2022
1 parent 25ca6df commit b4a97f9
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
4 changes: 2 additions & 2 deletions supriya/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,13 +766,13 @@ def add_group(
target_node=None,
add_action=AddAction.ADD_TO_HEAD,
name: Optional[str] = None,
parallel: bool = False
parallel: bool = False,
) -> GroupProxy:
if not self.moment:
raise ValueError("No current moment")
# TODO: implement (dummy) parallel flag for nonrealtime methods
identifier = self._resolve_target_node(target_node).add_group(
add_action=add_action,
add_action=add_action
)
proxy = GroupProxy(identifier=identifier, provider=self, parallel=parallel)
return proxy
Expand Down
67 changes: 67 additions & 0 deletions tests/providers/test_RealtimeProvider.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ def server(persistent_server):
yield persistent_server


@pytest.fixture
def server_supernova(persistent_server_supernova):
persistent_server_supernova.reset()
persistent_server_supernova.add_synthdef(default)
yield persistent_server_supernova


def test_RealtimeProvider_init_error():
with pytest.raises(ValueError):
RealtimeProvider(23)
Expand Down Expand Up @@ -138,6 +145,66 @@ def test_RealtimeProvider_add_group_1(server):
)


def test_RealtimeProvider_add_group_1_parallel(server):
provider = Provider.from_context(server)
seconds = time.time()
with server.osc_protocol.capture() as transcript:
with provider.at(seconds) as provider_moment:
group_proxy = provider.add_group(parallel=True)
assert group_proxy == GroupProxy(identifier=1000, provider=provider, parallel=True)
assert provider_moment == ProviderMoment(
provider=provider,
seconds=seconds,
bus_settings=[],
node_additions=[(group_proxy, AddAction.ADD_TO_HEAD, server.default_group)],
node_removals=[],
node_reorderings=[],
node_settings=[],
)
assert [entry.message.to_list() for entry in transcript] == [
[seconds + provider.latency, [["/p_new", 1000, 0, 1]]]
]
time.sleep(0.1)
assert str(server.query()) == normalize(
"""
NODE TREE 0 group
1 group
1000 group
"""
)


def test_RealtimeProvider_add_group_1_parallel_supernova(server_supernova):
provider = Provider.from_context(server_supernova)
seconds = time.time()
with server_supernova.osc_protocol.capture() as transcript:
with provider.at(seconds) as provider_moment:
group_proxy = provider.add_group(parallel=True)
assert group_proxy == GroupProxy(identifier=1000, provider=provider, parallel=True)
assert provider_moment == ProviderMoment(
provider=provider,
seconds=seconds,
bus_settings=[],
node_additions=[
(group_proxy, AddAction.ADD_TO_HEAD, server_supernova.default_group)
],
node_removals=[],
node_reorderings=[],
node_settings=[],
)
assert [entry.message.to_list() for entry in transcript] == [
[seconds + provider.latency, [["/p_new", 1000, 0, 1]]]
]
time.sleep(0.1)
assert str(server_supernova.query()) == normalize(
"""
NODE TREE 0 group
1 group
1000 group
"""
)


def test_RealtimeProvider_add_group_2(server):
provider = Provider.from_context(server)
seconds = time.time()
Expand Down

0 comments on commit b4a97f9

Please sign in to comment.