Skip to content

Commit

Permalink
Add "true batch" mode
Browse files Browse the repository at this point in the history
This increases throughput of adding new events by 2-3x.

Previously the repository would add events one by one, making two
queries per event. It was done this way to handle duplicates and to
ensure the EventRaws row had the same ID as the corresponding Events row

The new implementation instead does three queries for the entire batch.
The way this works is using AUTOINCREMENT + ON CONFLICT IGNORE. When
there is a duplicate in the batch, it will be ignored by ON CONFLICT
IGNORE, but because of AUTOINCREMENT the ID will still be "used up",
leaving a hole in the sequence of IDs. After all the non-duplicate
events have been inserted into Events, ALL events are inserted into
EventRaws. Since the IDs in the Events table are used up even on
duplicate events, the ID sequence of EventRaws should still line up with
the Events sequence. After both inserts are performed, the "garbage"
events in EventRaws are removed by deleting all EventRaws rows which do
not have a corresponding Events row.

This is way faster, but I am not sure it is robust in all situations and
the AUTOINCREMENT + ON CONFLICT IGNORE behavior seems like something
that might change in a future version of SQLite? The other drawback is
that handling duplicates is now way slower, since the EventRaws need to
be added and then removed now whereas before they would just try to add
to the Events table and then skip the raw after getting a conflict.
My thinking is that trying to add duplicates should be the exception, so
speeding up the common case is worth the tradeoff.

True batch can be disabled by setting "sqlite.trueBatch" to false in the
configuration file.
  • Loading branch information
