diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index a2627911f5..8911a702d0 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -82,8 +82,6 @@ import ( lock_zookeeper "mosn.io/layotto/components/lock/zookeeper" runtime_lock "mosn.io/layotto/pkg/runtime/lock" - // Sequencer - // Actuator _ "mosn.io/layotto/pkg/actuator" "mosn.io/layotto/pkg/actuator/health" @@ -250,6 +248,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp }), ), ) + // 4. check if unhealthy if err != nil { actuator.GetRuntimeReadinessIndicator().SetUnhealthy(err.Error()) diff --git a/components/rpc/invoker/mosn/channel/connpool.go b/components/rpc/invoker/mosn/channel/connpool.go index d440ec5c23..fcd2c1a97e 100644 --- a/components/rpc/invoker/mosn/channel/connpool.go +++ b/components/rpc/invoker/mosn/channel/connpool.go @@ -142,7 +142,6 @@ func (p *connPool) Put(c *wrapConn, close bool) { func (p *connPool) readloop(c *wrapConn) { var err error - defer func() { c.close() if p.cleanupFunc != nil { @@ -150,7 +149,7 @@ func (p *connPool) readloop(c *wrapConn) { } }() - c.buf = buffer.NewIoBuffer(16 * 1024) + c.buf = buffer.NewIoBuffer(128) for { n, readErr := c.buf.ReadOnce(c) if readErr != nil { diff --git a/components/rpc/invoker/mosn/channel/xchannel.go b/components/rpc/invoker/mosn/channel/xchannel.go index 5dd86d4072..6662cd9e79 100644 --- a/components/rpc/invoker/mosn/channel/xchannel.go +++ b/components/rpc/invoker/mosn/channel/xchannel.go @@ -50,6 +50,10 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) { m.pool = newConnPool( config.Size, func() (net.Conn, error) { + if _, _, err := net.SplitHostPort(config.Listener); err == nil { + return net.DialTimeout("tcp", config.Listener, time.Second) + } + local, remote := net.Pipe() localTcpConn := &fakeTcpConn{c: local} remoteTcpConn := &fakeTcpConn{c: remote} @@ -93,9 +97,7 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { if err != nil { return nil, err } - xstate := conn.state.(*xstate) - // encode request frame := m.proto.ToFrame(req) id := atomic.AddUint32(&xstate.reqid, 1) @@ -105,7 +107,6 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { m.pool.Put(conn, false) return nil, common.Error(common.InternalCode, encErr.Error()) } - callChan := make(chan call, 1) // set timeout deadline, _ := ctx.Deadline() @@ -117,7 +118,6 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { xstate.mu.Lock() xstate.calls[id] = callChan xstate.mu.Unlock() - // write packet if _, err := conn.Write(buf.Bytes()); err != nil { m.removeCall(xstate, id) @@ -125,7 +125,6 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { return nil, common.Error(common.UnavailebleCode, err.Error()) } m.pool.Put(conn, false) - select { case res := <-callChan: if res.err != nil { diff --git a/components/rpc/invoker/mosn/mosninvoker.go b/components/rpc/invoker/mosn/mosninvoker.go index 0403c60968..42d67dbe31 100644 --- a/components/rpc/invoker/mosn/mosninvoker.go +++ b/components/rpc/invoker/mosn/mosninvoker.go @@ -21,6 +21,9 @@ import ( "encoding/json" "errors" "fmt" + "os/user" + "strconv" + "time" "mosn.io/layotto/components/rpc" "mosn.io/layotto/components/rpc/callback" @@ -33,6 +36,8 @@ const ( Name = "mosn" ) +var LayottoStatLogger *log.Logger + type mosnInvoker struct { channel rpc.Channel cb rpc.Callback @@ -73,6 +78,12 @@ func (m *mosnInvoker) Init(conf rpc.RpcConfig) error { return err } m.channel = channel + usr, err := user.Current() + logRoot := usr.HomeDir + "/logs/tracelog/mosn/" + LayottoStatLogger, err = log.GetOrCreateLogger(logRoot+"layotto-client-stat.log", &log.Roller{MaxTime: 24 * 60 * 60}) + if err != nil { + return err + } return nil } @@ -88,13 +99,13 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp req.Timeout = 3000 } req.Ctx = ctx + startTime := time.Now() log.DefaultLogger.Debugf("[runtime][rpc]request %+v", req) req, err = m.cb.BeforeInvoke(req) if err != nil { log.DefaultLogger.Errorf("[runtime][rpc]before filter error %s", err.Error()) return nil, err } - resp, err = m.channel.Do(req) if err != nil { log.DefaultLogger.Errorf("[runtime][rpc]error %s", err.Error()) @@ -106,5 +117,14 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp if err != nil { log.DefaultLogger.Errorf("[runtime][rpc]after filter error %s", err.Error()) } + afterInvokeTime := time.Now() + rpcId := req.Header.Get("rpc_trace_context.sofaRpcId") + traceId := req.Header.Get("rpc_trace_context.sofaTraceId") + + LayottoStatLogger.Printf("%+v,%v,%+v", + rpcId, + traceId, + strconv.FormatInt(afterInvokeTime.Sub(startTime).Nanoseconds()/1000, 10), + ) return resp, err } diff --git a/components/sequencer/redis/standalone_redis_sequencer.go b/components/sequencer/redis/standalone_redis_sequencer.go new file mode 100644 index 0000000000..85d200e68e --- /dev/null +++ b/components/sequencer/redis/standalone_redis_sequencer.go @@ -0,0 +1,110 @@ +package redis + +import ( + "context" + "github.com/go-redis/redis/v8" + "mosn.io/layotto/components/pkg/utils" + "mosn.io/layotto/components/sequencer" + "mosn.io/pkg/log" +) + +type StandaloneRedisSequencer struct { + client *redis.Client + metadata utils.RedisMetadata + biggerThan map[string]int64 + + logger log.ErrorLogger + + ctx context.Context + cancel context.CancelFunc +} + +// NewStandaloneRedisSequencer returns a new redis sequencer +func NewStandaloneRedisSequencer(logger log.ErrorLogger) *StandaloneRedisSequencer { + s := &StandaloneRedisSequencer{ + logger: logger, + } + return s +} + +/* + 1. exists and >= biggerThan, no operation required, return 0 + 2. not exists or < biggthan, reset val, return 1 + 3. lua script occur error, such as tonumer(string), return error +*/ +const initScript = ` +if redis.call('exists', KEYS[1])==1 and tonumber(redis.call('get', KEYS[1])) >= tonumber(ARGV[1]) then + return 0 +else + redis.call('set', KEYS[1],ARGV[1]) + return 1 +end +` + +func (s *StandaloneRedisSequencer) Init(config sequencer.Configuration) error { + m, err := utils.ParseRedisMetadata(config.Properties) + if err != nil { + return err + } + //init + s.metadata = m + s.biggerThan = config.BiggerThan + + // construct client + s.client = utils.NewRedisClient(m) + s.ctx, s.cancel = context.WithCancel(context.Background()) + + //check biggerThan, initialize if not satisfied + for k, needV := range s.biggerThan { + if needV <= 0 { + continue + } + + eval := s.client.Eval(s.ctx, initScript, []string{k}, needV) + err = eval.Err() + //occur error, such as value is string type + if err != nil { + return err + } + //As long as there is no error, the initialization is successful + //It may be a reset value or it may be satisfied before + } + return nil +} + +func (s *StandaloneRedisSequencer) GetNextId(req *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) { + + incr := s.client.Incr(s.ctx, req.Key) + + err := incr.Err() + if err != nil { + return nil, err + } + + return &sequencer.GetNextIdResponse{ + NextId: incr.Val(), + }, nil +} + +func (s *StandaloneRedisSequencer) GetSegment(req *sequencer.GetSegmentRequest) (bool, *sequencer.GetSegmentResponse, error) { + + // size=0 only check support + if req.Size == 0 { + return true, nil, nil + } + + by := s.client.IncrBy(s.ctx, req.Key, int64(req.Size)) + err := by.Err() + if err != nil { + return true, nil, err + } + + return true, &sequencer.GetSegmentResponse{ + From: by.Val() - int64(req.Size) + 1, + To: by.Val(), + }, nil +} +func (s *StandaloneRedisSequencer) Close() error { + s.cancel() + return s.client.Close() +} diff --git a/components/sequencer/redis/standalone_redis_sequencer_test.go b/components/sequencer/redis/standalone_redis_sequencer_test.go new file mode 100644 index 0000000000..5f05d715cf --- /dev/null +++ b/components/sequencer/redis/standalone_redis_sequencer_test.go @@ -0,0 +1,136 @@ +package redis + +import ( + "fmt" + "github.com/alicebob/miniredis/v2" + "github.com/stretchr/testify/assert" + "mosn.io/layotto/components/sequencer" + "mosn.io/pkg/log" + "testing" +) + +const key = "resource_xxx" + +func TestStandaloneRedisSequencer(t *testing.T) { + s, err := miniredis.Run() + assert.NoError(t, err) + defer s.Close() + // construct component + comp := NewStandaloneRedisSequencer(log.DefaultLogger) + cfg := sequencer.Configuration{ + Properties: make(map[string]string), + } + cfg.Properties["redisHost"] = s.Addr() + cfg.Properties["redisPassword"] = "" + // init + err = comp.Init(cfg) + assert.NoError(t, err) + //first request + id, err := comp.GetNextId(&sequencer.GetNextIdRequest{ + Key: key, + }) + assert.NoError(t, err) + assert.Equal(t, int64(1), id.NextId) + + //again + id, err = comp.GetNextId(&sequencer.GetNextIdRequest{ + Key: key, + }) + assert.NoError(t, err) + assert.Equal(t, int64(2), id.NextId) +} + +func TestStandaloneRedisSequencer_biggerThan_success(t *testing.T) { + s, err := miniredis.Run() + assert.NoError(t, err) + defer s.Close() + // construct component + comp := NewStandaloneRedisSequencer(log.DefaultLogger) + cfg := sequencer.Configuration{ + Properties: make(map[string]string), + } + defalutVal := int64(20) + //init kv + s.Set(key, fmt.Sprint(defalutVal)) + cfg.Properties["redisHost"] = s.Addr() + cfg.Properties["redisPassword"] = "" + cfg.BiggerThan = map[string]int64{ + key: defalutVal, + } + + // init + err = comp.Init(cfg) + assert.NoError(t, err) + //first request + id, err := comp.GetNextId(&sequencer.GetNextIdRequest{ + Key: key, + }) + assert.NoError(t, err) + assert.Equal(t, defalutVal+1, id.NextId) + + //again + id, err = comp.GetNextId(&sequencer.GetNextIdRequest{ + Key: key, + }) + assert.NoError(t, err) + assert.Equal(t, defalutVal+2, id.NextId) +} + +func TestStandaloneRedisSequencer_biggerThan_fail_reset(t *testing.T) { + s, err := miniredis.Run() + assert.NoError(t, err) + defer s.Close() + // construct component + comp := NewStandaloneRedisSequencer(log.DefaultLogger) + cfg := sequencer.Configuration{ + Properties: make(map[string]string), + } + defalutVal := int64(20) + //init kv + s.Set(key, fmt.Sprint(defalutVal)) + // init config + cfg.Properties["redisHost"] = s.Addr() + cfg.Properties["redisPassword"] = "" + cfg.BiggerThan = map[string]int64{ + key: defalutVal + 5, + } + err = comp.Init(cfg) + assert.NoError(t, err) + + //first request + id, err := comp.GetNextId(&sequencer.GetNextIdRequest{ + Key: key, + }) + assert.NoError(t, err) + assert.Equal(t, defalutVal+5+1, id.NextId) + +} + +func TestStandaloneRedisSequencer_segment(t *testing.T) { + s, err := miniredis.Run() + assert.NoError(t, err) + defer s.Close() + // construct component + comp := NewStandaloneRedisSequencer(log.DefaultLogger) + cfg := sequencer.Configuration{ + Properties: make(map[string]string), + } + defalutVal := int64(20) + //init kv + s.Set(key, fmt.Sprint(defalutVal)) + cfg.Properties["redisHost"] = s.Addr() + cfg.Properties["redisPassword"] = "" + + // init + err = comp.Init(cfg) + assert.NoError(t, err) + //first request + _, id, err := comp.GetSegment(&sequencer.GetSegmentRequest{ + Key: key, + Size: 6, + }) + assert.NoError(t, err) + assert.Equal(t, defalutVal+1, id.From) + assert.Equal(t, defalutVal+6, id.To) + +} diff --git a/components/sequencer/store.go b/components/sequencer/store.go index fb7d049e83..f48fc73ece 100644 --- a/components/sequencer/store.go +++ b/components/sequencer/store.go @@ -47,7 +47,8 @@ type GetSegmentRequest struct { } type GetSegmentResponse struct { - Segment []int64 + From int64 + To int64 } type Configuration struct { diff --git a/configs/config_sequencer_redis.json b/configs/config_sequencer_redis.json new file mode 100644 index 0000000000..61bd6f9741 --- /dev/null +++ b/configs/config_sequencer_redis.json @@ -0,0 +1,75 @@ +{ + "servers": [ + { + "default_log_path": "stdout", + "default_log_level": "DEBUG", + "routers": [ + { + "router_config_name": "actuator_dont_need_router" + } + ], + "listeners": [ + { + "name": "grpc", + "address": "127.0.0.1:34904", + "bind_port": true, + "filter_chains": [ + { + "filters": [ + { + "type": "grpc", + "config": { + "server_name": "runtime", + "grpc_config": { + "hellos": { + "helloworld": { + "hello": "greeting" + } + }, + "sequencer": { + "redis": { + "metadata": { + "redisHost": "127.0.0.1:6379", + "redisPassword": "" + } + } + }, + "app": { + "app_id": "app1", + "grpc_callback_port": 9999 + } + } + } + } + ] + } + ] + }, + { + "name": "actuator", + "address": "127.0.0.1:34999", + "bind_port": true, + "filter_chains": [ + { + "filters": [ + { + "type": "proxy", + "config": { + "downstream_protocol": "Http1", + "upstream_protocol": "Http1", + "router_config_name": "actuator_dont_need_router" + } + } + ] + } + ], + "stream_filters": [ + { + "type": "actuator_filter" + } + ] + } + ] + } + ] +} diff --git a/demo/sequencer/redis/client.go b/demo/sequencer/redis/client.go new file mode 100644 index 0000000000..87d7039382 --- /dev/null +++ b/demo/sequencer/redis/client.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "fmt" + client "mosn.io/layotto/sdk/go-sdk/client" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" +) + +const ( + key = "key666" + storeName = "redis" +) + +func main() { + + cli, err := client.NewClient() + if err != nil { + panic(err) + } + defer cli.Close() + ctx := context.Background() + fmt.Printf("Try to get next id.Key:%s \n", key) + for i := 0; i < 10; i++ { + id, err := cli.GetNextId(ctx, &runtimev1pb.GetNextIdRequest{ + StoreName: storeName, + Key: key, + Options: nil, + Metadata: nil, + }) + if err != nil { + fmt.Print(err) + return + } + fmt.Printf("Next id:%v \n", id) + } + fmt.Println("Demo success!") +} diff --git a/docs/_sidebar.md b/docs/_sidebar.md index 7b7f008056..56524c577b 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -47,6 +47,8 @@ - [Apollo](en/component_specs/configuration/apollo.md) - [Sequencer](en/component_specs/sequencer/common.md) - [Etcd](en/component_specs/sequencer/etcd.md) + - [Redis](en/component_specs/sequencer/redis.md) + - Design documents - [Actuator design doc](en/design/actuator/actuator-design-doc.md) - [Configuration API with Apollo](en/design/configuration/configuration-api-with-apollo.md) diff --git a/docs/en/component_specs/sequencer/redis.md b/docs/en/component_specs/sequencer/redis.md new file mode 100644 index 0000000000..ea3fbd5198 --- /dev/null +++ b/docs/en/component_specs/sequencer/redis.md @@ -0,0 +1,47 @@ +# Redis + +## metadata fields +Example: configs/config_sequencer_redis.json + +| Field | Required | Description | +| --- | --- | --- | +| redisHost | Y | redis server address, such as localhost:6380 | +| redisPassword | Y | redis Password | +|maxRetries|N| maximum number of retries before giving upy,default value is 3| +|maxRetryBackoff|N| maximum backoff between each retry,default value is 2s | +|enableTLS |N| controls whether a client verifies the server's certificate chain and host name,default value is false| + +## How to avoid generating duplicate id +Redis components may generate duplicate IDs in the case of data loss. + +In order to avoid data loss and duplicate IDs, you need to use stand-alone redis and [use both persistence methods to get a degree of data safety comparable to what PostgreSQL can provide you.](https://redis.io/topics/persistence) + +## How to start Redis +If you want to run the redis demo, you need to start a Redis server with Docker first. + +command: +```shell +docker pull redis:latest +docker run -itd --name redis-test -p 6379:6379 redis +``` + +## Run layotto + +````shell +cd ${projectpath}/cmd/layotto +go build +```` +>If build reports an error, it can be executed in the root directory of the project `go mod vendor` + +Execute after the compilation is successful: +````shell +./layotto start -c ../../configs/config_sequencer_redis.json +```` + +## Run Demo + +````shell +cd ${projectpath}/demo/sequencer/redis/ + go build -o client + ./client +```` \ No newline at end of file diff --git a/docs/img/actuator/networkfilter-grpc.jpg b/docs/img/actuator/networkfilter-grpc.jpg new file mode 100644 index 0000000000..702120f931 Binary files /dev/null and b/docs/img/actuator/networkfilter-grpc.jpg differ diff --git a/docs/img/actuator/networkfilter.jpg b/docs/img/actuator/networkfilter.jpg new file mode 100644 index 0000000000..da31029b50 Binary files /dev/null and b/docs/img/actuator/networkfilter.jpg differ diff --git a/docs/zh/_sidebar.md b/docs/zh/_sidebar.md index 34de76c253..e40cabbf71 100644 --- a/docs/zh/_sidebar.md +++ b/docs/zh/_sidebar.md @@ -47,8 +47,10 @@ - [Apollo](zh/component_specs/configuration/apollo.md) - [Sequencer](zh/component_specs/sequencer/common.md) - [Etcd](zh/component_specs/sequencer/etcd.md) + - [Redis](zh/component_specs/sequencer/redis.md) - 设计文档 - [Actuator设计文档](zh/design/actuator/actuator-design-doc.md) + - [gRPC框架设计文档](zh/design/actuator/grpc-design-doc.md) - [Configuration API with Apollo](zh/design/configuration/configuration-api-with-apollo.md) - [Pub/Sub API以及与dapr component的兼容性](zh/design/pubsub/pubsub-api-and-compability-with-dapr-component.md) - [RPC设计文档](zh/design/rpc/rpc设计文档.md) @@ -61,4 +63,4 @@ - [Layotto贡献者指南](zh/development/CONTRIBUTING.md) - 博客 - [蚂蚁云原生应用运行时的探索和实践 - ArchSummit 上海](zh/blog/exploration-and-practice-of-antcloud-native-application-runtime-archsummit-shanghai.md) - - [MOSN子项目Layotto:开启服务网格+应用运行时新篇章](zh/blog/mosn-subproject-layotto-opening-a-new-chapter-in-service-grid-application-runtime/index.md) \ No newline at end of file + - [MOSN子项目Layotto:开启服务网格+应用运行时新篇章](zh/blog/mosn-subproject-layotto-opening-a-new-chapter-in-service-grid-application-runtime/index.md) diff --git a/docs/zh/component_specs/sequencer/redis.md b/docs/zh/component_specs/sequencer/redis.md new file mode 100644 index 0000000000..fc9715bbca --- /dev/null +++ b/docs/zh/component_specs/sequencer/redis.md @@ -0,0 +1,44 @@ +# Redis + +## 配置项说明 +示例:configs/config_sequencer_redis.json + +| 字段 | 必填 | 说明 | +| --- | --- | --- | +| redisHost | Y | redis服务器地址,例如localhost:6379 | +| redisPassword | Y | redis密码 | +|maxRetries|N| 放弃前的最大重试次数,默认值为3| +|maxRetryBackoff|N| 每次重试之间的最大退避时间,默认值为2s | +|enableTLS |N| 客户端是否验证服务器的证书链和主机名,默认值为false| + +## 如何避免生成重复id +redis组件在丢数据的情况下可能生成重复id,为了避免重复id需要使用单机redis,[需要特殊配置redis服务器,把两种落盘策略都打开、每次写操作都写磁盘](https://redis.io/topics/persistence) 避免丢数据。 + +## 怎么启动Redis +如果想启动redis的demo,需要先用Docker启动一个Redis +命令: +```shell +docker pull redis:latest +docker run -itd --name redis-test -p 6379:6379 redis +``` + +## 启动 layotto + +````shell +cd ${projectpath}/cmd/layotto +go build +```` +>如果 build 报错,可以在项目根目录执行 `go mod vendor` + +编译成功后执行: +````shell +./layotto start -c ../../configs/config_sequencer_redis.json +```` + +## 运行 Demo + +````shell +cd ${projectpath}/demo/sequencer/redis/ + go build -o client + ./client +```` \ No newline at end of file diff --git a/docs/zh/design/actuator/grpc-design-doc.md b/docs/zh/design/actuator/grpc-design-doc.md new file mode 100644 index 0000000000..c5ce55210d --- /dev/null +++ b/docs/zh/design/actuator/grpc-design-doc.md @@ -0,0 +1,112 @@ +# MOSN gRPC 框架设计文档 + +## 背景 + +MOSN 基于 go grpc server 框架提供一个 GRPC Server 的能力,相比于原生的 go grpc server 框架,可以获得如下能力: + ++ 完全复用 MOSN Sidecar 的部署、升级、运维能力 ++ 可复用 MOSN 中通用的基础能力:热升级、Listener Filter、部分 Network Filter 等 ++ 可复用部分 MOSN 中的 StreamFilter 扩展能力 + +## 设计思路 + ++ MOSN 的 gRPC 能力主要还是基于官方的 gRPC 库进行核心能力的实现,并且尽量减少 gRPC Server 开发者感受到的差异 ++ 基于 NetworkFilter 机制进行实现 + +## 详细设计 + +首先梳理一下 NetworkFilter 机制与处理流程 + +![networkfilter.png](../../../img/actuator/networkfilter.jpg) + ++ 在配置解析时,完成 gRPC Server 的启动,随着 MOSN 的 Listener 监听开始提供服务 ++ 一个连接对应一个 NetworkFilter 对象 + + InitializeReadFilterCallbacks 和 OnNewConnection 也是在连接创建时调用的接口,负责进行连接初始化的工作 + + OnData 是在收到数据以后调用的接口,负责数据的传递 ++ go gRPC Server 库,从“监听”的 Listener 中 Accept 一个连接,然后进行读写,而在 MOSN 框架中,Listener 的监听和连接数据的读写都处理过了,这里需要进行一层处理 + + Listener 和 Conn 都是 interface,可以在 MOSN 的 Filter 中进行处理以后,再把数据传给 gRPC Server,做到 gRPC Server 无感知 + + 在配置解析时,实现 Listener 的封装 + + 在 InitializeReadFilterCallbacks 中实现 Conn 的封装 + + 在 OnNewConnection 中将封装的 Conn 传递给封装的 Listener,触发 Listener.Accept + + 在 OnData 中将读取到的数据传递给封装的 Conn,触发 Conn.Read + +![networkfilter-grpc.png](../../../img/actuator/networkfilter-grpc.jpg) + ++ gRPC Server 的实现 + + 在使用官方 gRPC 框架实现 gRPC Server 的时候,开发者需要基于 proto 文件生成一个.pb.go 文件,同时需要实现一组接口满足 proto 中定义的接口实现,将其注册(Register)到 gRPC Server 框架中 + + MOSN 的 gRPC NetworkFilter 也需要提供类似的注册能力,让开发者只关注对应 gRPC Server 实现逻辑,然后注册到 MOSN 框架中即可 + ++ MOSN GRPC 框架要求开发者实现一个函数,该函数返回一个未调用 Serve 方法的 grpc server。框架会使用自定义的 Listener 去调用 Serve 方法实现对数据的拦截处理 + +```Go +func init() { + mgrpc.RegisterServerHandler("mygrpc", MyFunc) +} +func MyFunc(_ json.RawMessage) *grpc.Server { + s := grpc.NewServer() + // pb 是.pb.go 所在的 package 路径 + // server 是开发者实现的 api 接口 + pb.RegisterGreeterServer(s, &server{}) + return s +} +``` + ++ 预期使用的配置示例 (layotto) + +```json +{ + "servers":[ + { + "default_log_path":"stdout", + "default_log_level": "INFO", + "listeners":[ + { + "name":"grpc", + "address": "0.0.0.0:34904", + "bind_port": true, + "filter_chains": [{ + "filters": [ + { + "type": "grpc", + "config": { + "server_name":"runtime", + "grpc_config": { + "hellos": { + "helloworld": { + "hello": "greeting" + } + }, + "config_stores": { + "etcd": { + "address": ["127.0.0.1:2379"], + "timeout": "10" + } + } + } + } + } + ] + }], + "stream_filters": [ + { + "type": "flowControlFilter", + "config": { + "global_switch": true, + "limit_key_type": "PATH", + "rules": [ + { + "resource": "/spec.proto.runtime.v1.Runtime/SayHello", + "grade": 1, + "threshold": 5 + } + ] + } + } + ] + } + ] + } + ] +} + +``` diff --git a/go.sum b/go.sum index 2c4f55c5c9..a6ba0139a7 100644 --- a/go.sum +++ b/go.sum @@ -455,7 +455,6 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= -github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= diff --git a/pkg/grpc/api.go b/pkg/grpc/api.go index ee4d9f1cfb..37b51a72df 100644 --- a/pkg/grpc/api.go +++ b/pkg/grpc/api.go @@ -26,6 +26,7 @@ import ( "github.com/dapr/components-contrib/state" "github.com/gammazero/workerpool" "github.com/golang/protobuf/ptypes/empty" + "mosn.io/layotto/pkg/converter" runtime_lock "mosn.io/layotto/pkg/runtime/lock" runtime_sequencer "mosn.io/layotto/pkg/runtime/sequencer" @@ -41,6 +42,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/emptypb" + "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" "mosn.io/layotto/components/lock" @@ -764,7 +766,7 @@ func (a *api) Unlock(ctx context.Context, req *runtimev1pb.UnlockRequest) (*runt return newInternalErrorUnlockResponse(), err } if req.LockOwner == "" { - err := status.Errorf(codes.InvalidArgument, messages.ErrResourceIdEmpty, req.StoreName) + err := status.Errorf(codes.InvalidArgument, messages.ErrLockOwnerEmpty, req.StoreName) return newInternalErrorUnlockResponse(), err } // 2. find store component diff --git a/pkg/grpc/api_test.go b/pkg/grpc/api_test.go index 8aa30ea25a..6ed968c44e 100644 --- a/pkg/grpc/api_test.go +++ b/pkg/grpc/api_test.go @@ -20,16 +20,31 @@ import ( "context" "errors" "fmt" + "net" + "testing" + "time" + + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/components-contrib/state" "github.com/golang/mock/gomock" + "github.com/golang/protobuf/ptypes/any" "github.com/stretchr/testify/assert" + tmock "github.com/stretchr/testify/mock" "google.golang.org/grpc" + "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" + "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/rpc" + mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" + "mosn.io/layotto/components/sequencer" "mosn.io/layotto/pkg/mock" + mock_invoker "mosn.io/layotto/pkg/mock/components/invoker" + mock_lock "mosn.io/layotto/pkg/mock/components/lock" + mock_pubsub "mosn.io/layotto/pkg/mock/components/pubsub" + mock_sequencer "mosn.io/layotto/pkg/mock/components/sequencer" + "mosn.io/layotto/pkg/mock/components/state" runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" - "net" - "testing" - "time" ) const ( @@ -133,7 +148,7 @@ func TestGetConfiguration(t *testing.T) { mockConfigStore := mock.NewMockStore(ctrl) api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil, nil) mockConfigStore.EXPECT().Get(gomock.Any(), gomock.Any()).Return([]*configstores.ConfigurationItem{ - &configstores.ConfigurationItem{Key: "sofa", Content: "sofa1"}, + {Key: "sofa", Content: "sofa1"}, }, nil).Times(1) res, err := api.GetConfiguration(context.Background(), &runtimev1pb.GetConfigurationRequest{StoreName: "mock", AppId: "mosn", Keys: []string{"sofa"}}) assert.Nil(t, err) @@ -145,19 +160,74 @@ func TestGetConfiguration(t *testing.T) { } func TestSaveConfiguration(t *testing.T) { - ctrl := gomock.NewController(t) - mockConfigStore := mock.NewMockStore(ctrl) - api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil, nil) - _, err := api.SaveConfiguration(context.Background(), &runtimev1pb.SaveConfigurationRequest{StoreName: "etcd"}) - assert.Equal(t, err.Error(), "configure store [etcd] don't support now") + t.Run("normal", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockConfigStore := mock.NewMockStore(ctrl) + mockConfigStore.EXPECT().Set(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req *configstores.SetRequest) error { + assert.Equal(t, "appid", req.AppId) + assert.Equal(t, "mock", req.StoreName) + assert.Equal(t, 1, len(req.Items)) + return nil + }) + req := &runtimev1pb.SaveConfigurationRequest{ + StoreName: "mock", + AppId: "appid", + Items: []*runtimev1pb.ConfigurationItem{ + { + Key: "key", + Content: "value", + Group: " ", + Label: " ", + Tags: nil, + Metadata: nil, + }, + }, + Metadata: nil, + } + api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil, nil) + _, err := api.SaveConfiguration(context.Background(), req) + assert.Nil(t, err) + }) + + t.Run("unsupport configstore", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockConfigStore := mock.NewMockStore(ctrl) + api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil, nil) + _, err := api.SaveConfiguration(context.Background(), &runtimev1pb.SaveConfigurationRequest{StoreName: "etcd"}) + assert.Equal(t, err.Error(), "configure store [etcd] don't support now") + }) + } func TestDeleteConfiguration(t *testing.T) { - ctrl := gomock.NewController(t) - mockConfigStore := mock.NewMockStore(ctrl) - api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil, nil) - _, err := api.DeleteConfiguration(context.Background(), &runtimev1pb.DeleteConfigurationRequest{StoreName: "etcd"}) - assert.Equal(t, err.Error(), "configure store [etcd] don't support now") + t.Run("normal", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockConfigStore := mock.NewMockStore(ctrl) + mockConfigStore.EXPECT().Delete(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req *configstores.DeleteRequest) error { + assert.Equal(t, "appid", req.AppId) + assert.Equal(t, 1, len(req.Keys)) + assert.Equal(t, "key", req.Keys[0]) + return nil + }) + req := &runtimev1pb.DeleteConfigurationRequest{ + StoreName: "mock", + AppId: "appid", + Keys: []string{"key"}, + Metadata: nil, + } + api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil, nil) + _, err := api.DeleteConfiguration(context.Background(), req) + assert.Nil(t, err) + }) + + t.Run("unsupport configstore", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockConfigStore := mock.NewMockStore(ctrl) + api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil, nil) + _, err := api.DeleteConfiguration(context.Background(), &runtimev1pb.DeleteConfigurationRequest{StoreName: "etcd"}) + assert.Equal(t, err.Error(), "configure store [etcd] don't support now") + }) + } func TestSubscribeConfiguration(t *testing.T) { @@ -178,5 +248,726 @@ func TestSubscribeConfiguration(t *testing.T) { err = api.SubscribeConfiguration(grpcServer2) assert.NotNil(t, err) assert.Equal(t, err.Error(), "exit") +} + +type MockInvoker struct { + tmock.Mock +} + +func (m *MockInvoker) Init(config rpc.RpcConfig) error { + args := m.Called(config) + return args.Error(0) +} + +func (m *MockInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (*rpc.RPCResponse, error) { + args := m.Called(ctx, req) + return args.Get(0).(*rpc.RPCResponse), args.Error(1) +} + +func TestInvokeService(t *testing.T) { + t.Run("normal", func(t *testing.T) { + resp := &rpc.RPCResponse{ + Header: rpc.RPCHeader{ + "header1": []string{"value1"}, + }, + ContentType: "application/json", + Data: []byte("resp data"), + } + + mockInvoker := mock_invoker.NewMockInvoker(gomock.NewController(t)) + mockInvoker.EXPECT().Invoke(gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, req *rpc.RPCRequest) (*rpc.RPCResponse, error) { + assert.Equal(t, "id1", req.Id) + assert.Equal(t, "POST", req.Method) + assert.Equal(t, "application/json", req.ContentType) + return resp, nil + }) + in := &runtimev1pb.InvokeServiceRequest{ + Id: "id1", + Message: &runtimev1pb.CommonInvokeRequest{ + Method: "POST", + Data: &any.Any{}, + ContentType: "application/json", + }, + } + + a := &api{ + rpcs: map[string]rpc.Invoker{ + mosninvoker.Name: mockInvoker, + }, + } + + _, err := a.InvokeService(context.Background(), in) + assert.Nil(t, err) + }) +} + +func TestPublishEvent(t *testing.T) { + t.Run("invalid pubsub name", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPubSub := mock_pubsub.NewMockPubSub(ctrl) + api := NewAPI("", nil, nil, nil, map[string]pubsub.PubSub{"mock": mockPubSub}, nil, nil, nil) + _, err := api.PublishEvent(context.Background(), &runtimev1pb.PublishEventRequest{}) + assert.Equal(t, "rpc error: code = InvalidArgument desc = pubsub name is empty", err.Error()) + }) + + t.Run("invalid topic", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPubSub := mock_pubsub.NewMockPubSub(ctrl) + api := NewAPI("", nil, nil, nil, map[string]pubsub.PubSub{"mock": mockPubSub}, nil, nil, nil) + req := &runtimev1pb.PublishEventRequest{ + PubsubName: "abc", + } + _, err := api.PublishEvent(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = topic is empty in pubsub abc", err.Error()) + }) + + t.Run("component not found", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPubSub := mock_pubsub.NewMockPubSub(ctrl) + api := NewAPI("", nil, nil, nil, map[string]pubsub.PubSub{"mock": mockPubSub}, nil, nil, nil) + req := &runtimev1pb.PublishEventRequest{ + PubsubName: "abc", + Topic: "abc", + } + _, err := api.PublishEvent(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = pubsub abc not found", err.Error()) + }) + + t.Run("publish success", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPubSub := mock_pubsub.NewMockPubSub(ctrl) + mockPubSub.EXPECT().Publish(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Features().Return(nil) + api := NewAPI("", nil, nil, nil, map[string]pubsub.PubSub{"mock": mockPubSub}, nil, nil, nil) + req := &runtimev1pb.PublishEventRequest{ + PubsubName: "mock", + Topic: "abc", + } + _, err := api.PublishEvent(context.Background(), req) + assert.Nil(t, err) + }) + + t.Run("publish net error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPubSub := mock_pubsub.NewMockPubSub(ctrl) + mockPubSub.EXPECT().Publish(gomock.Any()).Return(fmt.Errorf("net error")) + mockPubSub.EXPECT().Features().Return(nil) + api := NewAPI("", nil, nil, nil, map[string]pubsub.PubSub{"mock": mockPubSub}, nil, nil, nil) + req := &runtimev1pb.PublishEventRequest{ + PubsubName: "mock", + Topic: "abc", + } + _, err := api.PublishEvent(context.Background(), req) + assert.NotNil(t, err) + assert.Equal(t, "rpc error: code = Internal desc = error when publish to topic abc in pubsub mock: net error", err.Error()) + }) +} + +func TestGetBulkState(t *testing.T) { + t.Run("state store not found", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.GetBulkStateRequest{ + StoreName: "abc", + } + _, err := api.GetBulkState(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = state store abc is not found", err.Error()) + }) + + t.Run("get state error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + mockStore.EXPECT().BulkGet(gomock.Any()).Return(false, nil, fmt.Errorf("net error")) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.GetBulkStateRequest{ + StoreName: "mock", + Keys: []string{"mykey"}, + } + _, err := api.GetBulkState(context.Background(), req) + assert.Equal(t, "net error", err.Error()) + }) + + t.Run("support bulk get", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + + compResp := []state.BulkGetResponse{ + { + Data: []byte("mock data"), + Metadata: nil, + }, + } + mockStore.EXPECT().BulkGet(gomock.Any()).Return(true, compResp, nil) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.GetBulkStateRequest{ + StoreName: "mock", + Keys: []string{"mykey"}, + } + rsp, err := api.GetBulkState(context.Background(), req) + assert.Nil(t, err) + assert.Equal(t, []byte("mock data"), rsp.GetItems()[0].GetData()) + }) + + t.Run("don't support bulk get", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + + resp1 := &state.GetResponse{ + Data: []byte("mock data"), + Metadata: nil, + } + + resp2 := &state.GetResponse{ + Data: []byte("mock data2"), + Metadata: nil, + } + mockStore.EXPECT().BulkGet(gomock.Any()).Return(false, nil, nil) + mockStore.EXPECT().Get(gomock.Any()).Return(resp1, nil) + mockStore.EXPECT().Get(gomock.Any()).Return(resp2, nil) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.GetBulkStateRequest{ + StoreName: "mock", + Keys: []string{"mykey", "mykey2"}, + } + rsp, err := api.GetBulkState(context.Background(), req) + assert.Nil(t, err) + assert.Equal(t, 2, len(rsp.GetItems())) + }) + +} + +func TestGetState(t *testing.T) { + t.Run("state store not found", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.GetStateRequest{ + StoreName: "abc", + } + _, err := api.GetState(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = state store abc is not found", err.Error()) + }) + + t.Run("state store not configured", func(t *testing.T) { + api := NewAPI("", nil, nil, nil, nil, nil, nil, nil) + req := &runtimev1pb.GetStateRequest{ + StoreName: "abc", + } + _, err := api.GetState(context.Background(), req) + assert.Equal(t, "rpc error: code = FailedPrecondition desc = state store is not configured", err.Error()) + }) + + t.Run("get modified state key error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.GetStateRequest{ + StoreName: "mock", + Key: "mykey||abc", + } + _, err := api.GetState(context.Background(), req) + assert.Equal(t, "input key/keyPrefix 'mykey||abc' can't contain '||'", err.Error()) + }) + + t.Run("get state error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + mockStore.EXPECT().Get(gomock.Any()).Return(nil, fmt.Errorf("net error")) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.GetStateRequest{ + StoreName: "mock", + Key: "mykey", + } + _, err := api.GetState(context.Background(), req) + assert.Equal(t, "rpc error: code = Internal desc = fail to get mykey from state store mock: net error", err.Error()) + }) + + t.Run("normal", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + + compResp := &state.GetResponse{ + Data: []byte("mock data"), + Metadata: nil, + } + mockStore.EXPECT().Get(gomock.Any()).Return(compResp, nil) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.GetStateRequest{ + StoreName: "mock", + Key: "mykey", + } + rsp, err := api.GetState(context.Background(), req) + assert.Nil(t, err) + assert.Equal(t, []byte("mock data"), rsp.GetData()) + }) } + +func TestSaveState(t *testing.T) { + t.Run("normal", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + mockStore.EXPECT().BulkSet(gomock.Any()).DoAndReturn(func(reqs []state.SetRequest) error { + assert.Equal(t, 1, len(reqs)) + assert.Equal(t, "abc", reqs[0].Key) + assert.Equal(t, []byte("mock data"), reqs[0].Value) + return nil + }) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.SaveStateRequest{ + StoreName: "mock", + States: []*runtimev1pb.StateItem{ + { + Key: "abc", + Value: []byte("mock data"), + }, + }, + } + _, err := api.SaveState(context.Background(), req) + assert.Nil(t, err) + }) + + t.Run("save error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + mockStore.EXPECT().BulkSet(gomock.Any()).Return(fmt.Errorf("net error")) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.SaveStateRequest{ + StoreName: "mock", + States: []*runtimev1pb.StateItem{ + { + Key: "abc", + Value: []byte("mock data"), + }, + }, + } + _, err := api.SaveState(context.Background(), req) + assert.NotNil(t, err) + assert.Equal(t, "rpc error: code = Internal desc = failed saving state in state store mock: net error", err.Error()) + }) +} + +func TestDeleteState(t *testing.T) { + t.Run("normal", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + mockStore.EXPECT().Delete(gomock.Any()).DoAndReturn(func(req *state.DeleteRequest) error { + assert.Equal(t, "abc", req.Key) + return nil + }) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.DeleteStateRequest{ + StoreName: "mock", + Key: "abc", + } + _, err := api.DeleteState(context.Background(), req) + assert.Nil(t, err) + }) + + t.Run("net error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + mockStore.EXPECT().Delete(gomock.Any()).Return(fmt.Errorf("net error")) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.DeleteStateRequest{ + StoreName: "mock", + Key: "abc", + } + _, err := api.DeleteState(context.Background(), req) + assert.NotNil(t, err) + assert.Equal(t, "rpc error: code = Internal desc = failed deleting state with key abc: net error", err.Error()) + }) +} + +func TestDeleteBulkState(t *testing.T) { + t.Run("normal", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + mockStore.EXPECT().BulkDelete(gomock.Any()).DoAndReturn(func(reqs []state.DeleteRequest) error { + assert.Equal(t, "abc", reqs[0].Key) + return nil + }) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.DeleteBulkStateRequest{ + StoreName: "mock", + States: []*runtimev1pb.StateItem{ + { + Key: "abc", + }, + }, + } + _, err := api.DeleteBulkState(context.Background(), req) + assert.Nil(t, err) + }) + + t.Run("net error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + mockStore.EXPECT().BulkDelete(gomock.Any()).Return(fmt.Errorf("net error")) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.DeleteBulkStateRequest{ + StoreName: "mock", + States: []*runtimev1pb.StateItem{ + { + Key: "abc", + }, + }, + } + _, err := api.DeleteBulkState(context.Background(), req) + assert.NotNil(t, err) + assert.Equal(t, "net error", err.Error()) + }) +} + +type MockTxStore struct { + state.Store + state.TransactionalStore +} + +func (m *MockTxStore) Init(metadata state.Metadata) error { + return m.Store.Init(metadata) +} + +func TestExecuteStateTransaction(t *testing.T) { + t.Run("state store not found", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return(nil) + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil) + req := &runtimev1pb.ExecuteStateTransactionRequest{ + StoreName: "abc", + } + _, err := api.ExecuteStateTransaction(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = state store abc is not found", err.Error()) + }) + + t.Run("state store not configured", func(t *testing.T) { + api := NewAPI("", nil, nil, nil, nil, nil, nil, nil) + req := &runtimev1pb.ExecuteStateTransactionRequest{ + StoreName: "abc", + } + _, err := api.ExecuteStateTransaction(context.Background(), req) + assert.Equal(t, "rpc error: code = FailedPrecondition desc = state store is not configured", err.Error()) + }) + + t.Run("normal", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return([]state.Feature{state.FeatureTransactional}) + + mockTxStore := mock_state.NewMockTransactionalStore(gomock.NewController(t)) + mockTxStore.EXPECT().Multi(gomock.Any()).DoAndReturn(func(req *state.TransactionalStateRequest) error { + assert.Equal(t, 2, len(req.Operations)) + assert.Equal(t, "mosn", req.Metadata["runtime"]) + assert.Equal(t, state.Upsert, req.Operations[0].Operation) + assert.Equal(t, state.Delete, req.Operations[1].Operation) + return nil + }) + + store := &MockTxStore{ + mockStore, + mockTxStore, + } + + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": store}, nil, nil) + req := &runtimev1pb.ExecuteStateTransactionRequest{ + StoreName: "mock", + Operations: []*runtimev1pb.TransactionalStateOperation{ + { + OperationType: string(state.Upsert), + Request: &runtimev1pb.StateItem{ + Key: "upsert", + Value: []byte("mock data"), + }, + }, + { + OperationType: string(state.Delete), + Request: &runtimev1pb.StateItem{ + Key: "delete_abc", + }, + }, + { + OperationType: string(state.Delete), + }, + }, + Metadata: map[string]string{ + "runtime": "mosn", + }, + } + _, err := api.ExecuteStateTransaction(context.Background(), req) + assert.Nil(t, err) + }) + + t.Run("net error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := mock_state.NewMockStore(ctrl) + mockStore.EXPECT().Features().Return([]state.Feature{state.FeatureTransactional}) + + mockTxStore := mock_state.NewMockTransactionalStore(gomock.NewController(t)) + mockTxStore.EXPECT().Multi(gomock.Any()).Return(fmt.Errorf("net error")) + + store := &MockTxStore{ + mockStore, + mockTxStore, + } + api := NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": store}, nil, nil) + req := &runtimev1pb.ExecuteStateTransactionRequest{ + StoreName: "mock", + Operations: []*runtimev1pb.TransactionalStateOperation{ + { + OperationType: string(state.Upsert), + Request: &runtimev1pb.StateItem{ + Key: "upsert", + Value: []byte("mock data"), + }, + }, + { + OperationType: string(state.Delete), + Request: &runtimev1pb.StateItem{ + Key: "delete_abc", + }, + }, + { + OperationType: string(state.Delete), + }, + }, + Metadata: map[string]string{ + "runtime": "mosn", + }, + } + _, err := api.ExecuteStateTransaction(context.Background(), req) + assert.NotNil(t, err) + assert.Equal(t, "rpc error: code = Internal desc = error while executing state transaction: net error", err.Error()) + }) +} + +func TestTryLock(t *testing.T) { + t.Run("lock store not configured", func(t *testing.T) { + api := NewAPI("", nil, nil, nil, nil, nil, nil, nil) + req := &runtimev1pb.TryLockRequest{ + StoreName: "abc", + } + _, err := api.TryLock(context.Background(), req) + assert.Equal(t, "rpc error: code = FailedPrecondition desc = lock store is not configured", err.Error()) + }) + + t.Run("resourceid empty", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + api := NewAPI("", nil, nil, nil, nil, nil, map[string]lock.LockStore{"mock": mockLockStore}, nil) + req := &runtimev1pb.TryLockRequest{ + StoreName: "abc", + } + _, err := api.TryLock(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = ResourceId is empty in lock store abc", err.Error()) + }) + + t.Run("lock owner empty", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + api := NewAPI("", nil, nil, nil, nil, nil, map[string]lock.LockStore{"mock": mockLockStore}, nil) + req := &runtimev1pb.TryLockRequest{ + StoreName: "abc", + ResourceId: "resource", + } + _, err := api.TryLock(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = LockOwner is empty in lock store abc", err.Error()) + }) + + t.Run("lock expire is not positive", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + api := NewAPI("", nil, nil, nil, nil, nil, map[string]lock.LockStore{"mock": mockLockStore}, nil) + req := &runtimev1pb.TryLockRequest{ + StoreName: "abc", + ResourceId: "resource", + LockOwner: "owner", + } + _, err := api.TryLock(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = Expire is not positive in lock store abc", err.Error()) + }) + + t.Run("lock store not found", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + api := NewAPI("", nil, nil, nil, nil, nil, map[string]lock.LockStore{"mock": mockLockStore}, nil) + req := &runtimev1pb.TryLockRequest{ + StoreName: "abc", + ResourceId: "resource", + LockOwner: "owner", + Expire: 1, + } + _, err := api.TryLock(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = lock store abc not found", err.Error()) + }) + + t.Run("normal", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + mockLockStore.EXPECT().TryLock(gomock.Any()).DoAndReturn(func(req *lock.TryLockRequest) (*lock.TryLockResponse, error) { + assert.Equal(t, "lock|||resource", req.ResourceId) + assert.Equal(t, "owner", req.LockOwner) + assert.Equal(t, int32(1), req.Expire) + return &lock.TryLockResponse{ + Success: true, + }, nil + }) + api := NewAPI("", nil, nil, nil, nil, nil, map[string]lock.LockStore{"mock": mockLockStore}, nil) + req := &runtimev1pb.TryLockRequest{ + StoreName: "mock", + ResourceId: "resource", + LockOwner: "owner", + Expire: 1, + } + resp, err := api.TryLock(context.Background(), req) + assert.Nil(t, err) + assert.Equal(t, true, resp.Success) + }) + +} + +func TestUnlock(t *testing.T) { + t.Run("lock store not configured", func(t *testing.T) { + api := NewAPI("", nil, nil, nil, nil, nil, nil, nil) + req := &runtimev1pb.UnlockRequest{ + StoreName: "abc", + } + _, err := api.Unlock(context.Background(), req) + assert.Equal(t, "rpc error: code = FailedPrecondition desc = lock store is not configured", err.Error()) + }) + + t.Run("resourceid empty", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + api := NewAPI("", nil, nil, nil, nil, nil, map[string]lock.LockStore{"mock": mockLockStore}, nil) + req := &runtimev1pb.UnlockRequest{ + StoreName: "abc", + } + _, err := api.Unlock(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = ResourceId is empty in lock store abc", err.Error()) + }) + + t.Run("lock owner empty", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + api := NewAPI("", nil, nil, nil, nil, nil, map[string]lock.LockStore{"mock": mockLockStore}, nil) + req := &runtimev1pb.UnlockRequest{ + StoreName: "abc", + ResourceId: "resource", + } + _, err := api.Unlock(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = LockOwner is empty in lock store abc", err.Error()) + }) + + t.Run("lock store not found", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + api := NewAPI("", nil, nil, nil, nil, nil, map[string]lock.LockStore{"mock": mockLockStore}, nil) + req := &runtimev1pb.UnlockRequest{ + StoreName: "abc", + ResourceId: "resource", + LockOwner: "owner", + } + _, err := api.Unlock(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = lock store abc not found", err.Error()) + }) + + t.Run("normal", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + mockLockStore.EXPECT().Unlock(gomock.Any()).DoAndReturn(func(req *lock.UnlockRequest) (*lock.UnlockResponse, error) { + assert.Equal(t, "lock|||resource", req.ResourceId) + assert.Equal(t, "owner", req.LockOwner) + return &lock.UnlockResponse{ + Status: lock.SUCCESS, + }, nil + }) + api := NewAPI("", nil, nil, nil, nil, nil, map[string]lock.LockStore{"mock": mockLockStore}, nil) + req := &runtimev1pb.UnlockRequest{ + StoreName: "mock", + ResourceId: "resource", + LockOwner: "owner", + } + resp, err := api.Unlock(context.Background(), req) + assert.Nil(t, err) + assert.Equal(t, runtimev1pb.UnlockResponse_SUCCESS, resp.Status) + }) +} + +func TestGetNextId(t *testing.T) { + t.Run("sequencers not configured", func(t *testing.T) { + api := NewAPI("", nil, nil, nil, nil, nil, nil, nil) + req := &runtimev1pb.GetNextIdRequest{ + StoreName: "abc", + } + _, err := api.GetNextId(context.Background(), req) + assert.Equal(t, "rpc error: code = FailedPrecondition desc = Sequencer store is not configured", err.Error()) + }) + + t.Run("seq key empty", func(t *testing.T) { + mockSequencerStore := mock_sequencer.NewMockStore(gomock.NewController(t)) + api := NewAPI("", nil, nil, nil, nil, nil, nil, map[string]sequencer.Store{"mock": mockSequencerStore}) + req := &runtimev1pb.GetNextIdRequest{ + StoreName: "abc", + } + _, err := api.GetNextId(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = Key is empty in sequencer store abc", err.Error()) + }) + + t.Run("sequencer store not found", func(t *testing.T) { + mockSequencerStore := mock_sequencer.NewMockStore(gomock.NewController(t)) + api := NewAPI("", nil, nil, nil, nil, nil, nil, map[string]sequencer.Store{"mock": mockSequencerStore}) + req := &runtimev1pb.GetNextIdRequest{ + StoreName: "abc", + Key: "next key", + } + _, err := api.GetNextId(context.Background(), req) + assert.Equal(t, "rpc error: code = InvalidArgument desc = Sequencer store abc not found", err.Error()) + }) + + t.Run("auto increment is strong", func(t *testing.T) { + mockSequencerStore := mock_sequencer.NewMockStore(gomock.NewController(t)) + mockSequencerStore.EXPECT().GetNextId(gomock.Any()). + DoAndReturn(func(req *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) { + assert.Equal(t, "sequencer|||next key", req.Key) + assert.Equal(t, sequencer.STRONG, req.Options.AutoIncrement) + return &sequencer.GetNextIdResponse{ + NextId: 10, + }, nil + }) + api := NewAPI("", nil, nil, nil, nil, nil, nil, map[string]sequencer.Store{"mock": mockSequencerStore}) + req := &runtimev1pb.GetNextIdRequest{ + StoreName: "mock", + Key: "next key", + Options: &runtimev1pb.SequencerOptions{ + Increment: runtimev1pb.SequencerOptions_STRONG, + }, + } + rsp, err := api.GetNextId(context.Background(), req) + assert.Nil(t, err) + assert.Equal(t, int64(10), rsp.NextId) + }) + + t.Run("net error", func(t *testing.T) { + mockSequencerStore := mock_sequencer.NewMockStore(gomock.NewController(t)) + mockSequencerStore.EXPECT().GetNextId(gomock.Any()).Return(nil, fmt.Errorf("net error")) + api := NewAPI("", nil, nil, nil, nil, nil, nil, map[string]sequencer.Store{"mock": mockSequencerStore}) + req := &runtimev1pb.GetNextIdRequest{ + StoreName: "mock", + Key: "next key", + Options: &runtimev1pb.SequencerOptions{ + Increment: runtimev1pb.SequencerOptions_STRONG, + }, + } + _, err := api.GetNextId(context.Background(), req) + assert.NotNil(t, err) + assert.Equal(t, "net error", err.Error()) + }) +} diff --git a/pkg/mock/components/invoker/invoker.go b/pkg/mock/components/invoker/invoker.go new file mode 100644 index 0000000000..2f4f7492fc --- /dev/null +++ b/pkg/mock/components/invoker/invoker.go @@ -0,0 +1,180 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: components/rpc/types.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + rpc "mosn.io/layotto/components/rpc" +) + +// MockInvoker is a mock of Invoker interface. +type MockInvoker struct { + ctrl *gomock.Controller + recorder *MockInvokerMockRecorder +} + +// MockInvokerMockRecorder is the mock recorder for MockInvoker. +type MockInvokerMockRecorder struct { + mock *MockInvoker +} + +// NewMockInvoker creates a new mock instance. +func NewMockInvoker(ctrl *gomock.Controller) *MockInvoker { + mock := &MockInvoker{ctrl: ctrl} + mock.recorder = &MockInvokerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInvoker) EXPECT() *MockInvokerMockRecorder { + return m.recorder +} + +// Init mocks base method. +func (m *MockInvoker) Init(config rpc.RpcConfig) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", config) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockInvokerMockRecorder) Init(config interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockInvoker)(nil).Init), config) +} + +// Invoke mocks base method. +func (m *MockInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (*rpc.RPCResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Invoke", ctx, req) + ret0, _ := ret[0].(*rpc.RPCResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Invoke indicates an expected call of Invoke. +func (mr *MockInvokerMockRecorder) Invoke(ctx, req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Invoke", reflect.TypeOf((*MockInvoker)(nil).Invoke), ctx, req) +} + +// MockCallback is a mock of Callback interface. +type MockCallback struct { + ctrl *gomock.Controller + recorder *MockCallbackMockRecorder +} + +// MockCallbackMockRecorder is the mock recorder for MockCallback. +type MockCallbackMockRecorder struct { + mock *MockCallback +} + +// NewMockCallback creates a new mock instance. +func NewMockCallback(ctrl *gomock.Controller) *MockCallback { + mock := &MockCallback{ctrl: ctrl} + mock.recorder = &MockCallbackMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCallback) EXPECT() *MockCallbackMockRecorder { + return m.recorder +} + +// AddAfterInvoke mocks base method. +func (m *MockCallback) AddAfterInvoke(arg0 rpc.CallbackFunc) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddAfterInvoke", arg0) +} + +// AddAfterInvoke indicates an expected call of AddAfterInvoke. +func (mr *MockCallbackMockRecorder) AddAfterInvoke(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAfterInvoke", reflect.TypeOf((*MockCallback)(nil).AddAfterInvoke), arg0) +} + +// AddBeforeInvoke mocks base method. +func (m *MockCallback) AddBeforeInvoke(arg0 rpc.CallbackFunc) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddBeforeInvoke", arg0) +} + +// AddBeforeInvoke indicates an expected call of AddBeforeInvoke. +func (mr *MockCallbackMockRecorder) AddBeforeInvoke(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBeforeInvoke", reflect.TypeOf((*MockCallback)(nil).AddBeforeInvoke), arg0) +} + +// AfterInvoke mocks base method. +func (m *MockCallback) AfterInvoke(arg0 *rpc.RPCResponse) (*rpc.RPCResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AfterInvoke", arg0) + ret0, _ := ret[0].(*rpc.RPCResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AfterInvoke indicates an expected call of AfterInvoke. +func (mr *MockCallbackMockRecorder) AfterInvoke(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AfterInvoke", reflect.TypeOf((*MockCallback)(nil).AfterInvoke), arg0) +} + +// BeforeInvoke mocks base method. +func (m *MockCallback) BeforeInvoke(arg0 *rpc.RPCRequest) (*rpc.RPCRequest, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BeforeInvoke", arg0) + ret0, _ := ret[0].(*rpc.RPCRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BeforeInvoke indicates an expected call of BeforeInvoke. +func (mr *MockCallbackMockRecorder) BeforeInvoke(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BeforeInvoke", reflect.TypeOf((*MockCallback)(nil).BeforeInvoke), arg0) +} + +// MockChannel is a mock of Channel interface. +type MockChannel struct { + ctrl *gomock.Controller + recorder *MockChannelMockRecorder +} + +// MockChannelMockRecorder is the mock recorder for MockChannel. +type MockChannelMockRecorder struct { + mock *MockChannel +} + +// NewMockChannel creates a new mock instance. +func NewMockChannel(ctrl *gomock.Controller) *MockChannel { + mock := &MockChannel{ctrl: ctrl} + mock.recorder = &MockChannelMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChannel) EXPECT() *MockChannelMockRecorder { + return m.recorder +} + +// Do mocks base method. +func (m *MockChannel) Do(arg0 *rpc.RPCRequest) (*rpc.RPCResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Do", arg0) + ret0, _ := ret[0].(*rpc.RPCResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Do indicates an expected call of Do. +func (mr *MockChannelMockRecorder) Do(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Do", reflect.TypeOf((*MockChannel)(nil).Do), arg0) +} diff --git a/pkg/mock/components/lock/lock.go b/pkg/mock/components/lock/lock.go new file mode 100644 index 0000000000..345bf3d076 --- /dev/null +++ b/pkg/mock/components/lock/lock.go @@ -0,0 +1,93 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: components/lock/lock_store.go + +// Package mock_lock is a generated GoMock package. +package mock_lock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + lock "mosn.io/layotto/components/lock" +) + +// MockLockStore is a mock of LockStore interface. +type MockLockStore struct { + ctrl *gomock.Controller + recorder *MockLockStoreMockRecorder +} + +// MockLockStoreMockRecorder is the mock recorder for MockLockStore. +type MockLockStoreMockRecorder struct { + mock *MockLockStore +} + +// NewMockLockStore creates a new mock instance. +func NewMockLockStore(ctrl *gomock.Controller) *MockLockStore { + mock := &MockLockStore{ctrl: ctrl} + mock.recorder = &MockLockStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLockStore) EXPECT() *MockLockStoreMockRecorder { + return m.recorder +} + +// Features mocks base method. +func (m *MockLockStore) Features() []lock.Feature { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Features") + ret0, _ := ret[0].([]lock.Feature) + return ret0 +} + +// Features indicates an expected call of Features. +func (mr *MockLockStoreMockRecorder) Features() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Features", reflect.TypeOf((*MockLockStore)(nil).Features)) +} + +// Init mocks base method. +func (m *MockLockStore) Init(metadata lock.Metadata) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", metadata) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockLockStoreMockRecorder) Init(metadata interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockLockStore)(nil).Init), metadata) +} + +// TryLock mocks base method. +func (m *MockLockStore) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryLock", req) + ret0, _ := ret[0].(*lock.TryLockResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TryLock indicates an expected call of TryLock. +func (mr *MockLockStoreMockRecorder) TryLock(req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryLock", reflect.TypeOf((*MockLockStore)(nil).TryLock), req) +} + +// Unlock mocks base method. +func (m *MockLockStore) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Unlock", req) + ret0, _ := ret[0].(*lock.UnlockResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Unlock indicates an expected call of Unlock. +func (mr *MockLockStoreMockRecorder) Unlock(req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockLockStore)(nil).Unlock), req) +} diff --git a/pkg/mock/components/pubsub/pubsub.go b/pkg/mock/components/pubsub/pubsub.go new file mode 100644 index 0000000000..67c47d121a --- /dev/null +++ b/pkg/mock/components/pubsub/pubsub.go @@ -0,0 +1,105 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/dapr/components-contrib/pubsub (interfaces: PubSub) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + pubsub "github.com/dapr/components-contrib/pubsub" + gomock "github.com/golang/mock/gomock" +) + +// MockPubSub is a mock of PubSub interface. +type MockPubSub struct { + ctrl *gomock.Controller + recorder *MockPubSubMockRecorder +} + +// MockPubSubMockRecorder is the mock recorder for MockPubSub. +type MockPubSubMockRecorder struct { + mock *MockPubSub +} + +// NewMockPubSub creates a new mock instance. +func NewMockPubSub(ctrl *gomock.Controller) *MockPubSub { + mock := &MockPubSub{ctrl: ctrl} + mock.recorder = &MockPubSubMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPubSub) EXPECT() *MockPubSubMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockPubSub) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockPubSubMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPubSub)(nil).Close)) +} + +// Features mocks base method. +func (m *MockPubSub) Features() []pubsub.Feature { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Features") + ret0, _ := ret[0].([]pubsub.Feature) + return ret0 +} + +// Features indicates an expected call of Features. +func (mr *MockPubSubMockRecorder) Features() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Features", reflect.TypeOf((*MockPubSub)(nil).Features)) +} + +// Init mocks base method. +func (m *MockPubSub) Init(arg0 pubsub.Metadata) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockPubSubMockRecorder) Init(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockPubSub)(nil).Init), arg0) +} + +// Publish mocks base method. +func (m *MockPubSub) Publish(arg0 *pubsub.PublishRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Publish", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Publish indicates an expected call of Publish. +func (mr *MockPubSubMockRecorder) Publish(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockPubSub)(nil).Publish), arg0) +} + +// Subscribe mocks base method. +func (m *MockPubSub) Subscribe(arg0 pubsub.SubscribeRequest, arg1 pubsub.Handler) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Subscribe", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockPubSubMockRecorder) Subscribe(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockPubSub)(nil).Subscribe), arg0, arg1) +} diff --git a/pkg/mock/components/sequencer/sequencer.go b/pkg/mock/components/sequencer/sequencer.go new file mode 100644 index 0000000000..1eaf253849 --- /dev/null +++ b/pkg/mock/components/sequencer/sequencer.go @@ -0,0 +1,80 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: components/sequencer/store.go + +// Package mock_sequencer is a generated GoMock package. +package mock_sequencer + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + sequencer "mosn.io/layotto/components/sequencer" +) + +// MockStore is a mock of Store interface. +type MockStore struct { + ctrl *gomock.Controller + recorder *MockStoreMockRecorder +} + +// MockStoreMockRecorder is the mock recorder for MockStore. +type MockStoreMockRecorder struct { + mock *MockStore +} + +// NewMockStore creates a new mock instance. +func NewMockStore(ctrl *gomock.Controller) *MockStore { + mock := &MockStore{ctrl: ctrl} + mock.recorder = &MockStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStore) EXPECT() *MockStoreMockRecorder { + return m.recorder +} + +// GetNextId mocks base method. +func (m *MockStore) GetNextId(arg0 *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNextId", arg0) + ret0, _ := ret[0].(*sequencer.GetNextIdResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetNextId indicates an expected call of GetNextId. +func (mr *MockStoreMockRecorder) GetNextId(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNextId", reflect.TypeOf((*MockStore)(nil).GetNextId), arg0) +} + +// GetSegment mocks base method. +func (m *MockStore) GetSegment(arg0 *sequencer.GetSegmentRequest) (bool, *sequencer.GetSegmentResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSegment", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(*sequencer.GetSegmentResponse) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetSegment indicates an expected call of GetSegment. +func (mr *MockStoreMockRecorder) GetSegment(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSegment", reflect.TypeOf((*MockStore)(nil).GetSegment), arg0) +} + +// Init mocks base method. +func (m *MockStore) Init(config sequencer.Configuration) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", config) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockStoreMockRecorder) Init(config interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockStore)(nil).Init), config) +} diff --git a/pkg/mock/components/state/state.go b/pkg/mock/components/state/state.go new file mode 100644 index 0000000000..6e1e8ace60 --- /dev/null +++ b/pkg/mock/components/state/state.go @@ -0,0 +1,201 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/dapr/components-contrib/state (interfaces: Store,TransactionalStore) + +// Package mock_state is a generated GoMock package. +package mock_state + +import ( + reflect "reflect" + + state "github.com/dapr/components-contrib/state" + gomock "github.com/golang/mock/gomock" +) + +// MockStore is a mock of Store interface. +type MockStore struct { + ctrl *gomock.Controller + recorder *MockStoreMockRecorder +} + +// MockStoreMockRecorder is the mock recorder for MockStore. +type MockStoreMockRecorder struct { + mock *MockStore +} + +// NewMockStore creates a new mock instance. +func NewMockStore(ctrl *gomock.Controller) *MockStore { + mock := &MockStore{ctrl: ctrl} + mock.recorder = &MockStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStore) EXPECT() *MockStoreMockRecorder { + return m.recorder +} + +// BulkDelete mocks base method. +func (m *MockStore) BulkDelete(arg0 []state.DeleteRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BulkDelete", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// BulkDelete indicates an expected call of BulkDelete. +func (mr *MockStoreMockRecorder) BulkDelete(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkDelete", reflect.TypeOf((*MockStore)(nil).BulkDelete), arg0) +} + +// BulkGet mocks base method. +func (m *MockStore) BulkGet(arg0 []state.GetRequest) (bool, []state.BulkGetResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BulkGet", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].([]state.BulkGetResponse) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// BulkGet indicates an expected call of BulkGet. +func (mr *MockStoreMockRecorder) BulkGet(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkGet", reflect.TypeOf((*MockStore)(nil).BulkGet), arg0) +} + +// BulkSet mocks base method. +func (m *MockStore) BulkSet(arg0 []state.SetRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BulkSet", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// BulkSet indicates an expected call of BulkSet. +func (mr *MockStoreMockRecorder) BulkSet(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkSet", reflect.TypeOf((*MockStore)(nil).BulkSet), arg0) +} + +// Delete mocks base method. +func (m *MockStore) Delete(arg0 *state.DeleteRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete. +func (mr *MockStoreMockRecorder) Delete(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), arg0) +} + +// Features mocks base method. +func (m *MockStore) Features() []state.Feature { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Features") + ret0, _ := ret[0].([]state.Feature) + return ret0 +} + +// Features indicates an expected call of Features. +func (mr *MockStoreMockRecorder) Features() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Features", reflect.TypeOf((*MockStore)(nil).Features)) +} + +// Get mocks base method. +func (m *MockStore) Get(arg0 *state.GetRequest) (*state.GetResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0) + ret0, _ := ret[0].(*state.GetResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockStoreMockRecorder) Get(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), arg0) +} + +// Init mocks base method. +func (m *MockStore) Init(arg0 state.Metadata) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockStoreMockRecorder) Init(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockStore)(nil).Init), arg0) +} + +// Set mocks base method. +func (m *MockStore) Set(arg0 *state.SetRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Set", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Set indicates an expected call of Set. +func (mr *MockStoreMockRecorder) Set(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockStore)(nil).Set), arg0) +} + +// MockTransactionalStore is a mock of TransactionalStore interface. +type MockTransactionalStore struct { + ctrl *gomock.Controller + recorder *MockTransactionalStoreMockRecorder +} + +// MockTransactionalStoreMockRecorder is the mock recorder for MockTransactionalStore. +type MockTransactionalStoreMockRecorder struct { + mock *MockTransactionalStore +} + +// NewMockTransactionalStore creates a new mock instance. +func NewMockTransactionalStore(ctrl *gomock.Controller) *MockTransactionalStore { + mock := &MockTransactionalStore{ctrl: ctrl} + mock.recorder = &MockTransactionalStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTransactionalStore) EXPECT() *MockTransactionalStoreMockRecorder { + return m.recorder +} + +// Init mocks base method. +func (m *MockTransactionalStore) Init(arg0 state.Metadata) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockTransactionalStoreMockRecorder) Init(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockTransactionalStore)(nil).Init), arg0) +} + +// Multi mocks base method. +func (m *MockTransactionalStore) Multi(arg0 *state.TransactionalStateRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Multi", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Multi indicates an expected call of Multi. +func (mr *MockTransactionalStoreMockRecorder) Multi(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Multi", reflect.TypeOf((*MockTransactionalStore)(nil).Multi), arg0) +} diff --git a/pkg/mock/runtime/appcallback/appcallback.go b/pkg/mock/runtime/appcallback/appcallback.go new file mode 100644 index 0000000000..0db0c5874a --- /dev/null +++ b/pkg/mock/runtime/appcallback/appcallback.go @@ -0,0 +1,131 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: spec/proto/runtime/v1/appcallback.pb.go + +// Package mock_appcallback is a generated GoMock package. +package mock_appcallback + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + empty "github.com/golang/protobuf/ptypes/empty" + grpc "google.golang.org/grpc" + runtime "mosn.io/layotto/spec/proto/runtime/v1" +) + +// MockAppCallbackClient is a mock of AppCallbackClient interface. +type MockAppCallbackClient struct { + ctrl *gomock.Controller + recorder *MockAppCallbackClientMockRecorder +} + +// MockAppCallbackClientMockRecorder is the mock recorder for MockAppCallbackClient. +type MockAppCallbackClientMockRecorder struct { + mock *MockAppCallbackClient +} + +// NewMockAppCallbackClient creates a new mock instance. +func NewMockAppCallbackClient(ctrl *gomock.Controller) *MockAppCallbackClient { + mock := &MockAppCallbackClient{ctrl: ctrl} + mock.recorder = &MockAppCallbackClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockAppCallbackClient) EXPECT() *MockAppCallbackClientMockRecorder { + return m.recorder +} + +// ListTopicSubscriptions mocks base method. +func (m *MockAppCallbackClient) ListTopicSubscriptions(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*runtime.ListTopicSubscriptionsResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListTopicSubscriptions", varargs...) + ret0, _ := ret[0].(*runtime.ListTopicSubscriptionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListTopicSubscriptions indicates an expected call of ListTopicSubscriptions. +func (mr *MockAppCallbackClientMockRecorder) ListTopicSubscriptions(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTopicSubscriptions", reflect.TypeOf((*MockAppCallbackClient)(nil).ListTopicSubscriptions), varargs...) +} + +// OnTopicEvent mocks base method. +func (m *MockAppCallbackClient) OnTopicEvent(ctx context.Context, in *runtime.TopicEventRequest, opts ...grpc.CallOption) (*runtime.TopicEventResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "OnTopicEvent", varargs...) + ret0, _ := ret[0].(*runtime.TopicEventResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OnTopicEvent indicates an expected call of OnTopicEvent. +func (mr *MockAppCallbackClientMockRecorder) OnTopicEvent(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnTopicEvent", reflect.TypeOf((*MockAppCallbackClient)(nil).OnTopicEvent), varargs...) +} + +// MockAppCallbackServer is a mock of AppCallbackServer interface. +type MockAppCallbackServer struct { + ctrl *gomock.Controller + recorder *MockAppCallbackServerMockRecorder +} + +// MockAppCallbackServerMockRecorder is the mock recorder for MockAppCallbackServer. +type MockAppCallbackServerMockRecorder struct { + mock *MockAppCallbackServer +} + +// NewMockAppCallbackServer creates a new mock instance. +func NewMockAppCallbackServer(ctrl *gomock.Controller) *MockAppCallbackServer { + mock := &MockAppCallbackServer{ctrl: ctrl} + mock.recorder = &MockAppCallbackServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockAppCallbackServer) EXPECT() *MockAppCallbackServerMockRecorder { + return m.recorder +} + +// ListTopicSubscriptions mocks base method. +func (m *MockAppCallbackServer) ListTopicSubscriptions(arg0 context.Context, arg1 *empty.Empty) (*runtime.ListTopicSubscriptionsResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListTopicSubscriptions", arg0, arg1) + ret0, _ := ret[0].(*runtime.ListTopicSubscriptionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListTopicSubscriptions indicates an expected call of ListTopicSubscriptions. +func (mr *MockAppCallbackServerMockRecorder) ListTopicSubscriptions(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTopicSubscriptions", reflect.TypeOf((*MockAppCallbackServer)(nil).ListTopicSubscriptions), arg0, arg1) +} + +// OnTopicEvent mocks base method. +func (m *MockAppCallbackServer) OnTopicEvent(arg0 context.Context, arg1 *runtime.TopicEventRequest) (*runtime.TopicEventResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OnTopicEvent", arg0, arg1) + ret0, _ := ret[0].(*runtime.TopicEventResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OnTopicEvent indicates an expected call of OnTopicEvent. +func (mr *MockAppCallbackServerMockRecorder) OnTopicEvent(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnTopicEvent", reflect.TypeOf((*MockAppCallbackServer)(nil).OnTopicEvent), arg0, arg1) +} diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go new file mode 100644 index 0000000000..fdc279da00 --- /dev/null +++ b/pkg/runtime/runtime_test.go @@ -0,0 +1,364 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package runtime + +import ( + "context" + "encoding/json" + "fmt" + "net" + "testing" + + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/components-contrib/state" + "github.com/golang/mock/gomock" + jsoniter "github.com/json-iterator/go" + "github.com/stretchr/testify/assert" + rawGRPC "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" + + "mosn.io/pkg/log" + + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/hello" + "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/rpc" + "mosn.io/layotto/components/sequencer" + "mosn.io/layotto/pkg/mock" + mock_invoker "mosn.io/layotto/pkg/mock/components/invoker" + mock_lock "mosn.io/layotto/pkg/mock/components/lock" + mock_pubsub "mosn.io/layotto/pkg/mock/components/pubsub" + mock_sequencer "mosn.io/layotto/pkg/mock/components/sequencer" + mock_state "mosn.io/layotto/pkg/mock/components/state" + mock_appcallback "mosn.io/layotto/pkg/mock/runtime/appcallback" + mlock "mosn.io/layotto/pkg/runtime/lock" + mpubsub "mosn.io/layotto/pkg/runtime/pubsub" + msequencer "mosn.io/layotto/pkg/runtime/sequencer" + mstate "mosn.io/layotto/pkg/runtime/state" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" +) + +func TestNewMosnRuntime(t *testing.T) { + runtimeConfig := &MosnRuntimeConfig{} + rt := NewMosnRuntime(runtimeConfig) + assert.NotNil(t, rt) +} + +func TestMosnRuntime_GetInfo(t *testing.T) { + runtimeConfig := &MosnRuntimeConfig{} + rt := NewMosnRuntime(runtimeConfig) + runtimeInfo := rt.GetInfo() + assert.NotNil(t, runtimeInfo) +} + +func TestMosnRuntime_Run(t *testing.T) { + t.Run("run succ", func(t *testing.T) { + runtimeConfig := &MosnRuntimeConfig{} + rt := NewMosnRuntime(runtimeConfig) + server, err := rt.Run() + assert.Nil(t, err) + assert.NotNil(t, server) + }) + + t.Run("no runtime config", func(t *testing.T) { + rt := NewMosnRuntime(nil) + _, err := rt.Run() + assert.NotNil(t, err) + assert.Equal(t, "[runtime] init error:no runtimeConfig", err.Error()) + }) +} + +func TestMosnRuntime_initAppCallbackConnection(t *testing.T) { + t.Run("init success", func(t *testing.T) { + // prepare app callback grpc server + port := 8888 + listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", port)) + assert.Nil(t, err) + defer listener.Close() + svr := rawGRPC.NewServer() + go func() { + svr.Serve(listener) + }() + cfg := &MosnRuntimeConfig{ + AppManagement: AppConfig{ + AppId: "", + GrpcCallbackPort: port, + }, + } + // construct MosnRuntime + m := NewMosnRuntime(cfg) + // test initAppCallbackConnection + err = m.initAppCallbackConnection() + // assert + assert.Nil(t, err) + }) +} + +func TestMosnRuntime_initPubSubs(t *testing.T) { + t.Run("normal", func(t *testing.T) { + // mock callback response + subResp := &runtimev1pb.ListTopicSubscriptionsResponse{ + Subscriptions: []*runtimev1pb.TopicSubscription{ + { + PubsubName: "mock", + Topic: "layotto", + Metadata: nil, + }, + }, + } + // init grpc server + mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) + mockAppCallbackServer.EXPECT().ListTopicSubscriptions(gomock.Any(), gomock.Any()).Return(subResp, nil) + + lis := bufconn.Listen(1024 * 1024) + s := rawGRPC.NewServer() + runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) + go func() { + s.Serve(lis) + }() + + // init callback client + callbackClient, err := rawGRPC.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return lis.Dial() + })) + assert.Nil(t, err) + + // mock pubsub component + mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) + mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Return(nil) + f := func() pubsub.PubSub { + return mockPubSub + } + + cfg := &MosnRuntimeConfig{ + PubSubManagement: map[string]mpubsub.Config{ + "mock": { + Metadata: map[string]string{ + "target": "layotto", + }, + }, + }, + } + // construct MosnRuntime + m := NewMosnRuntime(cfg) + m.AppCallbackConn = callbackClient + m.errInt = func(err error, format string, args ...interface{}) { + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + } + // test initPubSubs + err = m.initPubSubs(mpubsub.NewFactory("mock", f)) + // assert result + assert.Nil(t, err) + }) +} + +func TestMosnRuntime_initStates(t *testing.T) { + t.Run("init success", func(t *testing.T) { + // prepare mock + mockStateStore := mock_state.NewMockStore(gomock.NewController(t)) + mockStateStore.EXPECT().Init(gomock.Any()).Return(nil) + f := func() state.Store { + return mockStateStore + } + + cfg := &MosnRuntimeConfig{ + StateManagement: map[string]mstate.Config{ + "mock": { + Metadata: map[string]string{ + "target": "layotto", + }, + }, + }, + } + // construct MosnRuntime + m := NewMosnRuntime(cfg) + m.errInt = func(err error, format string, args ...interface{}) { + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + } + // test initStates + err := m.initStates(mstate.NewFactory("mock", f)) + // assert result + assert.Nil(t, err) + }) +} + +func TestMosnRuntime_initRpc(t *testing.T) { + t.Run("init success", func(t *testing.T) { + // prepare mock + mockInvoker := mock_invoker.NewMockInvoker(gomock.NewController(t)) + mockInvoker.EXPECT().Init(gomock.Any()).Return(nil) + f := func() rpc.Invoker { + return mockInvoker + } + + cfg := &MosnRuntimeConfig{ + RpcManagement: map[string]rpc.RpcConfig{ + "mock": {}, + }, + } + // construct MosnRuntime + m := NewMosnRuntime(cfg) + m.errInt = func(err error, format string, args ...interface{}) { + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + } + // test initRpcs method + err := m.initRpcs(rpc.NewRpcFactory("mock", f)) + // assert + assert.Nil(t, err) + }) +} + +func TestMosnRuntime_initConfigStores(t *testing.T) { + t.Run("init success", func(t *testing.T) { + mockStore := mock.NewMockStore(gomock.NewController(t)) + mockStore.EXPECT().Init(gomock.Any()).Return(nil) + f := func() configstores.Store { + return mockStore + } + + cfg := &MosnRuntimeConfig{ + ConfigStoreManagement: map[string]configstores.StoreConfig{ + "mock": {}, + }, + } + m := NewMosnRuntime(cfg) + m.errInt = func(err error, format string, args ...interface{}) { + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + } + err := m.initConfigStores(configstores.NewStoreFactory("mock", f)) + assert.Nil(t, err) + }) +} + +func TestMosnRuntime_initHellos(t *testing.T) { + t.Run("init success", func(t *testing.T) { + mockHello := mock.NewMockHelloService(gomock.NewController(t)) + mockHello.EXPECT().Init(gomock.Any()).Return(nil) + f := func() hello.HelloService { + return mockHello + } + + cfg := &MosnRuntimeConfig{ + HelloServiceManagement: map[string]hello.HelloConfig{ + "mock": {}, + }, + } + m := NewMosnRuntime(cfg) + m.errInt = func(err error, format string, args ...interface{}) { + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + } + err := m.initHellos(hello.NewHelloFactory("mock", f)) + assert.Nil(t, err) + }) +} + +func TestMosnRuntime_initSequencers(t *testing.T) { + t.Run("init success", func(t *testing.T) { + mockStore := mock_sequencer.NewMockStore(gomock.NewController(t)) + mockStore.EXPECT().Init(gomock.Any()).Return(nil) + f := func() sequencer.Store { + return mockStore + } + + cfg := &MosnRuntimeConfig{ + SequencerManagement: map[string]sequencer.Config{ + "mock": {}, + }, + } + m := NewMosnRuntime(cfg) + m.errInt = func(err error, format string, args ...interface{}) { + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + } + err := m.initSequencers(msequencer.NewFactory("mock", f)) + assert.Nil(t, err) + }) +} + +func TestMosnRuntime_initLocks(t *testing.T) { + t.Run("init success", func(t *testing.T) { + mockLockStore := mock_lock.NewMockLockStore(gomock.NewController(t)) + mockLockStore.EXPECT().Init(gomock.Any()).Return(nil) + f := func() lock.LockStore { + return mockLockStore + } + + cfg := &MosnRuntimeConfig{ + LockManagement: map[string]lock.Config{ + "mock": {}, + }, + } + m := NewMosnRuntime(cfg) + m.errInt = func(err error, format string, args ...interface{}) { + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + } + err := m.initLocks(mlock.NewFactory("mock", f)) + assert.Nil(t, err) + }) +} + +func TestMosnRuntime_publishMessageGRPC(t *testing.T) { + t.Run("publish success", func(t *testing.T) { + subResp := &runtimev1pb.TopicEventResponse{ + Status: runtimev1pb.TopicEventResponse_SUCCESS, + } + // init grpc server + mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) + mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(subResp, nil) + + lis := bufconn.Listen(1024 * 1024) + s := rawGRPC.NewServer() + runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) + go func() { + s.Serve(lis) + }() + + // init callback client + callbackClient, err := rawGRPC.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return lis.Dial() + })) + assert.Nil(t, err) + + cloudEvent := map[string]interface{}{ + pubsub.IDField: "id", + pubsub.SourceField: "source", + pubsub.DataContentTypeField: "content-type", + pubsub.TypeField: "type", + pubsub.SpecVersionField: "v1.0.0", + pubsub.DataBase64Field: "bGF5b3R0bw==", + } + + data, err := json.Marshal(cloudEvent) + assert.Nil(t, err) + + msg := &pubsub.NewMessage{ + Data: data, + Topic: "layotto", + Metadata: make(map[string]string), + } + + cfg := &MosnRuntimeConfig{} + m := NewMosnRuntime(cfg) + m.errInt = func(err error, format string, args ...interface{}) { + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + } + m.AppCallbackConn = callbackClient + m.json = jsoniter.ConfigFastest + err = m.publishMessageGRPC(context.Background(), msg) + assert.Nil(t, err) + }) +}