Skip to content

Commit

Permalink
add meta api (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
kqzh authored Jan 18, 2022
1 parent e6c6fbd commit ce0f101
Show file tree
Hide file tree
Showing 13 changed files with 359 additions and 1 deletion.
44 changes: 43 additions & 1 deletion ccore/nebula/client_meta.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package nebula

import "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
import (
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)

type (
MetaClient interface {
Open() error
AddHosts(endpoints []string) error
DropHosts(endpoints []string) error
ListSpaces() (types.Spaces, error)
BalanceData(space string) (types.Balancer, error)
BalanceLeader(space string) (types.Balancer, error)
BalanceDataRemove(space string, endpoints []string) (types.Balancer, error)
Close() error
}

Expand All @@ -31,6 +39,40 @@ func (c *defaultMetaClient) Close() error {
return c.meta.close()
}

func (c *defaultMetaClient) AddHosts(endpoints []string) error {
return c.meta.AddHosts(endpoints)
}

func (c *defaultMetaClient) DropHosts(endpoints []string) error {
return c.meta.DropHosts(endpoints)
}

func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) {
return c.meta.ListSpaces()
}

func (c *defaultMetaClient) BalanceData(space string) (types.Balancer, error) {
return c.meta.Balance(types.BalanceReq{
Cmd: types.BalanceData,
Space: space,
})
}

func (c *defaultMetaClient) BalanceLeader(space string) (types.Balancer, error) {
return c.meta.Balance(types.BalanceReq{
Cmd: types.BalanceLeader,
Space: space,
})
}

func (c *defaultMetaClient) BalanceDataRemove(space string, endpoints []string) (types.Balancer, error) {
return c.meta.Balance(types.BalanceReq{
Cmd: types.BalanceDataRemove,
Space: space,
HostsToRemove: endpoints,
})
}

func (c *defaultMetaClient) defaultClient() *defaultClient {
return (*defaultClient)(c)
}
1 change: 1 addition & 0 deletions ccore/nebula/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ var (
ErrUnsupportedVersion = errors.New("unsupported version")
ErrUnsupported = errors.New("unsupported")
ErrNoEndpoints = errors.New("no endpoints")
ErrNoJobStats = errors.New("no job stats")
)
1 change: 1 addition & 0 deletions ccore/nebula/internal/driver/v2_5/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2_5

import (
"github.com/facebook/fbthrift/thrift/lib/go/thrift"

nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)
Expand Down
30 changes: 30 additions & 0 deletions ccore/nebula/internal/driver/v2_5/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package v2_5

import (
"github.com/facebook/fbthrift/thrift/lib/go/thrift"

nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5/meta"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)
Expand Down Expand Up @@ -39,3 +41,31 @@ func (c *defaultMetaClient) Close() error {
}
return nil
}

func (c *defaultMetaClient) AddHosts(endpoints []string) error {
return nerrors.ErrUnsupported
}

func (c *defaultMetaClient) DropHosts(endpoints []string) error {
return nerrors.ErrUnsupported
}

func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) {
req := meta.NewListSpacesReq()

resp, err := c.meta.ListSpaces(req)
if err != nil {
return nil, err
}

if err := codeErrorIfHappened(resp.Code, nil); err != nil {
return nil, err
}

return newSpacesWrapper(resp.Spaces), nil
}

func (c *defaultMetaClient) Balance(req types.BalanceReq) (types.Balancer, error) {
// TODO: add 2.5 Balance logic
return nil, nerrors.ErrUnsupported
}
17 changes: 17 additions & 0 deletions ccore/nebula/internal/driver/v2_5/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors"
nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5/graph"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5/meta"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)

Expand Down Expand Up @@ -1136,3 +1137,19 @@ func newPlanNodeBranchInfoWrapper(planNodeBranchInfo *graph.PlanNodeBranchInfo)
func (w planNodeBranchInfoWrapper) Unwrap() interface{} {
return w.PlanNodeBranchInfo
}

type spaceWrapper struct {
Space *meta.IdName
}

func newSpacesWrapper(spaces []*meta.IdName) types.Spaces {
s := make([]types.Space, 0, len(spaces))
for _, space := range spaces {
s = append(s, spaceWrapper{Space: space})
}
return s
}

func (w spaceWrapper) GetName() string {
return string(w.Space.GetName())
}
1 change: 1 addition & 0 deletions ccore/nebula/internal/driver/v2_6/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2_6

import (
"github.com/facebook/fbthrift/thrift/lib/go/thrift"

nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)
Expand Down
30 changes: 30 additions & 0 deletions ccore/nebula/internal/driver/v2_6/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package v2_6

import (
"github.com/facebook/fbthrift/thrift/lib/go/thrift"

nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6/meta"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)
Expand Down Expand Up @@ -43,3 +45,31 @@ func (c *defaultMetaClient) Close() error {
}
return nil
}

func (c *defaultMetaClient) AddHosts(endpoints []string) error {
return nerrors.ErrUnsupported
}

