From ce0f101ad034a6d8f53dd5c4d0a07819b41b142b Mon Sep 17 00:00:00 2001 From: Kenshin <35095889+kqzh@users.noreply.github.com> Date: Tue, 18 Jan 2022 11:30:32 +0800 Subject: [PATCH] add meta api (#78) --- ccore/nebula/client_meta.go | 44 ++++++- ccore/nebula/errors/errors.go | 1 + ccore/nebula/internal/driver/v2_5/driver.go | 1 + ccore/nebula/internal/driver/v2_5/meta.go | 30 +++++ ccore/nebula/internal/driver/v2_5/wrapper.go | 17 +++ ccore/nebula/internal/driver/v2_6/driver.go | 1 + ccore/nebula/internal/driver/v2_6/meta.go | 30 +++++ ccore/nebula/internal/driver/v2_6/wrapper.go | 17 +++ ccore/nebula/internal/driver/v3_0/driver.go | 1 + ccore/nebula/internal/driver/v3_0/meta.go | 117 +++++++++++++++++++ ccore/nebula/internal/driver/v3_0/wrapper.go | 58 +++++++++ ccore/nebula/types/driver.go | 22 ++++ ccore/nebula/types/types.go | 21 ++++ 13 files changed, 359 insertions(+), 1 deletion(-) diff --git a/ccore/nebula/client_meta.go b/ccore/nebula/client_meta.go index ddf146c..eac03fe 100644 --- a/ccore/nebula/client_meta.go +++ b/ccore/nebula/client_meta.go @@ -1,10 +1,18 @@ package nebula -import "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" +import ( + "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" +) type ( MetaClient interface { Open() error + AddHosts(endpoints []string) error + DropHosts(endpoints []string) error + ListSpaces() (types.Spaces, error) + BalanceData(space string) (types.Balancer, error) + BalanceLeader(space string) (types.Balancer, error) + BalanceDataRemove(space string, endpoints []string) (types.Balancer, error) Close() error } @@ -31,6 +39,40 @@ func (c *defaultMetaClient) Close() error { return c.meta.close() } +func (c *defaultMetaClient) AddHosts(endpoints []string) error { + return c.meta.AddHosts(endpoints) +} + +func (c *defaultMetaClient) DropHosts(endpoints []string) error { + return c.meta.DropHosts(endpoints) +} + +func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) { + return c.meta.ListSpaces() +} + +func (c *defaultMetaClient) BalanceData(space string) (types.Balancer, error) { + return c.meta.Balance(types.BalanceReq{ + Cmd: types.BalanceData, + Space: space, + }) +} + +func (c *defaultMetaClient) BalanceLeader(space string) (types.Balancer, error) { + return c.meta.Balance(types.BalanceReq{ + Cmd: types.BalanceLeader, + Space: space, + }) +} + +func (c *defaultMetaClient) BalanceDataRemove(space string, endpoints []string) (types.Balancer, error) { + return c.meta.Balance(types.BalanceReq{ + Cmd: types.BalanceDataRemove, + Space: space, + HostsToRemove: endpoints, + }) +} + func (c *defaultMetaClient) defaultClient() *defaultClient { return (*defaultClient)(c) } diff --git a/ccore/nebula/errors/errors.go b/ccore/nebula/errors/errors.go index aaf64be..e0c5c45 100644 --- a/ccore/nebula/errors/errors.go +++ b/ccore/nebula/errors/errors.go @@ -6,4 +6,5 @@ var ( ErrUnsupportedVersion = errors.New("unsupported version") ErrUnsupported = errors.New("unsupported") ErrNoEndpoints = errors.New("no endpoints") + ErrNoJobStats = errors.New("no job stats") ) diff --git a/ccore/nebula/internal/driver/v2_5/driver.go b/ccore/nebula/internal/driver/v2_5/driver.go index 7e33f30..5d6a3b5 100644 --- a/ccore/nebula/internal/driver/v2_5/driver.go +++ b/ccore/nebula/internal/driver/v2_5/driver.go @@ -2,6 +2,7 @@ package v2_5 import ( "github.com/facebook/fbthrift/thrift/lib/go/thrift" + nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" ) diff --git a/ccore/nebula/internal/driver/v2_5/meta.go b/ccore/nebula/internal/driver/v2_5/meta.go index e1bf382..f43a028 100644 --- a/ccore/nebula/internal/driver/v2_5/meta.go +++ b/ccore/nebula/internal/driver/v2_5/meta.go @@ -2,6 +2,8 @@ package v2_5 import ( "github.com/facebook/fbthrift/thrift/lib/go/thrift" + + nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5/meta" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" ) @@ -39,3 +41,31 @@ func (c *defaultMetaClient) Close() error { } return nil } + +func (c *defaultMetaClient) AddHosts(endpoints []string) error { + return nerrors.ErrUnsupported +} + +func (c *defaultMetaClient) DropHosts(endpoints []string) error { + return nerrors.ErrUnsupported +} + +func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) { + req := meta.NewListSpacesReq() + + resp, err := c.meta.ListSpaces(req) + if err != nil { + return nil, err + } + + if err := codeErrorIfHappened(resp.Code, nil); err != nil { + return nil, err + } + + return newSpacesWrapper(resp.Spaces), nil +} + +func (c *defaultMetaClient) Balance(req types.BalanceReq) (types.Balancer, error) { + // TODO: add 2.5 Balance logic + return nil, nerrors.ErrUnsupported +} diff --git a/ccore/nebula/internal/driver/v2_5/wrapper.go b/ccore/nebula/internal/driver/v2_5/wrapper.go index a66d79d..cdff7a7 100644 --- a/ccore/nebula/internal/driver/v2_5/wrapper.go +++ b/ccore/nebula/internal/driver/v2_5/wrapper.go @@ -4,6 +4,7 @@ import ( nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors" nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5/graph" + "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5/meta" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" ) @@ -1136,3 +1137,19 @@ func newPlanNodeBranchInfoWrapper(planNodeBranchInfo *graph.PlanNodeBranchInfo) func (w planNodeBranchInfoWrapper) Unwrap() interface{} { return w.PlanNodeBranchInfo } + +type spaceWrapper struct { + Space *meta.IdName +} + +func newSpacesWrapper(spaces []*meta.IdName) types.Spaces { + s := make([]types.Space, 0, len(spaces)) + for _, space := range spaces { + s = append(s, spaceWrapper{Space: space}) + } + return s +} + +func (w spaceWrapper) GetName() string { + return string(w.Space.GetName()) +} diff --git a/ccore/nebula/internal/driver/v2_6/driver.go b/ccore/nebula/internal/driver/v2_6/driver.go index ab0562c..88cb864 100644 --- a/ccore/nebula/internal/driver/v2_6/driver.go +++ b/ccore/nebula/internal/driver/v2_6/driver.go @@ -2,6 +2,7 @@ package v2_6 import ( "github.com/facebook/fbthrift/thrift/lib/go/thrift" + nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" ) diff --git a/ccore/nebula/internal/driver/v2_6/meta.go b/ccore/nebula/internal/driver/v2_6/meta.go index 31371d6..b2e73e0 100644 --- a/ccore/nebula/internal/driver/v2_6/meta.go +++ b/ccore/nebula/internal/driver/v2_6/meta.go @@ -2,6 +2,8 @@ package v2_6 import ( "github.com/facebook/fbthrift/thrift/lib/go/thrift" + + nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6/meta" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" ) @@ -43,3 +45,31 @@ func (c *defaultMetaClient) Close() error { } return nil } + +func (c *defaultMetaClient) AddHosts(endpoints []string) error { + return nerrors.ErrUnsupported +} + +func (c *defaultMetaClient) DropHosts(endpoints []string) error { + return nerrors.ErrUnsupported +} + +func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) { + req := meta.NewListSpacesReq() + + resp, err := c.meta.ListSpaces(req) + if err != nil { + return nil, err + } + + if err := codeErrorIfHappened(resp.Code, nil); err != nil { + return nil, err + } + + return newSpacesWrapper(resp.Spaces), nil +} + +func (c *defaultMetaClient) Balance(req types.BalanceReq) (types.Balancer, error) { + // TODO: add 2.6 Balance logic + return nil, nerrors.ErrUnsupported +} diff --git a/ccore/nebula/internal/driver/v2_6/wrapper.go b/ccore/nebula/internal/driver/v2_6/wrapper.go index c1a1a39..12e3732 100644 --- a/ccore/nebula/internal/driver/v2_6/wrapper.go +++ b/ccore/nebula/internal/driver/v2_6/wrapper.go @@ -4,6 +4,7 @@ import ( nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors" nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6/graph" + "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6/meta" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" ) @@ -1236,3 +1237,19 @@ func newPlanNodeBranchInfoWrapper(planNodeBranchInfo *graph.PlanNodeBranchInfo) func (w planNodeBranchInfoWrapper) Unwrap() interface{} { return w.PlanNodeBranchInfo } + +type spaceWrapper struct { + Space *meta.IdName +} + +func newSpacesWrapper(spaces []*meta.IdName) types.Spaces { + s := make([]types.Space, 0, len(spaces)) + for _, space := range spaces { + s = append(s, spaceWrapper{Space: space}) + } + return s +} + +func (w spaceWrapper) GetName() string { + return string(w.Space.GetName()) +} diff --git a/ccore/nebula/internal/driver/v3_0/driver.go b/ccore/nebula/internal/driver/v3_0/driver.go index 5fa1234..68f5c3b 100644 --- a/ccore/nebula/internal/driver/v3_0/driver.go +++ b/ccore/nebula/internal/driver/v3_0/driver.go @@ -2,6 +2,7 @@ package v3_0 import ( "github.com/facebook/fbthrift/thrift/lib/go/thrift" + nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v3_0" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" ) diff --git a/ccore/nebula/internal/driver/v3_0/meta.go b/ccore/nebula/internal/driver/v3_0/meta.go index 0dcdcd5..734426e 100644 --- a/ccore/nebula/internal/driver/v3_0/meta.go +++ b/ccore/nebula/internal/driver/v3_0/meta.go @@ -1,7 +1,13 @@ package v3_0 import ( + "net" + "strconv" + "github.com/facebook/fbthrift/thrift/lib/go/thrift" + + nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors" + nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v3_0" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v3_0/meta" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" ) @@ -43,3 +49,114 @@ func (c *defaultMetaClient) Close() error { } return nil } + +func (c *defaultMetaClient) AddHosts(endpoints []string) error { + hostsToAdd := make([]*nthrift.HostAddr, 0, len(endpoints)) + for _, ep := range endpoints { + host, portStr, err := net.SplitHostPort(ep) + if err != nil { + return err + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return err + } + + hostsToAdd = append(hostsToAdd, &nthrift.HostAddr{ + Host: host, + Port: nthrift.Port(port), + }) + } + + req := &meta.AddHostsReq{ + Hosts: hostsToAdd, + } + resp, err := c.meta.AddHosts(req) + if err != nil { + return err + } + return codeErrorIfHappened(resp.Code, nil) +} + +func (c *defaultMetaClient) DropHosts(endpoints []string) error { + hostsToDrop := make([]*nthrift.HostAddr, 0, len(endpoints)) + for _, ep := range endpoints { + host, portStr, err := net.SplitHostPort(ep) + if err != nil { + return err + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return err + } + + hostsToDrop = append(hostsToDrop, &nthrift.HostAddr{ + Host: host, + Port: nthrift.Port(port), + }) + } + + req := &meta.DropHostsReq{ + Hosts: hostsToDrop, + } + resp, err := c.meta.DropHosts(req) + if err != nil { + return err + } + return codeErrorIfHappened(resp.Code, nil) +} + +func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) { + req := meta.NewListSpacesReq() + + resp, err := c.meta.ListSpaces(req) + if err != nil { + return nil, err + } + + if err := codeErrorIfHappened(resp.Code, nil); err != nil { + return nil, err + } + + return newSpacesWrapper(resp.Spaces), nil +} + +func (c *defaultMetaClient) Balance(req types.BalanceReq) (types.Balancer, error) { + paras := make([][]byte, 0) + + var cmd meta.AdminCmd + switch req.Cmd { + case types.BalanceLeader: + cmd = meta.AdminCmd_LEADER_BALANCE + case types.BalanceData: + cmd = meta.AdminCmd_DATA_BALANCE + case types.BalanceDataRemove: + cmd = meta.AdminCmd_DATA_BALANCE + for _, ep := range req.HostsToRemove { + paras = append(paras, []byte(ep)) + } + default: + return nil, nerrors.ErrUnsupported + } + + paras = append(paras, []byte(req.Space)) + + metaReq := &meta.AdminJobReq{ + Op: meta.AdminJobOp_ADD, + Cmd: cmd, + Paras: paras, + } + + resp, err := c.meta.RunAdminJob(metaReq) + if err != nil { + return nil, err + } + + if err := codeErrorIfHappened(resp.Code, nil); err != nil { + return nil, err + } + + return newBalancerWrap(c.meta, req.Space, resp.Result_.JobID), nil +} diff --git a/ccore/nebula/internal/driver/v3_0/wrapper.go b/ccore/nebula/internal/driver/v3_0/wrapper.go index bf9f7bc..357e3cf 100644 --- a/ccore/nebula/internal/driver/v3_0/wrapper.go +++ b/ccore/nebula/internal/driver/v3_0/wrapper.go @@ -4,7 +4,9 @@ import ( nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors" nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v3_0" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v3_0/graph" + "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v3_0/meta" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" + "strconv" ) type authResponseWrapper struct { @@ -1200,3 +1202,59 @@ func newPlanNodeBranchInfoWrapper(planNodeBranchInfo *graph.PlanNodeBranchInfo) func (w planNodeBranchInfoWrapper) Unwrap() interface{} { return w.PlanNodeBranchInfo } + +type spaceWrapper struct { + Space *meta.IdName +} + +func newSpacesWrapper(spaces []*meta.IdName) types.Spaces { + s := make([]types.Space, 0, len(spaces)) + for _, space := range spaces { + s = append(s, spaceWrapper{Space: space}) + } + return s +} + +func (w spaceWrapper) GetName() string { + return string(w.Space.GetName()) +} + +type balancerWrap struct { + id []byte + space []byte + client *meta.MetaServiceClient +} + +func newBalancerWrap(client *meta.MetaServiceClient, space string, id *int32) types.Balancer { + return balancerWrap{ + id: []byte(strconv.Itoa(int(*id))), + space: []byte(space), + client: client, + } +} + +func (b balancerWrap) GetStats() (types.BalanceStats, error) { + metaReq := &meta.AdminJobReq{ + Op: meta.AdminJobOp_SHOW, + Cmd: meta.AdminCmd_STATS, + Paras: [][]byte{b.id, b.space}, + } + + resp, err := b.client.RunAdminJob(metaReq) + if err != nil { + return "", err + } + + if len(resp.Result_.JobDesc) == 0 { + return "", nerrors.ErrNoJobStats + } + + switch resp.Result_.JobDesc[0].Status { + case meta.JobStatus_FINISHED: + return types.Balanced, nil + case meta.JobStatus_QUEUE, meta.JobStatus_RUNNING: + return types.Balancing, nil + default: + return types.ImBalanced, nil + } +} diff --git a/ccore/nebula/types/driver.go b/ccore/nebula/types/driver.go index c336a55..681d980 100644 --- a/ccore/nebula/types/driver.go +++ b/ccore/nebula/types/driver.go @@ -36,6 +36,10 @@ type ( MetaClientDriver interface { Open() error VerifyClientVersion() error + AddHosts(endpoints []string) error + DropHosts(endpoints []string) error + ListSpaces() (Spaces, error) + Balance(req BalanceReq) (Balancer, error) Close() error } @@ -65,6 +69,16 @@ type ( String() string } + Space interface { + GetName() string + } + + Spaces []Space + + Balancer interface { + GetStats() (BalanceStats, error) + } + FactoryDriver interface { NewValueBuilder() ValueBuilder NewDateBuilder() DateBuilder @@ -76,6 +90,14 @@ type ( } ) +func (s Spaces) GetSpaceNames() []string { + spaces := make([]string, 0, len(s)) + for _, space := range s { + spaces = append(spaces, space.GetName()) + } + return spaces +} + func Register(version Version, driver Driver, factory FactoryDriver) { registerDriver(version, driver) registerFactoryDriver(version, factory) diff --git a/ccore/nebula/types/types.go b/ccore/nebula/types/types.go index 7053717..d9a6ffb 100644 --- a/ccore/nebula/types/types.go +++ b/ccore/nebula/types/types.go @@ -393,3 +393,24 @@ type PlanNodeBranchInfo interface { String() string Unwrap() interface{} } + +type ( + BalanceCmd string + BalanceStats string +) + +const ( + BalanceData = BalanceCmd("Balance Data") + BalanceDataRemove = BalanceCmd("Balance Data Remove") + BalanceLeader = BalanceCmd("Balance Leader") + + Balanced = BalanceStats("Balanced") + ImBalanced = BalanceStats("ImBalanced") + Balancing = BalanceStats("Balancing") +) + +type BalanceReq struct { + Cmd BalanceCmd + Space string + HostsToRemove []string +}