Skip to content

Commit

Permalink
feat: support more function in packet and port
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Feb 12, 2025
1 parent a197a1e commit 711a43f
Show file tree
Hide file tree
Showing 20 changed files with 537 additions and 318 deletions.
10 changes: 5 additions & 5 deletions ext/pkg/control/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ func (n *CacheNode) forward(proc *process.Process) {
inPayload := inPck.Payload()
if outPayload, ok := n.lru.Load(inPayload); ok {
outPck := packet.New(outPayload)
n.tracer.Transform(inPck, outPck)
n.tracer.Reduce(outPck)
n.tracer.Link(inPck, outPck)
n.tracer.Write(nil, outPck)
} else {
n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) {
if _, ok := backPck.Payload().(types.Error); !ok {
n.lru.Store(inPayload, backPck.Payload())
}
n.tracer.Transform(inPck, backPck)
n.tracer.Reduce(backPck)
n.tracer.Link(inPck, backPck)
n.tracer.Write(nil, backPck)
}))

if outWriter == nil {
Expand Down
6 changes: 3 additions & 3 deletions ext/pkg/control/for.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (n *ForNode) forward(proc *process.Process) {
for i, outPayload := range outPayloads {
outPck := packet.New(outPayload)
outPcks[i] = outPck
n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)
}

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Transform(inPck, backPck)
n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Link(inPck, backPck)
if _, ok := backPck.Payload().(types.Error); ok {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
Expand Down
62 changes: 35 additions & 27 deletions ext/pkg/control/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type PipeNodeSpec struct {
type PipeNode struct {
tracer *packet.Tracer
inPort *port.InPort
outPorts []*port.OutPort
outPorts [2]*port.OutPort
errPort *port.OutPort
}

Expand All @@ -39,13 +39,13 @@ func NewPipeNode() *PipeNode {
n := &PipeNode{
tracer: packet.NewTracer(),
inPort: port.NewIn(),
outPorts: []*port.OutPort{port.NewOut(), port.NewOut()},
outPorts: [2]*port.OutPort{port.NewOut(), port.NewOut()},
errPort: port.NewOut(),
}

n.inPort.AddListener(port.ListenFunc(n.forward))
n.outPorts[0].AddListener(n.backward(0))
n.outPorts[1].AddListener(n.backward(1))
n.outPorts[0].AddListener(port.ListenFunc(n.backward0))
n.outPorts[1].AddListener(port.ListenFunc(n.backward1))
n.errPort.AddListener(port.ListenFunc(n.catch))

return n
Expand Down Expand Up @@ -93,42 +93,50 @@ func (n *PipeNode) Close() error {
func (n *PipeNode) forward(proc *process.Process) {
inReader := n.inPort.Open(proc)
var outWriter0 *packet.Writer
var outWriter1 *packet.Writer
var errWriter *packet.Writer

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Transform(inPck, backPck)
if _, ok := backPck.Payload().(types.Error); ok {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Write(errWriter, backPck)
} else {
if outWriter1 == nil {
outWriter1 = n.outPorts[1].Open(proc)
}
n.tracer.Write(outWriter1, backPck)
}
}))

if outWriter0 == nil {
outWriter0 = n.outPorts[0].Open(proc)
}
n.tracer.Write(outWriter0, inPck)
}
}

func (n *PipeNode) backward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
outWriter := n.outPorts[index].Open(proc)
func (n *PipeNode) backward0(proc *process.Process) {
outWriter0 := n.outPorts[0].Open(proc)
var outWriter1 *packet.Writer
var errWriter *packet.Writer

for backPck := range outWriter.Receive() {
n.tracer.Receive(outWriter, backPck)
for backPck := range outWriter0.Receive() {
outPcks := n.tracer.Writes(outWriter0)
if len(outPcks) > 0 {
n.tracer.Link(outPcks[0], backPck)
}
})

if _, ok := backPck.Payload().(types.Error); ok {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Write(errWriter, backPck)
} else {
if outWriter1 == nil {
outWriter1 = n.outPorts[1].Open(proc)
}
n.tracer.Write(outWriter1, backPck)
}

n.tracer.Receive(outWriter0, nil)
}
}

func (n *PipeNode) backward1(proc *process.Process) {
outWriter := n.outPorts[1].Open(proc)

for backPck := range outWriter.Receive() {
n.tracer.Receive(outWriter, backPck)
}
}

