Skip to content

Commit

Permalink
[FABC-804] Create DB Metric Options
Browse files Browse the repository at this point in the history
Created DB Metric Options

Change-Id: I84248b82c91d472385e9995d34ad8895be1d87cd
Signed-off-by: Saad Karim <skarim@us.ibm.com>
  • Loading branch information
Saad Karim committed Feb 26, 2019
1 parent 8493ed8 commit 7b71097
Show file tree
Hide file tree
Showing 41 changed files with 1,648 additions and 886 deletions.
4 changes: 2 additions & 2 deletions lib/ca_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,9 @@ func TestServerMigration(t *testing.T) {
assert.NoError(t, err, "failed to create database")

util.FatalError(t, err, "Failed to create db")
_, err = db.Exec("INSERT INTO users (id, token, type, affiliation, attributes, state, max_enrollments, level) VALUES ('registrar', '', 'user', 'org2', '[{\"name\":\"hf.Registrar.Roles\",\"value\":\"user,peer,client\"}]', '0', '-1', '0')")
_, err = db.Exec("", "INSERT INTO users (id, token, type, affiliation, attributes, state, max_enrollments, level) VALUES ('registrar', '', 'user', 'org2', '[{\"name\":\"hf.Registrar.Roles\",\"value\":\"user,peer,client\"}]', '0', '-1', '0')")
assert.NoError(t, err, "Failed to insert user 'registrar' into database")
_, err = db.Exec("INSERT INTO users (id, token, type, affiliation, attributes, state, max_enrollments, level) VALUES ('notregistrar', '', 'user', 'org2', '[{\"name\":\"hf.Revoker\",\"value\":\"true\"}]', '0', '-1', '0')")
_, err = db.Exec("", "INSERT INTO users (id, token, type, affiliation, attributes, state, max_enrollments, level) VALUES ('notregistrar', '', 'user', 'org2', '[{\"name\":\"hf.Revoker\",\"value\":\"true\"}]', '0', '-1', '0')")
assert.NoError(t, err, "Failed to insert user 'notregistrar' into database")

server := TestGetServer2(false, rootPort, dir, "", -1, t)
Expand Down
14 changes: 7 additions & 7 deletions lib/certdbaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (d *CertDBAccessor) InsertCertificate(cr certdb.CertificateRecord) error {
},
}

res, err := d.db.NamedExec(insertSQL, record)
res, err := d.db.NamedExec("InsertCertificate", insertSQL, record)
if err != nil {
return errors.Wrap(err, "Failed to insert record into database")
}
Expand All @@ -140,7 +140,7 @@ func (d *CertDBAccessor) GetCertificatesByID(id string) (crs []db.CertRecord, er
return nil, err
}

err = d.db.Select(&crs, fmt.Sprintf(d.db.Rebind(selectSQLbyID), sqlstruct.Columns(db.CertRecord{})), id)
err = d.db.Select("GetCertificatesByID", &crs, fmt.Sprintf(d.db.Rebind(selectSQLbyID), sqlstruct.Columns(db.CertRecord{})), id)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func (d *CertDBAccessor) GetCertificateWithID(serial, aki string) (crs db.CertRe
return crs, err
}

