Skip to content

Commit

Permalink
refactor: change method name in tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Feb 11, 2025
1 parent a197a1e commit bc1c871
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 131 deletions.
10 changes: 5 additions & 5 deletions ext/pkg/control/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ func (n *CacheNode) forward(proc *process.Process) {
inPayload := inPck.Payload()
if outPayload, ok := n.lru.Load(inPayload); ok {
outPck := packet.New(outPayload)
n.tracer.Transform(inPck, outPck)
n.tracer.Reduce(outPck)
n.tracer.Link(inPck, outPck)
n.tracer.Write(nil, outPck)
} else {
n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) {
if _, ok := backPck.Payload().(types.Error); !ok {
n.lru.Store(inPayload, backPck.Payload())
}
n.tracer.Transform(inPck, backPck)
n.tracer.Reduce(backPck)
n.tracer.Link(inPck, backPck)
n.tracer.Write(nil, backPck)
}))

if outWriter == nil {
Expand Down
6 changes: 3 additions & 3 deletions ext/pkg/control/for.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (n *ForNode) forward(proc *process.Process) {
for i, outPayload := range outPayloads {
outPck := packet.New(outPayload)
outPcks[i] = outPck
n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)
}

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Transform(inPck, backPck)
n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Link(inPck, backPck)
if _, ok := backPck.Payload().(types.Error); ok {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
Expand Down
4 changes: 2 additions & 2 deletions ext/pkg/control/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (n *PipeNode) forward(proc *process.Process) {
for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Transform(inPck, backPck)
n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Link(inPck, backPck)
if _, ok := backPck.Payload().(types.Error); ok {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
Expand Down
8 changes: 4 additions & 4 deletions ext/pkg/control/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (n *RetryNode) forward(proc *process.Process) {
_, fail := backPck.Payload().(types.Error)
if !fail || count == n.threshold {
attempts.Delete(inPck)
n.tracer.Transform(inPck, backPck)
n.tracer.Reduce(backPck)
n.tracer.Link(inPck, backPck)
n.tracer.Write(nil, backPck)
return
}

Expand All @@ -109,14 +109,14 @@ func (n *RetryNode) forward(proc *process.Process) {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
n.tracer.AddHook(inPck, hook)
n.tracer.Dispatch(inPck, hook)
n.tracer.Write(outWriter, inPck)
})

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
n.tracer.AddHook(inPck, hook)
n.tracer.Dispatch(inPck, hook)
n.tracer.Write(outWriter, inPck)
}
}
Expand Down
6 changes: 3 additions & 3 deletions ext/pkg/control/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ func (n *SessionNode) forward(proc *process.Process) {
child := parent.Fork()
outPck := packet.New(types.NewSlice(value, inPck.Payload()))

n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)

children = append(children, child)
outPcks = append(outPcks, outPck)
}

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) {
var err error
if v, ok := backPck.Payload().(types.Error); ok {
err = v.Unwrap()
Expand All @@ -135,7 +135,7 @@ func (n *SessionNode) forward(proc *process.Process) {
n.tracer.Write(outWriter, outPck)
}
if len(outPcks) == 0 {
n.tracer.Reduce(inPck)
n.tracer.Write(nil, inPck)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions ext/pkg/control/try.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ func (n *TryNode) forward(proc *process.Process) {
outWriter = n.outPort.Open(proc)
}

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Transform(inPck, backPck)
n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Link(inPck, backPck)
if _, ok := backPck.Payload().(types.Error); ok {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Write(errWriter, backPck)
} else {
n.tracer.Reduce(backPck)
n.tracer.Write(nil, backPck)
}
}))

