diff --git a/pkg/debug/debugger.go b/pkg/debug/debugger.go new file mode 100644 index 00000000..f4d1bd07 --- /dev/null +++ b/pkg/debug/debugger.go @@ -0,0 +1,264 @@ +package debug + +import ( + "sync" + "time" + + "github.com/gofrs/uuid" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/symbol" +) + +// Debugger manages symbols, processes, and their listeners. +type Debugger struct { + symbols map[uuid.UUID]*symbol.Symbol + processes map[uuid.UUID]*process.Process + frames map[uuid.UUID][]*Frame + inbounds map[uuid.UUID]map[string]port.Hook + outbounds map[uuid.UUID]map[string]port.Hook + watchers []Watcher + mu sync.RWMutex +} + +var _ symbol.LoadHook = (*Debugger)(nil) +var _ symbol.UnloadHook = (*Debugger)(nil) + +// NewDebugger creates and returns a new Debugger instance. +func NewDebugger() *Debugger { + return &Debugger{ + symbols: make(map[uuid.UUID]*symbol.Symbol), + processes: make(map[uuid.UUID]*process.Process), + frames: make(map[uuid.UUID][]*Frame), + inbounds: make(map[uuid.UUID]map[string]port.Hook), + outbounds: make(map[uuid.UUID]map[string]port.Hook), + } +} + +// AddWatcher adds a watcher to the debugger. Returns false if the watcher already exists. +func (d *Debugger) AddWatcher(watcher Watcher) bool { + d.mu.Lock() + defer d.mu.Unlock() + + for _, w := range d.watchers { + if w == watcher { + return false + } + } + + d.watchers = append(d.watchers, watcher) + return true +} + +// Symbols returns a list of all symbol IDs managed by the debugger. +func (d *Debugger) Symbols() []uuid.UUID { + d.mu.RLock() + defer d.mu.RUnlock() + + ids := make([]uuid.UUID, 0, len(d.symbols)) + for id := range d.symbols { + ids = append(ids, id) + } + return ids +} + +// Symbol retrieves a symbol by its ID. +func (d *Debugger) Symbol(id uuid.UUID) (*symbol.Symbol, bool) { + d.mu.RLock() + defer d.mu.RUnlock() + + sym, exists := d.symbols[id] + return sym, exists +} + +// Processes returns a list of all process IDs managed by the debugger. +func (d *Debugger) Processes() []uuid.UUID { + d.mu.RLock() + defer d.mu.RUnlock() + + ids := make([]uuid.UUID, 0, len(d.processes)) + for id := range d.processes { + ids = append(ids, id) + } + return ids +} + +// Process retrieves a process by its ID. +func (d *Debugger) Process(id uuid.UUID) (*process.Process, bool) { + d.mu.RLock() + defer d.mu.RUnlock() + + proc, exists := d.processes[id] + return proc, exists +} + +// Frames retrieves all frames associated with a specific process ID. +func (d *Debugger) Frames(id uuid.UUID) ([]*Frame, bool) { + d.mu.RLock() + defer d.mu.RUnlock() + + frames, exists := d.frames[id] + return frames, exists +} + +// Load adds a symbol and its associated listeners to the debugger. +func (d *Debugger) Load(sym *symbol.Symbol) error { + d.mu.Lock() + defer d.mu.Unlock() + + inbounds := make(map[string]port.Hook) + outbounds := make(map[string]port.Hook) + + d.symbols[sym.ID()] = sym + d.inbounds[sym.ID()] = inbounds + d.outbounds[sym.ID()] = outbounds + + for _, name := range sym.Ins() { + in := sym.In(name) + hook := port.HookFunc(func(proc *process.Process) { + d.accept(proc) + + inboundHook, outboundHook := d.hooks(proc, sym, in, nil) + + reader := in.Open(proc) + reader.AddInboundHook(inboundHook) + reader.AddOutboundHook(outboundHook) + }) + + in.AddHook(hook) + inbounds[name] = hook + } + + for _, name := range sym.Outs() { + out := sym.Out(name) + hook := port.HookFunc(func(proc *process.Process) { + d.accept(proc) + + inboundHook, outboundHook := d.hooks(proc, sym, nil, out) + + writer := out.Open(proc) + writer.AddInboundHook(inboundHook) + writer.AddOutboundHook(outboundHook) + }) + + out.AddHook(hook) + outbounds[name] = hook + } + + return nil +} + +// Unload removes a symbol and its associated listeners from the debugger. +func (d *Debugger) Unload(sym *symbol.Symbol) error { + d.mu.Lock() + defer d.mu.Unlock() + + for name, hook := range d.inbounds[sym.ID()] { + in := sym.In(name) + in.RemoveHook(hook) + } + for name, hook := range d.outbounds[sym.ID()] { + out := sym.Out(name) + out.RemoveHook(hook) + } + + delete(d.inbounds, sym.ID()) + delete(d.outbounds, sym.ID()) + delete(d.symbols, sym.ID()) + + return nil +} + +func (d *Debugger) accept(proc *process.Process) { + d.mu.Lock() + defer d.mu.Unlock() + + if _, exists := d.processes[proc.ID()]; !exists { + d.processes[proc.ID()] = proc + + proc.AddExitHook(process.ExitFunc(func(err error) { + d.mu.Lock() + defer d.mu.Unlock() + + delete(d.processes, proc.ID()) + delete(d.frames, proc.ID()) + })) + + watchers := d.watchers[:] + + d.mu.Unlock() + + for i := len(watchers) - 1; i >= 0; i-- { + watcher := watchers[i] + watcher.HandleProcess(proc) + } + + d.mu.Lock() + } + + if _, exists := d.frames[proc.ID()]; !exists { + d.frames[proc.ID()] = nil + } +} + +func (d *Debugger) hooks(proc *process.Process, sym *symbol.Symbol, in *port.InPort, out *port.OutPort) (packet.Hook, packet.Hook) { + inboundHook := packet.HookFunc(func(pck *packet.Packet) { + d.mu.Lock() + + frame := &Frame{ + Process: proc, + Symbol: sym, + InPort: in, + OutPort: out, + InPck: pck, + InTime: time.Now(), + } + d.frames[proc.ID()] = append(d.frames[proc.ID()], frame) + + watchers := d.watchers[:] + + d.mu.Unlock() + + for i := len(watchers) - 1; i >= 0; i-- { + watcher := watchers[i] + watcher.HandleFrame(frame) + } + }) + + outboundHook := packet.HookFunc(func(pck *packet.Packet) { + d.mu.Lock() + + var frame *Frame + for _, f := range d.frames[proc.ID()] { + if f.Symbol == sym && (f.InPort == in || f.OutPort == out) && f.OutPck == nil { + f.OutPck = pck + f.OutTime = time.Now() + frame = f + break + } + } + if frame == nil { + frame = &Frame{ + Process: proc, + Symbol: sym, + InPort: in, + OutPort: out, + OutPck: pck, + OutTime: time.Now(), + } + d.frames[proc.ID()] = append(d.frames[proc.ID()], frame) + } + + watchers := d.watchers[:] + + d.mu.Unlock() + + for i := len(watchers) - 1; i >= 0; i-- { + watcher := watchers[i] + watcher.HandleFrame(frame) + } + }) + + return inboundHook, outboundHook +} diff --git a/pkg/debug/debugger_test.go b/pkg/debug/debugger_test.go new file mode 100644 index 00000000..1aaa1417 --- /dev/null +++ b/pkg/debug/debugger_test.go @@ -0,0 +1,127 @@ +package debug + +import ( + "testing" + + "github.com/go-faker/faker/v4" + "github.com/gofrs/uuid" + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/resource" + "github.com/siyul-park/uniflow/pkg/spec" + "github.com/siyul-park/uniflow/pkg/symbol" + "github.com/stretchr/testify/assert" +) + +func TestNewDebugger(t *testing.T) { + d := NewDebugger() + assert.NotNil(t, d) +} + +func TestDebugger_Symbol(t *testing.T) { + d := NewDebugger() + + sym := &symbol.Symbol{ + Spec: &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: faker.UUIDHyphenated(), + Namespace: resource.DefaultNamespace, + Name: faker.UUIDHyphenated(), + }, + Node: node.NewOneToOneNode(nil), + } + + d.Load(sym) + defer d.Unload(sym) + + _, ok := d.Symbol(sym.ID()) + assert.True(t, ok) + + ids := d.Symbols() + assert.Contains(t, ids, sym.ID()) +} + +func TestDebugger_Process(t *testing.T) { + d := NewDebugger() + + done := make(chan struct{}) + d.AddWatcher(HandleProcessFunc(func(proc *process.Process) { + defer close(done) + + _, ok := d.Process(proc.ID()) + assert.True(t, ok) + + ids := d.Processes() + assert.Contains(t, ids, proc.ID()) + })) + + sym := &symbol.Symbol{ + Spec: &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: faker.UUIDHyphenated(), + Namespace: resource.DefaultNamespace, + Name: faker.UUIDHyphenated(), + }, + Node: node.NewOneToOneNode(nil), + } + + in := sym.In(node.PortIn) + + d.Load(sym) + defer d.Unload(sym) + + proc := process.New() + defer proc.Exit(nil) + + in.Open(proc) + + <-done +} + +func TestDebuffer_Frames(t *testing.T) { + d := NewDebugger() + + count := 0 + d.AddWatcher(HandleFrameFunc(func(frame *Frame) { + frames, ok := d.Frames(frame.Process.ID()) + assert.True(t, ok) + assert.Contains(t, frames, frame) + + count += 1 + })) + + sym := &symbol.Symbol{ + Spec: &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: faker.UUIDHyphenated(), + Namespace: resource.DefaultNamespace, + Name: faker.UUIDHyphenated(), + }, + Node: node.NewOneToOneNode(func(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) { + return inPck, nil + }), + } + + out := port.NewOut() + defer out.Close() + + out.Link(sym.In(node.PortIn)) + + d.Load(sym) + defer d.Unload(sym) + + proc := process.New() + defer proc.Exit(nil) + + writer := out.Open(proc) + + pck := packet.New(nil) + + writer.Write(pck) + assert.Equal(t, 1, count) + + <-writer.Receive() + assert.Equal(t, 2, count) +} diff --git a/pkg/debug/frame.go b/pkg/debug/frame.go new file mode 100644 index 00000000..d5096675 --- /dev/null +++ b/pkg/debug/frame.go @@ -0,0 +1,22 @@ +package debug + +import ( + "time" + + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/symbol" +) + +// Frame represents a processing unit, containing information about the process, symbol, ports, and packets. +type Frame struct { + Process *process.Process + Symbol *symbol.Symbol + InPort *port.InPort + OutPort *port.OutPort + InPck *packet.Packet + OutPck *packet.Packet + InTime time.Time + OutTime time.Time +} diff --git a/pkg/debug/watcher.go b/pkg/debug/watcher.go new file mode 100644 index 00000000..5068d9df --- /dev/null +++ b/pkg/debug/watcher.go @@ -0,0 +1,38 @@ +package debug + +import "github.com/siyul-park/uniflow/pkg/process" + +// Watcher interface with methods for handling *Frame and *Process. +type Watcher interface { + HandleFrame(*Frame) + HandleProcess(*process.Process) +} + +type watcher struct { + handleFrame func(*Frame) + handleProcess func(*process.Process) +} + +var _ Watcher = (*watcher)(nil) + +// HandleFrameFunc returns a Watcher that handles *Frame. +func HandleFrameFunc(handle func(*Frame)) Watcher { + return &watcher{handleFrame: handle} +} + +// HandleProcessFunc returns a Watcher that handles *Process. +func HandleProcessFunc(handle func(*process.Process)) Watcher { + return &watcher{handleProcess: handle} +} + +func (w *watcher) HandleFrame(frame *Frame) { + if w.handleFrame != nil { + w.handleFrame(frame) + } +} + +func (w *watcher) HandleProcess(process *process.Process) { + if w.handleProcess != nil { + w.handleProcess(process) + } +} diff --git a/pkg/packet/reader.go b/pkg/packet/reader.go index 32b41681..3655ed03 100644 --- a/pkg/packet/reader.go +++ b/pkg/packet/reader.go @@ -40,11 +40,12 @@ func NewReader() *Reader { break } else { pck := New(types.NewError(ErrDroppedPacket)) - if ok := w.receive(pck, r); ok { - for _, hook := range r.outboundHooks { - hook.Handle(pck) - } + + for _, hook := range r.outboundHooks { + hook.Handle(pck) } + + w.receive(pck, r) } } return @@ -110,13 +111,11 @@ func (r *Reader) Receive(pck *Packet) bool { if w := r.writer(); w == nil { return false } else { - ok := w.receive(pck, r) - if ok { - for _, hook := range r.outboundHooks { - hook.Handle(pck) - } + for _, hook := range r.outboundHooks { + hook.Handle(pck) } - return ok + + return w.receive(pck, r) } } @@ -140,13 +139,13 @@ func (r *Reader) write(pck *Packet, writer *Writer) bool { case <-r.done: return false default: - r.writers = append(r.writers, writer) - r.in <- pck - for _, hook := range r.inboundHooks { hook.Handle(pck) } + r.writers = append(r.writers, writer) + r.in <- pck + return true } } diff --git a/pkg/packet/writer.go b/pkg/packet/writer.go index 0502c066..8fb5b0f7 100644 --- a/pkg/packet/writer.go +++ b/pkg/packet/writer.go @@ -144,6 +144,10 @@ func (w *Writer) Write(pck *Packet) int { case <-w.done: return 0 default: + for _, hook := range w.outboundHooks { + hook.Handle(pck) + } + count := 0 receives := make([]*Packet, len(w.readers)) for i, r := range w.readers { @@ -160,10 +164,6 @@ func (w *Writer) Write(pck *Packet) int { if count > 0 { w.receives = append(w.receives, receives) - - for _, hook := range w.outboundHooks { - hook.Handle(pck) - } } return count @@ -218,11 +218,11 @@ func (w *Writer) receive(pck *Packet, reader *Reader) bool { w.receives = w.receives[1:] pck := Merge(receives) - w.in <- pck - for _, hook := range w.inboundHooks { hook.Handle(pck) } + + w.in <- pck } return true diff --git a/pkg/port/hook.go b/pkg/port/hook.go new file mode 100644 index 00000000..4302898b --- /dev/null +++ b/pkg/port/hook.go @@ -0,0 +1,24 @@ +package port + +import "github.com/siyul-park/uniflow/pkg/process" + +// Hook defines an interface for processing packets associated with a process. +type Hook interface { + // Open processes the given process. + Open(*process.Process) +} + +type hook struct { + open func(*process.Process) +} + +var _ Hook = (*hook)(nil) + +// HookFunc creates a new Hook from the provided function. +func HookFunc(open func(*process.Process)) Hook { + return &hook{open: open} +} + +func (h *hook) Open(proc *process.Process) { + h.open(proc) +} diff --git a/pkg/port/inport.go b/pkg/port/inport.go index ce427a9c..c5ed8905 100644 --- a/pkg/port/inport.go +++ b/pkg/port/inport.go @@ -7,68 +7,77 @@ import ( "github.com/siyul-park/uniflow/pkg/process" ) -// InPort represents an input port for receiving data. +// InPort represents an input port used for receiving data. type InPort struct { readers map[*process.Process]*packet.Reader + hooks []Hook listeners []Listener mu sync.RWMutex } -// NewIn creates a new InPort instance. +// NewIn creates and returns a new InPort instance. func NewIn() *InPort { return &InPort{ readers: make(map[*process.Process]*packet.Reader), } } -// AddListener registers the listener to handle incoming data if not already registered. -func (p *InPort) AddListener(listener Listener) bool { +// AddHook adds a hook to the port if it is not already present. +func (p *InPort) AddHook(hook Hook) bool { p.mu.Lock() defer p.mu.Unlock() - for _, l := range p.listeners { - if l == listener { + for _, h := range p.hooks { + if h == hook { return false } } - p.listeners = append(p.listeners, listener) + p.hooks = append(p.hooks, hook) return true } -// RemoveListener unregisters the listener so it no longer handles incoming data. -func (p *InPort) RemoveListener(listener Listener) bool { +// RemoveHook removes a hook from the port if it exists. +func (p *InPort) RemoveHook(hook Hook) bool { p.mu.Lock() defer p.mu.Unlock() - for i, l := range p.listeners { - if l == listener { - p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) + for i, h := range p.hooks { + if h == hook { + p.hooks = append(p.hooks[:i], p.hooks[i+1:]...) return true } } return false } -// Open opens the input port for a given process and returns a reader. -// If the process already has an associated reader, it returns the existing one. -// Otherwise, it creates a new reader and associates it with the process. -func (p *InPort) Open(proc *process.Process) *packet.Reader { +// AddListener adds a listener to the port if it is not already registered. +func (p *InPort) AddListener(listener Listener) bool { p.mu.Lock() defer p.mu.Unlock() - reader, ok := p.readers[proc] - if ok { - return reader + for _, l := range p.listeners { + if l == listener { + return false + } } - reader = packet.NewReader() + p.listeners = append(p.listeners, listener) + return true +} - if proc.Status() == process.StatusTerminated { - reader.Close() +// Open prepares the input port for a given process and returns a reader. +// If a reader for the process already exists, it is returned. Otherwise, a new reader is created. +func (p *InPort) Open(proc *process.Process) *packet.Reader { + p.mu.Lock() + + reader, exists := p.readers[proc] + if exists { + p.mu.Unlock() return reader } + reader = packet.NewReader() p.readers[proc] = reader proc.AddExitHook(process.ExitFunc(func(_ error) { @@ -79,17 +88,25 @@ func (p *InPort) Open(proc *process.Process) *packet.Reader { reader.Close() })) + hooks := p.hooks[:] listeners := p.listeners[:] - go func() { - for i := len(listeners) - 1; i >= 0; i-- { - listeners[i].Accept(proc) - } - }() + + p.mu.Unlock() + + for i := len(hooks) - 1; i >= 0; i-- { + hook := hooks[i] + hook.Open(proc) + } + + for _, listener := range listeners { + listener := listener + go listener.Accept(proc) + } return reader } -// Close closes all readers associated with the input port. +// Close shuts down all readers associated with the input port and clears hooks and listeners. func (p *InPort) Close() { p.mu.Lock() defer p.mu.Unlock() @@ -98,4 +115,6 @@ func (p *InPort) Close() { reader.Close() } p.readers = make(map[*process.Process]*packet.Reader) + p.hooks = nil + p.listeners = nil } diff --git a/pkg/port/inport_test.go b/pkg/port/inport_test.go index 7d703187..e42a78e3 100644 --- a/pkg/port/inport_test.go +++ b/pkg/port/inport_test.go @@ -22,7 +22,7 @@ func TestInPort_Open(t *testing.T) { assert.Equal(t, r1, r2) } -func TestInPort_AddAndRemoveListener(t *testing.T) { +func TestInPort_Hook(t *testing.T) { proc := process.New() defer proc.Exit(nil) @@ -30,14 +30,14 @@ func TestInPort_AddAndRemoveListener(t *testing.T) { defer in.Close() done := make(chan struct{}) - h := ListenFunc(func(proc *process.Process) { + h := HookFunc(func(proc *process.Process) { close(done) }) - ok := in.AddListener(h) + ok := in.AddHook(h) assert.True(t, ok) - ok = in.AddListener(h) + ok = in.AddHook(h) assert.False(t, ok) _ = in.Open(proc) @@ -51,11 +51,41 @@ func TestInPort_AddAndRemoveListener(t *testing.T) { assert.NoError(t, ctx.Err()) } - ok = in.RemoveListener(h) + ok = in.RemoveHook(h) + assert.True(t, ok) + + ok = in.RemoveHook(h) + assert.False(t, ok) +} + +func TestInPort_Listener(t *testing.T) { + proc := process.New() + defer proc.Exit(nil) + + in := NewIn() + defer in.Close() + + done := make(chan struct{}) + h := ListenFunc(func(proc *process.Process) { + close(done) + }) + + ok := in.AddListener(h) assert.True(t, ok) - ok = in.RemoveListener(h) + ok = in.AddListener(h) assert.False(t, ok) + + _ = in.Open(proc) + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + select { + case <-done: + case <-ctx.Done(): + assert.NoError(t, ctx.Err()) + } } func BenchmarkInPort_Open(b *testing.B) { diff --git a/pkg/port/outport.go b/pkg/port/outport.go index 474be250..60e9e537 100644 --- a/pkg/port/outport.go +++ b/pkg/port/outport.go @@ -12,11 +12,12 @@ import ( type OutPort struct { ins []*InPort writers map[*process.Process]*packet.Writer + hooks []Hook listeners []Listener mu sync.RWMutex } -// Write sends the payload through OutPort, handles errors, and returns the processed result or any encountered error. +// Write sends the payload through the OutPort and returns the result or an error. func Write(out *OutPort, payload types.Value) (types.Value, error) { var err error @@ -41,42 +42,57 @@ func Write(out *OutPort, payload types.Value) (types.Value, error) { return payload, nil } -// NewOut creates a new OutPort instance. +// NewOut creates and returns a new OutPort instance. func NewOut() *OutPort { return &OutPort{ writers: make(map[*process.Process]*packet.Writer), } } -// AddListener registers the listener to handle incoming data if not already registered. -func (p *OutPort) AddListener(listener Listener) bool { +// AddHook adds a hook for packet processing if not already present. +func (p *OutPort) AddHook(hook Hook) bool { p.mu.Lock() defer p.mu.Unlock() - for _, l := range p.listeners { - if l == listener { + for _, h := range p.hooks { + if h == hook { return false } } - p.listeners = append(p.listeners, listener) + p.hooks = append(p.hooks, hook) return true } -// RemoveListener unregisters the listener so it no longer handles incoming data. -func (p *OutPort) RemoveListener(listener Listener) bool { +// RemoveHook removes a hook from the port if present. +func (p *OutPort) RemoveHook(hook Hook) bool { p.mu.Lock() defer p.mu.Unlock() - for i, l := range p.listeners { - if l == listener { - p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) + for i, h := range p.hooks { + if h == hook { + p.hooks = append(p.hooks[:i], p.hooks[i+1:]...) return true } } return false } +// AddListener registers a listener for outgoing data if not already present. +func (p *OutPort) AddListener(listener Listener) bool { + p.mu.Lock() + defer p.mu.Unlock() + + for _, l := range p.listeners { + if l == listener { + return false + } + } + + p.listeners = append(p.listeners, listener) + return true +} + // Links returns the number of input ports this port is connected to. func (p *OutPort) Links() int { p.mu.RLock() @@ -85,7 +101,7 @@ func (p *OutPort) Links() int { return len(p.ins) } -// Link connects the output port to an input port. +// Link connects this output port to an input port. func (p *OutPort) Link(in *InPort) { p.mu.Lock() defer p.mu.Unlock() @@ -93,7 +109,7 @@ func (p *OutPort) Link(in *InPort) { p.ins = append(p.ins, in) } -// Unlink disconnects the output port from an input port. +// Unlink disconnects this output port from an input port. func (p *OutPort) Unlink(in *InPort) { p.mu.Lock() defer p.mu.Unlock() @@ -106,10 +122,8 @@ func (p *OutPort) Unlink(in *InPort) { } } -// Open opens the output port for a given process and returns a writer. -// If the process already has an associated writer, it returns the existing one. -// Otherwise, it creates a new writer and associates it with the process. -// It also connects the writer to all linked input ports and starts data listeners. +// Open opens the output port for the given process and returns a writer. +// It connects the writer to all linked input ports and starts data listeners. func (p *OutPort) Open(proc *process.Process) *packet.Writer { writer, ok := func() (*packet.Writer, bool) { p.mu.Lock() @@ -137,25 +151,30 @@ func (p *OutPort) Open(proc *process.Process) *packet.Writer { if !ok { p.mu.RLock() - defer p.mu.RUnlock() for _, in := range p.ins { reader := in.Open(proc) writer.Link(reader) } + hooks := p.hooks[:] listeners := p.listeners[:] - go func() { - for i := len(listeners) - 1; i >= 0; i-- { - listeners[i].Accept(proc) - } - }() + + p.mu.RUnlock() + + for i := len(hooks) - 1; i >= 0; i-- { + hooks[i].Open(proc) + } + + for _, listener := range listeners { + go listener.Accept(proc) + } } return writer } -// Close closes all writers associated with the output port and clears linked input ports. +// Close closes all writers and clears linked input ports, hooks, and listeners. func (p *OutPort) Close() { p.mu.Lock() defer p.mu.Unlock() @@ -163,6 +182,9 @@ func (p *OutPort) Close() { for _, writer := range p.writers { writer.Close() } + p.writers = make(map[*process.Process]*packet.Writer) p.ins = nil + p.hooks = nil + p.listeners = nil } diff --git a/pkg/port/outport_test.go b/pkg/port/outport_test.go index 51f708c6..3fe8083b 100644 --- a/pkg/port/outport_test.go +++ b/pkg/port/outport_test.go @@ -45,7 +45,7 @@ func TestOutPort_Link(t *testing.T) { assert.Equal(t, 1, out.Links()) } -func TestOutPort_AddAndRemoveListener(t *testing.T) { +func TestOutPort_Hook(t *testing.T) { proc := process.New() defer proc.Exit(nil) @@ -53,14 +53,14 @@ func TestOutPort_AddAndRemoveListener(t *testing.T) { defer out.Close() done := make(chan struct{}) - h := ListenFunc(func(proc *process.Process) { + h := HookFunc(func(proc *process.Process) { close(done) }) - ok := out.AddListener(h) + ok := out.AddHook(h) assert.True(t, ok) - ok = out.AddListener(h) + ok = out.AddHook(h) assert.False(t, ok) _ = out.Open(proc) @@ -73,12 +73,36 @@ func TestOutPort_AddAndRemoveListener(t *testing.T) { case <-ctx.Done(): assert.NoError(t, ctx.Err()) } +} + +func TestOutPort_Listener(t *testing.T) { + proc := process.New() + defer proc.Exit(nil) + + out := NewOut() + defer out.Close() + + done := make(chan struct{}) + h := ListenFunc(func(proc *process.Process) { + close(done) + }) - ok = out.RemoveListener(h) + ok := out.AddListener(h) assert.True(t, ok) - ok = out.RemoveListener(h) + ok = out.AddListener(h) assert.False(t, ok) + + _ = out.Open(proc) + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + select { + case <-done: + case <-ctx.Done(): + assert.NoError(t, ctx.Err()) + } } func BenchmarkOutPort_Open(b *testing.B) { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index cb7baab6..7ef5e5b3 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -4,6 +4,7 @@ import ( "context" "github.com/gofrs/uuid" + "github.com/siyul-park/uniflow/pkg/debug" "github.com/siyul-park/uniflow/pkg/hook" "github.com/siyul-park/uniflow/pkg/resource" "github.com/siyul-park/uniflow/pkg/scheme" @@ -14,11 +15,12 @@ import ( // Config defines configuration options for the Runtime. type Config struct { - Namespace string // Namespace defines the isolated execution environment for workflows. - Hook *hook.Hook // Hook is a collection of hook functions for managing symbols. - Scheme *scheme.Scheme // Scheme defines the scheme and behaviors for symbols. - SpecStore spec.Store // SpecStore is responsible for persisting symbols. - SecretStore secret.Store // SpecStore is responsible for persisting symbols. + Namespace string // Namespace defines the isolated execution environment for workflows. + Hook *hook.Hook // Hook is a collection of hook functions for managing symbols. + Scheme *scheme.Scheme // Scheme defines the scheme and behaviors for symbols. + SpecStore spec.Store // SpecStore is responsible for persisting symbols. + SecretStore secret.Store // SpecStore is responsible for persisting symbols. + Debugger *debug.Debugger // Debugger provides debugging capabilities. } // Runtime represents an environment for executing Workflows. @@ -49,9 +51,18 @@ func New(config Config) *Runtime { config.SecretStore = secret.NewStore() } + var loadHooks []symbol.LoadHook + var unloadHooks []symbol.UnloadHook + if config.Debugger != nil { + loadHooks = append(loadHooks, config.Debugger) + unloadHooks = append(unloadHooks, config.Debugger) + } + loadHooks = append(loadHooks, config.Hook) + unloadHooks = append(unloadHooks, config.Hook) + tb := symbol.NewTable(symbol.TableOptions{ - LoadHooks: []symbol.LoadHook{config.Hook}, - UnloadHooks: []symbol.UnloadHook{config.Hook}, + LoadHooks: loadHooks, + UnloadHooks: unloadHooks, }) ld := symbol.NewLoader(symbol.LoaderConfig{