err = d.db.Get(&crs, fmt.Sprintf(d.db.Rebind(selectSQL), sqlstruct.Columns(db.CertRecord{})), serial, aki)
err = d.db.Get("GetCertificatesByID", &crs, fmt.Sprintf(d.db.Rebind(selectSQL), sqlstruct.Columns(db.CertRecord{})), serial, aki)
if err != nil {
return crs, dbutil.GetError(err, "Certificate")
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func (d *CertDBAccessor) GetRevokedCertificates(expiredAfter, expiredBefore, rev
}
whereClause := strings.Join(whereConds, " AND ")
revokedSQL = strings.Replace(revokedSQL, "WHERE_CLAUSE", whereClause, 1)
err = d.db.Select(&crs, fmt.Sprintf(d.db.Rebind(revokedSQL),
err = d.db.Select("GetRevokedCertificates", &crs, fmt.Sprintf(d.db.Rebind(revokedSQL),
sqlstruct.Columns(certdb.CertificateRecord{})), args...)
if err != nil {
return crs, dbutil.GetError(err, "Certificate")
Expand Down Expand Up @@ -246,12 +246,12 @@ func (d *CertDBAccessor) RevokeCertificatesByID(id string, reasonCode int) (crs
record.ID = id
record.Reason = reasonCode

err = d.db.Select(&crs, d.db.Rebind("SELECT * FROM certificates WHERE (id = ? AND status != 'revoked')"), id)
err = d.db.Select("RevokeCertificatesByID", &crs, d.db.Rebind("SELECT * FROM certificates WHERE (id = ? AND status != 'revoked')"), id)
if err != nil {
return nil, err
}

_, err = d.db.NamedExec(updateRevokeSQL, record)
_, err = d.db.NamedExec("RevokeCertificatesByID", updateRevokeSQL, record)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func (d *CertDBAccessor) GetCertificates(req cr.CertificateRequest, callersAffil
getCertificateSQL = getCertificateSQL + ";"

log.Debugf("Executing get certificates query: %s, with args: %s", getCertificateSQL, args)
rows, err := d.db.Queryx(d.db.Rebind(getCertificateSQL), args...)
rows, err := d.db.Queryx("GetCertificates", d.db.Rebind(getCertificateSQL), args...)
if err != nil {
return nil, dbutil.GetError(err, "Certificate")
}
Expand Down
2 changes: 1 addition & 1 deletion lib/certdbaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func testInsertCertificate(req *certdb.CertificateRecord, id string, ca *CA) err
}

db := ca.GetDB()
res, err := db.NamedExec(insertSQL, record)
res, err := db.NamedExec("", insertSQL, record)
if err != nil {
return errors.Wrap(err, "Failed to insert record into database")
}
Expand Down
24 changes: 12 additions & 12 deletions lib/dbaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (d *Accessor) InsertUser(user *cadbuser.Info) error {
}

// Store the user record in the DB
res, err := d.db.NamedExec(insertUser, &cadbuser.Record{
res, err := d.db.NamedExec("InsertUser", insertUser, &cadbuser.Record{
Name: user.Name,
Pass: pwd,
Type: user.Type,
Expand Down Expand Up @@ -234,7 +234,7 @@ func (d *Accessor) UpdateUser(user *cadbuser.Info, updatePass bool) error {
}

// Store the updated user entry
res, err := d.db.NamedExec(updateUser, cadbuser.Record{
res, err := d.db.NamedExec("UpdateUser", updateUser, cadbuser.Record{
Name: user.Name,
Pass: pwd,
Type: user.Type,
Expand Down Expand Up @@ -274,7 +274,7 @@ func (d *Accessor) GetUser(id string, attrs []string) (user.User, error) {
}

var userRec cadbuser.Record
err = d.db.Get(&userRec, d.db.Rebind(getUser), id)
err = d.db.Get("GetUser", &userRec, d.db.Rebind(getUser), id)
if err != nil {
return nil, cadbutil.GetError(err, "User")
}
Expand Down Expand Up @@ -302,7 +302,7 @@ func (d *Accessor) InsertAffiliation(name string, prekey string, level int) erro
return nil
}
}
_, err = d.db.Exec(d.db.Rebind(insertAffiliation), name, prekey, level)
_, err = d.db.Exec("InsertAffiliation", d.db.Rebind(insertAffiliation), name, prekey, level)
if err != nil {
if (!strings.Contains(err.Error(), "UNIQUE constraint failed") && dbType == "sqlite3") || (!strings.Contains(err.Error(), "duplicate key value") && dbType == "postgres") {
return err
Expand Down Expand Up @@ -444,7 +444,7 @@ func (d *Accessor) GetAffiliation(name string) (spi.Affiliation, error) {

var affiliationRecord db.AffiliationRecord

err = d.db.Get(&affiliationRecord, d.db.Rebind(getAffiliationQuery), name)
err = d.db.Get("GetAffiliation", &affiliationRecord, d.db.Rebind(getAffiliationQuery), name)
if err != nil {
return nil, cadbutil.GetError(err, "affiliation")
}
Expand Down Expand Up @@ -511,7 +511,7 @@ func (d *Accessor) GetUserLessThanLevel(level int) ([]user.User, error) {
return []user.User{}, nil
}

rows, err := d.db.Queryx(d.db.Rebind("SELECT * FROM users WHERE (level < ?) OR (level IS NULL)"), level)
rows, err := d.db.Queryx("GetUserLessThanLevel", d.db.Rebind("SELECT * FROM users WHERE (level < ?) OR (level IS NULL)"), level)
if err != nil {
return nil, errors.Wrap(err, "Failed to get identities that need to be updated")
}
Expand All @@ -537,14 +537,14 @@ func (d *Accessor) GetAllAffiliations(name string) (*sqlx.Rows, error) {
}

if name == "" { // Requesting all affiliations
rows, err := d.db.Queryx(d.db.Rebind("SELECT * FROM affiliations"))
rows, err := d.db.Queryx("GetAllAffiliations", d.db.Rebind("SELECT * FROM affiliations"))
if err != nil {
return nil, err
}
return rows, nil
}

rows, err := d.db.Queryx(d.db.Rebind(getAllAffiliationsQuery), name, name+".%")
rows, err := d.db.Queryx("GetAllAffiliations", d.db.Rebind(getAllAffiliationsQuery), name, name+".%")
if err != nil {
return nil, err
}
Expand All @@ -569,7 +569,7 @@ func (d *Accessor) GetFilteredUsers(affiliation, types string) (*sqlx.Rows, erro
if affiliation == "" {
if util.ListContains(types, "*") { // If type is '*', allowed to get back of all types
query := "SELECT * FROM users"
rows, err := d.db.Queryx(d.db.Rebind(query))
rows, err := d.db.Queryx("GetFilteredUsers", d.db.Rebind(query))
if err != nil {
return nil, errors.Wrapf(err, "Failed to execute query '%s' for affiliation '%s' and types '%s'", query, affiliation, types)
}
Expand All @@ -581,7 +581,7 @@ func (d *Accessor) GetFilteredUsers(affiliation, types string) (*sqlx.Rows, erro
if err != nil {
return nil, errors.Wrapf(err, "Failed to construct query '%s' for affiliation '%s' and types '%s'", query, affiliation, types)
}
rows, err := d.db.Queryx(d.db.Rebind(query), args...)
rows, err := d.db.Queryx("GetFilteredUsers", d.db.Rebind(query), args...)
if err != nil {
return nil, errors.Wrapf(err, "Failed to execute query '%s' for affiliation '%s' and types '%s'", query, affiliation, types)
}
Expand All @@ -591,7 +591,7 @@ func (d *Accessor) GetFilteredUsers(affiliation, types string) (*sqlx.Rows, erro
subAffiliation := affiliation + ".%"
if util.ListContains(types, "*") { // If type is '*', allowed to get back of all types for requested affiliation
query := "SELECT * FROM users WHERE ((affiliation = ?) OR (affiliation LIKE ?))"
rows, err := d.db.Queryx(d.db.Rebind(query))
rows, err := d.db.Queryx("GetFilteredUsers", d.db.Rebind(query))
if err != nil {
return nil, errors.Wrapf(err, "Failed to execute query '%s' for affiliation '%s' and types '%s'", query, affiliation, types)
}
Expand All @@ -603,7 +603,7 @@ func (d *Accessor) GetFilteredUsers(affiliation, types string) (*sqlx.Rows, erro
if err != nil {
return nil, errors.Wrapf(err, "Failed to construct query '%s' for affiliation '%s' and types '%s'", query, affiliation, types)
}
rows, err := d.db.Queryx(d.db.Rebind(inQuery), args...)
rows, err := d.db.Queryx("GetFilteredUsers", d.db.Rebind(inQuery), args...)
if err != nil {
return nil, errors.Wrapf(err, "Failed to execute query '%s' for affiliation '%s' and types '%s'", query, affiliation, types)
}
Expand Down
6 changes: 3 additions & 3 deletions lib/dbaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Truncate(db *db.DB) {
if len(strings.TrimSpace(expr)) == 0 {
continue
}
if _, err := db.Exec(expr); err != nil {
if _, err := db.Exec("", expr); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func testWithExistingDbAndTablesAndUser(t *testing.T) {
}
db, acc := createSQLiteDB(rootDB, t)

_, err = db.Exec("CREATE TABLE IF NOT EXISTS users (id VARCHAR(64), token bytea, type VARCHAR(64), affiliation VARCHAR(64), attributes VARCHAR(256), state INTEGER, max_enrollments INTEGER, level INTEGER DEFAULT 0)")
_, err = db.Exec("", "CREATE TABLE IF NOT EXISTS users (id VARCHAR(64), token bytea, type VARCHAR(64), affiliation VARCHAR(64), attributes VARCHAR(256), state INTEGER, max_enrollments INTEGER, level INTEGER DEFAULT 0)")
assert.NoError(t, err, "Error creating users table")

srv := TestGetServer2(false, rootPort, rootDir, "", -1, t)
Expand Down Expand Up @@ -178,7 +178,7 @@ func testWithExistingDbAndTable(t *testing.T) {
srv := TestGetServer2(false, rootPort, rootDir, "", -1, t)
srv.CA.Config.DB.Datasource = "fabric_ca.db"

_, err = db.Exec("CREATE TABLE IF NOT EXISTS users (id VARCHAR(64), token bytea, type VARCHAR(64), affiliation VARCHAR(64), attributes VARCHAR(256), state INTEGER, max_enrollments INTEGER, level INTEGER DEFAULT 0)")
_, err = db.Exec("", "CREATE TABLE IF NOT EXISTS users (id VARCHAR(64), token bytea, type VARCHAR(64), affiliation VARCHAR(64), attributes VARCHAR(256), state INTEGER, max_enrollments INTEGER, level INTEGER DEFAULT 0)")
assert.NoError(t, err, "Error creating users table")

err = srv.Start()
Expand Down
70 changes: 56 additions & 14 deletions lib/server/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ package db
import (
"context"
"database/sql"
"time"

"github.com/cloudflare/cfssl/certdb"
"github.com/hyperledger/fabric-ca/lib/server/db/util"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/jmoiron/sqlx"
)

Expand All @@ -20,13 +22,23 @@ import (

// FabricCADB is the interface that wrapper off SqlxDB
type FabricCADB interface {
SqlxDB

IsInitialized() bool
SetDBInitialized(bool)
// BeginTx has same behavior as MustBegin except it returns FabricCATx
// instead of *sqlx.Tx
BeginTx() FabricCATx
DriverName() string

Select(funcName string, dest interface{}, query string, args ...interface{}) error
Exec(funcName, query string, args ...interface{}) (sql.Result, error)
NamedExec(funcName, query string, arg interface{}) (sql.Result, error)
Get(funcName string, dest interface{}, query string, args ...interface{}) error
Queryx(funcName, query string, args ...interface{}) (*sqlx.Rows, error)
Rebind(query string) string
MustBegin() *sqlx.Tx
Close() error
SetMaxOpenConns(n int)
PingContext(ctx context.Context) error
}

//go:generate counterfeiter -o mocks/sqlxDB.go -fake-name SqlxDB . SqlxDB
Expand Down Expand Up @@ -67,12 +79,21 @@ type DB struct {
DB SqlxDB
// Indicates if database was successfully initialized
IsDBInitialized bool

CAName string

Metrics Metrics
}

// New creates an instance of DB
func New(db SqlxDB) *DB {
metricsProvider := &disabled.Provider{}
return &DB{
DB: db,
Metrics: Metrics{
APICounter: metricsProvider.NewCounter(APICounterOpts),
APIDuration: metricsProvider.NewHistogram(APIDurationOpts),
},
}
}

Expand All @@ -89,33 +110,49 @@ func (db *DB) SetDBInitialized(b bool) {
// BeginTx implements BeginTx method of FabricCADB interface
func (db *DB) BeginTx() FabricCATx {
return &TX{
TX: db.DB.MustBegin(),
TX: db.DB.MustBegin(),
Record: db,
}
}

// Select performs select sql statement
func (db *DB) Select(dest interface{}, query string, args ...interface{}) error {
return db.DB.Select(dest, query, args...)
func (db *DB) Select(funcName string, dest interface{}, query string, args ...interface{}) error {
startTime := time.Now()
err := db.DB.Select(dest, query, args...)
db.recordMetric(startTime, funcName, "Select")
return err
}

// Exec executes query
func (db *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
return db.DB.Exec(query, args...)
func (db *DB) Exec(funcName, query string, args ...interface{}) (sql.Result, error) {
startTime := time.Now()
res, err := db.DB.Exec(query, args...)
db.recordMetric(startTime, funcName, "Exec")
return res, err
}

// NamedExec executes query
func (db *DB) NamedExec(query string, args interface{}) (sql.Result, error) {
return db.DB.NamedExec(query, args)
func (db *DB) NamedExec(funcName, query string, args interface{}) (sql.Result, error) {
startTime := time.Now()
res, err := db.DB.NamedExec(query, args)
db.recordMetric(startTime, funcName, "NamedExec")
return res, err
}

// Get executes query
func (db *DB) Get(dest interface{}, query string, args ...interface{}) error {
return db.DB.Get(dest, query, args...)
func (db *DB) Get(funcName string, dest interface{}, query string, args ...interface{}) error {
startTime := time.Now()
err := db.DB.Get(dest, query, args...)
db.recordMetric(startTime, funcName, "Get")
return err
}

// Queryx executes query
func (db *DB) Queryx(query string, args ...interface{}) (*sqlx.Rows, error) {
return db.DB.Queryx(query, args...)
func (db *DB) Queryx(funcName, query string, args ...interface{}) (*sqlx.Rows, error) {
startTime := time.Now()
rows, err := db.DB.Queryx(query, args...)
db.recordMetric(startTime, funcName, "Queryx")
return rows, err
}

// MustBegin starts a transaction
Expand Down Expand Up @@ -148,6 +185,11 @@ func (db *DB) PingContext(ctx context.Context) error {
return db.DB.PingContext(ctx)
}

func (db *DB) recordMetric(startTime time.Time, funcName, dbapiName string) {
db.Metrics.APICounter.With("ca_name", db.CAName, "func_name", funcName, "dbapi_name", dbapiName).Add(1)
db.Metrics.APIDuration.With("ca_name", db.CAName, "func_name", funcName, "dbapi_name", dbapiName).Observe(time.Since(startTime).Seconds())
}

// CurrentDBLevels returns current levels from the database
func CurrentDBLevels(db FabricCADB) (*util.Levels, error) {
var err error
Expand Down Expand Up @@ -188,7 +230,7 @@ func CurrentDBLevels(db FabricCADB) (*util.Levels, error) {
}

func getProperty(db FabricCADB, propName string, val *int) error {
err := db.Get(val, db.Rebind("Select value FROM properties WHERE (property = ?)"), propName)
err := db.Get("GetProperty", val, db.Rebind("Select value FROM properties WHERE (property = ?)"), propName)
if err != nil && err.Error() == "sql: no rows in result set" {
return nil
}
Expand Down
Loading

0 comments on commit 7b71097

Please sign in to comment.