Skip to content

Commit

Permalink
feat: rename ChannelMerge to FanIn and add FanOut (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
samber authored Nov 11, 2022
1 parent 27d8810 commit 34ef81e
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 9 deletions.
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ Supported helpers for channels:
- [Generator](#generator)
- [Batch](#batch)
- [BatchWithTimeout](#batchwithtimeout)
- [ChannelMerge](#channelmerge)
- [FanIn](#fanin)
- [FanOut](#fanout)

Supported intersection helpers:

Expand Down Expand Up @@ -1535,16 +1536,27 @@ for i := range children {
}
```

### ChannelMerge
### FanIn

Collects messages from multiple input channels into a single buffered channel. Output messages has no priority.
Collects messages from multiple input channels into a single buffered channel. Output messages has no priority. When all upstream channels reach EOF, downstream channel closes.

```go
stream1 := make(chan int, 42)
stream2 := make(chan int, 42)
stream3 := make(chan int, 42)

all := lo.ChannelMerge(100, stream1, stream2, stream3)
all := lo.FanIn(100, stream1, stream2, stream3)
```

### FanOut

Broadcasts all the upstream messages to multiple downstream channels. When upstream channel reach EOF, downstream channels close. If any downstream channels is full, broadcasting is paused.

```go
stream := make(chan int, 42)

all := lo.FanOut(5, 42, stream)
// [5]<-chan int
```

### Contains
Expand Down
35 changes: 32 additions & 3 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (coll
return buffer, index, time.Since(now), true
}

// ChannelMerge collects messages from multiple input channels into a single buffered channel.
// Output messages has no priority.
func ChannelMerge[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T {
// FanIn collects messages from multiple input channels into a single buffered channel.
// Output messages has no priority. When all upstream channels reach EOF, downstream channel closes.
func FanIn[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T {
out := make(chan T, channelBufferCap)
var wg sync.WaitGroup

Expand All @@ -263,3 +263,32 @@ func ChannelMerge[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T {
}()
return out
}

// ChannelMerge collects messages from multiple input channels into a single buffered channel.
// Output messages has no priority. When all upstream channels reach EOF, downstream channel closes.
// Deprecated: Use lo.FanIn instead.
func ChannelMerge[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T {
return FanIn(channelBufferCap, upstreams...)
}

// FanOut broadcasts all the upstream messages to multiple downstream channels.
// When upstream channel reach EOF, downstream channels close. If any downstream
// channels is full, broadcasting is paused.
func FanOut[T any](count int, channelsBufferCap int, upstream <-chan T) []<-chan T {
downstreams := createChannels[T](count, channelsBufferCap)

go func() {
for msg := range upstream {
for i := range downstreams {
downstreams[i] <- msg
}
}

// Close out once all the output goroutines are done.
for i := range downstreams {
close(downstreams[i])
}
}()

return channelsToReadOnly(downstreams)
}
35 changes: 33 additions & 2 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func TestBatchWithTimeout(t *testing.T) {
is.False(ok5)
}

func TestChannelMerge(t *testing.T) {
func TestFanIn(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
Expand All @@ -332,7 +332,7 @@ func TestChannelMerge(t *testing.T) {
close(upstreams[i])
}(i)
}
out := ChannelMerge(10, roupstreams...)
out := FanIn(10, roupstreams...)
time.Sleep(10 * time.Millisecond)

// check input channels
Expand All @@ -357,3 +357,34 @@ func TestChannelMerge(t *testing.T) {
is.Equal(false, ok0)
is.Equal(0, msg0)
}

func TestFanOut(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)

upstream := SliceToChannel(10, []int{0, 1, 2, 3, 4, 5})
rodownstreams := FanOut(3, 10, upstream)

time.Sleep(10 * time.Millisecond)

// check output channels
is.Equal(3, len(rodownstreams))

// check channels allocation
for i := range rodownstreams {
is.Equal(6, len(rodownstreams[i]))
is.Equal(10, cap(rodownstreams[i]))
is.Equal([]int{0, 1, 2, 3, 4, 5}, ChannelToSlice(rodownstreams[i]))
}

// check it is closed
time.Sleep(10 * time.Millisecond)

// check channels allocation
for i := range rodownstreams {
msg, ok := <-rodownstreams[i]
is.Equal(false, ok)
is.Equal(0, msg)
}
}

0 comments on commit 34ef81e

Please sign in to comment.