Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ssl connection pool #10

Merged
merged 2 commits into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 155 additions & 82 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nebulagraph

import (
"crypto/tls"
"fmt"
"strconv"
"strings"
Expand All @@ -10,31 +11,78 @@ import (
nebula "github.com/vesoft-inc/nebula-go/v2"
)

type Data []string
type (
// NebulaPool nebula connection pool
NebulaPool struct {
HostList []nebula.HostAddress
Pool *nebula.ConnectionPool
Log nebula.Logger
DataChs []chan Data
OutoptCh chan []string
Version string
csvStrategy csvReaderStrategy
initialized bool
sessions []*nebula.Session
channelBufferSize int
sslconfig *sslConfig
mutex sync.Mutex
}

type Output struct {
TimeStamp int64
NGQL string
Latency int64
ResponseTime int32
IsSucceed bool
Rows int32
ErrorMsg string
}
// NebulaSession a wrapper for nebula session, could read data from DataCh
NebulaSession struct {
Session *nebula.Session
Pool *NebulaPool
DataCh chan Data
}

// Response a wrapper for nebula resultset
Response struct {
*nebula.ResultSet
ResponseTime int32
}

csvReaderStrategy int

sslConfig struct {
rootCAPath string
certPath string
privateKeyPath string
}

// Data data in csv file
Data []string

output struct {
timeStamp int64
nGQL string
latency int64
responseTime int32
isSucceed bool
rows int32
errorMsg string
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need errorCode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error code is less useful for user.

just like this
image

)

const (
// AllInOne all the vus use the same DataCh
AllInOne csvReaderStrategy = iota
// Separate each vu has a seprate DataCh
Separate
)

func formatOutput(o *Output) []string {
func formatOutput(o *output) []string {
return []string{
strconv.FormatInt(o.TimeStamp, 10),
o.NGQL,
strconv.Itoa(int(o.Latency)),
strconv.Itoa(int(o.ResponseTime)),
strconv.FormatBool(o.IsSucceed),
strconv.Itoa(int(o.Rows)),
o.ErrorMsg,
strconv.FormatInt(o.timeStamp, 10),
o.nGQL,
strconv.Itoa(int(o.latency)),
strconv.Itoa(int(o.responseTime)),
strconv.FormatBool(o.isSucceed),
strconv.Itoa(int(o.rows)),
o.errorMsg,
}
}

var OutputHeader []string = []string{
var outputHeader []string = []string{
"timestamp",
"nGQL",
"latency",
Expand All @@ -44,37 +92,7 @@ var OutputHeader []string = []string{
"errorMsg",
}

type Response struct {
*nebula.ResultSet
ResponseTime int32
}
type CSVReaderStrategy int

const (
AllInOne CSVReaderStrategy = iota
Separate
)

type NebulaPool struct {
HostList []nebula.HostAddress
Pool *nebula.ConnectionPool
Log nebula.Logger
DataChs []chan Data
OutoptCh chan []string
Version string
csvStrategy CSVReaderStrategy
initialized bool
sessions []*nebula.Session
channelBufferSize int
mutex sync.Mutex
}

type NebulaSession struct {
Session *nebula.Session
Pool *NebulaPool
DataCh chan Data
}

// New for k6 initialization.
func New() *NebulaPool {
return &NebulaPool{
Log: nebula.DefaultLogger{},
Expand All @@ -83,57 +101,105 @@ func New() *NebulaPool {
}
}

// NewSSLConfig return sslConfig
func (np *NebulaPool) NewSSLConfig(rootCAPath, certPath, privateKeyPath string) {
np.sslconfig = &sslConfig{
rootCAPath: rootCAPath,
certPath: certPath,
privateKeyPath: privateKeyPath,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SelfSign, is password in privateKeyPath file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nebula-go doesn't support password yet ..

}

// Init init nebula pool with address and concurrent, by default the buffersize is 20000
func (np *NebulaPool) Init(address string, concurrent int) (*NebulaPool, error) {
return np.InitWithSize(address, concurrent, 20000)

}

// InitWithSize init nebula pool with channel buffer size
func (np *NebulaPool) InitWithSize(address string, concurrent int, size int) (*NebulaPool, error) {
if np.initialized {
return np, nil
}
np.mutex.Lock()
defer np.mutex.Unlock()
np.Log.Info("begin init the nebula pool")
np.sessions = make([]*nebula.Session, concurrent)
np.channelBufferSize = size
np.OutoptCh = make(chan []string, np.channelBufferSize)
np.initialized = true
var (
sslConfig *tls.Config
err error
pool *nebula.ConnectionPool
)

if np.sslconfig != nil {
sslConfig, err = nebula.GetDefaultSSLConfig(
np.sslconfig.rootCAPath,
np.sslconfig.certPath,
np.sslconfig.privateKeyPath,
)
if err != nil {
return nil, err
}
// skip insecure verification for stress testing.
sslConfig.InsecureSkipVerify = true
}
err = np.initAndVerifyPool(address, concurrent, size)
if err != nil {
return nil, err
}
conf := np.getDefaultConf(concurrent)
if sslConfig != nil {
pool, err = nebula.NewSslConnectionPool(np.HostList, *conf, sslConfig, np.Log)

} else {
pool, err = nebula.NewConnectionPool(np.HostList, *conf, np.Log)
}

if err != nil {
return nil, err
}
np.Pool = pool
np.Log.Info("finish init the pool")
np.initialized = true
return np, nil
}

func (np *NebulaPool) initAndVerifyPool(address string, concurrent int, size int) error {

addrs := strings.Split(address, ",")
var hosts []nebula.HostAddress
for _, addr := range addrs {
hostPort := strings.Split(addr, ":")
if len(hostPort) != 2 {
return nil, fmt.Errorf("Invalid address: %s", addr)
return fmt.Errorf("Invalid address: %s", addr)
}
port, err := strconv.Atoi(hostPort[1])
if err != nil {
return nil, err
return err
}
host := hostPort[0]
hostAddr := nebula.HostAddress{Host: host, Port: port}
hosts = append(hosts, hostAddr)

np.HostList = hosts
}
np.sessions = make([]*nebula.Session, concurrent)
np.channelBufferSize = size
np.OutoptCh = make(chan []string, np.channelBufferSize)
return nil
}

func (np *NebulaPool) getDefaultConf(concurrent int) *nebula.PoolConfig {
conf := nebula.PoolConfig{
TimeOut: 0,
IdleTime: 0,
MaxConnPoolSize: concurrent,
MinConnPoolSize: 1,
}
pool, err := nebula.NewConnectionPool(hosts, conf, np.Log)
if err != nil {
return nil, err
}

np.Log.Info("finish init the pool")
np.Pool = pool
np.initialized = true
return np, nil
return &conf
}

// ConfigCsvStrategy set csv reader strategy
func (np *NebulaPool) ConfigCsvStrategy(strategy int) {
np.csvStrategy = CSVReaderStrategy(strategy)
np.csvStrategy = csvReaderStrategy(strategy)
}

// ConfigCSV config the csv file to be read
func (np *NebulaPool) ConfigCSV(path, delimiter string, withHeader bool) error {
for _, dataCh := range np.DataChs {
reader := NewCsvReader(path, delimiter, withHeader, dataCh)
Expand All @@ -144,15 +210,19 @@ func (np *NebulaPool) ConfigCSV(path, delimiter string, withHeader bool) error {
return nil
}

// ConfigOutput config the output file, would write the execution outputs
func (np *NebulaPool) ConfigOutput(path string) error {
writer := NewCsvWriter(path, ",", OutputHeader, np.OutoptCh)
writer := NewCsvWriter(path, ",", outputHeader, np.OutoptCh)
if err := writer.WriteForever(); err != nil {
return err
}
return nil
}

// Close close the nebula pool
func (np *NebulaPool) Close() error {
np.mutex.Lock()
defer np.mutex.Unlock()
if !np.initialized {
return nil
}
Expand All @@ -162,11 +232,11 @@ func (np *NebulaPool) Close() error {
s.Release()
}
}
np.Pool.Close()
np.initialized = false
return nil
}

// GetSession get the session from pool
func (np *NebulaPool) GetSession(user, password string) (*NebulaSession, error) {
session, err := np.Pool.GetSession(user, password)
if err != nil {
Expand All @@ -176,12 +246,12 @@ func (np *NebulaPool) GetSession(user, password string) (*NebulaSession, error)
defer np.mutex.Unlock()
np.sessions = append(np.sessions, session)
s := &NebulaSession{Session: session, Pool: np}
s.PrepareCsvReader()
s.prepareCsvReader()

return s, nil
}

func (s *NebulaSession) PrepareCsvReader() error {
func (s *NebulaSession) prepareCsvReader() error {
np := s.Pool
if np.csvStrategy == AllInOne {
if len(np.DataChs) == 0 {
Expand All @@ -197,6 +267,7 @@ func (s *NebulaSession) PrepareCsvReader() error {
return nil
}

// GetData get data from csv reader
func (s *NebulaSession) GetData() (Data, error) {
if s.DataCh != nil && len(s.DataCh) != 0 {
if d, ok := <-s.DataCh; ok {
Expand All @@ -206,6 +277,7 @@ func (s *NebulaSession) GetData() (Data, error) {
return nil, fmt.Errorf("no Data at all")
}

// Execute execute nebula query
func (s *NebulaSession) Execute(stmt string) (*Response, error) {
start := time.Now()
rs, err := s.Session.Execute(stmt)
Expand All @@ -217,14 +289,14 @@ func (s *NebulaSession) Execute(stmt string) (*Response, error) {

// output
if s.Pool.OutoptCh != nil && len(s.Pool.OutoptCh) != cap(s.Pool.OutoptCh) {
o := &Output{
TimeStamp: start.Unix(),
NGQL: stmt,
Latency: rs.GetLatency(),
ResponseTime: responseTime,
IsSucceed: rs.IsSucceed(),
Rows: int32(rs.GetRowSize()),
ErrorMsg: rs.GetErrorMsg(),
o := &output{
timeStamp: start.Unix(),
nGQL: stmt,
latency: rs.GetLatency(),
responseTime: responseTime,
isSucceed: rs.IsSucceed(),
rows: int32(rs.GetRowSize()),
errorMsg: rs.GetErrorMsg(),
}
s.Pool.OutoptCh <- formatOutput(o)

Expand All @@ -233,6 +305,7 @@ func (s *NebulaSession) Execute(stmt string) (*Response, error) {
return &Response{ResultSet: rs, ResponseTime: responseTime}, nil
}

// GetResponseTime GetResponseTime
func (r *Response) GetResponseTime() int32 {
return r.ResponseTime
}
24 changes: 24 additions & 0 deletions example/cert/test.ca.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-----BEGIN CERTIFICATE-----
MIIEGzCCAwOgAwIBAgIUDcmZFpL4PcdCXfLRBK8bR2vb39cwDQYJKoZIhvcNAQEL
BQAwgZwxCzAJBgNVBAYTAkNOMREwDwYDVQQIDAhaaGVqaWFuZzERMA8GA1UEBwwI
SGFuZ3pob3UxFDASBgNVBAoMC1Zlc29mdCBJbmMuMRAwDgYDVQQLDAdzZWN0aW9u
MRYwFAYDVQQDDA1zaHlsb2NrIGh1YW5nMScwJQYJKoZIhvcNAQkBFhhzaHlsb2Nr
Lmh1YW5nQHZlc29mdC5jb20wHhcNMjEwODE5MDkyNDQ3WhcNMjUwODE4MDkyNDQ3
WjCBnDELMAkGA1UEBhMCQ04xETAPBgNVBAgMCFpoZWppYW5nMREwDwYDVQQHDAhI
YW5nemhvdTEUMBIGA1UECgwLVmVzb2Z0IEluYy4xEDAOBgNVBAsMB3NlY3Rpb24x
FjAUBgNVBAMMDXNoeWxvY2sgaHVhbmcxJzAlBgkqhkiG9w0BCQEWGHNoeWxvY2su
aHVhbmdAdmVzb2Z0LmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
AMEAgpamCQHl+8JnUHI6/VmJHjDLYJLTliN/CwpFrhMqIVjJ8wG57WYLpXpn91Lz
eHu52LkVzcikybIJ2a+LOTvnhNFdbmTbqDtrb+s6wM/sO+nF6tU2Av4e5zhyKoeR
LL+rHMk3nymohbdN4djySFmOOU5A1O/4b0bZz4Ylu995kUawdiaEo13BzxxOC7Ik
Gge5RyDcm0uLXZqTAPy5Sjv/zpOyj0AqL1CJUH7XBN9OMRhVU0ZX9nHWl1vgLRld
J6XT17Y9QbbHhCNEdAmFE5kEFgCvZc+MungUYABlkvoj86TLmC/FMV6fWdxQssyd
hS+ssfJFLaTDaEFz5a/Tr48CAwEAAaNTMFEwHQYDVR0OBBYEFK0GVrQx+wX1GCHy
e+6fl4X+prmYMB8GA1UdIwQYMBaAFK0GVrQx+wX1GCHye+6fl4X+prmYMA8GA1Ud
EwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAHqP8P+ZUHmngviHLSSN1ln5
Mx4BCkVeFRUaFx0yFXytV/iLXcG2HpFg3A9rAFoYgCDwi1xpsERnBZ/ShTv/eFOc
IxBY5yggx3/lGi8tAgvUdarhd7mQO67UJ0V4YU3hAkbnZ8grHHXj+4hfgUpY4ok6
yaed6HXwknBb9W8N1jZI8ginhkhjaeRCHdMiF+fBvNCtmeR1bCml1Uz7ailrpcaT
Mf84+5VYuFEnaRZYWFNsWNCOBlJ/6/b3V10vMXzMmYHqz3xgAq0M3fVTFTzopnAX
DLSzorL/dYVdqEDCQi5XI9YAlgWN4VeGzJI+glkLOCNzHxRNP6Qev+YI+7Uxz6I=
-----END CERTIFICATE-----
Loading