From 8cb7ee7613b5b68e8800ccd50b744b619270bff3 Mon Sep 17 00:00:00 2001 From: Manfred Touron <94029+moul@users.noreply.github.com> Date: Tue, 29 Dec 2020 13:46:06 +0100 Subject: [PATCH 1/2] feat: add Future --- pattern.go | 16 ++++++++++++++++ pattern_test.go | 18 ++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/pattern.go b/pattern.go index 878ba4d..a9835aa 100644 --- a/pattern.go +++ b/pattern.go @@ -18,3 +18,19 @@ func CheckErr(err error) { panic(err) } } + +// Future starts running the given function in background and return a chan that will return the result of the execution. +func Future(fn func() (interface{}, error)) <-chan FutureRet { + c := make(chan FutureRet, 1) + go func() { + ret, err := fn() + c <- FutureRet{Ret: ret, Err: err} + }() + return c +} + +// FutureRet is a generic struct returned by Future. +type FutureRet struct { + Ret interface{} + Err error +} diff --git a/pattern_test.go b/pattern_test.go index 29aeaa2..f347bc6 100644 --- a/pattern_test.go +++ b/pattern_test.go @@ -3,6 +3,7 @@ package u_test import ( "fmt" "net/http" + "time" "moul.io/u" ) @@ -19,3 +20,20 @@ func ExampleCheckErr() { _, err := http.Get("http://foo.bar") u.CheckErr(err) // panic } + +func ExampleFuture() { + future := u.Future(func() (interface{}, error) { + time.Sleep(100 * time.Millisecond) + return "foobar", nil + }) + + // here, we can do some stuff + + ret := <-future + fmt.Println("Ret:", ret.Ret) + fmt.Println("Err:", ret.Err) + + // Output: + // Ret: foobar + // Err: +} From d3a4840096286653af184eba350adeae609d5fb3 Mon Sep 17 00:00:00 2001 From: Manfred Touron <94029+moul@users.noreply.github.com> Date: Tue, 29 Dec 2020 14:10:02 +0100 Subject: [PATCH 2/2] feat: add FanIn --- README.md | 17 +++++++++++++++++ concurrency.go | 31 ++++++++++++++++++++++++++++++- concurrency_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ depaware.txt | 2 -- 4 files changed, 92 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 84724e1..574bb6d 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,9 @@ func CaptureStdoutAndStderr() (func() string, error) CaptureStdoutAndStderr temporarily pipes os.Stdout and os.Stderr into a buffer. +func CheckErr(err error) + CheckErr panics if the passed error is not nil. + func CombineFuncs(left func(), right ...func()) func() CombineFuncs create a chain of functions. This can be particularly useful for creating cleanup function progressively. It solves the infinite loop you @@ -64,9 +67,16 @@ func ExecStandaloneOutputs(cmd *exec.Cmd) ([]byte, []byte, error) standard error. func ExpandUser(path string) (string, error) +func FanIn(chans ...<-chan interface{}) <-chan interface{} + FanIn merges multiple input chans events into one. + func FileExists(path string) bool FileExists checks whether a path exists and is a regular file. +func Future(fn func() (interface{}, error)) <-chan FutureRet + Future starts running the given function in background and return a chan + that will return the result of the execution. + func JSON(input interface{}) string JSON returns a JSON representation of the passed input. @@ -134,6 +144,12 @@ func WaitForCtrlC() TYPES +type FutureRet struct { + Ret interface{} + Err error +} + FutureRet is a generic struct returned by Future. + type MutexMap struct { // Has unexported fields. } @@ -154,6 +170,7 @@ type UniqueChild interface { auto-kill itself when its context is done. func NewUniqueChild(ctx context.Context) UniqueChild + NewUniqueChild instantiates and returns a UniqueChild manager. ``` diff --git a/concurrency.go b/concurrency.go index cbf5536..9d7a0ea 100644 --- a/concurrency.go +++ b/concurrency.go @@ -1,6 +1,9 @@ package u -import "context" +import ( + "context" + "sync" +) // UniqueChild is a goroutine manager (parent) that can only have one child at a time. // When you call UniqueChild.SetChild(), UniqueChild cancels the previous child context (if any), then run a new child. @@ -10,6 +13,7 @@ type UniqueChild interface { CloseChild() } +// NewUniqueChild instantiates and returns a UniqueChild manager. func NewUniqueChild(ctx context.Context) UniqueChild { return &uniqueChild{ctx: ctx} } type uniqueChild struct { @@ -33,3 +37,28 @@ func (parent *uniqueChild) CloseChild() { parent.lastChildCancelFn() } } + +// FanIn merges multiple input chans events into one. +func FanIn(chans ...<-chan interface{}) <-chan interface{} { + merged := make(chan interface{}) + var wg sync.WaitGroup + wg.Add(len(chans)) + + output := func(c <-chan interface{}) { + for item := range c { + merged <- item + } + wg.Done() + } + + for _, ch := range chans { + go output(ch) + } + + go func() { + wg.Wait() + close(merged) + }() + + return merged +} diff --git a/concurrency_test.go b/concurrency_test.go index 1d0c3c5..07ce4b5 100644 --- a/concurrency_test.go +++ b/concurrency_test.go @@ -3,6 +3,8 @@ package u_test import ( "context" "fmt" + "sort" + "strings" "time" "moul.io/u" @@ -81,3 +83,46 @@ func ExampleUniqueChild_CloseChild() { // Output: A } + +func ExampleFanIn() { + ch1 := make(chan interface{}) + ch2 := make(chan interface{}) + ch3 := make(chan interface{}) + merged := u.FanIn(ch1, ch2, ch3) + done := make(chan bool) + received := []string{} + + go func() { + for item := range merged { + fmt.Println("tick") + received = append(received, fmt.Sprintf("%v", item)) + } + done <- true + }() + + ch1 <- 1 + ch2 <- 2 + ch3 <- 3 + close(ch1) + ch2 <- 4 + ch2 <- 5 + ch3 <- 6 + close(ch2) + ch3 <- 7 + close(ch3) + + <-done + + sort.Strings(received) + fmt.Println(strings.Join(received, ", ")) + + // Output: + // tick + // tick + // tick + // tick + // tick + // tick + // tick + // 1, 2, 3, 4, 5, 6, 7 +} diff --git a/depaware.txt b/depaware.txt index e50d7be..6078c6f 100644 --- a/depaware.txt +++ b/depaware.txt @@ -28,7 +28,6 @@ moul.io/u dependencies: (generated by github.com/tailscale/depaware) path from archive/zip path/filepath from io/ioutil+ reflect from encoding/binary+ - LD runtime/cgo sort from compress/flate+ strconv from compress/flate+ strings from archive/zip+ @@ -39,4 +38,3 @@ moul.io/u dependencies: (generated by github.com/tailscale/depaware) unicode from bytes+ unicode/utf16 from encoding/json+ unicode/utf8 from archive/zip+ - unsafe from hash/crc32+