Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Bulk insert of messages #1140

Merged
merged 1 commit into from
Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nats-streaming-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Streaming Server SQL Store Options:
--sql_source <string> Datasource used when opening an SQL connection to the database
--sql_no_caching <bool> Enable/Disable caching for improved performance
--sql_max_open_conns <int> Maximum number of opened connections to the database
--sql_bulk_insert_limit <int> Maximum number of messages stored with a single SQL "INSERT" statement

Streaming Server TLS Options:
-secure <bool> Use a TLS connection to the NATS server without
Expand Down
6 changes: 6 additions & 0 deletions server/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ func parseSQLOptions(itf interface{}, opts *Options) error {
return err
}
opts.SQLStoreOpts.MaxOpenConns = int(v.(int64))
case "bulk_insert_limit":
if err := checkType(name, reflect.Int64, v); err != nil {
return err
}
opts.SQLStoreOpts.BulkInsertLimit = int(v.(int64))
}
}
return nil
Expand Down Expand Up @@ -688,6 +693,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
defSQLOpts := stores.DefaultSQLStoreOptions()
fs.BoolVar(&sopts.SQLStoreOpts.NoCaching, "sql_no_caching", defSQLOpts.NoCaching, "Enable/Disable caching")
fs.IntVar(&sopts.SQLStoreOpts.MaxOpenConns, "sql_max_open_conns", defSQLOpts.MaxOpenConns, "Max opened connections to the database")
fs.IntVar(&sopts.SQLStoreOpts.BulkInsertLimit, "sql_bulk_insert_limit", 0, "Limit the number of messages inserted in one SQL query")
fs.StringVar(&sopts.SyslogName, "syslog_name", "", "Syslog Name")
fs.BoolVar(&sopts.Encrypt, "encrypt", false, "Specify if server should use encryption at rest")
fs.StringVar(&sopts.EncryptionCipher, "encryption_cipher", stores.CryptoCipherAutoSelect, "Encryption cipher. Supported are AES and CHACHA (default is AES)")
Expand Down
3 changes: 3 additions & 0 deletions server/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ func TestParseConfig(t *testing.T) {
if opts.SQLStoreOpts.MaxOpenConns != 5 {
t.Fatalf("Expected SQL MaxOpenConns to be 5, got %v", opts.SQLStoreOpts.MaxOpenConns)
}
if opts.SQLStoreOpts.BulkInsertLimit != 1000 {
t.Fatalf("Expected SQL BulkInsertLimit to be 1000, got %v", opts.SQLStoreOpts.BulkInsertLimit)
}
if !opts.Encrypt {
t.Fatal("Expected Encrypt to be true")
}
Expand Down
111 changes: 109 additions & 2 deletions stores/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ const (
// Limit of number of messages in the cache before message store
// is automatically flushed on a Store() call.
sqlDefaultMsgCacheLimit = 1024

// If bulk insert limit is set, the server will still insert messages
// using tx if the limit is below this threshold.
sqlMinBulkInsertLimit = 5
)

// These are initialized based on the constants that have reasonable values.
Expand Down Expand Up @@ -215,6 +219,11 @@ type SQLStoreOptions struct {
// APIs will cause execution of their respective SQL statements.
NoCaching bool

// If this is non 0, and NoCaching is not enabled, the server will perform
// bulk insert of messages. This is the limit of values added to the SQL statement
// "INSERT INTO Messages (..) VALUES (..)[,(..)*]".
BulkInsertLimit int

// Maximum number of open connections to the database.
// If <= 0, then there is no limit on the number of open connections.
// The default is 0 (unlimited).
Expand All @@ -240,6 +249,14 @@ func SQLNoCaching(noCaching bool) SQLStoreOption {
}
}

// SQLBulkInsertLimit sets the BulkInsertLimit option
func SQLBulkInsertLimit(limit int) SQLStoreOption {
return func(o *SQLStoreOptions) error {
o.BulkInsertLimit = limit
return nil
}
}

// SQLMaxOpenConns sets the MaxOpenConns option
func SQLMaxOpenConns(max int) SQLStoreOption {
return func(o *SQLStoreOptions) error {
Expand All @@ -254,6 +271,7 @@ func SQLAllOptions(opts *SQLStoreOptions) SQLStoreOption {
return func(o *SQLStoreOptions) error {
o.NoCaching = opts.NoCaching
o.MaxOpenConns = opts.MaxOpenConns
o.BulkInsertLimit = opts.BulkInsertLimit
return nil
}
}
Expand All @@ -275,6 +293,8 @@ type SQLStore struct {
wg sync.WaitGroup
preparedStmts []*sql.Stmt
ssFlusher *subStoresFlusher
postgres bool
bulkInserts []string
}

type sqlDBLock struct {
Expand Down Expand Up @@ -410,11 +430,20 @@ func NewSQLStore(log logger.Logger, driver, source string, limits *StoreLimits,
db: db,
doneCh: make(chan struct{}),
preparedStmts: make([]*sql.Stmt, 0, len(sqlStmts)),
postgres: driver == driverPostgres,
}
if err := s.init(TypeSQL, log, limits); err != nil {
s.Close()
return nil, err
}
if s.postgres && opts.BulkInsertLimit > 0 {
limit := opts.BulkInsertLimit
s.bulkInserts = make([]string, limit)
for i := 0; i < limit; i++ {
j := i * 5
s.bulkInserts[i] = fmt.Sprintf("($%d,$%d,$%d,$%d,$%d)", j+1, j+2, j+3, j+4, j+5)
}
}
if err := s.createPreparedStmts(); err != nil {
s.Close()
return nil, err
Expand Down Expand Up @@ -1633,6 +1662,9 @@ func (ms *SQLMsgStore) flush() error {
ps *sql.Stmt
)
defer func() {
if ms.limits.MaxAge > 0 && ms.expireTimer == nil {
ms.createExpireTimer()
}
ms.writeCache.transferToFreeList()
if ps != nil {
ps.Close()
Expand All @@ -1641,6 +1673,9 @@ func (ms *SQLMsgStore) flush() error {
tx.Rollback()
}
}()
if limit := ms.sqlStore.opts.BulkInsertLimit; limit >= sqlMinBulkInsertLimit {
return ms.bulkInsert(limit)
}
tx, err := ms.sqlStore.db.Begin()
if err != nil {
return err
Expand All @@ -1664,8 +1699,80 @@ func (ms *SQLMsgStore) flush() error {
return err
}
tx = nil
if ms.limits.MaxAge > 0 && ms.expireTimer == nil {
ms.createExpireTimer()
return nil
}

// Insert messages with INSERT INTO MESSAGES () VALUES (),(),()...
// This is faster than the original insert with transactions.
// It is done only if user configures the BulkInsertLimit option.
// Lock held on entry.
func (ms *SQLMsgStore) bulkInsert(limit int) error {
const insertStmt = "INSERT INTO Messages (id, seq, timestamp, size, data) VALUES "
const valArgs = "(?,?,?,?,?)"

count := ms.writeCache.count

sb := strings.Builder{}
size := len(insertStmt) + count // number of "," + last ";"
if ms.sqlStore.postgres {
for i := 0; i < limit; i++ {
size += len(ms.sqlStore.bulkInserts[i])
}
} else {
size += count * len(valArgs)
}
sb.Grow(size)
sb.WriteString(insertStmt)

for i := 0; i < limit; i++ {
if i > 0 {
sb.WriteString(",")
}
if ms.sqlStore.postgres {
sb.WriteString(ms.sqlStore.bulkInserts[i])
} else {
sb.WriteString(valArgs)
}
}
sb.WriteString(";")
stmtb := []byte(sb.String())

args := make([]interface{}, 0, 5*count)
start := ms.writeCache.head
for count > 0 {
args = args[:0]
i := 0
l := len(insertStmt)
// Iterate through the cache, but do not remove elements from the list.
// They are needed by the caller.
for cm := start; cm != nil; cm = cm.next {
if i > 0 {
l++
}
if ms.sqlStore.postgres {
l += len(ms.sqlStore.bulkInserts[i])
} else {
l += len(valArgs)
}
args = append(args, ms.channelID, cm.msg.Sequence, cm.msg.Timestamp, len(cm.data), cm.data)
i++
if i == limit {
start = cm.next
break
}
}
count -= i
var stmt string
if i == limit {
stmt = sb.String()
} else {
l++
stmtb[l-1] = ';'
stmt = string(stmtb[:l])
}
if _, err := ms.sqlStore.db.Exec(stmt, args[:i*5]...); err != nil {
return err
}
}
return nil
}
Expand Down
49 changes: 47 additions & 2 deletions stores/sqlstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ func TestSQLAllOptions(t *testing.T) {
defer cleanupSQLDatastore(t)

opts := &SQLStoreOptions{
NoCaching: true,
MaxOpenConns: 123,
NoCaching: true,
MaxOpenConns: 123,
BulkInsertLimit: 456,
}
s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, SQLAllOptions(opts))
if err != nil {
Expand All @@ -178,6 +179,9 @@ func TestSQLAllOptions(t *testing.T) {
if so.MaxOpenConns != 123 {
t.Fatalf("MaxOpenConns should be 123, got %v", so.MaxOpenConns)
}
if so.BulkInsertLimit != 456 {
t.Fatalf("BulkInsertLimit should be 456, got %v", so.BulkInsertLimit)
}
}

func TestSQLPostgresDriverInit(t *testing.T) {
Expand Down Expand Up @@ -2140,3 +2144,44 @@ func TestSQLMaxAgeForMsgsWithTimestampInPast(t *testing.T) {
}
}
}

func TestSQLBulkInsertLimit(t *testing.T) {
if !doSQL {
t.SkipNow()
}

cleanupSQLDatastore(t)
defer cleanupSQLDatastore(t)

// Create store with caching enabled and bulk insert limit
s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil,
SQLNoCaching(false), SQLBulkInsertLimit(10))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()

cs := storeCreateChannel(t, s, "foo")
for seq := uint64(1); seq < 127; seq++ {
msg := &pb.MsgProto{
Sequence: seq,
Subject: "foo",
Data: []byte(fmt.Sprintf("%v", seq)),
Timestamp: time.Now().UnixNano(),
}
if _, err := cs.Msgs.Store(msg); err != nil {
t.Fatalf("Error storing message: %v", err)
}
}
if err := cs.Msgs.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}

for seq := uint64(1); seq < 127; seq++ {
m := msgStoreLookup(t, cs.Msgs, seq)
expected := fmt.Sprintf("%v", seq)
if string(m.Data) != expected {
t.Fatalf("Expected %q, got %q", expected, m.Data)
}
}
}
1 change: 1 addition & 0 deletions test/configs/test_parse.conf
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,6 @@ streaming: {
source: "ivan:pwd@/nss_db"
no_caching: true
max_open_conns: 5
bulk_insert_limit: 1000
}
}