From 20f9b792e11d30aa8fec6a4b9e250ec9ae1948a2 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 8 Jan 2021 14:21:03 -0700 Subject: [PATCH] [IMPROVED] Bulk insert of messages A new configuration `bulk_insert_limit` switches the server from the current insertion of messages within a SQL transaction, to an "INSERT INTO MESSAGES () VALUES (),(),..." which can speed up performance by several folds. The server still may perform regular insert within transaction if the limit is deemed too low. This new configuration parameter is not enabled by default. It needs to be explicitly set, either in configuration file or from command line `--sql_bulk_insert_limit `. Resolves #1132 Signed-off-by: Ivan Kozlovic --- nats-streaming-server.go | 1 + server/conf.go | 6 ++ server/conf_test.go | 3 + stores/sqlstore.go | 111 ++++++++++++++++++++++++++++++++++- stores/sqlstore_test.go | 49 +++++++++++++++- test/configs/test_parse.conf | 1 + 6 files changed, 167 insertions(+), 4 deletions(-) diff --git a/nats-streaming-server.go b/nats-streaming-server.go index 7920b79a..5812490f 100644 --- a/nats-streaming-server.go +++ b/nats-streaming-server.go @@ -91,6 +91,7 @@ Streaming Server SQL Store Options: --sql_source Datasource used when opening an SQL connection to the database --sql_no_caching Enable/Disable caching for improved performance --sql_max_open_conns Maximum number of opened connections to the database + --sql_bulk_insert_limit Maximum number of messages stored with a single SQL "INSERT" statement Streaming Server TLS Options: -secure Use a TLS connection to the NATS server without diff --git a/server/conf.go b/server/conf.go index 9925a069..e8ff6fe4 100644 --- a/server/conf.go +++ b/server/conf.go @@ -595,6 +595,11 @@ func parseSQLOptions(itf interface{}, opts *Options) error { return err } opts.SQLStoreOpts.MaxOpenConns = int(v.(int64)) + case "bulk_insert_limit": + if err := checkType(name, reflect.Int64, v); err != nil { + return err + } + opts.SQLStoreOpts.BulkInsertLimit = int(v.(int64)) } } return nil @@ -688,6 +693,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, defSQLOpts := stores.DefaultSQLStoreOptions() fs.BoolVar(&sopts.SQLStoreOpts.NoCaching, "sql_no_caching", defSQLOpts.NoCaching, "Enable/Disable caching") fs.IntVar(&sopts.SQLStoreOpts.MaxOpenConns, "sql_max_open_conns", defSQLOpts.MaxOpenConns, "Max opened connections to the database") + fs.IntVar(&sopts.SQLStoreOpts.BulkInsertLimit, "sql_bulk_insert_limit", 0, "Limit the number of messages inserted in one SQL query") fs.StringVar(&sopts.SyslogName, "syslog_name", "", "Syslog Name") fs.BoolVar(&sopts.Encrypt, "encrypt", false, "Specify if server should use encryption at rest") fs.StringVar(&sopts.EncryptionCipher, "encryption_cipher", stores.CryptoCipherAutoSelect, "Encryption cipher. Supported are AES and CHACHA (default is AES)") diff --git a/server/conf_test.go b/server/conf_test.go index 4a25f928..06f0331d 100644 --- a/server/conf_test.go +++ b/server/conf_test.go @@ -287,6 +287,9 @@ func TestParseConfig(t *testing.T) { if opts.SQLStoreOpts.MaxOpenConns != 5 { t.Fatalf("Expected SQL MaxOpenConns to be 5, got %v", opts.SQLStoreOpts.MaxOpenConns) } + if opts.SQLStoreOpts.BulkInsertLimit != 1000 { + t.Fatalf("Expected SQL BulkInsertLimit to be 1000, got %v", opts.SQLStoreOpts.BulkInsertLimit) + } if !opts.Encrypt { t.Fatal("Expected Encrypt to be true") } diff --git a/stores/sqlstore.go b/stores/sqlstore.go index 9d43a372..493d498f 100644 --- a/stores/sqlstore.go +++ b/stores/sqlstore.go @@ -185,6 +185,10 @@ const ( // Limit of number of messages in the cache before message store // is automatically flushed on a Store() call. sqlDefaultMsgCacheLimit = 1024 + + // If bulk insert limit is set, the server will still insert messages + // using tx if the limit is below this threshold. + sqlMinBulkInsertLimit = 5 ) // These are initialized based on the constants that have reasonable values. @@ -215,6 +219,11 @@ type SQLStoreOptions struct { // APIs will cause execution of their respective SQL statements. NoCaching bool + // If this is non 0, and NoCaching is not enabled, the server will perform + // bulk insert of messages. This is the limit of values added to the SQL statement + // "INSERT INTO Messages (..) VALUES (..)[,(..)*]". + BulkInsertLimit int + // Maximum number of open connections to the database. // If <= 0, then there is no limit on the number of open connections. // The default is 0 (unlimited). @@ -240,6 +249,14 @@ func SQLNoCaching(noCaching bool) SQLStoreOption { } } +// SQLBulkInsertLimit sets the BulkInsertLimit option +func SQLBulkInsertLimit(limit int) SQLStoreOption { + return func(o *SQLStoreOptions) error { + o.BulkInsertLimit = limit + return nil + } +} + // SQLMaxOpenConns sets the MaxOpenConns option func SQLMaxOpenConns(max int) SQLStoreOption { return func(o *SQLStoreOptions) error { @@ -254,6 +271,7 @@ func SQLAllOptions(opts *SQLStoreOptions) SQLStoreOption { return func(o *SQLStoreOptions) error { o.NoCaching = opts.NoCaching o.MaxOpenConns = opts.MaxOpenConns + o.BulkInsertLimit = opts.BulkInsertLimit return nil } } @@ -275,6 +293,8 @@ type SQLStore struct { wg sync.WaitGroup preparedStmts []*sql.Stmt ssFlusher *subStoresFlusher + postgres bool + bulkInserts []string } type sqlDBLock struct { @@ -410,11 +430,20 @@ func NewSQLStore(log logger.Logger, driver, source string, limits *StoreLimits, db: db, doneCh: make(chan struct{}), preparedStmts: make([]*sql.Stmt, 0, len(sqlStmts)), + postgres: driver == driverPostgres, } if err := s.init(TypeSQL, log, limits); err != nil { s.Close() return nil, err } + if s.postgres && opts.BulkInsertLimit > 0 { + limit := opts.BulkInsertLimit + s.bulkInserts = make([]string, limit) + for i := 0; i < limit; i++ { + j := i * 5 + s.bulkInserts[i] = fmt.Sprintf("($%d,$%d,$%d,$%d,$%d)", j+1, j+2, j+3, j+4, j+5) + } + } if err := s.createPreparedStmts(); err != nil { s.Close() return nil, err @@ -1633,6 +1662,9 @@ func (ms *SQLMsgStore) flush() error { ps *sql.Stmt ) defer func() { + if ms.limits.MaxAge > 0 && ms.expireTimer == nil { + ms.createExpireTimer() + } ms.writeCache.transferToFreeList() if ps != nil { ps.Close() @@ -1641,6 +1673,9 @@ func (ms *SQLMsgStore) flush() error { tx.Rollback() } }() + if limit := ms.sqlStore.opts.BulkInsertLimit; limit >= sqlMinBulkInsertLimit { + return ms.bulkInsert(limit) + } tx, err := ms.sqlStore.db.Begin() if err != nil { return err @@ -1664,8 +1699,80 @@ func (ms *SQLMsgStore) flush() error { return err } tx = nil - if ms.limits.MaxAge > 0 && ms.expireTimer == nil { - ms.createExpireTimer() + return nil +} + +// Insert messages with INSERT INTO MESSAGES () VALUES (),(),()... +// This is faster than the original insert with transactions. +// It is done only if user configures the BulkInsertLimit option. +// Lock held on entry. +func (ms *SQLMsgStore) bulkInsert(limit int) error { + const insertStmt = "INSERT INTO Messages (id, seq, timestamp, size, data) VALUES " + const valArgs = "(?,?,?,?,?)" + + count := ms.writeCache.count + + sb := strings.Builder{} + size := len(insertStmt) + count // number of "," + last ";" + if ms.sqlStore.postgres { + for i := 0; i < limit; i++ { + size += len(ms.sqlStore.bulkInserts[i]) + } + } else { + size += count * len(valArgs) + } + sb.Grow(size) + sb.WriteString(insertStmt) + + for i := 0; i < limit; i++ { + if i > 0 { + sb.WriteString(",") + } + if ms.sqlStore.postgres { + sb.WriteString(ms.sqlStore.bulkInserts[i]) + } else { + sb.WriteString(valArgs) + } + } + sb.WriteString(";") + stmtb := []byte(sb.String()) + + args := make([]interface{}, 0, 5*count) + start := ms.writeCache.head + for count > 0 { + args = args[:0] + i := 0 + l := len(insertStmt) + // Iterate through the cache, but do not remove elements from the list. + // They are needed by the caller. + for cm := start; cm != nil; cm = cm.next { + if i > 0 { + l++ + } + if ms.sqlStore.postgres { + l += len(ms.sqlStore.bulkInserts[i]) + } else { + l += len(valArgs) + } + args = append(args, ms.channelID, cm.msg.Sequence, cm.msg.Timestamp, len(cm.data), cm.data) + i++ + if i == limit { + start = cm.next + break + } + } + count -= i + var stmt string + if i == limit { + stmt = sb.String() + } else { + l++ + stmtb[l-1] = ';' + stmt = string(stmtb[:l]) + } + if _, err := ms.sqlStore.db.Exec(stmt, args[:i*5]...); err != nil { + return err + } } return nil } diff --git a/stores/sqlstore_test.go b/stores/sqlstore_test.go index 0d11fa56..c8edb382 100644 --- a/stores/sqlstore_test.go +++ b/stores/sqlstore_test.go @@ -160,8 +160,9 @@ func TestSQLAllOptions(t *testing.T) { defer cleanupSQLDatastore(t) opts := &SQLStoreOptions{ - NoCaching: true, - MaxOpenConns: 123, + NoCaching: true, + MaxOpenConns: 123, + BulkInsertLimit: 456, } s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, SQLAllOptions(opts)) if err != nil { @@ -178,6 +179,9 @@ func TestSQLAllOptions(t *testing.T) { if so.MaxOpenConns != 123 { t.Fatalf("MaxOpenConns should be 123, got %v", so.MaxOpenConns) } + if so.BulkInsertLimit != 456 { + t.Fatalf("BulkInsertLimit should be 456, got %v", so.BulkInsertLimit) + } } func TestSQLPostgresDriverInit(t *testing.T) { @@ -2140,3 +2144,44 @@ func TestSQLMaxAgeForMsgsWithTimestampInPast(t *testing.T) { } } } + +func TestSQLBulkInsertLimit(t *testing.T) { + if !doSQL { + t.SkipNow() + } + + cleanupSQLDatastore(t) + defer cleanupSQLDatastore(t) + + // Create store with caching enabled and bulk insert limit + s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, + SQLNoCaching(false), SQLBulkInsertLimit(10)) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + defer s.Close() + + cs := storeCreateChannel(t, s, "foo") + for seq := uint64(1); seq < 127; seq++ { + msg := &pb.MsgProto{ + Sequence: seq, + Subject: "foo", + Data: []byte(fmt.Sprintf("%v", seq)), + Timestamp: time.Now().UnixNano(), + } + if _, err := cs.Msgs.Store(msg); err != nil { + t.Fatalf("Error storing message: %v", err) + } + } + if err := cs.Msgs.Flush(); err != nil { + t.Fatalf("Error on flush: %v", err) + } + + for seq := uint64(1); seq < 127; seq++ { + m := msgStoreLookup(t, cs.Msgs, seq) + expected := fmt.Sprintf("%v", seq) + if string(m.Data) != expected { + t.Fatalf("Expected %q, got %q", expected, m.Data) + } + } +} diff --git a/test/configs/test_parse.conf b/test/configs/test_parse.conf index 5efce6bd..ef765d3a 100644 --- a/test/configs/test_parse.conf +++ b/test/configs/test_parse.conf @@ -99,5 +99,6 @@ streaming: { source: "ivan:pwd@/nss_db" no_caching: true max_open_conns: 5 + bulk_insert_limit: 1000 } }