Skip to content

Commit

Permalink
Add session pool
Browse files Browse the repository at this point in the history
Add tests
  • Loading branch information
Aiee committed Sep 15, 2022
1 parent 47169b9 commit 2c7a7a3
Show file tree
Hide file tree
Showing 8 changed files with 558 additions and 102 deletions.
52 changes: 26 additions & 26 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ func logoutAndClose(conn *connection, sessionID int64) {
}

func TestConnection(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAdress)
err := conn.open(hostAdress, testPoolConfig.TimeOut, nil)
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}

authresp, authErr := conn.authenticate(username, password)
authResp, authErr := conn.authenticate(username, password)
if authErr != nil {
t.Fatalf("fail to authenticate, username: %s, password: %s, %s", username, password, authErr.Error())
}

sessionID := authresp.GetSessionID()
sessionID := authResp.GetSessionID()

defer logoutAndClose(conn, sessionID)

Expand Down Expand Up @@ -107,19 +107,19 @@ func TestConnection(t *testing.T) {
}

func TestConnectionIPv6(t *testing.T) {
hostAdress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAdress)
err := conn.open(hostAdress, testPoolConfig.TimeOut, nil)
hostAddress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}

authresp, authErr := conn.authenticate(username, password)
authResp, authErr := conn.authenticate(username, password)
if authErr != nil {
t.Fatalf("fail to authenticate, username: %s, password: %s, %s", username, password, authErr.Error())
}

sessionID := authresp.GetSessionID()
sessionID := authResp.GetSessionID()

defer logoutAndClose(conn, sessionID)

Expand Down Expand Up @@ -151,9 +151,9 @@ func TestConnectionIPv6(t *testing.T) {
}

func TestConfigs(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{}
hostList = append(hostList, hostAdress)
hostList = append(hostList, hostAddress)

var configList = []PoolConfig{
// default
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestConfigs(t *testing.T) {
username, password, err.Error())
}
defer session.Release()
// Excute a query
// Execute a query
resp, err := tryToExecute(session, "SHOW HOSTS;")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -231,10 +231,10 @@ func TestAuthentication(t *testing.T) {
password = "nebula"
)

hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}

conn := newConnection(hostAdress)
err := conn.open(hostAdress, testPoolConfig.TimeOut, nil)
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand All @@ -245,8 +245,8 @@ func TestAuthentication(t *testing.T) {
}

