Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipelines] Create signal/slot declaration system #60

Open
sneakers-the-rat opened this issue Nov 7, 2024 · 0 comments
Open

[pipelines] Create signal/slot declaration system #60

sneakers-the-rat opened this issue Nov 7, 2024 · 0 comments
Milestone

Comments

@sneakers-the-rat
Copy link
Collaborator

The input and output idea in #51 immediately shows that more is needed for the kinds of graphs we want to process.

  • We want to be able to have multiple outputs per node
    • they should be able to have independent sets of nodes attached to them (rather than always passing a header and a frame together)
    • they might have different types - the header of a frame vs the array of a frame
    • they might have different emission frequencies - emitting buffer headers 5 times per frame
  • We want to have multiple inputs per node
    • after a split with 5x headers per frame, we might want to have a merge node that then combines the header metadata and the frame after it's assembled from buffers
  • We want to add metadata to the type of output/input
    • e.g. push vs pull nodes
    • "gather inputs from slot A, and then emit all gathered items from slot A whenever an item is received in slot B"
    • "await until we receive a value in slot A and B"
  • We want to keep the means of communication between nodes abstract
    • direct synchronous argument passing
    • multiprocess queues
    • socket-based queues

So I am thinking of borrowing some of the UX from Qt without all the overhead, something like this

class ParseHeader(Node):

    header: Signal[HeaderModel]
    buffers: Signal[list[np.ndarray]]

    self._buffers = []

    def process(self, value):
        header, buffer = parse_header(value)
        self.header.emit(header)
    
        self._buffers.append(buffer)
        if self.new_frame(header):
            self.buffers.emit(self._buffers)
            self._buffers = []

then we get into something like a petri net with a bipartite graph structure that goes Node -> Scheduler -> Node -> ... where the Scheduler can be one of several methods for distributing events from a signal to a slot.

This is sort of an open ended implementation without a firm target that i'll be exploring around with between the conversion of SDCard (async) and streaming (sync) pipelines.

@sneakers-the-rat sneakers-the-rat added this to the pipelines milestone Nov 7, 2024
@sneakers-the-rat sneakers-the-rat moved this to Ready in pipelines Nov 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Ready
Development

No branches or pull requests

1 participant