diff --git a/.github/workflows/ci-workflow.yml b/.github/workflows/ci-workflow.yml index 6bacf0b..270bb06 100644 --- a/.github/workflows/ci-workflow.yml +++ b/.github/workflows/ci-workflow.yml @@ -7,27 +7,27 @@ jobs: name: lint runs-on: ubuntu-latest steps: - - uses: actions/setup-go@v3 + - uses: actions/setup-go@v5 with: - go-version: 1.21 + go-version: 1.22 - uses: actions/checkout@v3 - name: golangci-lint - uses: golangci/golangci-lint-action@v3 + uses: golangci/golangci-lint-action@v4 with: - version: v1.54.2 + version: v1.56.2 args: --verbose test: strategy: matrix: - go-version: [1.21] + go-version: [1.22] platform: [ubuntu-latest, macos-latest] runs-on: ${{ matrix.platform }} steps: - name: Install Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: ${{ matrix.go-version }} diff --git a/.vscode/settings.json b/.vscode/settings.json index e21253a..f1825c1 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,12 +6,14 @@ "cSpell.words": [ "Assistable", "bodyclose", + "cmds", "coverpkg", "coverprofile", "cubiest", "deadcode", "depguard", "dogsled", + "dotenv", "dupl", "errcheck", "exportloopref", @@ -27,6 +29,7 @@ "goimports", "golangci", "goleak", + "Gomega", "gomnd", "goreleaser", "gosec", @@ -43,11 +46,15 @@ "nakedret", "nolint", "nolintlint", + "onecontext", "onsi", + "outdir", "passthru", "pixa", "prealloc", "repotoken", + "rxgo", + "samber", "shogo", "sidewalk", "skeletor", @@ -56,12 +63,16 @@ "staticcheck", "structcheck", "stylecheck", + "teivah", + "testcache", "thelper", "tparallel", "typecheck", "unconvert", + "Unmarshaller", "unparam", "varcheck", + "watchv", "watchvc", "watchvi", "wgan", diff --git a/Taskfile.yml b/Taskfile.yml index 3c9a40d..a9a0f29 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -18,6 +18,19 @@ tasks: cmds: - go build ./... + # NB: this clean task is very aggressive. It will not only remove + # old versions of packages, it removes all of them, even the + # current versions in use. You can use the download task to + # refresh the pkg cache. + # + clean: + cmds: + - go clean + + clean-t: + cmds: + - go clean -testcache + # === test ================================================= t: @@ -28,6 +41,14 @@ tasks: cmds: - go test ./i18n + tx: + cmds: + - ginkgo rx/... + + tc: + cmds: + - ginkgo common/... + dry: cmds: - ginkgo -v --dry-run ./... diff --git a/boost/annotated-wait-group_test.go b/boost/annotated-wait-group_test.go index 06eaa4b..ac47cd9 100644 --- a/boost/annotated-wait-group_test.go +++ b/boost/annotated-wait-group_test.go @@ -3,7 +3,7 @@ package boost_test import ( "time" - . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok "github.com/snivilised/lorax/boost" ) diff --git a/boost/boost_suite_test.go b/boost/boost_suite_test.go index db90fea..536988d 100644 --- a/boost/boost_suite_test.go +++ b/boost/boost_suite_test.go @@ -3,8 +3,8 @@ package boost_test import ( "testing" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok + . "github.com/onsi/gomega" //nolint:revive // gomega ok ) func TestAsync(t *testing.T) { diff --git a/boost/worker-pool.go b/boost/worker-pool.go index 30cd7ce..3a5f2bf 100644 --- a/boost/worker-pool.go +++ b/boost/worker-pool.go @@ -200,7 +200,9 @@ func (p *WorkerPool[I, O]) run( // closure to forwardChOut, otherwise we end up in a deadlock. // running = false + close(forwardChOut) + p.Logger.Debug("source jobs chan closed", slog.String("source", "worker-pool.run"), ) diff --git a/boost/worker-pool_test.go b/boost/worker-pool_test.go index 039af56..dcad655 100644 --- a/boost/worker-pool_test.go +++ b/boost/worker-pool_test.go @@ -7,8 +7,8 @@ import ( "time" "github.com/fortytw2/leaktest" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok + . "github.com/onsi/gomega" //nolint:revive // gomega ok "github.com/samber/lo" "github.com/snivilised/lorax/boost" @@ -135,7 +135,7 @@ func (p *pipeline[I, O]) produce(parentContext context.Context, interval time.Duration, provider helpers.ProviderFunc[I], ) { - p.cancel = func(parentContext context.Context, + p.cancel = func(_ context.Context, parentCancel context.CancelFunc, delay time.Duration, ) { @@ -144,8 +144,8 @@ func (p *pipeline[I, O]) produce(parentContext context.Context, parentCancel, ) } - p.stop = func(parentContext context.Context, - parentCancel context.CancelFunc, + p.stop = func(_ context.Context, + _ context.CancelFunc, delay time.Duration, ) { go helpers.StopProducerAfter( @@ -235,15 +235,15 @@ var ( Eventually(parentContext, pipe.outputsDup.Channel).WithTimeout(time.Second * 5).Should(BeClosed()) Eventually(parentContext, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed()) } - assertCancelled assertFunc = func(parentContext context.Context, - pipe TestPipeline, + assertCancelled assertFunc = func(_ context.Context, + _ TestPipeline, result *boost.PoolResult, ) { // This needs to be upgraded to check that it is "timeout on send" error Expect(result.Error).Error().NotTo(BeNil()) } - assertOutputChTimeout assertFunc = func(parentContext context.Context, - pipe TestPipeline, + assertOutputChTimeout assertFunc = func(_ context.Context, + _ TestPipeline, result *boost.PoolResult, ) { Expect(result.Error).Error().NotTo(BeNil()) @@ -257,7 +257,8 @@ var ( // } } - assertNoError assertFunc = func(parentContext context.Context, pipe TestPipeline, result *boost.PoolResult) { + assertNoError assertFunc = func(_ context.Context, + _ TestPipeline, result *boost.PoolResult) { Expect(result.Error).Error().To(BeNil()) } @@ -265,6 +266,8 @@ var ( pipe TestPipeline, result *boost.PoolResult, ) { + _, _ = parentContext, pipe + if result.Error == nil { return // This is temporary } diff --git a/go.mod b/go.mod index 4c00ffe..39d4847 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/google/uuid v1.6.0 github.com/nicksnyder/go-i18n/v2 v2.3.0 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/teivah/onecontext v1.3.0 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect diff --git a/go.sum b/go.sum index c38881e..89b8254 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,11 @@ github.com/google/pprof v0.0.0-20230406165453-00490a63f317 h1:hFhpt7CTmR3DX+b4R1 github.com/google/pprof v0.0.0-20230406165453-00490a63f317/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= @@ -45,9 +48,13 @@ github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXn github.com/snivilised/extendio v0.6.1 h1:8IIJ4rryKcRv796QxfHcMnuWl4erFMqJF4Rqcj8eY/k= github.com/snivilised/extendio v0.6.1/go.mod h1:LbMvItdNqN8oZI4yUENxM459OkTP+W+hYXxKrc9l8ZQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/teivah/onecontext v1.3.0 h1:tbikMhAlo6VhAuEGCvhc8HlTnpX4xTNPTOseWuhO1J0= +github.com/teivah/onecontext v1.3.0/go.mod h1:hoW1nmdPVK/0jrvGtcx8sCKYs2PiS4z0zzfdeuEVyb0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -56,21 +63,32 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs= go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/i18n/i18n_suite_test.go b/i18n/i18n_suite_test.go index 4cc92d2..dac047d 100644 --- a/i18n/i18n_suite_test.go +++ b/i18n/i18n_suite_test.go @@ -3,8 +3,8 @@ package i18n_test import ( "testing" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok + . "github.com/onsi/gomega" //nolint:revive // gomega ok ) func TestI18n(t *testing.T) { diff --git a/rx/factory.go b/rx/factory.go new file mode 100644 index 0000000..fb00ff5 --- /dev/null +++ b/rx/factory.go @@ -0,0 +1,29 @@ +package rx + +// Amb takes several Observables, emit all of the items from only the first of these Observables +// to emit an item or notification. +func Amb[I any](observables []Observable[I], opts ...Option[I]) Observable[I] { + _, _ = observables, opts + + panic("Amb: NOT-IMPL") +} + +// FromChannel creates a cold observable from a channel. +func FromChannel[I any](next <-chan Item[I], opts ...Option[I]) Observable[I] { + option := parseOptions(opts...) + ctx := option.buildContext(emptyContext) + + return &ObservableImpl[I]{ + parent: ctx, + iterable: newChannelIterable(next, opts...), + } +} + +func parseOptions[I any](opts ...Option[I]) Option[I] { + o := new(funcOption[I]) + for _, opt := range opts { + opt.apply(o) + } + + return o +} diff --git a/rx/item.go b/rx/item.go new file mode 100644 index 0000000..8945c09 --- /dev/null +++ b/rx/item.go @@ -0,0 +1,99 @@ +package rx + +import ( + "context" + "time" +) + +type ( + // Item is a wrapper having either a value or an error. + // + Item[I any] struct { + V I + E error + } + + // TimestampItem attach a timestamp to an item. + // + TimestampItem[I any] struct { + Timestamp time.Time + V I + } + + // CloseChannelStrategy indicates a strategy on whether to close a channel. + CloseChannelStrategy uint32 +) + +const ( + // LeaveChannelOpen indicates to leave the channel open after completion. + LeaveChannelOpen CloseChannelStrategy = iota + // CloseChannel indicates to close the channel open after completion. + CloseChannel +) + +// Of creates an item from a value. +func Of[I any](v I) Item[I] { + return Item[I]{V: v} +} + +// Error creates an item from an error. +func Error[I any](err error) Item[I] { + return Item[I]{E: err} +} + +// SendItems is an utility function that send a list of items and indicate a +// strategy on whether to close the channel once the function completes. +// This method has been derived from the original SendItems. +// (does not support channels or slice) +func SendItems[I any](ctx context.Context, + ch chan<- Item[I], strategy CloseChannelStrategy, items ...Item[I], +) { + if strategy == CloseChannel { + defer close(ch) + } + + sendItems(ctx, ch, items...) +} + +func sendItems[I any](ctx context.Context, ch chan<- Item[I], items ...Item[I]) { + for _, item := range items { + item.SendContext(ctx, ch) + } +} + +// IsError checks if an item is an error. +func (i Item[I]) IsError() bool { + return i.E != nil +} + +// SendBlocking sends an item and blocks until it is sent. +func (i Item[I]) SendBlocking(ch chan<- Item[I]) { + ch <- i +} + +// SendContext sends an item and blocks until it is sent or a context canceled. +// It returns a boolean to indicate whether the item was sent. +func (i Item[I]) SendContext(ctx context.Context, ch chan<- Item[I]) bool { + select { + case <-ctx.Done(): // Context's done channel has the highest priority + return false + default: + select { + case <-ctx.Done(): + return false + case ch <- i: + return true + } + } +} + +// SendNonBlocking sends an item without blocking. +// It returns a boolean to indicate whether the item was sent. +func (i Item[I]) SendNonBlocking(ch chan<- Item[I]) bool { + select { + default: + return false + case ch <- i: + return true + } +} diff --git a/rx/item_test.go b/rx/item_test.go new file mode 100644 index 0000000..97d73c5 --- /dev/null +++ b/rx/item_test.go @@ -0,0 +1,143 @@ +package rx_test + +import ( + "context" + + "github.com/fortytw2/leaktest" + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok + . "github.com/onsi/gomega" //nolint:revive // gomega ok + "github.com/snivilised/lorax/rx" + "github.com/snivilised/lorax/rxa" +) + +var _ = Describe("Item", Ordered, func() { + Context("SendItems", func() { + Context("variadic", func() { + When("no errors in observable", func() { + It("🧪 should: send items without error", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan rx.Item[int], 3) + + rx.SendItems(context.Background(), ch, rx.CloseChannel, + rx.Of(1), + rx.Of(2), + rx.Of(3), + ) + + rxa.Assert(context.Background(), + rx.FromChannel(ch), + rxa.HasItems([]int{1, 2, 3}), + rxa.HasNoError[int]()) + }) + }) + + When("error in observable", func() { + It("🧪 should: send items including error", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan rx.Item[int], 3) + + rx.SendItems(context.Background(), ch, rx.CloseChannel, + rx.Of(1), + rx.Error[int](errFoo), + rx.Of(3), + ) + + rxa.Assert(context.Background(), + rx.FromChannel(ch), + rxa.HasItems([]int{1, 3}), + rxa.HasAnError[int]()) + }) + }) + + When("specific error in observable", func() { + It("🧪 should: send items including error", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan rx.Item[int], 3) + + rx.SendItems(context.Background(), ch, rx.CloseChannel, + rx.Of(1), + rx.Error[int](errFoo), + rx.Of(3), + ) + + rxa.Assert(context.Background(), + rx.FromChannel(ch), + rxa.HasItems([]int{1, 3}), + rxa.HasError[int](errFoo)) + }) + }) + }) + + Context("blocking", func() { + When("no errors in observable", func() { + It("foo", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan rx.Item[int], 1) + defer close(ch) + rx.Of[int](5).SendBlocking(ch) + + Expect((<-ch).V).To(Equal(5)) + }) + }) + }) + + Context("context", func() { + When("not cancelled", func() { + It("🧪 should: return true", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan rx.Item[int], 1) + defer close(ch) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + Expect(rx.Of(5).SendContext(ctx, ch)).To(BeTrue()) + }) + }) + + When("cancelled", func() { + It("🧪 should: return false", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan rx.Item[int], 1) + defer close(ch) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + Expect(rx.Of(5).SendContext(ctx, ch)).To(BeFalse()) + }) + }) + }) + + Context("non-blocking", func() { + When("channel free", func() { + It("🧪 should: send item and return true", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan rx.Item[int], 1) + defer close(ch) + + Expect(rx.Of(5).SendNonBlocking(ch)).To(BeTrue()) + }) + }) + + When("channel busy", func() { + It("🧪 should: not send item and return false", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan rx.Item[int], 1) + defer close(ch) + + rx.Of(5).SendNonBlocking(ch) + Expect(rx.Of(5).SendNonBlocking(ch)).To(BeFalse()) + }) + }) + }) + }) +}) diff --git a/rx/iterable-channel.go b/rx/iterable-channel.go new file mode 100644 index 0000000..754b3e5 --- /dev/null +++ b/rx/iterable-channel.go @@ -0,0 +1,84 @@ +package rx + +import ( + "context" + "sync" +) + +type channelIterable[I any] struct { + next <-chan Item[I] + opts []Option[I] + subscribers []chan Item[I] + mutex sync.RWMutex + producerAlreadyCreated bool +} + +func newChannelIterable[I any](next <-chan Item[I], opts ...Option[I]) Iterable[I] { + return &channelIterable[I]{ + next: next, + subscribers: make([]chan Item[I], 0), + opts: opts, + } +} + +func (i *channelIterable[I]) Observe(opts ...Option[I]) <-chan Item[I] { + mergedOptions := append(i.opts, opts...) //nolint:gocritic // foo + option := parseOptions(mergedOptions...) + + if !option.isConnectable() { + return i.next + } + + if option.isConnectOperation() { + i.connect(option.buildContext(emptyContext)) + return nil + } + + ch := option.buildChannel() + + i.mutex.Lock() + i.subscribers = append(i.subscribers, ch) + i.mutex.Unlock() + + return ch +} + +func (i *channelIterable[I]) connect(ctx context.Context) { + i.mutex.Lock() + if !i.producerAlreadyCreated { + go i.produce(ctx) + i.producerAlreadyCreated = true + } + i.mutex.Unlock() +} + +func (i *channelIterable[I]) produce(ctx context.Context) { + defer func() { + i.mutex.RLock() + + for _, subscriber := range i.subscribers { + close(subscriber) + } + + i.mutex.RUnlock() + }() + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-i.next: + if !ok { + return + } + + i.mutex.RLock() + + for _, subscriber := range i.subscribers { + subscriber <- item + } + + i.mutex.RUnlock() + } + } +} diff --git a/rx/iterable.go b/rx/iterable.go new file mode 100644 index 0000000..e51b8c0 --- /dev/null +++ b/rx/iterable.go @@ -0,0 +1,6 @@ +package rx + +// Iterable is the basic type that can be observed. +type Iterable[I any] interface { + Observe(opts ...Option[I]) <-chan Item[I] +} diff --git a/rx/observable-operator.go b/rx/observable-operator.go new file mode 100644 index 0000000..c016bc9 --- /dev/null +++ b/rx/observable-operator.go @@ -0,0 +1,5 @@ +package rx + +func (o *ObservableImpl[I]) Observe(opts ...Option[I]) <-chan Item[I] { + return o.iterable.Observe(opts...) +} diff --git a/rx/observable.go b/rx/observable.go new file mode 100644 index 0000000..adc80e2 --- /dev/null +++ b/rx/observable.go @@ -0,0 +1,15 @@ +package rx + +import ( + "context" +) + +type Observable[I any] interface { + Iterable[I] +} + +// ObservableImpl implements Observable. +type ObservableImpl[I any] struct { + parent context.Context + iterable Iterable[I] +} diff --git a/rx/options.go b/rx/options.go new file mode 100644 index 0000000..b0c1a05 --- /dev/null +++ b/rx/options.go @@ -0,0 +1,72 @@ +package rx + +import ( + "context" + + "github.com/teivah/onecontext" +) + +var emptyContext context.Context + +type Option[I any] interface { + apply(*funcOption[I]) + // ... + buildChannel() chan Item[I] + buildContext(parent context.Context) context.Context + // ... + isConnectable() bool + isConnectOperation() bool +} + +type funcOption[I any] struct { + f func(*funcOption[I]) + isBuffer bool + buffer int + ctx context.Context + observation ObservationStrategy + pool int + backPressureStrategy BackPressureStrategy + onErrorStrategy OnErrorStrategy + propagate bool + connectable bool + connectOperation bool + serialized func(I) int +} + +func (fdo *funcOption[I]) isConnectable() bool { + return fdo.connectable +} + +func (fdo *funcOption[I]) isConnectOperation() bool { + return fdo.connectOperation +} + +func (fdo *funcOption[I]) apply(do *funcOption[I]) { + fdo.f(do) +} + +func (fdo *funcOption[I]) buildChannel() chan Item[I] { + if fdo.isBuffer { + return make(chan Item[I], fdo.buffer) + } + + return make(chan Item[I]) +} + +func (fdo *funcOption[I]) buildContext(parent context.Context) context.Context { + if fdo.ctx != nil && parent != nil { + ctx, _ := onecontext.Merge(fdo.ctx, parent) + + return ctx + } + + if fdo.ctx != nil { + return fdo.ctx + } + + if parent != nil { + return parent + } + + return context.Background() +} diff --git a/rx/rx-suite_test.go b/rx/rx-suite_test.go new file mode 100644 index 0000000..172ea4e --- /dev/null +++ b/rx/rx-suite_test.go @@ -0,0 +1,13 @@ +package rx_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok + . "github.com/onsi/gomega" //nolint:revive // gomega ok +) + +func TestRx(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Rx Suite") +} diff --git a/rx/types.go b/rx/types.go new file mode 100644 index 0000000..fe2b63e --- /dev/null +++ b/rx/types.go @@ -0,0 +1,80 @@ +package rx + +import "context" + +type ( + operatorOptions[I any] struct { + stop func() + resetIterable func(Iterable[I]) + } + + // Comparator defines a func that returns an int: + // - 0 if two elements are equals + // - A negative value if the first argument is less than the second + // - A positive value if the first argument is greater than the second + Comparator[I any] func(I, I) int + // ItemToObservable defines a function that computes an observable from an item. + ItemToObservable[I any] func(Item[I]) Observable[I] + // ErrorToObservable defines a function that transforms an observable from an error. + ErrorToObservable[I any] func(error) Observable[I] + // Func defines a function that computes a value from an input value. + Func[I any] func(context.Context, I) (I, error) + // Func2 defines a function that computes a value from two input values. + Func2[I any] func(context.Context, I, I) (I, error) + // FuncN defines a function that computes a value from N input values. + FuncN[I any] func(...I) I + // ErrorFunc defines a function that computes a value from an error. + ErrorFunc[I any] func(error) I + // Predicate defines a func that returns a bool from an input value. + Predicate[I any] func(I) bool + // Marshaller defines a marshaller type (ItemValue[I] to []byte). + Marshaller[I any] func(I) ([]byte, error) + // Unmarshaller defines an unmarshaller type ([]byte to interface). + Unmarshaller[I any] func([]byte, I) error + // Producer defines a producer implementation. + Producer[I any] func(ctx context.Context, next chan<- Item[I]) + // Supplier defines a function that supplies a result from nothing. + Supplier[I any] func(ctx context.Context) Item[I] + // Disposed is a notification channel indicating when an Observable is closed. + Disposed <-chan struct{} + // Disposable is a function to be called in order to dispose a subscription. + Disposable context.CancelFunc + + // NextFunc handles a next item in a stream. + NextFunc[I any] func(I) + // ErrFunc handles an error in a stream. + ErrFunc func(error) + // CompletedFunc handles the end of a stream. + CompletedFunc func() +) + +// BackPressureStrategy is the back-pressure strategy type. +type BackPressureStrategy uint32 + +const ( + // Block blocks until the channel is available. + Block BackPressureStrategy = iota + // Drop drops the message. + Drop +) + +// OnErrorStrategy is the Observable error strategy. +type OnErrorStrategy uint32 + +const ( + // StopOnError is the default error strategy. + // An operator will stop processing items on error. + StopOnError OnErrorStrategy = iota + // ContinueOnError means an operator will continue processing items after an error. + ContinueOnError +) + +// ObservationStrategy defines the strategy to consume from an Observable. +type ObservationStrategy uint32 + +const ( + // Lazy is the default observation strategy, when an Observer subscribes. + Lazy ObservationStrategy = iota + // Eager means consuming as soon as the Observable is created. + Eager +) diff --git a/rx/util_test.go b/rx/util_test.go new file mode 100644 index 0000000..72912d5 --- /dev/null +++ b/rx/util_test.go @@ -0,0 +1,10 @@ +package rx_test + +import ( + "errors" +) + +var ( + errFoo = errors.New("foo") + errBar = errors.New("bar") +) diff --git a/rxa/assert.go b/rxa/assert.go new file mode 100644 index 0000000..7380562 --- /dev/null +++ b/rxa/assert.go @@ -0,0 +1,215 @@ +package rxa + +// rxa (rx assertion) package can use rx + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" //nolint:revive,stylecheck // ginkgo ok + . "github.com/onsi/gomega" //nolint:revive,stylecheck // gomega ok + "github.com/snivilised/lorax/rx" +) + +// AssertPredicate is a custom predicate based on the items. +type AssertPredicate[I any] func(items []I) error + +// RxAssert lists the Observable assertions. +type RxAssert[I any] interface { + apply(*rxAssert[I]) + itemsToBeChecked() (bool, []I) + itemsNoOrderedToBeChecked() (bool, []I) + noItemsToBeChecked() bool + someItemsToBeChecked() bool + raisedErrorToBeChecked() (bool, error) + raisedErrorsToBeChecked() (bool, []error) + raisedAnErrorToBeChecked() (bool, error) + notRaisedErrorToBeChecked() bool + itemToBeChecked() (bool, I) + noItemToBeChecked() (bool, I) + customPredicatesToBeChecked() (bool, []AssertPredicate[I]) +} + +type rxAssert[I any] struct { + f func(*rxAssert[I]) + checkHasItems bool + checkHasNoItems bool + checkHasSomeItems bool + items []I + checkHasItemsNoOrder bool + itemsNoOrder []I + checkHasRaisedError bool + err error + checkHasRaisedErrors bool + errs []error + checkHasRaisedAnError bool + checkHasNotRaisedError bool + checkHasItem bool + item I + checkHasNoItem bool + checkHasCustomPredicate bool + customPredicates []AssertPredicate[I] +} + +func (ass *rxAssert[I]) apply(do *rxAssert[I]) { + ass.f(do) +} + +func (ass *rxAssert[I]) itemsToBeChecked() (b bool, i []I) { + return ass.checkHasItems, ass.items +} + +func (ass *rxAssert[I]) itemsNoOrderedToBeChecked() (b bool, i []I) { + return ass.checkHasItemsNoOrder, ass.itemsNoOrder +} + +func (ass *rxAssert[I]) noItemsToBeChecked() bool { + return ass.checkHasNoItems +} + +func (ass *rxAssert[I]) someItemsToBeChecked() bool { + return ass.checkHasSomeItems +} +func (ass *rxAssert[I]) raisedErrorToBeChecked() (bool, error) { + return ass.checkHasRaisedError, ass.err +} + +func (ass *rxAssert[I]) raisedErrorsToBeChecked() (bool, []error) { + return ass.checkHasRaisedErrors, ass.errs +} + +func (ass *rxAssert[I]) raisedAnErrorToBeChecked() (bool, error) { + return ass.checkHasRaisedAnError, ass.err +} + +func (ass *rxAssert[I]) notRaisedErrorToBeChecked() bool { + return ass.checkHasNotRaisedError +} + +func (ass *rxAssert[I]) itemToBeChecked() (b bool, i I) { + return ass.checkHasNoItem, ass.item +} + +func (ass *rxAssert[I]) noItemToBeChecked() (b bool, i I) { + return ass.checkHasNoItem, ass.item +} + +func (ass *rxAssert[I]) customPredicatesToBeChecked() (bool, []AssertPredicate[I]) { + return ass.checkHasCustomPredicate, ass.customPredicates +} + +func newAssertion[I any](f func(*rxAssert[I])) *rxAssert[I] { + return &rxAssert[I]{ + f: f, + } +} + +func parseAssertions[I any](assertions ...RxAssert[I]) RxAssert[I] { + ass := new(rxAssert[I]) + + for _, assertion := range assertions { + assertion.apply(ass) + } + + return ass +} + +func Assert[I any](ctx context.Context, iterable rx.Iterable[I], assertions ...RxAssert[I]) { + ass := parseAssertions(assertions...) + got := make([]I, 0) + errs := make([]error, 0) + observe := iterable.Observe() + +loop: + for { + select { + case <-ctx.Done(): + break loop + case item, ok := <-observe: + if !ok { + break loop + } + + if item.IsError() { + errs = append(errs, item.E) + } else { + got = append(got, item.V) + } + } + } + + if checked, predicates := ass.customPredicatesToBeChecked(); checked { + for _, predicate := range predicates { + err := predicate(got) + if err != nil { + Fail(err.Error()) + } + } + } + + if checkHasItems, expectedItems := ass.itemsToBeChecked(); checkHasItems { + Expect(got).To(ContainElements(expectedItems)) + } + + if checkHasItemsNoOrder, itemsNoOrder := ass.itemsNoOrderedToBeChecked(); checkHasItemsNoOrder { + m := make(map[interface{}]interface{}) + for _, v := range itemsNoOrder { + m[v] = nil + } + + for _, v := range got { + delete(m, v) + } + + if len(m) != 0 { + Fail(fmt.Sprintf("missing elements: '%v'", got)) + } + } + + if checkHasItem, value := ass.itemToBeChecked(); checkHasItem { + length := len(got) + if length != 1 { + Fail(fmt.Sprintf("wrong number of items, expected 1, got %d", length)) + } + + if length > 0 { + Expect(got[0]).To(Equal(value)) + } + } + + if ass.noItemsToBeChecked() { + Expect(got).To(BeEmpty()) + } + + if ass.someItemsToBeChecked() { + Expect(got).NotTo(BeEmpty()) + } + + if checkHasRaisedError, expectedError := ass.raisedErrorToBeChecked(); checkHasRaisedError { + if expectedError == nil { + Expect(errs).To(BeEmpty()) + } else { + length := len(errs) + + if length == 0 { + Fail(fmt.Sprintf("no error raised; expected: %v", expectedError)) + } + + if length > 0 { + Expect(errs[0]).Error().To(Equal(expectedError)) + } + } + } + + if checkHasRaisedErrors, expectedErrors := ass.raisedErrorsToBeChecked(); checkHasRaisedErrors { + Expect(errs).To(ContainElements(expectedErrors)) + } + + if checkHasRaisedAnError, expectedError := ass.raisedAnErrorToBeChecked(); checkHasRaisedAnError { + Expect(expectedError).Error().To(BeNil()) // this might not be right + } + + if ass.notRaisedErrorToBeChecked() { + Expect(errs).To(BeEmpty()) + } +} diff --git a/rxa/assertions.go b/rxa/assertions.go new file mode 100644 index 0000000..bc34819 --- /dev/null +++ b/rxa/assertions.go @@ -0,0 +1,28 @@ +package rxa + +func HasItems[I any](expectedItems []I) RxAssert[I] { + return newAssertion(func(ra *rxAssert[I]) { + ra.checkHasItems = true + ra.items = expectedItems + }) +} + +func HasError[I any](err error) RxAssert[I] { + return newAssertion(func(a *rxAssert[I]) { + a.checkHasRaisedError = true + a.err = err + }) +} + +// HasAnError checks that the observable has produce an error. +func HasAnError[I any]() RxAssert[I] { + return newAssertion(func(a *rxAssert[I]) { + a.checkHasRaisedAnError = true + }) +} + +func HasNoError[I any]() RxAssert[I] { + return newAssertion(func(ra *rxAssert[I]) { + ra.checkHasNotRaisedError = true + }) +}