func TestInvalidHostTimeout(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{hostAdress}
hostAddress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{hostAddress}

invalidHostList := []HostAddress{
{Host: "192.168.100.125", Port: 3699}, // Invalid host
Expand All @@ -269,9 +269,9 @@ func TestInvalidHostTimeout(t *testing.T) {
}

func TestServiceDataIO(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{}
hostList = append(hostList, hostAdress)
hostList = append(hostList, hostAddress)

testPoolConfig = PoolConfig{
TimeOut: 0 * time.Millisecond,
Expand Down Expand Up @@ -558,9 +558,9 @@ func TestServiceDataIO(t *testing.T) {
}

func TestPool_SingleHost(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{}
hostList = append(hostList, hostAdress)
hostList = append(hostList, hostAddress)

testPoolConfig = PoolConfig{
TimeOut: 0 * time.Millisecond,
Expand Down Expand Up @@ -604,7 +604,7 @@ func TestPool_SingleHost(t *testing.T) {

func TestPool_MultiHosts(t *testing.T) {
hostList := poolAddress
// Minimun pool size < hosts number
// Minimum pool size < hosts number
multiHostsConfig := PoolConfig{
TimeOut: 0 * time.Millisecond,
IdleTime: 0 * time.Millisecond,
Expand Down Expand Up @@ -722,7 +722,7 @@ func TestMultiThreads(t *testing.T) {
assert.Equal(t, 666, pool.getIdleConnCount(), "Total number of idle connections should be 666")
}

func TestLoadbalancer(t *testing.T) {
func TestLoadBalancer(t *testing.T) {
hostList := poolAddress
var loadPerHost = make(map[HostAddress]int)
testPoolConfig := PoolConfig{
Expand Down Expand Up @@ -808,9 +808,9 @@ func TestIdleTimeoutCleaner(t *testing.T) {
}

func TestTimeout(t *testing.T) {
hostAdress := HostAddress{Host: address, Port: port}
hostAddress := HostAddress{Host: address, Port: port}
hostList := []HostAddress{}
hostList = append(hostList, hostAdress)
hostList = append(hostList, hostAddress)

testPoolConfig = PoolConfig{
TimeOut: 1000 * time.Millisecond,
Expand Down
88 changes: 88 additions & 0 deletions configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,91 @@ func openAndReadFile(path string) ([]byte, error) {
}
return b, nil
}

// SessionPoolConf is the configs of a session pool
// Note that the space name is bound to the session pool for its lifetime
type SessionPoolConf struct {
Username string // username for authentication
Password string // password for authentication
ServiceAddrs []HostAddress // service addresses for session pool
hostIndex int // index of the host in ServiceAddrs that the next new session will connect to
SpaceName string // The space name that all sessions in the pool are bound to
sslConfig *tls.Config // Optional SSL config for the connection

// Basic pool configs
// Socket timeout and Socket connection timeout, unit: seconds
TimeOut time.Duration
// The idleTime of the connection, unit: seconds
// If connection's idle time is longer than idleTime, it will be delete
// 0 value means the connection will not expire
IdleTime time.Duration
// The max sessions in pool for all addresses
MaxSize int
// The min sessions in pool for all addresses
MinSize int
}

// TODO(Aiee) add more constructors
// NewSessionPoolConf returns a new SessionPoolConf with given parameters
func NewSessionPoolConf(username, password string, serviceAddrs []HostAddress, spaceName string) (*SessionPoolConf, error) {
newPoolConf := GetDefaultSessionConf()
newPoolConf.Username = username
newPoolConf.Password = password
newPoolConf.ServiceAddrs = serviceAddrs
newPoolConf.SpaceName = spaceName

if err := newPoolConf.checkMandatoryFields(); err != nil {
return nil, err
}

return &newPoolConf, nil
}

// GetDefaultSessionConf returns the default config
func GetDefaultSessionConf() SessionPoolConf {
return SessionPoolConf{
TimeOut: 0 * time.Millisecond,
IdleTime: 0 * time.Millisecond,
MaxSize: 10,
MinSize: 0,
}
}

func (conf *SessionPoolConf) checkMandatoryFields() error {
// Check mandatory fields
if conf.Username == "" {
return fmt.Errorf("invalid session pool config: Username is empty")
}
if conf.Password == "" {
return fmt.Errorf("invalid session pool config: Password is empty")
}
if len(conf.ServiceAddrs) == 0 {
return fmt.Errorf("invalid session pool config: Service address is empty")
}
if conf.SpaceName == "" {
return fmt.Errorf("invalid session pool config: Space name is empty")
}
return nil
}

// checkBasicFields checks the basic fields of the config and
// sets a default value if the given field value is invalid
func (conf *SessionPoolConf) checkBasicFields(log Logger) {
// Check pool related fields, use default value if the given value is invalid
if conf.TimeOut < 0 {
conf.TimeOut = 0 * time.Millisecond
log.Warn("Illegal Timeout value, the default value of 0 second has been applied")
}
if conf.IdleTime < 0 {
conf.IdleTime = 0 * time.Millisecond
log.Warn("Invalid IdleTime value, the default value of 0 second has been applied")
}
if conf.MaxSize < 1 {
conf.MaxSize = 10
log.Warn("Invalid MaxSize value, the default value of 10 has been applied")
}
if conf.MinSize < 0 {
conf.MinSize = 0
log.Warn("Invalid MinSize value, the default value of 0 has been applied")
}
}
39 changes: 23 additions & 16 deletions connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewSslConnectionPool(addresses []HostAddress, conf PoolConfig, sslConfig *t

// initPool initializes the connection pool
func (pool *ConnectionPool) initPool() error {
if err := pool.checkAddresses(); err != nil {
if err := checkAddresses(pool.conf.TimeOut, pool.addresses, pool.sslConfig); err != nil {
return fmt.Errorf("failed to open connection, error: %s ", err.Error())
}

Expand Down Expand Up @@ -167,7 +167,7 @@ func (pool *ConnectionPool) getIdleConn() (*connection, error) {

// Create a new connection if there is no idle connection and total connection < pool max size
newConn, err := pool.createConnection()
// TODO: If no idle avaliable, wait for timeout and reconnect
// TODO: If no idle available, wait for timeout and reconnect
return newConn, err
}

Expand All @@ -181,15 +181,9 @@ func (pool *ConnectionPool) release(conn *connection) {
pool.idleConnectionQueue.PushBack(conn)
}

// Ping checks avaliability of host
// Ping checks availability of host
func (pool *ConnectionPool) Ping(host HostAddress, timeout time.Duration) error {
newConn := newConnection(host)
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, pool.sslConfig); err != nil {
return err
}
newConn.close()
return nil
return pingAddress(host, timeout, pool.sslConfig)
}

// Close closes all connection
Expand Down Expand Up @@ -259,7 +253,7 @@ func removeFromList(l *list.List, conn *connection) {
// Compare total connection number with pool max size and return a connection if capable
func (pool *ConnectionPool) createConnection() (*connection, error) {
totalConn := pool.idleConnectionQueue.Len() + pool.activeConnectionQueue.Len()
// If no idle avaliable and the number of total connection reaches the max pool size, return error/wait for timeout
// If no idle available and the number of total connection reaches the max pool size, return error/wait for timeout
if totalConn >= pool.conf.MaxConnPoolSize {
return nil, fmt.Errorf("failed to get connection: No valid connection" +
" in the idle queue and connection number has reached the pool capacity")
Expand Down Expand Up @@ -342,15 +336,28 @@ func (pool *ConnectionPool) timeoutConnectionList() (closing []*connection) {
return
}

func (pool *ConnectionPool) checkAddresses() error {
// checkAddresses checks addresses availability
// It opens a temporary connection to each address and closes it immediately.
// If no error is returned, the addresses are available.
func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config) error {
var timeout = 3 * time.Second
if pool.conf.TimeOut != 0 && pool.conf.TimeOut < timeout {
timeout = pool.conf.TimeOut
if confTimeout != 0 && confTimeout < timeout {
timeout = confTimeout
}
for _, address := range pool.addresses {
if err := pool.Ping(address, timeout); err != nil {
for _, address := range addresses {
if err := pingAddress(address, timeout, sslConfig); err != nil {
return err
}
}
return nil
}

func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config) error {
newConn := newConnection(address)
defer newConn.close()
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, sslConfig); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 2c7a7a3

Please sign in to comment.