From 1936412d6198cf94c5ab15b05037a962185547b2 Mon Sep 17 00:00:00 2001 From: siyul-park Date: Wed, 12 Feb 2025 00:04:42 +0900 Subject: [PATCH] refactor: change method name in tracer --- ext/pkg/control/cache.go | 10 ++-- ext/pkg/control/for.go | 6 +-- ext/pkg/control/pipe.go | 4 +- ext/pkg/control/retry.go | 8 +-- ext/pkg/control/session.go | 6 +-- ext/pkg/control/try.go | 6 +-- pkg/node/manytoone.go | 8 +-- pkg/node/onetomany.go | 6 +-- pkg/node/onetoone.go | 4 +- pkg/packet/tracer.go | 101 +++++++++++++------------------------ pkg/packet/tracer_test.go | 36 ++----------- 11 files changed, 67 insertions(+), 128 deletions(-) diff --git a/ext/pkg/control/cache.go b/ext/pkg/control/cache.go index 3f936549..5b97d882 100644 --- a/ext/pkg/control/cache.go +++ b/ext/pkg/control/cache.go @@ -92,15 +92,15 @@ func (n *CacheNode) forward(proc *process.Process) { inPayload := inPck.Payload() if outPayload, ok := n.lru.Load(inPayload); ok { outPck := packet.New(outPayload) - n.tracer.Transform(inPck, outPck) - n.tracer.Reduce(outPck) + n.tracer.Link(inPck, outPck) + n.tracer.Write(nil, outPck) } else { - n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) { + n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) { if _, ok := backPck.Payload().(types.Error); !ok { n.lru.Store(inPayload, backPck.Payload()) } - n.tracer.Transform(inPck, backPck) - n.tracer.Reduce(backPck) + n.tracer.Link(inPck, backPck) + n.tracer.Write(nil, backPck) })) if outWriter == nil { diff --git a/ext/pkg/control/for.go b/ext/pkg/control/for.go index a8ec61d8..aa941f33 100644 --- a/ext/pkg/control/for.go +++ b/ext/pkg/control/for.go @@ -112,11 +112,11 @@ func (n *ForNode) forward(proc *process.Process) { for i, outPayload := range outPayloads { outPck := packet.New(outPayload) outPcks[i] = outPck - n.tracer.Transform(inPck, outPck) + n.tracer.Link(inPck, outPck) } - n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) { - n.tracer.Transform(inPck, backPck) + n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) { + n.tracer.Link(inPck, backPck) if _, ok := backPck.Payload().(types.Error); ok { if errWriter == nil { errWriter = n.errPort.Open(proc) diff --git a/ext/pkg/control/pipe.go b/ext/pkg/control/pipe.go index f88dd2f5..99b08dc1 100644 --- a/ext/pkg/control/pipe.go +++ b/ext/pkg/control/pipe.go @@ -99,8 +99,8 @@ func (n *PipeNode) forward(proc *process.Process) { for inPck := range inReader.Read() { n.tracer.Read(inReader, inPck) - n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) { - n.tracer.Transform(inPck, backPck) + n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) { + n.tracer.Link(inPck, backPck) if _, ok := backPck.Payload().(types.Error); ok { if errWriter == nil { errWriter = n.errPort.Open(proc) diff --git a/ext/pkg/control/retry.go b/ext/pkg/control/retry.go index cf13220e..705dd05c 100644 --- a/ext/pkg/control/retry.go +++ b/ext/pkg/control/retry.go @@ -96,8 +96,8 @@ func (n *RetryNode) forward(proc *process.Process) { _, fail := backPck.Payload().(types.Error) if !fail || count == n.threshold { attempts.Delete(inPck) - n.tracer.Transform(inPck, backPck) - n.tracer.Reduce(backPck) + n.tracer.Link(inPck, backPck) + n.tracer.Write(nil, backPck) return } @@ -109,14 +109,14 @@ func (n *RetryNode) forward(proc *process.Process) { if outWriter == nil { outWriter = n.outPort.Open(proc) } - n.tracer.AddHook(inPck, hook) + n.tracer.Dispatch(inPck, hook) n.tracer.Write(outWriter, inPck) }) if outWriter == nil { outWriter = n.outPort.Open(proc) } - n.tracer.AddHook(inPck, hook) + n.tracer.Dispatch(inPck, hook) n.tracer.Write(outWriter, inPck) } } diff --git a/ext/pkg/control/session.go b/ext/pkg/control/session.go index 8e313566..29a1a51e 100644 --- a/ext/pkg/control/session.go +++ b/ext/pkg/control/session.go @@ -112,13 +112,13 @@ func (n *SessionNode) forward(proc *process.Process) { child := parent.Fork() outPck := packet.New(types.NewSlice(value, inPck.Payload())) - n.tracer.Transform(inPck, outPck) + n.tracer.Link(inPck, outPck) children = append(children, child) outPcks = append(outPcks, outPck) } - n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) { + n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) { var err error if v, ok := backPck.Payload().(types.Error); ok { err = v.Unwrap() @@ -135,7 +135,7 @@ func (n *SessionNode) forward(proc *process.Process) { n.tracer.Write(outWriter, outPck) } if len(outPcks) == 0 { - n.tracer.Reduce(inPck) + n.tracer.Write(nil, inPck) } } } diff --git a/ext/pkg/control/try.go b/ext/pkg/control/try.go index 92c3dd00..88e1cca9 100644 --- a/ext/pkg/control/try.go +++ b/ext/pkg/control/try.go @@ -93,15 +93,15 @@ func (n *TryNode) forward(proc *process.Process) { outWriter = n.outPort.Open(proc) } - n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) { - n.tracer.Transform(inPck, backPck) + n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) { + n.tracer.Link(inPck, backPck) if _, ok := backPck.Payload().(types.Error); ok { if errWriter == nil { errWriter = n.errPort.Open(proc) } n.tracer.Write(errWriter, backPck) } else { - n.tracer.Reduce(backPck) + n.tracer.Write(nil, backPck) } })) diff --git a/pkg/node/manytoone.go b/pkg/node/manytoone.go index 8bd86258..f9cbede8 100644 --- a/pkg/node/manytoone.go +++ b/pkg/node/manytoone.go @@ -113,21 +113,21 @@ func (n *ManyToOneNode) forward(index int) port.Listener { n.tracer.Read(inReader, inPck) if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) { - n.tracer.Reduce(inPck) + n.tracer.Write(nil, inPck) } else if outPck, errPck := n.action(proc, inPcks); errPck != nil { if errWriter == nil { errWriter = n.errPort.Open(proc) } - n.tracer.Transform(inPck, errPck) + n.tracer.Link(inPck, errPck) n.tracer.Write(errWriter, errPck) } else if outPck != nil { if outWriter == nil { outWriter = n.outPort.Open(proc) } - n.tracer.Transform(inPck, outPck) + n.tracer.Link(inPck, outPck) n.tracer.Write(outWriter, outPck) } else { - n.tracer.Reduce(inPck) + n.tracer.Write(nil, inPck) } } }) diff --git a/pkg/node/onetomany.go b/pkg/node/onetomany.go index 5d15c6bf..8fe291d1 100644 --- a/pkg/node/onetomany.go +++ b/pkg/node/onetomany.go @@ -104,12 +104,12 @@ func (n *OneToManyNode) forward(proc *process.Process) { if errWriter == nil { errWriter = n.errPort.Open(proc) } - n.tracer.Transform(inPck, errPck) + n.tracer.Link(inPck, errPck) n.tracer.Write(errWriter, errPck) } else { for i, outPck := range outPcks { if i < len(outWriters) && outPck != nil { - n.tracer.Transform(inPck, outPck) + n.tracer.Link(inPck, outPck) } } @@ -125,7 +125,7 @@ func (n *OneToManyNode) forward(proc *process.Process) { } if count == 0 { - n.tracer.Reduce(inPck) + n.tracer.Write(nil, inPck) } } } diff --git a/pkg/node/onetoone.go b/pkg/node/onetoone.go index b890722d..d27c2c31 100644 --- a/pkg/node/onetoone.go +++ b/pkg/node/onetoone.go @@ -79,13 +79,13 @@ func (n *OneToOneNode) forward(proc *process.Process) { if errWriter == nil { errWriter = n.errPort.Open(proc) } - n.tracer.Transform(inPck, errPck) + n.tracer.Link(inPck, errPck) n.tracer.Write(errWriter, errPck) } else { if outWriter == nil { outWriter = n.outPort.Open(proc) } - n.tracer.Transform(inPck, outPck) + n.tracer.Link(inPck, outPck) n.tracer.Write(outWriter, outPck) } } diff --git a/pkg/packet/tracer.go b/pkg/packet/tracer.go index 4159a12a..516fbda2 100644 --- a/pkg/packet/tracer.go +++ b/pkg/packet/tracer.go @@ -20,7 +20,7 @@ type Tracer struct { mu sync.Mutex } -// NewTracer initializes a new Tracer instance. +// NewTracer initializes and returns a new Tracer instance. func NewTracer() *Tracer { return &Tracer{ hooks: make(map[uuid.UUID]Hooks), @@ -33,16 +33,16 @@ func NewTracer() *Tracer { } } -// AddHook adds a Handler to be invoked when a packet completes processing. -func (t *Tracer) AddHook(pck *Packet, hook Hook) { +// Dispatch registers a hook to be executed when a packet completes processing. +func (t *Tracer) Dispatch(pck *Packet, hook Hook) { t.mu.Lock() defer t.mu.Unlock() t.hooks[pck.ID()] = append(t.hooks[pck.ID()], hook) } -// Transform tracks the transformation of a source packet into a target packet. -func (t *Tracer) Transform(source, target *Packet) { +// Link establishes a relationship between a source packet and a transformed target packet. +func (t *Tracer) Link(source, target *Packet) { t.mu.Lock() defer t.mu.Unlock() @@ -55,17 +55,7 @@ func (t *Tracer) Transform(source, target *Packet) { t.receives[source.ID()] = append(t.receives[source.ID()], nil) } -// Reduce processes a packet and its subsequent transformations. -func (t *Tracer) Reduce(pck *Packet) { - t.mu.Lock() - defer t.mu.Unlock() - - t.reduce(pck.ID(), pck) - t.handle(pck.ID()) - t.receive(pck.ID()) -} - -// Read logs a packet being read by a specific reader. +// Read logs that a packet was read by a specific reader. func (t *Tracer) Read(reader *Reader, pck *Packet) { t.mu.Lock() defer t.mu.Unlock() @@ -74,8 +64,7 @@ func (t *Tracer) Read(reader *Reader, pck *Packet) { t.reader[pck.ID()] = reader } -// Write logs a packet being written by a specific writer. If the writer's write -// operation is successful, it updates the tracking maps; otherwise, it processes the packet. +// Write logs a packet write; on failure, it processes the packet immediately. func (t *Tracer) Write(writer *Writer, pck *Packet) { t.mu.Lock() defer t.mu.Unlock() @@ -84,13 +73,12 @@ func (t *Tracer) Write(writer *Writer, pck *Packet) { t.writes[writer] = append(t.writes[writer], pck.ID()) t.receives[pck.ID()] = append(t.receives[pck.ID()], nil) } else { - t.reduce(pck.ID(), pck) - t.handle(pck.ID()) - t.receive(pck.ID()) + t.receive(pck.ID(), pck) + t.resolve(pck.ID()) } } -// Receive handles the receipt of a packet by a writer and processes it further if necessary. +// Receive processes a packet received by a writer and continues tracking it. func (t *Tracer) Receive(writer *Writer, pck *Packet) { t.mu.Lock() defer t.mu.Unlock() @@ -107,12 +95,11 @@ func (t *Tracer) Receive(writer *Writer, pck *Packet) { delete(t.writes, writer) } - t.reduce(write, pck) - t.handle(write) - t.receive(write) + t.receive(write, pck) + t.resolve(write) } -// Close terminates readers and clears internal resources. +// Close releases resources and signals readers with an error before shutting down. func (t *Tracer) Close() { t.mu.Lock() defer t.mu.Unlock() @@ -130,35 +117,35 @@ func (t *Tracer) Close() { t.reader = make(map[uuid.UUID]*Reader) } -func (t *Tracer) reduce(source uuid.UUID, target *Packet) { - targets := t.targets[source] +func (t *Tracer) receive(source uuid.UUID, target *Packet) { receives := t.receives[source] - - offset := 0 - for i := 0; i < len(targets); i++ { - if receives[i+offset] != nil { - i-- - offset++ - } - } - - ok := false - for i := len(targets) + offset; i < len(receives); i++ { + for i := 0; i < len(receives); i++ { if receives[i] == nil { receives[i] = target - ok = true - break + return } } - if !ok { - receives = append(receives, target) - t.receives[source] = receives - } + t.receives[source] = append(receives, target) } -func (t *Tracer) receive(id uuid.UUID) { +func (t *Tracer) resolve(id uuid.UUID) { receives := t.receives[id] + if slices.Contains(receives, nil) { + return + } + + if hooks := t.hooks[id]; len(hooks) > 0 { + join := Join(receives...) + + delete(t.hooks, id) + delete(t.receives, id) + + t.mu.Unlock() + hooks.Handle(join) + t.mu.Lock() + } + receives = t.receives[id] if slices.Contains(receives, nil) { return } @@ -192,8 +179,7 @@ func (t *Tracer) receive(id uuid.UUID) { delete(t.targets, source) } - t.handle(source) - t.receive(source) + t.resolve(source) } } @@ -225,22 +211,3 @@ func (t *Tracer) receive(id uuid.UUID) { delete(t.receives, id) } } - -func (t *Tracer) handle(id uuid.UUID) { - receives := t.receives[id] - - if slices.Contains(receives, nil) { - return - } - - if hooks := t.hooks[id]; len(hooks) > 0 { - join := Join(receives...) - - delete(t.hooks, id) - delete(t.receives, id) - - t.mu.Unlock() - hooks.Handle(join) - t.mu.Lock() - } -} diff --git a/pkg/packet/tracer_test.go b/pkg/packet/tracer_test.go index 5d1274ab..88c9b0e7 100644 --- a/pkg/packet/tracer_test.go +++ b/pkg/packet/tracer_test.go @@ -3,13 +3,10 @@ package packet import ( "testing" - "github.com/go-faker/faker/v4" - "github.com/siyul-park/uniflow/pkg/types" - "github.com/stretchr/testify/assert" ) -func TestTracer_AddHook(t *testing.T) { +func TestTracer_Dispatch(t *testing.T) { w1 := NewWriter() defer w1.Close() @@ -42,7 +39,7 @@ func TestTracer_AddHook(t *testing.T) { w2.Receive() count := 0 - tr.AddHook(pck1, HookFunc(func(pck *Packet) { + tr.Dispatch(pck1, HookFunc(func(pck *Packet) { count++ })) @@ -50,7 +47,7 @@ func TestTracer_AddHook(t *testing.T) { assert.Equal(t, 1, count) } -func TestTracer_Transform(t *testing.T) { +func TestTracer_Link(t *testing.T) { w1 := NewWriter() defer w1.Close() @@ -77,7 +74,7 @@ func TestTracer_Transform(t *testing.T) { <-r1.Read() tr.Read(r1, pck1) - tr.Transform(pck1, pck2) + tr.Link(pck1, pck2) tr.Write(w2, pck2) <-r2.Read() @@ -91,31 +88,6 @@ func TestTracer_Transform(t *testing.T) { assert.Equal(t, pck3, pck4) } -func TestTracer_Reduce(t *testing.T) { - w1 := NewWriter() - defer w1.Close() - - r1 := NewReader() - defer r1.Close() - - w1.Link(r1) - - tr := NewTracer() - defer tr.Close() - - pck1 := New(types.NewString(faker.UUIDHyphenated())) - - w1.Write(pck1) - <-r1.Read() - - tr.Read(r1, pck1) - tr.Reduce(pck1) - - pck2, ok := <-w1.Receive() - assert.True(t, ok) - assert.Equal(t, pck1.Payload(), pck2.Payload()) -} - func TestTracer_ReadAndWriteAndReceive(t *testing.T) { w1 := NewWriter() defer w1.Close()