Skip to content

Commit

Permalink
Add session pool (#222)
Browse files Browse the repository at this point in the history
* Add session pool

Add tests

* Refactor some test helper functions

Fix multithread bug

* Add example

* Add idle session cleanermake

Hide some interfaces

Add space switch cases

* Update the constructor of session pool config

Address comments

* Add benchmark

* Change default session pool config

* Update README

* Address comments

* Fix test
  • Loading branch information
Aiee authored Oct 14, 2022
1 parent 47169b9 commit 82ab6c3
Show file tree
Hide file tree
Showing 14 changed files with 1,411 additions and 152 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ run-examples:
go run basic_example/graph_client_basic_example.go && \
go run basic_example/parameter_example.go && \
go run gorountines_example/graph_client_goroutines_example.go && \
go run json_example/parse_json_example.go
go run json_example/parse_json_example.go && \
go run session_pool_example/session_pool_example.go
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ For example:
[Code Example with Gorountines](https://github.com/vesoft-inc/nebula-go/tree/master/gorountines_example/graph_client_goroutines_example.go)
[Session Pool Example](https://github.com/vesoft-inc/nebula-go/blob/master/session_pool_example/session_pool_example.go)
## Licensing
**Nebula GO** is under [Apache 2.0](https://www.apache.org/licenses/LICENSE-2.0) license, so you can freely download, modify, and deploy the source code to meet your needs. You can also freely deploy **Nebula GO** as a back-end service to support your SaaS deployment.
2 changes: 1 addition & 1 deletion basic_example/graph_client_basic_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {
"CREATE TAG IF NOT EXISTS person(name string, age int);" +
"CREATE EDGE IF NOT EXISTS like(likeness double)"

// Excute a query
// Execute a query
resultSet, err := session.Execute(createSchema)
if err != nil {
fmt.Print(err.Error())
Expand Down
176 changes: 124 additions & 52 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ func logoutAndClose(conn *connection, sessionID int64) {
}

func TestConnection(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAdress)
err := conn.open(hostAdress, testPoolConfig.TimeOut, nil)
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}

authresp, authErr := conn.authenticate(username, password)
authResp, authErr := conn.authenticate(username, password)
if authErr != nil {
t.Fatalf("fail to authenticate, username: %s, password: %s, %s", username, password, authErr.Error())
}

sessionID := authresp.GetSessionID()
sessionID := authResp.GetSessionID()

defer logoutAndClose(conn, sessionID)

Expand All @@ -84,20 +84,28 @@ func TestConnection(t *testing.T) {
t.Fatalf(err.Error())
return
}
checkConResp(t, "show hosts", resp)
checkConResp("show hosts", resp)

resp, err = conn.execute(sessionID, "CREATE SPACE client_test(partition_num=1024, replica_factor=1, vid_type = FIXED_STRING(30));")
if err != nil {
t.Error(err.Error())
return
}
checkConResp(t, "create space", resp)
checkConResp("create space", resp)

resp, err = conn.execute(sessionID, "return 1")
if err != nil {
t.Error(err.Error())
return
}
checkConResp("return 1", resp)

resp, err = conn.execute(sessionID, "DROP SPACE client_test;")
if err != nil {
t.Error(err.Error())
return
}
checkConResp(t, "drop space", resp)
checkConResp("drop space", resp)

res := conn.ping()
if res != true {
Expand All @@ -107,19 +115,19 @@ func TestConnection(t *testing.T) {
}

func TestConnectionIPv6(t *testing.T) {
hostAdress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAdress)
err := conn.open(hostAdress, testPoolConfig.TimeOut, nil)
hostAddress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}

authresp, authErr := conn.authenticate(username, password)
authResp, authErr := conn.authenticate(username, password)
if authErr != nil {
t.Fatalf("fail to authenticate, username: %s, password: %s, %s", username, password, authErr.Error())
}

sessionID := authresp.GetSessionID()
sessionID := authResp.GetSessionID()

defer logoutAndClose(conn, sessionID)

Expand All @@ -128,20 +136,20 @@ func TestConnectionIPv6(t *testing.T) {
t.Fatalf(err.Error())
return
}
checkConResp(t, "show hosts", resp)
checkConResp("show hosts", resp)

resp, err = conn.execute(sessionID, "CREATE SPACE client_test(partition_num=1024, replica_factor=1, vid_type = FIXED_STRING(30));")
if err != nil {
t.Error(err.Error())
return
}
checkConResp(t, "create space", resp)
checkConResp("create space", resp)
resp, err = conn.execute(sessionID, "DROP SPACE client_test;")
if err != nil {
t.Error(err.Error())
return
}
checkConResp(t, "drop space", resp)
checkConResp("drop space", resp)

res := conn.ping()
if res != true {
Expand All @@ -151,9 +159,9 @@ func TestConnectionIPv6(t *testing.T) {
}

func TestConfigs(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{}
hostList = append(hostList, hostAdress)
hostList = append(hostList, hostAddress)

var configList = []PoolConfig{
// default
Expand Down Expand Up @@ -202,7 +210,7 @@ func TestConfigs(t *testing.T) {
username, password, err.Error())
}
defer session.Release()
// Excute a query
// Execute a query
resp, err := tryToExecute(session, "SHOW HOSTS;")
if err != nil {
t.Fatalf(err.Error())
Expand All @@ -219,22 +227,24 @@ func TestConfigs(t *testing.T) {
}
checkResultSet(t, "create space", resp)

dropSpace(t, session, "client_test")
err = dropSpace("client_test")
if err != nil {
t.Fatalf(err.Error())
return
}
}
}

func TestAuthentication(t *testing.T) {
const (
address = "127.0.0.1"
port = 3699
username = "dummy"
password = "nebula"
)

hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}

conn := newConnection(hostAdress)
err := conn.open(hostAdress, testPoolConfig.TimeOut, nil)
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand All @@ -245,12 +255,14 @@ func TestAuthentication(t *testing.T) {
}

func TestInvalidHostTimeout(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{hostAdress}
hostAddress := HostAddress{Host: address, Port: port}
// invalid host
invalidHostAddress := HostAddress{Host: "192.168.100.125", Port: 3699}
hostList := []HostAddress{hostAddress}

invalidHostList := []HostAddress{
{Host: "192.168.100.125", Port: 3699}, // Invalid host
{Host: "127.0.0.1", Port: 3699},
invalidHostAddress, // Invalid host
hostAddress,
}

// Initialize connection pool
Expand All @@ -269,9 +281,9 @@ func TestInvalidHostTimeout(t *testing.T) {
}

func TestServiceDataIO(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{}
hostList = append(hostList, hostAdress)
hostList = append(hostList, hostAddress)

testPoolConfig = PoolConfig{
TimeOut: 0 * time.Millisecond,
Expand Down Expand Up @@ -554,13 +566,17 @@ func TestServiceDataIO(t *testing.T) {
}
assert.Equal(t, int8(sessionCreatedTime.Hour()), localTime.GetHour())
}
dropSpace(t, session, "client_test")
err = dropSpace("client_test")
if err != nil {
t.Fatalf(err.Error())
}

}

func TestPool_SingleHost(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{}
hostList = append(hostList, hostAdress)
hostList = append(hostList, hostAddress)

testPoolConfig = PoolConfig{
TimeOut: 0 * time.Millisecond,
Expand All @@ -584,7 +600,7 @@ func TestPool_SingleHost(t *testing.T) {
username, password, err.Error())
}
defer session.Release()
// Excute a query
// Execute a query
resp, err := tryToExecute(session, "SHOW HOSTS;")
if err != nil {
t.Fatalf(err.Error())
Expand All @@ -599,12 +615,15 @@ func TestPool_SingleHost(t *testing.T) {
}
checkResultSet(t, "create space", resp)

dropSpace(t, session, "client_test")
err = dropSpace("client_test")
if err != nil {
t.Fatalf(err.Error())
}
}

func TestPool_MultiHosts(t *testing.T) {
hostList := poolAddress
// Minimun pool size < hosts number
// Minimum pool size < hosts number
multiHostsConfig := PoolConfig{
TimeOut: 0 * time.Millisecond,
IdleTime: 0 * time.Millisecond,
Expand Down Expand Up @@ -678,7 +697,8 @@ func TestMultiThreads(t *testing.T) {
// Initialize connection pool
pool, err := NewConnectionPool(hostList, testPoolConfig, nebulaLog)
if err != nil {
log.Fatal(fmt.Sprintf("fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
log.Fatal(fmt.Sprintf("fail to initialize the connection pool, host: %s, port: %d, %s",
address, port, err.Error()))
}
defer pool.Close()

Expand Down Expand Up @@ -713,16 +733,13 @@ func TestMultiThreads(t *testing.T) {
assert.Equal(t, 666, pool.getActiveConnCount(), "Total number of active connections should be 666")
assert.Equal(t, 666, len(sessionList), "Total number of sessions should be 666")

// for i := 0; i < len(hostList); i++ {
// assert.Equal(t, 222, pool.GetServerWorkload(i))
// }
for i := 0; i < testPoolConfig.MaxConnPoolSize; i++ {
sessionList[i].Release()
}
assert.Equal(t, 666, pool.getIdleConnCount(), "Total number of idle connections should be 666")
}

func TestLoadbalancer(t *testing.T) {
func TestLoadBalancer(t *testing.T) {
hostList := poolAddress
var loadPerHost = make(map[HostAddress]int)
testPoolConfig := PoolConfig{
Expand Down Expand Up @@ -808,9 +825,9 @@ func TestIdleTimeoutCleaner(t *testing.T) {
}

func TestTimeout(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{}
hostList = append(hostList, hostAdress)
hostList = append(hostList, hostAddress)

testPoolConfig = PoolConfig{
TimeOut: 1000 * time.Millisecond,
Expand Down Expand Up @@ -888,7 +905,10 @@ func TestTimeout(t *testing.T) {
assert.Contains(t, resultSet.AsStringTable(), []string{"999"})

// Drop space
dropSpace(t, session, "client_test")
err = dropSpace("client_test")
if err != nil {
t.Fatalf(err.Error())
}
}

func TestExecuteJson(t *testing.T) {
Expand Down Expand Up @@ -1222,14 +1242,15 @@ func TestIpLookup(t *testing.T) {

// Method used to check execution response
func checkResultSet(t *testing.T, prefix string, err *ResultSet) {
t.Helper()
if !err.IsSucceed() {
t.Errorf("%s, ErrorCode: %v, ErrorMsg: %s", prefix, err.GetErrorCode(), err.GetErrorMsg())
}
}

func checkConResp(t *testing.T, prefix string, err *graph.ExecutionResponse) {
func checkConResp(prefix string, err *graph.ExecutionResponse) {
if IsError(err) {
t.Errorf("%s, ErrorCode: %v, ErrorMsg: %s", prefix, err.ErrorCode, err.ErrorMsg)
log.Fatalf("%s, ErrorCode: %v, ErrorMsg: %s", prefix, err.ErrorCode, err.ErrorMsg)
}
}

Expand Down Expand Up @@ -1361,12 +1382,63 @@ func loadTestData(t *testing.T, session *Session) {
checkResultSet(t, query, resultSet)
}

func dropSpace(t *testing.T, session *Session, spaceName string) {
// prepareSpace creates a space for test
func prepareSpace(spaceName string) error {
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}

authResp, authErr := conn.authenticate(username, password)
if authErr != nil {
return fmt.Errorf("fail to authenticate, username: %s, password: %s, %s", username, password, authErr.Error())
}

sessionID := authResp.GetSessionID()

defer logoutAndClose(conn, sessionID)

query := fmt.Sprintf("CREATE SPACE IF NOT EXISTS"+
" %s(partition_num=32, replica_factor=1, vid_type = FIXED_STRING(30));", spaceName)
resp, err := conn.execute(sessionID, query)
if err != nil {
log.Fatalf(err.Error())
}
checkConResp(query, resp)
time.Sleep(5 * time.Second)

return nil
}

// dropSpace drops a space. The space name should be the same as the one created in prepareSpace
func dropSpace(spaceName string) error {
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}

authResp, authErr := conn.authenticate(username, password)
if authErr != nil {
return fmt.Errorf("fail to authenticate, username: %s, password: %s, %s", username, password, authErr.Error())
}

sessionID := authResp.GetSessionID()

defer logoutAndClose(conn, sessionID)

query := fmt.Sprintf("DROP SPACE IF EXISTS %s;", spaceName)
resultSet, err := tryToExecute(session, query)
resp, err := conn.execute(sessionID, query)
if err != nil {
t.Fatalf(err.Error())
return
return err
}
checkResultSet(t, query, resultSet)
checkConResp(query, resp)
return nil
}
Loading

0 comments on commit 82ab6c3

Please sign in to comment.