Skip to content

Commit

Permalink
merge (#175)
Browse files Browse the repository at this point in the history
* add request time for rpc

* change totle time

* update

* update

* update

* grpc design docs (#163)

* add trace log

* change log path

* update

* sequencer redis (#152)

* update

* update

* [fix] Sequencer redis (#164)

fix : for GetSegment method, redis component only check support when size=0.

* update

* update

* update

* update

* add unittest for runtime and grpc (#169)

* update

* remove trace time prints

* update

Co-authored-by: 文徐 <wangwenxue.wwx@alibaba-inc.com>
Co-authored-by: 永鹏 <neji_bupt@163.com>
Co-authored-by: ZLBer <1098294815@qq.com>
Co-authored-by: tianjipeng <tianjipeng@outlook.com>
  • Loading branch information
5 people authored and 铭渊 committed Aug 12, 2021
1 parent f426342 commit e15a60f
Show file tree
Hide file tree
Showing 26 changed files with 2,558 additions and 28 deletions.
3 changes: 1 addition & 2 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ import (
lock_zookeeper "mosn.io/layotto/components/lock/zookeeper"
runtime_lock "mosn.io/layotto/pkg/runtime/lock"

// Sequencer

// Actuator
_ "mosn.io/layotto/pkg/actuator"
"mosn.io/layotto/pkg/actuator/health"
Expand Down Expand Up @@ -250,6 +248,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
}),
),
)

// 4. check if unhealthy
if err != nil {
actuator.GetRuntimeReadinessIndicator().SetUnhealthy(err.Error())
Expand Down
3 changes: 1 addition & 2 deletions components/rpc/invoker/mosn/channel/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,14 @@ func (p *connPool) Put(c *wrapConn, close bool) {

func (p *connPool) readloop(c *wrapConn) {
var err error

defer func() {
c.close()
if p.cleanupFunc != nil {
p.cleanupFunc(c, err)
}
}()

c.buf = buffer.NewIoBuffer(16 * 1024)
c.buf = buffer.NewIoBuffer(128)
for {
n, readErr := c.buf.ReadOnce(c)
if readErr != nil {
Expand Down
9 changes: 4 additions & 5 deletions components/rpc/invoker/mosn/channel/xchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) {
m.pool = newConnPool(
config.Size,
func() (net.Conn, error) {
if _, _, err := net.SplitHostPort(config.Listener); err == nil {
return net.DialTimeout("tcp", config.Listener, time.Second)
}

local, remote := net.Pipe()
localTcpConn := &fakeTcpConn{c: local}
remoteTcpConn := &fakeTcpConn{c: remote}
Expand Down Expand Up @@ -93,9 +97,7 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
if err != nil {
return nil, err
}

xstate := conn.state.(*xstate)

// encode request
frame := m.proto.ToFrame(req)
id := atomic.AddUint32(&xstate.reqid, 1)
Expand All @@ -105,7 +107,6 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
m.pool.Put(conn, false)
return nil, common.Error(common.InternalCode, encErr.Error())
}

callChan := make(chan call, 1)
// set timeout
deadline, _ := ctx.Deadline()
Expand All @@ -117,15 +118,13 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
xstate.mu.Lock()
xstate.calls[id] = callChan
xstate.mu.Unlock()

// write packet
if _, err := conn.Write(buf.Bytes()); err != nil {
m.removeCall(xstate, id)
m.pool.Put(conn, true)
return nil, common.Error(common.UnavailebleCode, err.Error())
}
m.pool.Put(conn, false)

select {
case res := <-callChan:
if res.err != nil {
Expand Down
22 changes: 21 additions & 1 deletion components/rpc/invoker/mosn/mosninvoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"encoding/json"
"errors"
"fmt"
"os/user"
"strconv"
"time"

"mosn.io/layotto/components/rpc"
"mosn.io/layotto/components/rpc/callback"
Expand All @@ -33,6 +36,8 @@ const (
Name = "mosn"
)

var LayottoStatLogger *log.Logger

type mosnInvoker struct {
channel rpc.Channel
cb rpc.Callback
Expand Down Expand Up @@ -73,6 +78,12 @@ func (m *mosnInvoker) Init(conf rpc.RpcConfig) error {
return err
}
m.channel = channel
usr, err := user.Current()
logRoot := usr.HomeDir + "/logs/tracelog/mosn/"
LayottoStatLogger, err = log.GetOrCreateLogger(logRoot+"layotto-client-stat.log", &log.Roller{MaxTime: 24 * 60 * 60})
if err != nil {
return err
}
return nil
}

Expand All @@ -88,13 +99,13 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp
req.Timeout = 3000
}
req.Ctx = ctx
startTime := time.Now()
log.DefaultLogger.Debugf("[runtime][rpc]request %+v", req)
req, err = m.cb.BeforeInvoke(req)
if err != nil {
log.DefaultLogger.Errorf("[runtime][rpc]before filter error %s", err.Error())
return nil, err
}

resp, err = m.channel.Do(req)
if err != nil {
log.DefaultLogger.Errorf("[runtime][rpc]error %s", err.Error())
Expand All @@ -106,5 +117,14 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp
if err != nil {
log.DefaultLogger.Errorf("[runtime][rpc]after filter error %s", err.Error())
}
afterInvokeTime := time.Now()
rpcId := req.Header.Get("rpc_trace_context.sofaRpcId")
traceId := req.Header.Get("rpc_trace_context.sofaTraceId")

LayottoStatLogger.Printf("%+v,%v,%+v",
rpcId,
traceId,
strconv.FormatInt(afterInvokeTime.Sub(startTime).Nanoseconds()/1000, 10),
)
return resp, err
}
110 changes: 110 additions & 0 deletions components/sequencer/redis/standalone_redis_sequencer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package redis

import (
"context"
"github.com/go-redis/redis/v8"
"mosn.io/layotto/components/pkg/utils"
"mosn.io/layotto/components/sequencer"
"mosn.io/pkg/log"
)

type StandaloneRedisSequencer struct {
client *redis.Client
metadata utils.RedisMetadata
biggerThan map[string]int64

logger log.ErrorLogger

ctx context.Context
cancel context.CancelFunc
}

// NewStandaloneRedisSequencer returns a new redis sequencer
func NewStandaloneRedisSequencer(logger log.ErrorLogger) *StandaloneRedisSequencer {
s := &StandaloneRedisSequencer{
logger: logger,
}
return s
}

/*
1. exists and >= biggerThan, no operation required, return 0
2. not exists or < biggthan, reset val, return 1
3. lua script occur error, such as tonumer(string), return error
*/
const initScript = `
if redis.call('exists', KEYS[1])==1 and tonumber(redis.call('get', KEYS[1])) >= tonumber(ARGV[1]) then
return 0
else
redis.call('set', KEYS[1],ARGV[1])
return 1
end
`

func (s *StandaloneRedisSequencer) Init(config sequencer.Configuration) error {
m, err := utils.ParseRedisMetadata(config.Properties)
if err != nil {
return err
}
//init
s.metadata = m
s.biggerThan = config.BiggerThan

// construct client
s.client = utils.NewRedisClient(m)
s.ctx, s.cancel = context.WithCancel(context.Background())

//check biggerThan, initialize if not satisfied
for k, needV := range s.biggerThan {
if needV <= 0 {
continue
}

eval := s.client.Eval(s.ctx, initScript, []string{k}, needV)
err = eval.Err()
//occur error, such as value is string type
if err != nil {
return err
}
//As long as there is no error, the initialization is successful
//It may be a reset value or it may be satisfied before
}
return nil
}

func (s *StandaloneRedisSequencer) GetNextId(req *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) {

incr := s.client.Incr(s.ctx, req.Key)

err := incr.Err()
if err != nil {
return nil, err
}

return &sequencer.GetNextIdResponse{
NextId: incr.Val(),
}, nil
}

func (s *StandaloneRedisSequencer) GetSegment(req *sequencer.GetSegmentRequest) (bool, *sequencer.GetSegmentResponse, error) {

// size=0 only check support
if req.Size == 0 {
return true, nil, nil
}

by := s.client.IncrBy(s.ctx, req.Key, int64(req.Size))
err := by.Err()
if err != nil {
return true, nil, err
}

return true, &sequencer.GetSegmentResponse{
From: by.Val() - int64(req.Size) + 1,
To: by.Val(),
}, nil
}
func (s *StandaloneRedisSequencer) Close() error {
s.cancel()
return s.client.Close()
}
136 changes: 136 additions & 0 deletions components/sequencer/redis/standalone_redis_sequencer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package redis

import (
"fmt"
"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
"mosn.io/layotto/components/sequencer"
"mosn.io/pkg/log"
"testing"
)

const key = "resource_xxx"

func TestStandaloneRedisSequencer(t *testing.T) {
s, err := miniredis.Run()
assert.NoError(t, err)
defer s.Close()
// construct component
comp := NewStandaloneRedisSequencer(log.DefaultLogger)
cfg := sequencer.Configuration{
Properties: make(map[string]string),
}
cfg.Properties["redisHost"] = s.Addr()
cfg.Properties["redisPassword"] = ""
// init
err = comp.Init(cfg)
assert.NoError(t, err)
//first request
id, err := comp.GetNextId(&sequencer.GetNextIdRequest{
Key: key,
})
assert.NoError(t, err)
assert.Equal(t, int64(1), id.NextId)

//again
id, err = comp.GetNextId(&sequencer.GetNextIdRequest{
Key: key,
})
assert.NoError(t, err)
assert.Equal(t, int64(2), id.NextId)
}

func TestStandaloneRedisSequencer_biggerThan_success(t *testing.T) {
s, err := miniredis.Run()
assert.NoError(t, err)
defer s.Close()
// construct component
comp := NewStandaloneRedisSequencer(log.DefaultLogger)
cfg := sequencer.Configuration{
Properties: make(map[string]string),
}
defalutVal := int64(20)
//init kv
s.Set(key, fmt.Sprint(defalutVal))
cfg.Properties["redisHost"] = s.Addr()
cfg.Properties["redisPassword"] = ""
cfg.BiggerThan = map[string]int64{
key: defalutVal,
}

// init
err = comp.Init(cfg)
assert.NoError(t, err)
//first request
id, err := comp.GetNextId(&sequencer.GetNextIdRequest{
Key: key,
})
assert.NoError(t, err)
assert.Equal(t, defalutVal+1, id.NextId)

//again
id, err = comp.GetNextId(&sequencer.GetNextIdRequest{
Key: key,
})
assert.NoError(t, err)
assert.Equal(t, defalutVal+2, id.NextId)
}

func TestStandaloneRedisSequencer_biggerThan_fail_reset(t *testing.T) {
s, err := miniredis.Run()
assert.NoError(t, err)
defer s.Close()
// construct component
comp := NewStandaloneRedisSequencer(log.DefaultLogger)
cfg := sequencer.Configuration{
Properties: make(map[string]string),
}
defalutVal := int64(20)
//init kv
s.Set(key, fmt.Sprint(defalutVal))
// init config
cfg.Properties["redisHost"] = s.Addr()
cfg.Properties["redisPassword"] = ""
cfg.BiggerThan = map[string]int64{
key: defalutVal + 5,
}
err = comp.Init(cfg)
assert.NoError(t, err)

//first request
id, err := comp.GetNextId(&sequencer.GetNextIdRequest{
Key: key,
})
assert.NoError(t, err)
assert.Equal(t, defalutVal+5+1, id.NextId)

}

func TestStandaloneRedisSequencer_segment(t *testing.T) {
s, err := miniredis.Run()
assert.NoError(t, err)
defer s.Close()
// construct component
comp := NewStandaloneRedisSequencer(log.DefaultLogger)
cfg := sequencer.Configuration{
Properties: make(map[string]string),
}
defalutVal := int64(20)
//init kv
s.Set(key, fmt.Sprint(defalutVal))
cfg.Properties["redisHost"] = s.Addr()
cfg.Properties["redisPassword"] = ""

// init
err = comp.Init(cfg)
assert.NoError(t, err)
//first request
_, id, err := comp.GetSegment(&sequencer.GetSegmentRequest{
Key: key,
Size: 6,
})
assert.NoError(t, err)
assert.Equal(t, defalutVal+1, id.From)
assert.Equal(t, defalutVal+6, id.To)

}
3 changes: 2 additions & 1 deletion components/sequencer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type GetSegmentRequest struct {
}

type GetSegmentResponse struct {
Segment []int64
From int64
To int64
}

type Configuration struct {
Expand Down
Loading

0 comments on commit e15a60f

Please sign in to comment.