Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A client for Nebula Graph Database that support multi versions #59

Merged
merged 1 commit into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ccore/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/vesoft-inc/nebula-http-gateway/ccore

go 1.16

require github.com/facebook/fbthrift v0.31.1-0.20210223140454-614a73a42488
2 changes: 2 additions & 0 deletions ccore/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/facebook/fbthrift v0.31.1-0.20210223140454-614a73a42488 h1:A4KCT0mvTBkvb93gGN+efLPkrgTqmqMeaLDG51KVhMM=
github.com/facebook/fbthrift v0.31.1-0.20210223140454-614a73a42488/go.mod h1:2tncLx5rmw69e5kMBv/yJneERbzrr1yr5fdlnTbu8lU=
18 changes: 18 additions & 0 deletions ccore/nebula/alias.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package nebula

import (
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)

type (
Version = types.Version
AuthResponse = types.AuthResponse
ExecutionResponse = types.ExecutionResponse
)

var (
V2_0_0 = types.V2_0_0
V2_5_0 = types.V2_5_0
V2_5_1 = types.V2_5_1
V2_6_0 = types.V2_6_0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/vesoft-inc/nebula-go/pull/148/files, I see thrift code use 2.6.0, so we need fix this compile bug too, or we use 2.6.1 thrift code instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed.

)
70 changes: 70 additions & 0 deletions ccore/nebula/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package nebula

import "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"

type (
// TODO: add client pool management
// TODO: add version auto recognize

Client interface {
Graph() GraphClient
Meta() MetaClient
StorageAdmin() StorageAdminClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call storageAdmin storage? for users, they don't know what is storageAdmin, but use it for storage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is named according to thrift define, it is best not to introduce new terms.

}

ConnectionInfo struct {
GraphEndpoints []string
MetaEndpoints []string
StorageAdminEndpoints []string
GraphAccount Account
}

Account struct {
Username string
Password string
}

defaultClient struct {
o Options
driver types.Driver
graph *driverGraph
meta *driverMeta
storageAdmin *driverStorageAdmin
}
)

func NewClient(info ConnectionInfo, opts ...Option) (Client, error) {
o := defaultOptions()
for _, opt := range opts {
opt(&o)
}
o.complete()
if err := o.validate(); err != nil {
return nil, err
}

driver, err := types.GetDriver(o.version)
if err != nil {
return nil, err
}

return &defaultClient{
o: o,
driver: driver,
graph: newDriverGraph(info.GraphEndpoints, info.GraphAccount.Username, info.GraphAccount.Password, &o.graph),
meta: newDriverMeta(info.MetaEndpoints, &o.meta),
storageAdmin: newDriverStorageAdmin(info.StorageAdminEndpoints, &o.storageAdmin),
}, nil
}

func (c *defaultClient) Graph() GraphClient {
return (*defaultGraphClient)(c)
}

func (c *defaultClient) Meta() MetaClient {
return (*defaultMetaClient)(c)
}

func (c *defaultClient) StorageAdmin() StorageAdminClient {
return (*defaultStorageAdminClient)(c)
}
46 changes: 46 additions & 0 deletions ccore/nebula/client_graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package nebula

type (
GraphClient interface {
Open() error
Execute(stmt []byte) (ExecutionResponse, error)
ExecuteJson(stmt []byte) ([]byte, error)
Close() error
}

defaultGraphClient defaultClient
)

func NewGraphClient(endpoints []string, username, password string, opts ...Option) (GraphClient, error) {
c, err := NewClient(ConnectionInfo{
GraphEndpoints: endpoints,
GraphAccount: Account{
Username: username,
Password: password,
},
}, opts...)
if err != nil {
return nil, err
}
return c.Graph(), nil
}

func (c *defaultGraphClient) Open() error {
return c.graph.open(c.driver)
}

func (c *defaultGraphClient) Execute(stmt []byte) (ExecutionResponse, error) {
return c.graph.Execute(c.graph.sessionId, stmt)
}

func (c *defaultGraphClient) ExecuteJson(stmt []byte) ([]byte, error) {
if err := c.graph.open(c.driver); err != nil {
return nil, err
}

return c.graph.ExecuteJson(c.graph.sessionId, stmt)
}

func (c *defaultGraphClient) Close() error {
return c.graph.close()
}
28 changes: 28 additions & 0 deletions ccore/nebula/client_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package nebula

type (
MetaClient interface {
Open() error
Close() error
}

defaultMetaClient defaultClient
)

func NewMetaClient(endpoints []string, opts ...Option) (MetaClient, error) {
c, err := NewClient(ConnectionInfo{
MetaEndpoints: endpoints,
}, opts...)
if err != nil {
return nil, err
}
return c.Meta(), nil
}

func (c *defaultMetaClient) Open() error {
return c.meta.close()
}

func (c *defaultMetaClient) Close() error {
return c.meta.open(c.driver)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use c.meta.close()?

}
28 changes: 28 additions & 0 deletions ccore/nebula/client_storage_admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package nebula

type (
StorageAdminClient interface {
Open() error
Close() error
}

defaultStorageAdminClient defaultClient
)