JackBister committed Feb 1, 2021
1 parent 5b6c057 commit 82b069b
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 15 deletions.
3 changes: 2 additions & 1 deletion cmd/logsuck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var cfg = config.Config{

SQLite: &config.SqliteConfig{
DatabaseFile: "logsuck.db",
TrueBatch: true,
},

Web: &config.WebConfig{
Expand Down Expand Up @@ -165,7 +166,7 @@ func main() {
if err != nil {
log.Fatalln(err.Error())
}
repo, err = events.SqliteRepository(db)
repo, err = events.SqliteRepository(db, cfg.SQLite)
if err != nil {
log.Fatalln(err.Error())
}
Expand Down
10 changes: 9 additions & 1 deletion internal/config/FromJSON.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type jsonRecipientConfig struct {
}

type jsonSqliteConfig struct {
FileName string `json:fileName`
FileName string `json:fileName`
TrueBatch *bool `json:trueBatch`
}

type jsonWebConfig struct {
Expand Down Expand Up @@ -89,6 +90,7 @@ var defaultConfig = Config{

SQLite: &SqliteConfig{
DatabaseFile: "logsuck.db",
TrueBatch: true,
},

Web: &WebConfig{
Expand Down Expand Up @@ -243,6 +245,12 @@ func FromJSON(r io.Reader) (*Config, error) {
} else {
sqlite.DatabaseFile = cfg.Sqlite.FileName
}
if cfg.Sqlite.TrueBatch == nil {
log.Println("Using default TrueBatch mode. defaultTrueBatch=true")
sqlite.TrueBatch = true
} else {
sqlite.TrueBatch = *cfg.Sqlite.TrueBatch
}
}

var web *WebConfig
Expand Down
1 change: 1 addition & 0 deletions internal/config/SqliteConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ package config

type SqliteConfig struct {
DatabaseFile string
TrueBatch bool
}
2 changes: 1 addition & 1 deletion internal/events/EventPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func BatchedRepositoryPublisher(cfg *config.Config, repo Repository) EventPublis
case evt := <-adder:
accumulated = append(accumulated, evt)
if len(accumulated) >= 5000 {
_, err := repo.AddBatch(accumulated)
err := repo.AddBatch(accumulated)
if err != nil {
// TODO: Error handling
log.Println("error when adding events:", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/events/EventRecipient.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (er *EventRecipient) Serve() error {
processed[i].Timestamp = time.Now()
}
}
_, err = er.repo.AddBatch(processed)
err = er.repo.AddBatch(processed)
if err != nil {
http.Error(w, fmt.Sprintf("failed to add events to repository: %v", err), 500)
return
Expand Down
2 changes: 1 addition & 1 deletion internal/events/EventRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
)

type Repository interface {
AddBatch(events []Event) ([]int64, error)
AddBatch(events []Event) error
FilterStream(srch *search.Search, searchStartTime, searchEndTime *time.Time) <-chan []EventWithId
GetByIds(ids []int64, sortMode SortMode) ([]EventWithId, error)
}
106 changes: 97 additions & 9 deletions internal/events/EventRepositorySqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"log"
"strconv"
"strings"
"time"

"github.com/jackbister/logsuck/internal/config"
"github.com/jackbister/logsuck/internal/search"
)

Expand All @@ -30,10 +32,12 @@ const filterStreamPageSize = 1000

type sqliteRepository struct {
db *sql.DB

cfg *config.SqliteConfig
}

func SqliteRepository(db *sql.DB) (Repository, error) {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS Events (id INTEGER NOT NULL PRIMARY KEY, host TEXT NOT NULL, source TEXT NOT NULL, timestamp DATETIME NOT NULL, offset BIGINT NOT NULL, UNIQUE(host, source, timestamp, offset));")
func SqliteRepository(db *sql.DB, cfg *config.SqliteConfig) (Repository, error) {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS Events (id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, host TEXT NOT NULL, source TEXT NOT NULL, timestamp DATETIME NOT NULL, offset BIGINT NOT NULL, UNIQUE(host, source, timestamp, offset));")
if err != nil {
return nil, fmt.Errorf("error creating events table: %w", err)
}
Expand All @@ -47,16 +51,100 @@ func SqliteRepository(db *sql.DB) (Repository, error) {
return nil, fmt.Errorf("error creating eventraws table: %w", err)
}
return &sqliteRepository{
db: db,
db: db,
cfg: cfg,
}, nil
}

func (repo *sqliteRepository) AddBatch(events []Event) ([]int64, error) {
func (repo *sqliteRepository) AddBatch(events []Event) error {
if repo.cfg.TrueBatch {
return repo.addBatchTrueBatch(events)
} else {
return repo.addBatchOneByOne(events)
}
}

const esbBase = "INSERT OR IGNORE INTO Events (host, source, timestamp, offset) VALUES "
const esbBaseLen = len(esbBase)
const esbPerEvt = "(?, ?, ?, ?)"
const esbPerEvtLen = len(esbPerEvt)
const rsbBase = "INSERT INTO EventRaws (raw, source, host) VALUES "
const rsbBaseLen = len(rsbBase)
const rsbPerEvt = "(?, ?, ?)"
const rsbPerEvtLen = len(rsbPerEvt)

func (repo *sqliteRepository) addBatchTrueBatch(events []Event) error {
startTime := time.Now()
var eventSb strings.Builder
var rawSb strings.Builder
eventSb.Grow(esbBaseLen + esbPerEvtLen*len(events) + len(events))
rawSb.Grow(rsbBaseLen + rsbPerEvtLen*len(events) + len(events))
eventSb.WriteString(esbBase)
rawSb.WriteString(rsbBase)

esbArgs := make([]interface{}, 0, 4*len(events))
rsbArgs := make([]interface{}, 0, 3*len(events))
for i, evt := range events {
eventSb.WriteString(esbPerEvt)
rawSb.WriteString(rsbPerEvt)
if i != len(events)-1 {
eventSb.WriteRune(',')
rawSb.WriteRune(',')
}
esbArgs = append(esbArgs, evt.Host, evt.Source, evt.Timestamp, evt.Offset)
rsbArgs = append(rsbArgs, evt.Raw, evt.Source, evt.Host)
}

tx, err := repo.db.BeginTx(context.TODO(), nil)
if err != nil {
return fmt.Errorf("error starting transaction for adding event batch: %w", err)
}
rows, err := tx.Query("SELECT MAX(rowid) FROM EventRaws;")
if err != nil {
tx.Rollback()
return fmt.Errorf("error adding event batch: failed to get MAX(rowid): %w", err)
}
prevMaxID := 0
if rows.Next() {
rows.Scan(&prevMaxID)
}
eventQ := eventSb.String()
_, err = tx.Exec(eventQ, esbArgs...)
if err != nil {
tx.Rollback()
return fmt.Errorf("error adding event batch to Events table: %w", err)
}
rawQ := rawSb.String()
res, err := tx.Exec(rawQ, rsbArgs...)
if err != nil {
tx.Rollback()
return fmt.Errorf("error adding event batch to EventRaws table: %w", err)
}
newMaxID, err := res.LastInsertId()
if err != nil {
log.Printf("got error when getting new max ID to clean up EventRaws: %v", err)
} else {
res, err = tx.Exec("DELETE FROM EventRaws AS er WHERE NOT EXISTS (SELECT 1 FROM Events e WHERE e.ID = er.rowid) AND er.rowid > ? AND er.rowid <= ? AND er.rowid != (SELECT MAX(ID) FROM Events)", prevMaxID, newMaxID)
if err != nil {
log.Printf("got error when cleaning up EventRaws: %v", err)
} else if deleted, err := res.RowsAffected(); err == nil && deleted > 0 {
log.Printf("Skipped adding numEvents=%v as they appear to be duplicates (same source, offset and timestamp as an existing event)", deleted)
}
}
err = tx.Commit()
if err != nil {
// TODO: Hmm?
}
log.Printf("added numEvents=%v in timeInMs=%v\n", len(events), time.Now().Sub(startTime).Milliseconds())
return nil
}

func (repo *sqliteRepository) addBatchOneByOne(events []Event) error {
startTime := time.Now()
ret := make([]int64, len(events))
tx, err := repo.db.BeginTx(context.TODO(), nil)
if err != nil {
return nil, fmt.Errorf("error starting transaction for adding event: %w", err)
return fmt.Errorf("error starting transaction for adding event: %w", err)
}
numberOfDuplicates := map[string]int64{}
for i, evt := range events {
Expand All @@ -68,17 +156,17 @@ func (repo *sqliteRepository) AddBatch(events []Event) ([]int64, error) {
}
if err != nil {
tx.Rollback()
return nil, fmt.Errorf("error executing add statement: %w", err)
return fmt.Errorf("error executing add statement: %w", err)
}
id, err := res.LastInsertId()
if err != nil {
tx.Rollback()
return nil, fmt.Errorf("error getting event id after insert: %w", err)
return fmt.Errorf("error getting event id after insert: %w", err)
}
_, err = tx.Exec("INSERT INTO EventRaws (rowid, raw, source, host) SELECT LAST_INSERT_ROWID(), ?, ?, ?;", evt.Raw, evt.Source, evt.Host)
if err != nil {
tx.Rollback()
return nil, fmt.Errorf("error executing add raw statement: %w", err)
return fmt.Errorf("error executing add raw statement: %w", err)
}
ret[i] = id
}
Expand All @@ -90,7 +178,7 @@ func (repo *sqliteRepository) AddBatch(events []Event) ([]int64, error) {
log.Printf("Skipped adding numEvents=%v from source=%v because they appear to be duplicates (same source, offset and timestamp as an existing event)\n", v, k)
}
log.Printf("added numEvents=%v in timeInMs=%v\n", len(events), time.Now().Sub(startTime).Milliseconds())
return ret, nil
return nil
}

func (repo *sqliteRepository) FilterStream(srch *search.Search, searchStartTime, searchEndTime *time.Time) <-chan []EventWithId {
Expand Down
89 changes: 89 additions & 0 deletions internal/events/EventRepositorySqlite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2020 The Logsuck Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package events

import (
"database/sql"
"testing"
"time"

"github.com/jackbister/logsuck/internal/config"

_ "github.com/mattn/go-sqlite3"
)

func TestAddBatchTrueBatch(t *testing.T) {
db, err := sql.Open("sqlite3", ":memory:")
if err != nil {
t.Fatalf("TestRexPipelineStep got error when creating in-memory SQLite database: %v", err)
}
repo, err := SqliteRepository(db, &config.SqliteConfig{
DatabaseFile: ":memory:",
TrueBatch: true,
})
if err != nil {
t.Fatalf("TestRexPipelineStep got error when creating events repo: %v", err)
}

repo.AddBatch([]Event{
{
Raw: "2021-02-01 00:00:00 log event",
Timestamp: time.Date(2021, 2, 1, 0, 0, 0, 0, time.UTC),
Host: "localhost",
Source: "log.txt",
Offset: 0,
},
})

evts, err := repo.GetByIds([]int64{0}, SortModeNone)
if err != nil {
t.Fatalf("got error when retrieving event: %v", err)
}
if len(evts) != 1 {
t.Fatalf("got unexpected number of events, expected 1 event but got %v", len(evts))
}
}

func TestAddBatchOneByOne(t *testing.T) {
db, err := sql.Open("sqlite3", ":memory:")
if err != nil {
t.Fatalf("TestRexPipelineStep got error when creating in-memory SQLite database: %v", err)
}
repo, err := SqliteRepository(db, &config.SqliteConfig{
DatabaseFile: ":memory:",
TrueBatch: false,
})
if err != nil {
t.Fatalf("TestRexPipelineStep got error when creating events repo: %v", err)
}

repo.AddBatch([]Event{
{
Raw: "2021-02-01 00:00:00 log event",
Timestamp: time.Date(2021, 2, 1, 0, 0, 0, 0, time.UTC),
Host: "localhost",
Source: "log.txt",
Offset: 0,
},
})

evts, err := repo.GetByIds([]int64{0}, SortModeNone)
if err != nil {
t.Fatalf("got error when retrieving event: %v", err)
}
if len(evts) != 1 {
t.Fatalf("got unexpected number of events, expected 1 event but got %v", len(evts))
}
}
3 changes: 2 additions & 1 deletion internal/pipeline/Utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"testing"

"github.com/jackbister/logsuck/internal/config"
"github.com/jackbister/logsuck/internal/events"

_ "github.com/mattn/go-sqlite3"
Expand All @@ -28,7 +29,7 @@ func newInMemRepo(t *testing.T) events.Repository {
if err != nil {
t.Fatalf("TestRexPipelineStep got error when creating in-memory SQLite database: %v", err)
}
repo, err := events.SqliteRepository(db)
repo, err := events.SqliteRepository(db, &config.SqliteConfig{})
if err != nil {
t.Fatalf("TestRexPipelineStep got error when creating events repo: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions logsuck-config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@
"fileName": {
"description": "The file name which will be used for the SQLite database. Default 'logsuck.db'.",
"type": "string"
},
"trueBatch": {
"description": "Whether Logsuck should use 'true batch' mode or not. True batch is significantly faster at saving events on average, but is slower at handling duplicates and relies on SQLite behavior which may not be guaranteed. Default true.",
"type": "boolean"
}
}
},
Expand Down

0 comments on commit 82b069b

Please sign in to comment.