Skip to content

Commit

Permalink
add cloud core basic framework
Browse files Browse the repository at this point in the history
  • Loading branch information
veezhang committed Dec 13, 2021
1 parent cce0022 commit 0407eef
Show file tree
Hide file tree
Showing 105 changed files with 370,403 additions and 0 deletions.
5 changes: 5 additions & 0 deletions ccore/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/vesoft-inc/nebula-http-gateway/ccore

go 1.16

require github.com/facebook/fbthrift v0.31.1-0.20210223140454-614a73a42488
2 changes: 2 additions & 0 deletions ccore/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/facebook/fbthrift v0.31.1-0.20210223140454-614a73a42488 h1:A4KCT0mvTBkvb93gGN+efLPkrgTqmqMeaLDG51KVhMM=
github.com/facebook/fbthrift v0.31.1-0.20210223140454-614a73a42488/go.mod h1:2tncLx5rmw69e5kMBv/yJneERbzrr1yr5fdlnTbu8lU=
18 changes: 18 additions & 0 deletions ccore/nebula/alias.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package nebula

import (
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)

type (
Version = types.Version
AuthResponse = types.AuthResponse
ExecutionResponse = types.ExecutionResponse
)

var (
V2_0_0 = types.V2_0_0
V2_5_0 = types.V2_5_0
V2_5_1 = types.V2_5_1
V2_6_0 = types.V2_6_0
)
70 changes: 70 additions & 0 deletions ccore/nebula/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package nebula

import "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"

type (
// TODO: add client pool management
// TODO: add version auto recognize

Client interface {
Graph() GraphClient
Meta() MetaClient
StorageAdmin() StorageAdminClient
}

ConnectionInfo struct {
GraphEndpoints []string
MetaEndpoints []string
StorageAdminEndpoints []string
GraphAccount Account
}

Account struct {
Username string
Password string
}

defaultClient struct {
o Options
driver types.Driver
graph *driverGraph
meta *driverMeta
storageAdmin *driverStorageAdmin
}
)

func NewClient(info ConnectionInfo, opts ...Option) (Client, error) {
o := defaultOptions()
for _, opt := range opts {
opt(&o)
}
o.complete()
if err := o.validate(); err != nil {
return nil, err
}

driver, err := types.GetDriver(o.version)
if err != nil {
return nil, err
}

return &defaultClient{
o: o,
driver: driver,
graph: newDriverGraph(info.GraphEndpoints, info.GraphAccount.Username, info.GraphAccount.Password, &o.graph),
meta: newDriverMeta(info.MetaEndpoints, &o.meta),
storageAdmin: newDriverStorageAdmin(info.StorageAdminEndpoints, &o.storageAdmin),
}, nil
}

func (c *defaultClient) Graph() GraphClient {
return (*defaultGraphClient)(c)
}

func (c *defaultClient) Meta() MetaClient {
return (*defaultMetaClient)(c)
}

func (c *defaultClient) StorageAdmin() StorageAdminClient {
return (*defaultStorageAdminClient)(c)
}
46 changes: 46 additions & 0 deletions ccore/nebula/client_graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package nebula

type (
GraphClient interface {
Open() error
Execute(stmt []byte) (ExecutionResponse, error)
ExecuteJson(stmt []byte) ([]byte, error)
Close() error
}

defaultGraphClient defaultClient
)

func NewGraphClient(endpoints []string, username, password string, opts ...Option) (GraphClient, error) {
c, err := NewClient(ConnectionInfo{
GraphEndpoints: endpoints,
GraphAccount: Account{
Username: username,
Password: password,
},
}, opts...)
if err != nil {
return nil, err
}
return c.Graph(), nil
}

func (c *defaultGraphClient) Open() error {
return c.graph.open(c.driver)
}

func (c *defaultGraphClient) Execute(stmt []byte) (ExecutionResponse, error) {
return c.graph.Execute(c.graph.sessionId, stmt)
}

func (c *defaultGraphClient) ExecuteJson(stmt []byte) ([]byte, error) {
if err := c.graph.open(c.driver); err != nil {
return nil, err
}

return c.graph.ExecuteJson(c.graph.sessionId, stmt)
}

func (c *defaultGraphClient) Close() error {
return c.graph.close()
}
28 changes: 28 additions & 0 deletions ccore/nebula/client_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package nebula

type (
MetaClient interface {
Open() error
Close() error
}

defaultMetaClient defaultClient
)

func NewMetaClient(endpoints []string, opts ...Option) (MetaClient, error) {
c, err := NewClient(ConnectionInfo{
MetaEndpoints: endpoints,
}, opts...)
if err != nil {
return nil, err
}
return c.Meta(), nil
}

func (c *defaultMetaClient) Open() error {
return c.meta.close()
}

func (c *defaultMetaClient) Close() error {
return c.meta.open(c.driver)
}
28 changes: 28 additions & 0 deletions ccore/nebula/client_storage_admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package nebula

type (
StorageAdminClient interface {
Open() error
Close() error
}

defaultStorageAdminClient defaultClient
)