func NewStorageAdminClient(endpoints []string, opts ...Option) (StorageAdminClient, error) {
c, err := NewClient(ConnectionInfo{
StorageAdminEndpoints: endpoints,
}, opts...)
if err != nil {
return nil, err
}
return c.StorageAdmin(), nil
}

func (c *defaultStorageAdminClient) Open() error {
return c.storageAdmin.open(c.driver)
}

func (c *defaultStorageAdminClient) Close() error {
return c.storageAdmin.close()
}
178 changes: 178 additions & 0 deletions ccore/nebula/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package nebula

import (
"sync"

"github.com/facebook/fbthrift/thrift/lib/go/thrift"
nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors"
_ "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/driver/v2_0_0"
_ "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/driver/v2_5_0"
_ "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/driver/v2_5_1"
_ "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/driver/v2_6_0"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)

type (
driverGraph struct {
types.GraphClientDriver
connection *connectionMu
username string
password string
sessionId int64
}

driverMeta struct {
types.MetaClientDriver
connection *connectionMu
}

driverStorageAdmin struct {
types.StorageAdminClientDriver
connection *connectionMu
}

connectionMu struct {
o *socketOptions
mu sync.Mutex
endpoints []string
}
)

func newDriverGraph(endpoints []string, username, password string, o *socketOptions) *driverGraph {
return &driverGraph{
connection: newConnectionMu(endpoints, o),
username: username,
password: password,
}
}

func newDriverMeta(endpoints []string, o *socketOptions) *driverMeta {
return &driverMeta{
connection: newConnectionMu(endpoints, o),
}
}

func newDriverStorageAdmin(endpoints []string, o *socketOptions) *driverStorageAdmin {
return &driverStorageAdmin{
connection: newConnectionMu(endpoints, o),
}
}

func newConnectionMu(endpoints []string, o *socketOptions) *connectionMu {
return &connectionMu{
o: o,
endpoints: endpoints,
}
}

func (d *driverGraph) open(driver types.Driver) error {
if d.GraphClientDriver != nil {
return nil
}

transport, pf, err := d.connection.connect()
if err != nil {
return err
}

d.GraphClientDriver = driver.NewGraphClientDriver(transport, pf)

if err = d.GraphClientDriver.Open(); err != nil {
return err
}

resp, err := d.GraphClientDriver.Authenticate(d.username, d.password)
if err != nil {
return err
}
if errorCode := resp.ErrorCode(); errorCode != nerrors.ErrorCode_SUCCEEDED {
return nerrors.NewCodeError(errorCode, resp.ErrorMsg())
}
sessionId := resp.SessionID()
if sessionId == nil {
panic("sessionId can not be nil after authenticate")
}
d.sessionId = *sessionId
return nil
}

func (d *driverGraph) close() error {
if d.GraphClientDriver != nil {
if err := d.GraphClientDriver.Close(); err != nil {
return err
}
}
return nil
}

func (d *driverMeta) open(driver types.Driver) error {
if d.MetaClientDriver != nil {
return nil
}

transport, pf, err := d.connection.connect()
if err != nil {
return err
}

d.MetaClientDriver = driver.NewMetaClientDriver(transport, pf)
return nil
}

func (d *driverMeta) close() error {
if d.MetaClientDriver != nil {
if err := d.MetaClientDriver.Close(); err != nil {
return err
}
}
return nil
}

func (d *driverStorageAdmin) open(driver types.Driver) error {
if d.StorageAdminClientDriver != nil {
return nil
}

transport, pf, err := d.connection.connect()
if err != nil {
return err
}

d.StorageAdminClientDriver = driver.NewStorageClientDriver(transport, pf)
return nil
}

func (d *driverStorageAdmin) close() error {
if d.StorageAdminClientDriver != nil {
if err := d.StorageAdminClientDriver.Close(); err != nil {
return err
}
}
return nil
}

func (c *connectionMu) connect() (thrift.Transport, thrift.ProtocolFactory, error) {
// TODO: automatically open until success, only the first endpoints is supported now.
if len(c.endpoints) == 0 {
return nil, nil, nerrors.ErrNoEndpoints
}
return c.buildThriftTransport(c.endpoints[0])
}

func (c *connectionMu) buildThriftTransport(endpoint string) (thrift.Transport, thrift.ProtocolFactory, error) {
transport, err := func() (thrift.Transport, error) {
if c.o.tlsConfig == nil {
return thrift.NewSocket(thrift.SocketTimeout(c.o.timeout), thrift.SocketAddr(endpoint))
}
return thrift.NewSSLSocketTimeout(endpoint, c.o.tlsConfig, c.o.timeout)
}()
if err != nil {
return nil, nil, err
}

bufferedTranFactory := thrift.NewBufferedTransportFactory(c.o.bufferSize)
transport = thrift.NewFramedTransportMaxLength(bufferedTranFactory.GetTransport(transport), c.o.frameMaxLength)
pf := thrift.NewBinaryProtocolFactoryDefault()

return transport, pf, nil
}
1 change: 1 addition & 0 deletions ccore/nebula/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package nebula
Loading