func (c *defaultMetaClient) DropHosts(endpoints []string) error {
return nerrors.ErrUnsupported
}

func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) {
req := meta.NewListSpacesReq()

resp, err := c.meta.ListSpaces(req)
if err != nil {
return nil, err
}

if err := codeErrorIfHappened(resp.Code, nil); err != nil {
return nil, err
}

return newSpacesWrapper(resp.Spaces), nil
}

func (c *defaultMetaClient) Balance(req types.BalanceReq) (types.Balancer, error) {
// TODO: add 2.6 Balance logic
return nil, nerrors.ErrUnsupported
}
17 changes: 17 additions & 0 deletions ccore/nebula/internal/driver/v2_6/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors"
nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6/graph"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_6/meta"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)

Expand Down Expand Up @@ -1236,3 +1237,19 @@ func newPlanNodeBranchInfoWrapper(planNodeBranchInfo *graph.PlanNodeBranchInfo)
func (w planNodeBranchInfoWrapper) Unwrap() interface{} {
return w.PlanNodeBranchInfo
}

type spaceWrapper struct {
Space *meta.IdName
}

func newSpacesWrapper(spaces []*meta.IdName) types.Spaces {
s := make([]types.Space, 0, len(spaces))
for _, space := range spaces {
s = append(s, spaceWrapper{Space: space})
}
return s
}

func (w spaceWrapper) GetName() string {
return string(w.Space.GetName())
}
1 change: 1 addition & 0 deletions ccore/nebula/internal/driver/v3_0/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v3_0

import (
"github.com/facebook/fbthrift/thrift/lib/go/thrift"

nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v3_0"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)
Expand Down
117 changes: 117 additions & 0 deletions ccore/nebula/internal/driver/v3_0/meta.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package v3_0

import (
"net"
"strconv"

"github.com/facebook/fbthrift/thrift/lib/go/thrift"

nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors"
nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v3_0"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v3_0/meta"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)
Expand Down Expand Up @@ -43,3 +49,114 @@ func (c *defaultMetaClient) Close() error {
}
return nil
}

func (c *defaultMetaClient) AddHosts(endpoints []string) error {
hostsToAdd := make([]*nthrift.HostAddr, 0, len(endpoints))
for _, ep := range endpoints {
host, portStr, err := net.SplitHostPort(ep)
if err != nil {
return err
}

port, err := strconv.Atoi(portStr)
if err != nil {
return err
}

hostsToAdd = append(hostsToAdd, &nthrift.HostAddr{
Host: host,
Port: nthrift.Port(port),
})
}

req := &meta.AddHostsReq{
Hosts: hostsToAdd,
}
resp, err := c.meta.AddHosts(req)
if err != nil {
return err
}
return codeErrorIfHappened(resp.Code, nil)
}

func (c *defaultMetaClient) DropHosts(endpoints []string) error {
hostsToDrop := make([]*nthrift.HostAddr, 0, len(endpoints))
for _, ep := range endpoints {
host, portStr, err := net.SplitHostPort(ep)
if err != nil {
return err
}

port, err := strconv.Atoi(portStr)
if err != nil {
return err
}

hostsToDrop = append(hostsToDrop, &nthrift.HostAddr{
Host: host,
Port: nthrift.Port(port),
})
}

req := &meta.DropHostsReq{
Hosts: hostsToDrop,
}
resp, err := c.meta.DropHosts(req)
if err != nil {
return err
}
return codeErrorIfHappened(resp.Code, nil)
}

func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) {
req := meta.NewListSpacesReq()

resp, err := c.meta.ListSpaces(req)
if err != nil {
return nil, err
}

if err := codeErrorIfHappened(resp.Code, nil); err != nil {
return nil, err
}

return newSpacesWrapper(resp.Spaces), nil
}

func (c *defaultMetaClient) Balance(req types.BalanceReq) (types.Balancer, error) {
paras := make([][]byte, 0)

var cmd meta.AdminCmd
switch req.Cmd {
case types.BalanceLeader:
cmd = meta.AdminCmd_LEADER_BALANCE
case types.BalanceData:
cmd = meta.AdminCmd_DATA_BALANCE
case types.BalanceDataRemove:
cmd = meta.AdminCmd_DATA_BALANCE
for _, ep := range req.HostsToRemove {
paras = append(paras, []byte(ep))
}
default:
return nil, nerrors.ErrUnsupported
}

paras = append(paras, []byte(req.Space))

metaReq := &meta.AdminJobReq{
Op: meta.AdminJobOp_ADD,
Cmd: cmd,
Paras: paras,
}

resp, err := c.meta.RunAdminJob(metaReq)
if err != nil {
return nil, err
}

if err := codeErrorIfHappened(resp.Code, nil); err != nil {
return nil, err
}

return newBalancerWrap(c.meta, req.Space, resp.Result_.JobID), nil
}
Loading

0 comments on commit ce0f101

Please sign in to comment.