From 0bdb51e10e1330828566802b4ae7be29aa2be472 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 11 Feb 2021 14:17:21 +0300 Subject: [PATCH 1/7] RC stabilization --- activity/activity_pool.go | 7 +-- activity/plugin.go | 2 +- go.mod | 4 +- go.sum | 53 ++++++++++++---------- protocol/json_codec.go | 5 ++- tests/docker-compose.yaml | 3 -- utils/toString.go | 10 +++++ workflow/id_registry.go | 38 +++++++--------- workflow/plugin.go | 95 ++++++++++++++++++++++----------------- workflow/process.go | 55 +++++++++++------------ workflow/worker.go | 35 ++++++--------- 11 files changed, 158 insertions(+), 149 deletions(-) create mode 100644 utils/toString.go diff --git a/activity/activity_pool.go b/activity/activity_pool.go index ebb08005..6ad447f2 100644 --- a/activity/activity_pool.go +++ b/activity/activity_pool.go @@ -12,6 +12,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" "github.com/temporalio/roadrunner-temporal/client" rrt "github.com/temporalio/roadrunner-temporal/protocol" + "github.com/temporalio/roadrunner-temporal/utils" "go.temporal.io/api/common/v1" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/converter" @@ -110,7 +111,7 @@ func (pool *activityPoolImpl) ActivityNames() []string { // ActivityNames returns list of all available activity names. func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) { const op = errors.Op("activity_pool_get_activity_context") - c, ok := pool.running.Load(string(taskToken)) + c, ok := pool.running.Load(utils.ToString(taskToken)) if !ok { return nil, errors.E(op, errors.Str("heartbeat on non running activity")) } @@ -177,8 +178,8 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common. msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...) } - pool.running.Store(string(info.TaskToken), ctx) - defer pool.running.Delete(string(info.TaskToken)) + pool.running.Store(utils.ToString(info.TaskToken), ctx) + defer pool.running.Delete(utils.ToString(info.TaskToken)) result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg) if err != nil { diff --git a/activity/plugin.go b/activity/plugin.go index 9ae551cf..38317455 100644 --- a/activity/plugin.go +++ b/activity/plugin.go @@ -26,7 +26,7 @@ const ( RootPluginName = "temporal" // RRMode sets as RR_MODE env variable to let worker know about the mode to run. - RRMode = "temporal/activity" + RRMode = "temporal" ) // Plugin to manage activity execution. diff --git a/go.mod b/go.mod index b360ad2e..43d87a83 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,9 @@ require ( github.com/golang/protobuf v1.4.3 github.com/json-iterator/go v1.1.10 github.com/pborman/uuid v1.2.1 - github.com/spiral/endure v1.0.0-beta.22 + github.com/spiral/endure v1.0.0-beta.23 github.com/spiral/errors v1.0.9 - github.com/spiral/roadrunner/v2 v2.0.0-beta.24 + github.com/spiral/roadrunner/v2 v2.0.0-RC.1 github.com/stretchr/testify v1.7.0 go.temporal.io/api v1.4.0 go.temporal.io/sdk v1.4.1 diff --git a/go.sum b/go.sum index ce579055..b989aa49 100644 --- a/go.sum +++ b/go.sum @@ -31,7 +31,7 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis/v2 v2.14.1/go.mod h1:uS970Sw5Gs9/iK3yBg0l9Uj9s25wXxSpQUE9EaJ/Blg= +github.com/alicebob/miniredis/v2 v2.14.2/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I= github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -108,7 +108,6 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= -github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o= @@ -117,11 +116,11 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= -github.com/go-redis/redis/v8 v8.4.11/go.mod h1:d5yY/TlkQyYBSBHnXUmnf1OrHbyQere5JV4dLKwvXmo= +github.com/go-redis/redis/v8 v8.5.0/go.mod h1:YmEcgBDttjnkbMzDAhDtQxY9yVA7jMN6PCR5HeMvqFE= github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gofiber/fiber/v2 v2.4.1/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0= +github.com/gofiber/fiber/v2 v2.5.0/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.4.0 h1:zgVt4UpGxcqVOw97aRGxT4svlcmdK35fynLNctY32zI= @@ -178,9 +177,10 @@ github.com/google/uuid v1.1.4 h1:0ecGp3skIrHWPNGPJDaBIghfA6Sp7Ruo2Io8eLKzWm0= github.com/google/uuid v1.1.4/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= -github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 h1:l5lAOZEym3oK3SQ2HBHWsJUfbNBiTXJDeW2QDxw9AQ0= +github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= @@ -230,7 +230,6 @@ github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= -github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -265,8 +264,8 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= -github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= -github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= @@ -299,16 +298,16 @@ github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtb github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= -github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= +github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -383,10 +382,10 @@ github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/smartystreets/assertions v0.0.0-20180820201707-7c9eb446e3cf/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= +github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck= +github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= @@ -404,15 +403,15 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spiral/endure v1.0.0-beta.22 h1:zOhrQ49DeYfr1rOHfUy573pjWpyxkG30vVz4o0ejvaQ= -github.com/spiral/endure v1.0.0-beta.22/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE= +github.com/spiral/endure v1.0.0-beta.23 h1:iIK+lrOTaWUyJpENxvjNjlhBA0QIrhks1uxcza3bmUQ= +github.com/spiral/endure v1.0.0-beta.23/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.9 h1:RcVZ7a1RYkaT3HWFGDuQiDB02pG6yqh7715Uwd7urwM= github.com/spiral/errors v1.0.9/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= -github.com/spiral/goridge/v3 v3.0.0 h1:FIz6wHaob5KynpOfzVpzj4bmqbEelGPFyuEf4i2+CG8= -github.com/spiral/goridge/v3 v3.0.0/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= -github.com/spiral/roadrunner/v2 v2.0.0-beta.22 h1:MboP310ufkIqtBkpnsRFeafZNUEwfDIWqSVjNEEnl6Q= -github.com/spiral/roadrunner/v2 v2.0.0-beta.22/go.mod h1:mK4r1q+DcSVqUy7OsykGgdupqMmqYYnWwqBVVEnk4QU= +github.com/spiral/goridge/v3 v3.0.1 h1:mWo6hVEDJV3nRwsszx9y262CtrLQNojbONF4ikvKCBg= +github.com/spiral/goridge/v3 v3.0.1/go.mod h1:rYfsBwigGneLgYJTIh5urotnH63I5O+p6ZcVq7xc1lY= +github.com/spiral/roadrunner/v2 v2.0.0-RC.1 h1:1KI7Hy2wUzNVAjI5F3wEJCwWhCzvayFX2S8xstWnsmw= +github.com/spiral/roadrunner/v2 v2.0.0-RC.1/go.mod h1:gTWXqCJkwd2OAaLk2RmEck0YAFslQvE9UV9AqdUlXMY= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= @@ -447,10 +446,10 @@ github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8= +github.com/yookoala/gofast v0.5.0/go.mod h1:OJU201Q6HCaE1cASckaTbMm3KB6e0cZxK0mgqfwOKvQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= +github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= @@ -542,6 +541,7 @@ golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= @@ -561,6 +561,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEha golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -594,7 +595,6 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -605,6 +605,8 @@ golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0 h1:n+DPcgTwkgWzIFpLmoimYR2K2 golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 h1:/dSxr6gT0FNI1MO5WLJo8mTmItROeOKTkDn+7OwWBos= golang.org/x/sys v0.0.0-20210105210732-16f7687f5001/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091 h1:DMyOG0U+gKfu8JZzg2UQe9MeaC1X+xQWlAKcRnjxjCw= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -621,7 +623,6 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 h1:Hir2P/De0WpUhtrKGGjvSb2YxUgyZ7EFOSLIcSSpiwE= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20180726210403-bfb5194568d3/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -650,6 +651,8 @@ golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200605181038-cef9fc3bc8f0 h1:gxU2P+MOOGAWge5BKP+BzqSeegxvDBRib5rk3yZDDuI= golang.org/x/tools v0.0.0-20200605181038-cef9fc3bc8f0/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200908211811-12e1bf57a112/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210115202250-e0d201561e39 h1:BTs2GMGSMWpgtCpv1CE7vkJTv7XcHdcLLnAMu7UbgTY= golang.org/x/tools v0.0.0-20210115202250-e0d201561e39/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -670,6 +673,8 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= diff --git a/protocol/json_codec.go b/protocol/json_codec.go index e7a77068..9cd834ed 100644 --- a/protocol/json_codec.go +++ b/protocol/json_codec.go @@ -6,6 +6,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/temporalio/roadrunner-temporal/utils" "go.temporal.io/api/common/v1" "go.temporal.io/api/failure/v1" ) @@ -98,7 +99,7 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, } if c.level >= DebugNormal { - logMessage := string(p.Body) + " " + string(p.Context) + logMessage := utils.ToString(p.Body) + " " + utils.ToString(p.Context) if c.level >= DebugHumanized { logMessage = color.GreenString(logMessage) } @@ -117,7 +118,7 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, } if c.level >= DebugNormal { - logMessage := string(out.Body) + logMessage := utils.ToString(out.Body) if c.level >= DebugHumanized { logMessage = color.HiYellowString(logMessage) } diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index 542c91bb..0a73683d 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -9,11 +9,8 @@ services: image: temporalio/auto-setup:1.6.3 ports: - "7233:7233" - volumes: - - ${DYNAMIC_CONFIG_DIR:-../config/dynamicconfig}:/etc/temporal/config/dynamicconfig environment: - "CASSANDRA_SEEDS=cassandra" - - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml" depends_on: - cassandra temporal-admin-tools: diff --git a/utils/toString.go b/utils/toString.go new file mode 100644 index 00000000..25d4516d --- /dev/null +++ b/utils/toString.go @@ -0,0 +1,10 @@ +package utils + +import "unsafe" + +// unsafe, but lightning fast []byte to string conversion +// no allocation +// only english strings which are passed as []byte +func ToString(data []byte) string { + return *(*string)(unsafe.Pointer(&data)) +} diff --git a/workflow/id_registry.go b/workflow/id_registry.go index ac75cbda..9dc26e13 100644 --- a/workflow/id_registry.go +++ b/workflow/id_registry.go @@ -8,9 +8,9 @@ import ( // used to gain access to child workflow ids after they become available via callback result. type idRegistry struct { - mu sync.Mutex - ids map[uint64]entry - listeners map[uint64]listener + sync.Mutex + ids sync.Map + listeners sync.Map } type listener func(w bindings.WorkflowExecution, err error) @@ -21,31 +21,27 @@ type entry struct { } func newIDRegistry() *idRegistry { - return &idRegistry{ - ids: map[uint64]entry{}, - listeners: map[uint64]listener{}, - } + return &idRegistry{} } func (c *idRegistry) listen(id uint64, cl listener) { - c.mu.Lock() - defer c.mu.Unlock() - - c.listeners[id] = cl - - if e, ok := c.ids[id]; ok { + c.listeners.Store(id, cl) + val, exist := c.ids.Load(id) + if exist { + c.Lock() + e := val.(entry) cl(e.w, e.err) + c.Unlock() } } func (c *idRegistry) push(id uint64, w bindings.WorkflowExecution, err error) { - c.mu.Lock() - defer c.mu.Unlock() - - e := entry{w: w, err: err} - c.ids[id] = e - - if l, ok := c.listeners[id]; ok { - l(e.w, e.err) + c.ids.Store(id, entry{w: w, err: err}) + l, exist := c.listeners.Load(id) + if exist { + c.Lock() + list := l.(listener) + list(w, err) + c.Unlock() } } diff --git a/workflow/plugin.go b/workflow/plugin.go index 9f6f79b2..d8006893 100644 --- a/workflow/plugin.go +++ b/workflow/plugin.go @@ -9,7 +9,6 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" rrWorker "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -25,19 +24,22 @@ const ( RootPluginName = "temporal" // RRMode sets as RR_MODE env variable to let pool know about the mode to run. - RRMode = "temporal/workflow" + RRMode = "workflow" ) // Plugin manages workflows and workers. type Plugin struct { + // embed + sync.Mutex + // plugins temporal client.Temporal events events.Handler server server.Server log logger.Logger - mu sync.Mutex - reset chan struct{} - pool pool - closing int64 + + reset chan struct{} + pool pool + closing int64 } // Init workflow plugin. @@ -57,8 +59,8 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger // Serve starts workflow service. func (p *Plugin) Serve() chan error { - p.mu.Lock() - defer p.mu.Unlock() + p.Lock() + defer p.Unlock() const op = errors.Op("workflow_plugin_serve") errCh := make(chan error, 1) @@ -70,37 +72,16 @@ func (p *Plugin) Serve() chan error { p.pool = workflowPool - go func() { - for { - select { - case <-p.reset: - if atomic.LoadInt64(&p.closing) == 1 { - return - } - - err := p.replacePool() - if err == nil { - continue - } - - bkoff := backoff.NewExponentialBackOff() - bkoff.InitialInterval = time.Second - - err = backoff.Retry(p.replacePool, bkoff) - if err != nil { - errCh <- errors.E(op, err) - } - } - } - }() + // start pool watcher + go p.watch(errCh) return errCh } // Stop workflow service. func (p *Plugin) Stop() error { - const op = errors.Op("workflow_plugin_stop") atomic.StoreInt64(&p.closing, 1) + const op = errors.Op("workflow_plugin_stop") workflowPool := p.getPool() if workflowPool != nil { @@ -121,8 +102,8 @@ func (p *Plugin) Name() string { // Workers returns list of available workflow workers. func (p *Plugin) Workers() []rrWorker.BaseProcess { - p.mu.Lock() - defer p.mu.Unlock() + p.Lock() + defer p.Unlock() if p.pool == nil { return nil } @@ -163,8 +144,8 @@ func (p *Plugin) startPool() (pool, error) { } func (p *Plugin) replacePool() error { - p.mu.Lock() - defer p.mu.Unlock() + p.Lock() + defer p.Unlock() const op = errors.Op("workflow_plugin_replace_worker") if p.pool != nil { @@ -180,13 +161,13 @@ func (p *Plugin) replacePool() error { } } - wrk, err := p.startPool() + pool, err := p.startPool() if err != nil { p.log.Error("Replace workflow pool failed", "error", err) return errors.E(op, err) } - p.pool = wrk + p.pool = pool p.log.Debug("workflow pool successfully replaced") return nil @@ -194,12 +175,42 @@ func (p *Plugin) replacePool() error { // getPool returns pool. func (p *Plugin) getPool() pool { - p.mu.Lock() - defer p.mu.Unlock() + p.Lock() + defer p.Unlock() return p.pool } +// watch takes care about replacing pool +func (p *Plugin) watch(errCh chan error) { + go func() { + const op = errors.Op("workflow_plugin_watch") + for { + select { + case <-p.reset: + if atomic.LoadInt64(&p.closing) == 1 { + return + } + + err := p.replacePool() + if err == nil { + continue + } + + bkoff := backoff.NewExponentialBackOff() + bkoff.InitialInterval = time.Second + + err = backoff.Retry(p.replacePool, bkoff) + if err != nil { + p.log.Error("failed to replace workflow pool", "error", errors.E(op, err)) + errCh <- err + return + } + } + } + }() +} + // AddListener adds event listeners to the service. func (p *Plugin) poolListener(event interface{}) { if ev, ok := event.(events.PoolEvent); ok { @@ -211,8 +222,8 @@ func (p *Plugin) poolListener(event interface{}) { if ev, ok := event.(events.WorkerEvent); ok { if ev.Event == events.EventWorkerError { - // if destroyed, do not reset - if ev.Worker.(rrWorker.BaseProcess).State().Value() == states.StateDestroyed { + // if destroyed, do not reset, because RR-pool will handle this signal + if ev.Worker.(rrWorker.BaseProcess).State().Value() == rrWorker.StateDestroyed { p.events.Push(event) return } diff --git a/workflow/process.go b/workflow/process.go index 367a72de..d225ac97 100644 --- a/workflow/process.go +++ b/workflow/process.go @@ -162,9 +162,8 @@ func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Pay const op = errors.Op("workflow_process_handle_query") result, err := wf.runCommand(rrt.InvokeQuery{ RunID: wf.runID, - Name: queryType}, - queryArgs, - ) + Name: queryType, + }, queryArgs) if err != nil { return nil, errors.E(op, err) @@ -180,23 +179,19 @@ func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Pay // process incoming command func (wf *workflowProcess) handleMessage(msg rrt.Message) error { const op = errors.Op("handleMessage") - var err error - var id = msg.ID - var cmd = msg.Command - var payloads = msg.Payloads - switch command := cmd.(type) { + switch command := msg.Command.(type) { case *rrt.ExecuteActivity: - params := command.ActivityParams(wf.env, payloads) - activityID := wf.env.ExecuteActivity(params, wf.createCallback(id)) + params := command.ActivityParams(wf.env, msg.Payloads) + activityID := wf.env.ExecuteActivity(params, wf.createCallback(msg.ID)) - wf.canceller.register(id, func() error { + wf.canceller.register(msg.ID, func() error { wf.env.RequestCancelActivity(activityID) return nil }) case *rrt.ExecuteChildWorkflow: - params := command.WorkflowParams(wf.env, payloads) + params := command.WorkflowParams(wf.env, msg.Payloads) // always use deterministic id if params.WorkflowID == "" { @@ -204,18 +199,18 @@ func (wf *workflowProcess) handleMessage(msg rrt.Message) error { params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID)) } - wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) { - wf.ids.push(id, r, e) + wf.env.ExecuteChildWorkflow(params, wf.createCallback(msg.ID), func(r bindings.WorkflowExecution, e error) { + wf.ids.push(msg.ID, r, e) }) - wf.canceller.register(id, func() error { + wf.canceller.register(msg.ID, func() error { wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID) return nil }) case *rrt.GetChildWorkflowExecution: wf.ids.listen(command.ID, func(w bindings.WorkflowExecution, err error) { - cl := wf.createCallback(id) + cl := wf.createCallback(msg.ID) // TODO rewrite if err != nil { @@ -231,8 +226,8 @@ func (wf *workflowProcess) handleMessage(msg rrt.Message) error { }) case *rrt.NewTimer: - timerID := wf.env.NewTimer(command.ToDuration(), wf.createCallback(id)) - wf.canceller.register(id, func() error { + timerID := wf.env.NewTimer(command.ToDuration(), wf.createCallback(msg.ID)) + wf.canceller.register(msg.ID, func() error { if timerID != nil { wf.env.RequestCancelTimer(*timerID) } @@ -251,7 +246,7 @@ func (wf *workflowProcess) handleMessage(msg rrt.Message) error { return errors.E(op, err) } - wf.mq.pushResponse(id, result) + wf.mq.pushResponse(msg.ID, result) err = wf.flushQueue() if err != nil { return errors.E(op, err) @@ -260,28 +255,28 @@ func (wf *workflowProcess) handleMessage(msg rrt.Message) error { case *rrt.SideEffect: wf.env.SideEffect( func() (*commonpb.Payloads, error) { - return payloads, nil + return msg.Payloads, nil }, - wf.createContinuableCallback(id), + wf.createContinuableCallback(msg.ID), ) case *rrt.CompleteWorkflow: result, _ := wf.env.GetDataConverter().ToPayloads("completed") - wf.mq.pushResponse(id, result) + wf.mq.pushResponse(msg.ID, result) if msg.Failure == nil { - wf.env.Complete(payloads, nil) + wf.env.Complete(msg.Payloads, nil) } else { wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter())) } case *rrt.ContinueAsNew: result, _ := wf.env.GetDataConverter().ToPayloads("completed") - wf.mq.pushResponse(id, result) + wf.mq.pushResponse(msg.ID, result) wf.env.Complete(nil, &workflow.ContinueAsNewError{ WorkflowType: &bindings.WorkflowType{Name: command.Name}, - Input: payloads, + Input: msg.Payloads, Header: wf.header, TaskQueueName: command.Options.TaskQueueName, WorkflowExecutionTimeout: command.Options.WorkflowExecutionTimeout, @@ -295,23 +290,23 @@ func (wf *workflowProcess) handleMessage(msg rrt.Message) error { command.WorkflowID, command.RunID, command.Signal, - payloads, + msg.Payloads, nil, command.ChildWorkflowOnly, - wf.createCallback(id), + wf.createCallback(msg.ID), ) case *rrt.CancelExternalWorkflow: - wf.env.RequestCancelExternalWorkflow(command.Namespace, command.WorkflowID, command.RunID, wf.createCallback(id)) + wf.env.RequestCancelExternalWorkflow(command.Namespace, command.WorkflowID, command.RunID, wf.createCallback(msg.ID)) case *rrt.Cancel: - err = wf.canceller.cancel(command.CommandIDs...) + err := wf.canceller.cancel(command.CommandIDs...) if err != nil { return errors.E(op, err) } result, _ := wf.env.GetDataConverter().ToPayloads("completed") - wf.mq.pushResponse(id, result) + wf.mq.pushResponse(msg.ID, result) err = wf.flushQueue() if err != nil { diff --git a/workflow/worker.go b/workflow/worker.go index 237258d2..af72a2f9 100644 --- a/workflow/worker.go +++ b/workflow/worker.go @@ -37,11 +37,11 @@ type pool interface { // workerImpl manages workflowProcess executions between pool restarts. type workerImpl struct { + sync.Mutex codec rrt.Codec seqID uint64 workflows map[string]rrt.WorkflowInfo tWorkers []tWorker.Worker - mu sync.Mutex pool rrPool.Pool } @@ -66,7 +66,6 @@ func newPool(codec rrt.Codec, factory server.Server, listener ...events.Listener wrk := &workerImpl{ codec: codec, - mu: sync.Mutex{}, pool: p, } @@ -94,8 +93,8 @@ func (w *workerImpl) Start(ctx context.Context, temporal client.Temporal) error // Destroy stops all temporal workers and application pool. func (w *workerImpl) Destroy(ctx context.Context) error { - w.mu.Lock() - defer w.mu.Unlock() + w.Lock() + defer w.Unlock() for i := 0; i < len(w.tWorkers); i++ { w.tWorkers[i].Stop() @@ -110,8 +109,8 @@ func (w *workerImpl) Destroy(ctx context.Context) error { // Pool returns rr Pool func (w *workerImpl) Pool() rrPool.Pool { - w.mu.Lock() - defer w.mu.Unlock() + w.Lock() + defer w.Unlock() return w.pool } @@ -130,20 +129,14 @@ func (w *workerImpl) SeqID() uint64 { // Exec set of commands in thread safe move. func (w *workerImpl) Exec(p payload.Payload) (payload.Payload, error) { - w.mu.Lock() - defer w.mu.Unlock() + w.Lock() + defer w.Unlock() return w.pool.Exec(p) } func (w *workerImpl) Workers() []rrWorker.BaseProcess { - wrk := w.pool.Workers() - base := make([]rrWorker.BaseProcess, 0, 1) - for i := 0; i < len(wrk); i++ { - base = append(base, rrWorker.FromSync(wrk[i].(*rrWorker.SyncWorkerImpl))) - } - - return base + return w.pool.Workers() } func (w *workerImpl) WorkflowNames() []string { @@ -164,22 +157,22 @@ func (w *workerImpl) initPool(ctx context.Context, temporal client.Temporal) err } w.workflows = make(map[string]rrt.WorkflowInfo) - w.tWorkers = make([]tWorker.Worker, 0) + w.tWorkers = make([]tWorker.Worker, 0, len(workerInfo)) - for j := range workerInfo { - wrk, err := temporal.CreateWorker(workerInfo[j].TaskQueue, workerInfo[j].Options) + for i := range workerInfo { + wrk, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options) if err != nil { return errors.E(op, err, w.Destroy(ctx)) } w.tWorkers = append(w.tWorkers, wrk) - for i := range workerInfo[j].Workflows { + for j := range workerInfo[i].Workflows { wrk.RegisterWorkflowWithOptions(w, workflow.RegisterOptions{ - Name: workerInfo[j].Workflows[i].Name, + Name: workerInfo[i].Workflows[j].Name, DisableAlreadyRegisteredCheck: false, }) - w.workflows[workerInfo[j].Workflows[i].Name] = workerInfo[j].Workflows[i] + w.workflows[workerInfo[i].Workflows[j].Name] = workerInfo[i].Workflows[j] } } From a09b2454f2384eaa90c23512481cccbdd700f19a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 11 Feb 2021 14:23:26 +0300 Subject: [PATCH 2/7] Add CHANGELOG --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..2d25bbce --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,9 @@ +CHANGELOG +========= + + +v1.0.0-RC.1 (11.02.2021) +------------------- +- RR-Core to v2.0.0-RC.1 +- Endure update to v1.0.0-beta.23 +- Non significant improvements (comments, errors.E usage, small logical issues) From 28ceeb63e8715fe35713ebc9a46d8229fd5736b6 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 11 Feb 2021 14:32:27 +0300 Subject: [PATCH 3/7] Update CI add some time to wait the temporal docker container --- .github/dependabot.yml | 2 +- .github/workflows/linux.yml | 4 +-- .github/workflows/macos.yml | 66 ------------------------------------- CHANGELOG.md | 1 + 4 files changed, 4 insertions(+), 69 deletions(-) delete mode 100644 .github/workflows/macos.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 3483c95e..2c561205 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -8,5 +8,5 @@ updates: - package-ecosystem: gomod # See documentation for possible values directory: "/" # Location of package manifests schedule: - interval: "daily" + interval: daily diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 178555e9..614839c7 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -16,7 +16,7 @@ jobs: runs-on: ${{ matrix.os }} timeout-minutes: 60 strategy: - fail-fast: false + fail-fast: true matrix: php: [ "7.4", "8.0" ] go: [ "1.14", "1.15" ] @@ -64,7 +64,7 @@ jobs: run: | docker-compose -f ./tests/docker-compose.yaml up -d mkdir ./coverage-ci - sleep 15 + sleep 60 go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal.txt -covermode=atomic ./tests go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_protocol.txt -covermode=atomic ./protocol go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_workflow.txt -covermode=atomic ./workflow diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml deleted file mode 100644 index 738a74e1..00000000 --- a/.github/workflows/macos.yml +++ /dev/null @@ -1,66 +0,0 @@ -name: macOS - -on: - push: - pull_request: - branches: - # Branches from forks have the form 'user:branch-name' so we only run - # this job on pull_request events for branches that look like fork - # branches. Without this we would end up running this job twice for non - # forked PRs, once for the push and then once for opening the PR. - - "**:**" - -jobs: - golang: - name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}}) - runs-on: ${{ matrix.os }} - timeout-minutes: 60 - strategy: - fail-fast: false - matrix: - php: [ "7.4", "8.0" ] - go: [ "1.14", "1.15" ] - os: [ macos-latest ] - steps: - - name: Set up Go ${{ matrix.go }} - uses: actions/setup-go@v2 # action page: - with: - go-version: ${{ matrix.go }} - - - name: Set up PHP ${{ matrix.php }} - uses: shivammathur/setup-php@v2 # action page: - with: - php-version: ${{ matrix.php }} - extensions: sockets - - - name: Check out code - uses: actions/checkout@v2 - - - name: Get Composer Cache Directory - id: composer-cache - run: echo "::set-output name=dir::$(composer config cache-files-dir)" - - - name: Init Composer Cache # Docs: - uses: actions/cache@v2 - with: - path: ${{ steps.composer-cache.outputs.dir }} - key: ${{ runner.os }}-composer-${{ matrix.php }}-${{ hashFiles('**/composer.json') }} - restore-keys: ${{ runner.os }}-composer- - - - name: Install Composer dependencies - run: cd tests && composer update --prefer-dist --no-progress --ansi - - - name: Init Go modules Cache # Docs: - uses: actions/cache@v2 - with: - path: ~/go/pkg/mod - key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} - restore-keys: ${{ runner.os }}-go- - - - name: Install Go dependencies - run: go mod download - - - name: Run golang tests on Mac - run: | - go test -v -race -tags=debug ./protocol - go test -v -race -tags=debug ./workflow diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d25bbce..b22b540a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,4 +6,5 @@ v1.0.0-RC.1 (11.02.2021) ------------------- - RR-Core to v2.0.0-RC.1 - Endure update to v1.0.0-beta.23 +- Change `RR_MODE` to `temporal` in the `activity` and `workflow` plugins. - Non significant improvements (comments, errors.E usage, small logical issues) From bb06fc6d9e2ac94d45e9cff3568c9d52e47da6df Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 11 Feb 2021 14:52:05 +0300 Subject: [PATCH 4/7] Remove sleep from the GHA --- .github/workflows/linux.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 614839c7..d4a9b9f4 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -64,7 +64,6 @@ jobs: run: | docker-compose -f ./tests/docker-compose.yaml up -d mkdir ./coverage-ci - sleep 60 go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal.txt -covermode=atomic ./tests go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_protocol.txt -covermode=atomic ./protocol go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/temporal_workflow.txt -covermode=atomic ./workflow From 17d79fc63af15d5084faa03fc00c1966eeaa5a07 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 11 Feb 2021 14:53:02 +0300 Subject: [PATCH 5/7] Use ubuntu-20.04 in the codeql check --- .github/workflows/codeql-analysis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 75e40110..73089e8d 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -17,7 +17,7 @@ on: jobs: analyze: name: Analyze - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 strategy: fail-fast: false From 64202cf3fcfa0b8d2e48b512bebd9fa7ff0af270 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 11 Feb 2021 15:10:57 +0300 Subject: [PATCH 6/7] Reduce number of iteration for the Test_FailedActivityHeartbeat test --- tests/hp_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/hp_test.go b/tests/hp_test.go index 375729c7..35845abb 100644 --- a/tests/hp_test.go +++ b/tests/hp_test.go @@ -321,7 +321,7 @@ func Test_FailedActivityHeartbeat(t *testing.T) { TaskQueue: "default", }, "FailedHeartbeatWorkflow", - 8, + 1, ) assert.NoError(t, err) @@ -333,7 +333,7 @@ func Test_FailedActivityHeartbeat(t *testing.T) { act := we.PendingActivities[0] - assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data)) + assert.Equal(t, `{"value":1}`, string(act.HeartbeatDetails.Payloads[0].Data)) var result string assert.NoError(t, w.Get(context.Background(), &result)) @@ -770,7 +770,7 @@ func Test_FailedActivityHeartbeatProto(t *testing.T) { TaskQueue: "default", }, "FailedHeartbeatWorkflow", - 8, + 1, ) assert.NoError(t, err) @@ -782,7 +782,7 @@ func Test_FailedActivityHeartbeatProto(t *testing.T) { act := we.PendingActivities[0] - assert.Equal(t, `{"value":8}`, string(act.HeartbeatDetails.Payloads[0].Data)) + assert.Equal(t, `{"value":1}`, string(act.HeartbeatDetails.Payloads[0].Data)) var result string assert.NoError(t, w.Get(context.Background(), &result)) From 3bc5768e706054be474fd66d37e8d96e78864d49 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 11 Feb 2021 15:17:25 +0300 Subject: [PATCH 7/7] Remove incorrect test --- tests/hp_test.go | 66 ------------------------------------------------ 1 file changed, 66 deletions(-) diff --git a/tests/hp_test.go b/tests/hp_test.go index 35845abb..150b6226 100644 --- a/tests/hp_test.go +++ b/tests/hp_test.go @@ -309,39 +309,6 @@ func Test_ActivityHeartbeat(t *testing.T) { wg.Wait() } -func Test_FailedActivityHeartbeat(t *testing.T) { - stopCh := make(chan struct{}, 1) - wg := &sync.WaitGroup{} - wg.Add(1) - s := NewTestServer(t, stopCh, wg, false) - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "FailedHeartbeatWorkflow", - 1, - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - assert.Len(t, we.PendingActivities, 1) - - act := we.PendingActivities[0] - - assert.Equal(t, `{"value":1}`, string(act.HeartbeatDetails.Payloads[0].Data)) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK!", result) - stopCh <- struct{}{} - wg.Wait() -} - func Test_BinaryPayload(t *testing.T) { stopCh := make(chan struct{}, 1) wg := &sync.WaitGroup{} @@ -758,39 +725,6 @@ func Test_ActivityHeartbeatProto(t *testing.T) { wg.Wait() } -func Test_FailedActivityHeartbeatProto(t *testing.T) { - stopCh := make(chan struct{}, 1) - wg := &sync.WaitGroup{} - wg.Add(1) - s := NewTestServer(t, stopCh, wg, true) - - w, err := s.Client().ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: "default", - }, - "FailedHeartbeatWorkflow", - 1, - ) - assert.NoError(t, err) - - time.Sleep(time.Second) - - we, err := s.Client().DescribeWorkflowExecution(context.Background(), w.GetID(), w.GetRunID()) - assert.NoError(t, err) - assert.Len(t, we.PendingActivities, 1) - - act := we.PendingActivities[0] - - assert.Equal(t, `{"value":1}`, string(act.HeartbeatDetails.Payloads[0].Data)) - - var result string - assert.NoError(t, w.Get(context.Background(), &result)) - assert.Equal(t, "OK!", result) - stopCh <- struct{}{} - wg.Wait() -} - func Test_BinaryPayloadProto(t *testing.T) { stopCh := make(chan struct{}, 1) wg := &sync.WaitGroup{}