Expand Down
8 changes: 4 additions & 4 deletions pkg/node/manytoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,21 @@ func (n *ManyToOneNode) forward(index int) port.Listener {
n.tracer.Read(inReader, inPck)

if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) {
n.tracer.Reduce(inPck)
n.tracer.Write(nil, inPck)
} else if outPck, errPck := n.action(proc, inPcks); errPck != nil {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Transform(inPck, errPck)
n.tracer.Link(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else if outPck != nil {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)
n.tracer.Write(outWriter, outPck)
} else {
n.tracer.Reduce(inPck)
n.tracer.Write(nil, inPck)
}
}
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/node/onetomany.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ func (n *OneToManyNode) forward(proc *process.Process) {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Transform(inPck, errPck)
n.tracer.Link(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else {
for i, outPck := range outPcks {
if i < len(outWriters) && outPck != nil {
n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)
}
}

Expand All @@ -125,7 +125,7 @@ func (n *OneToManyNode) forward(proc *process.Process) {
}

if count == 0 {
n.tracer.Reduce(inPck)
n.tracer.Write(nil, inPck)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/node/onetoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func (n *OneToOneNode) forward(proc *process.Process) {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Transform(inPck, errPck)
n.tracer.Link(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)
n.tracer.Write(outWriter, outPck)
}
}
Expand Down
106 changes: 36 additions & 70 deletions pkg/packet/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Tracer struct {
mu sync.Mutex
}

// NewTracer initializes a new Tracer instance.
// NewTracer initializes and returns a new Tracer instance.
func NewTracer() *Tracer {
return &Tracer{
hooks: make(map[uuid.UUID]Hooks),
Expand All @@ -33,16 +33,16 @@ func NewTracer() *Tracer {
}
}

// AddHook adds a Handler to be invoked when a packet completes processing.
func (t *Tracer) AddHook(pck *Packet, hook Hook) {
// Dispatch registers a hook to be executed when a packet completes processing.
func (t *Tracer) Dispatch(pck *Packet, hook Hook) {
t.mu.Lock()
defer t.mu.Unlock()

t.hooks[pck.ID()] = append(t.hooks[pck.ID()], hook)
}

// Transform tracks the transformation of a source packet into a target packet.
func (t *Tracer) Transform(source, target *Packet) {
// Link establishes a relationship between a source packet and a transformed target packet.
func (t *Tracer) Link(source, target *Packet) {
t.mu.Lock()
defer t.mu.Unlock()

Expand All @@ -55,17 +55,7 @@ func (t *Tracer) Transform(source, target *Packet) {
t.receives[source.ID()] = append(t.receives[source.ID()], nil)
}

// Reduce processes a packet and its subsequent transformations.
func (t *Tracer) Reduce(pck *Packet) {
t.mu.Lock()
defer t.mu.Unlock()

t.reduce(pck.ID(), pck)
t.handle(pck.ID())
t.receive(pck.ID())
}

// Read logs a packet being read by a specific reader.
// Read logs that a packet was read by a specific reader.
func (t *Tracer) Read(reader *Reader, pck *Packet) {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -74,8 +64,7 @@ func (t *Tracer) Read(reader *Reader, pck *Packet) {
t.reader[pck.ID()] = reader
}

// Write logs a packet being written by a specific writer. If the writer's write
// operation is successful, it updates the tracking maps; otherwise, it processes the packet.
// Write logs a packet write; on failure, it processes the packet immediately.
func (t *Tracer) Write(writer *Writer, pck *Packet) {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -84,13 +73,12 @@ func (t *Tracer) Write(writer *Writer, pck *Packet) {
t.writes[writer] = append(t.writes[writer], pck.ID())
t.receives[pck.ID()] = append(t.receives[pck.ID()], nil)
} else {
t.reduce(pck.ID(), pck)
t.handle(pck.ID())
t.receive(pck.ID())
t.receive(pck.ID(), pck)
t.resolve(pck.ID())
}
}

// Receive handles the receipt of a packet by a writer and processes it further if necessary.
// Receive processes a packet received by a writer and continues tracking it.
func (t *Tracer) Receive(writer *Writer, pck *Packet) {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -107,12 +95,11 @@ func (t *Tracer) Receive(writer *Writer, pck *Packet) {
delete(t.writes, writer)
}

t.reduce(write, pck)
t.handle(write)
t.receive(write)
t.receive(write, pck)
t.resolve(write)
}

// Close terminates readers and clears internal resources.
// Close releases resources and signals readers with an error before shutting down.
func (t *Tracer) Close() {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -130,35 +117,34 @@ func (t *Tracer) Close() {
t.reader = make(map[uuid.UUID]*Reader)
}

func (t *Tracer) reduce(source uuid.UUID, target *Packet) {
targets := t.targets[source]
receives := t.receives[source]

offset := 0
for i := 0; i < len(targets); i++ {
if receives[i+offset] != nil {
i--
offset++
func (t *Tracer) receive(source uuid.UUID, target *Packet) {
for i, receive := range t.receives[source] {
if receive == nil {
t.receives[source][i] = target
return
}
}

ok := false
for i := len(targets) + offset; i < len(receives); i++ {
if receives[i] == nil {
receives[i] = target
ok = true
break
}
}
if !ok {
receives = append(receives, target)
t.receives[source] = receives
}
t.receives[source] = append(t.receives[source], target)
}

func (t *Tracer) receive(id uuid.UUID) {
func (t *Tracer) resolve(id uuid.UUID) {
receives := t.receives[id]
if slices.Contains(receives, nil) {
return
}

if hooks := t.hooks[id]; len(hooks) > 0 {
join := Join(receives...)

delete(t.hooks, id)
delete(t.receives, id)

t.mu.Unlock()
hooks.Handle(join)
t.mu.Lock()
}

receives = t.receives[id]
if slices.Contains(receives, nil) {
return
}
Expand Down Expand Up @@ -192,8 +178,7 @@ func (t *Tracer) receive(id uuid.UUID) {
delete(t.targets, source)
}

t.handle(source)
t.receive(source)
t.resolve(source)
}
}

Expand Down Expand Up @@ -225,22 +210,3 @@ func (t *Tracer) receive(id uuid.UUID) {
delete(t.receives, id)
}
}

func (t *Tracer) handle(id uuid.UUID) {
receives := t.receives[id]

if slices.Contains(receives, nil) {
return
}

if hooks := t.hooks[id]; len(hooks) > 0 {
join := Join(receives...)

delete(t.hooks, id)
delete(t.receives, id)

t.mu.Unlock()
hooks.Handle(join)
t.mu.Lock()
}
}
Loading

0 comments on commit bc1c871

Please sign in to comment.