func (n *PipeNode) catch(proc *process.Process) {
Expand Down
8 changes: 4 additions & 4 deletions ext/pkg/control/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (n *RetryNode) forward(proc *process.Process) {
_, fail := backPck.Payload().(types.Error)
if !fail || count == n.threshold {
attempts.Delete(inPck)
n.tracer.Transform(inPck, backPck)
n.tracer.Reduce(backPck)
n.tracer.Link(inPck, backPck)
n.tracer.Write(nil, backPck)
return
}

Expand All @@ -109,14 +109,14 @@ func (n *RetryNode) forward(proc *process.Process) {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
n.tracer.AddHook(inPck, hook)
n.tracer.Dispatch(inPck, hook)
n.tracer.Write(outWriter, inPck)
})

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
n.tracer.AddHook(inPck, hook)
n.tracer.Dispatch(inPck, hook)
n.tracer.Write(outWriter, inPck)
}
}
Expand Down
6 changes: 3 additions & 3 deletions ext/pkg/control/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ func (n *SessionNode) forward(proc *process.Process) {
child := parent.Fork()
outPck := packet.New(types.NewSlice(value, inPck.Payload()))

n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)

children = append(children, child)
outPcks = append(outPcks, outPck)
}

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Dispatch(inPck, packet.HookFunc(func(backPck *packet.Packet) {
var err error
if v, ok := backPck.Payload().(types.Error); ok {
err = v.Unwrap()
Expand All @@ -135,7 +135,7 @@ func (n *SessionNode) forward(proc *process.Process) {
n.tracer.Write(outWriter, outPck)
}
if len(outPcks) == 0 {
n.tracer.Reduce(inPck)
n.tracer.Write(nil, inPck)
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions ext/pkg/control/try.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,36 +84,36 @@ func (n *TryNode) Close() error {
func (n *TryNode) forward(proc *process.Process) {
inReader := n.inPort.Open(proc)
var outWriter *packet.Writer
var errWriter *packet.Writer

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Transform(inPck, backPck)
if _, ok := backPck.Payload().(types.Error); ok {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Write(errWriter, backPck)
} else {
n.tracer.Reduce(backPck)
}
}))

n.tracer.Write(outWriter, inPck)
}
}

func (n *TryNode) backward(proc *process.Process) {
outWriter := n.outPort.Open(proc)
var errWriter *packet.Writer

for backPck := range outWriter.Receive() {
n.tracer.Receive(outWriter, backPck)
if _, ok := backPck.Payload().(types.Error); ok {
outPcks := n.tracer.Writes(outWriter)
if len(outPcks) > 0 {
n.tracer.Link(outPcks[0], backPck)
}

if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Write(errWriter, backPck)
n.tracer.Receive(outWriter, nil)
} else {
n.tracer.Receive(outWriter, backPck)
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/node/manytoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,21 @@ func (n *ManyToOneNode) forward(index int) port.Listener {
n.tracer.Read(inReader, inPck)

if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) {
n.tracer.Reduce(inPck)
n.tracer.Write(nil, inPck)
} else if outPck, errPck := n.action(proc, inPcks); errPck != nil {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Transform(inPck, errPck)
n.tracer.Link(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else if outPck != nil {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)
n.tracer.Write(outWriter, outPck)
} else {
n.tracer.Reduce(inPck)
n.tracer.Write(nil, inPck)
}
}
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/node/onetomany.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ func (n *OneToManyNode) forward(proc *process.Process) {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Transform(inPck, errPck)
n.tracer.Link(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else {
for i, outPck := range outPcks {
if i < len(outWriters) && outPck != nil {
n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)
}
}

Expand All @@ -125,7 +125,7 @@ func (n *OneToManyNode) forward(proc *process.Process) {
}

if count == 0 {
n.tracer.Reduce(inPck)
n.tracer.Write(nil, inPck)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/node/onetoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func (n *OneToOneNode) forward(proc *process.Process) {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Transform(inPck, errPck)
n.tracer.Link(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
n.tracer.Transform(inPck, outPck)
n.tracer.Link(inPck, outPck)
n.tracer.Write(outWriter, outPck)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/packet/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Packet struct {
var None = New(nil)

// ErrDroppedPacket is an error indicating a dropped packet.
var ErrDroppedPacket = errors.New("dropped packet")
var ErrDroppedPacket = types.NewError(errors.New("dropped packet"))

// Join combines multiple packets into one, handling errors and payloads.
func Join(pcks ...*Packet) *Packet {
Expand Down
4 changes: 1 addition & 3 deletions pkg/packet/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package packet

import (
"sync"

"github.com/siyul-park/uniflow/pkg/types"
)

// Reader represents a packet reader that manages incoming packets from multiple writers.
Expand Down Expand Up @@ -127,7 +125,7 @@ func (r *Reader) Close() {
return
}

pck := New(types.NewError(ErrDroppedPacket))
pck := New(ErrDroppedPacket)
for _, w := range r.writers {
r.outbounds.Handle(pck)
go w.receive(pck, r)
Expand Down
Loading

0 comments on commit 711a43f

Please sign in to comment.