Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix MySQL panic test #996

Merged
merged 8 commits into from
Feb 19, 2021
67 changes: 65 additions & 2 deletions internal/db/rdb/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func (m *mySQLClient) Open(ctx context.Context) (err error) {
conn.SetMaxIdleConns(m.maxIdleConns)
conn.SetMaxOpenConns(m.maxOpenConns)

m.session = dbr.NewSession(conn, nil)
if m.session == nil {
m.session = dbr.NewSession(conn, m.eventReceiver)
}
m.connected.Store(true)

return m.Ping(ctx)
Expand All @@ -145,6 +147,11 @@ func (m *mySQLClient) Open(ctx context.Context) (err error) {
// Ping check the connection of MySQL database.
// If the connection is closed, it returns error.
func (m *mySQLClient) Ping(ctx context.Context) (err error) {
if m.session == nil {
err = errors.ErrMySQLSessionNil
m.outputLog(err)
return err
}
pctx, cancel := context.WithTimeout(ctx, m.initialPingTimeLimit)
defer cancel()
tick := time.NewTicker(m.initialPingDuration)
Expand Down Expand Up @@ -172,8 +179,15 @@ func (m *mySQLClient) Ping(ctx context.Context) (err error) {
}

// Close closes the connection of MySQL database.
// If the connection is already closed or closing conncection is failed, it returns error.
// If the connection is already closed or closing connection is failed, it returns error.
func (m *mySQLClient) Close(ctx context.Context) (err error) {
if m.session == nil {
err = errors.ErrMySQLSessionNil
m.outputLog(err)
m.connected.Store(false)
return err
}

if m.connected.Load().(bool) {
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
err = m.session.Close()
if err == nil {
Expand All @@ -189,6 +203,12 @@ func (m *mySQLClient) GetVector(ctx context.Context, uuid string) (Vector, error
return nil, errors.ErrMySQLConnectionClosed
}

if m.session == nil {
err := errors.ErrMySQLSessionNil
m.outputLog(err)
return nil, err
}

var data *data
_, err := m.session.Select(asterisk).From(vectorTableName).Where(m.dbr.Eq(uuidColumnName, uuid)).Limit(1).LoadContext(ctx, &data)
if err != nil {
Expand Down Expand Up @@ -216,6 +236,12 @@ func (m *mySQLClient) GetIPs(ctx context.Context, uuid string) ([]string, error)
return nil, errors.ErrMySQLConnectionClosed
}

if m.session == nil {
err := errors.ErrMySQLSessionNil
m.outputLog(err)
return nil, err
}

var id int64
_, err := m.session.Select(idColumnName).From(vectorTableName).Where(m.dbr.Eq(uuidColumnName, uuid)).Limit(1).LoadContext(ctx, &id)
if err != nil {
Expand Down Expand Up @@ -253,6 +279,12 @@ func (m *mySQLClient) SetVector(ctx context.Context, vec Vector) error {
return errors.ErrMySQLConnectionClosed
}

if m.session == nil {
err := errors.ErrMySQLSessionNil
m.outputLog(err)
return err
}

tx, err := m.session.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -304,6 +336,12 @@ func (m *mySQLClient) SetVectors(ctx context.Context, vecs ...Vector) error {
return errors.ErrMySQLConnectionClosed
}

if m.session == nil {
err := errors.ErrMySQLSessionNil
m.outputLog(err)
return err
}

tx, err := m.session.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -358,6 +396,12 @@ func (m *mySQLClient) deleteVector(ctx context.Context, val string) error {
return errors.ErrMySQLConnectionClosed
}

if m.session == nil {
err := errors.ErrMySQLSessionNil
m.outputLog(err)
return err
}

tx, err := m.session.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -411,6 +455,12 @@ func (m *mySQLClient) SetIPs(ctx context.Context, uuid string, ips ...string) er
return errors.ErrMySQLConnectionClosed
}

if m.session == nil {
err := errors.ErrMySQLSessionNil
m.outputLog(err)
return err
}

tx, err := m.session.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -444,6 +494,12 @@ func (m *mySQLClient) RemoveIPs(ctx context.Context, ips ...string) error {
return errors.ErrMySQLConnectionClosed
}

if m.session == nil {
err := errors.ErrMySQLSessionNil
m.outputLog(err)
return err
}

tx, err := m.session.Begin()
if err != nil {
return err
Expand All @@ -457,3 +513,10 @@ func (m *mySQLClient) RemoveIPs(ctx context.Context, ips ...string) error {

return tx.Commit()
}

func (m *mySQLClient) outputLog(err error) {
log.Errorf(
"err: %v, { host: %s, port: %d, user: %s, name: %s, db: %s, charset: %s, socketPath: %s, network: %s} ",
err, m.host, m.port, m.user, m.name, m.db, m.charset, m.socketPath, m.network,
)
}
Loading