func NewStorageAdminClient(endpoints []string, opts ...Option) (StorageAdminClient, error) {
c, err := NewClient(ConnectionInfo{
StorageAdminEndpoints: endpoints,
}, opts...)
if err != nil {
return nil, err
}
return c.StorageAdmin(), nil
}

func (c *defaultStorageAdminClient) Open() error {
return c.storageAdmin.open(c.driver)
}

func (c *defaultStorageAdminClient) Close() error {
return c.storageAdmin.close()
}
178 changes: 178 additions & 0 deletions ccore/nebula/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package nebula

import (
"sync"

"github.com/facebook/fbthrift/thrift/lib/go/thrift"
nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors"
_ "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/driver/v2_0_0"
_ "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/driver/v2_5_0"
_ "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/driver/v2_5_1"
_ "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/driver/v2_6_0"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)

type (
driverGraph struct {
types.GraphClientDriver
connection *connectionMu
username string
password string
sessionId int64
}

driverMeta struct {
types.MetaClientDriver
connection *connectionMu
}

driverStorageAdmin struct {
types.StorageAdminClientDriver
connection *connectionMu
}

connectionMu struct {
o *socketOptions
mu sync.Mutex
endpoints []string
}
)

func newDriverGraph(endpoints []string, username, password string, o *socketOptions) *driverGraph {
return &driverGraph{
connection: newConnectionMu(endpoints, o),
username: username,
password: password,
}
}

func newDriverMeta(endpoints []string, o *socketOptions) *driverMeta {
return &driverMeta{
connection: newConnectionMu(endpoints, o),
}
}

func newDriverStorageAdmin(endpoints []string, o *socketOptions) *driverStorageAdmin {
return &driverStorageAdmin{
connection: newConnectionMu(endpoints, o),
}
}

func newConnectionMu(endpoints []string, o *socketOptions) *connectionMu {
return &connectionMu{
o: o,
endpoints: endpoints,
}
}

func (d *driverGraph) open(driver types.Driver) error {
if d.GraphClientDriver != nil {
return nil
}

transport, pf, err := d.connection.connect()
if err != nil {
return err
}

d.GraphClientDriver = driver.NewGraphClientDriver(transport, pf)

if err = d.GraphClientDriver.Open(); err != nil {
return err
}

resp, err := d.GraphClientDriver.Authenticate(d.username, d.password)
if err != nil {
return err
}
if errorCode := resp.ErrorCode(); errorCode != nerrors.ErrorCode_SUCCEEDED {
return nerrors.NewCodeError(errorCode, resp.ErrorMsg())
}
sessionId := resp.SessionID()
if sessionId == nil {
panic("sessionId can not be nil after authenticate")
}
d.sessionId = *sessionId
return nil
}

func (d *driverGraph) close() error {
if d.GraphClientDriver != nil {
if err := d.GraphClientDriver.Close(); err != nil {
return err
}
}
return nil
}

func (d *driverMeta) open(driver types.Driver) error {
if d.MetaClientDriver != nil {
return nil
}

transport, pf, err := d.connection.connect()
if err != nil {
return err
}

d.MetaClientDriver = driver.NewMetaClientDriver(transport, pf)
return nil
}

func (d *driverMeta) close() error {
if d.MetaClientDriver != nil {
if err := d.MetaClientDriver.Close(); err != nil {
return err
}
}
return nil
}

func (d *driverStorageAdmin) open(driver types.Driver) error {
if d.StorageAdminClientDriver != nil {
return nil
}

transport, pf, err := d.connection.connect()
if err != nil {
return err
}

d.StorageAdminClientDriver = driver.NewStorageClientDriver(transport, pf)
return nil
}

func (d *driverStorageAdmin) close() error {
if d.StorageAdminClientDriver != nil {
if err := d.StorageAdminClientDriver.Close(); err != nil {
return err
}
}
return nil
}

func (c *connectionMu) connect() (thrift.Transport, thrift.ProtocolFactory, error) {
// TODO: automatically open until success, only the first endpoints is supported now.
if len(c.endpoints) == 0 {
return nil, nil, nerrors.ErrNoEndpoints
}
return c.buildThriftTransport(c.endpoints[0])
}

func (c *connectionMu) buildThriftTransport(endpoint string) (thrift.Transport, thrift.ProtocolFactory, error) {
transport, err := func() (thrift.Transport, error) {
if c.o.tlsConfig == nil {
return thrift.NewSocket(thrift.SocketTimeout(c.o.timeout), thrift.SocketAddr(endpoint))
}
return thrift.NewSSLSocketTimeout(endpoint, c.o.tlsConfig, c.o.timeout)
}()
if err != nil {
return nil, nil, err
}

bufferedTranFactory := thrift.NewBufferedTransportFactory(c.o.bufferSize)
transport = thrift.NewFramedTransportMaxLength(bufferedTranFactory.GetTransport(transport), c.o.frameMaxLength)
pf := thrift.NewBinaryProtocolFactoryDefault()

return transport, pf, nil
}
1 change: 1 addition & 0 deletions ccore/nebula/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package nebula
Loading

0 comments on commit 0407eef

Please sign in to comment.