Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use UNLOGGED TABLES on PostgreSQL #14

Merged
merged 4 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions backends/sqldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ const (
dbTypeMysql = "mysql"
)

// Opt represents SQL DB backend's options.
type Opt struct {
DBType string
ResultsTable string
UnloggedTables bool
}

// sqlDB represents the sqlDB backend.
type sqlDB struct {
db *sql.DB
dbType string
resultsTable string
logger *log.Logger
db *sql.DB
opt Opt
logger *log.Logger

// The result schemas (CREATE TABLE ...) are dynamically
// generated everytime queries are executed based on their result columns.
Expand Down Expand Up @@ -53,22 +59,22 @@ type insertSchema struct {

// NewSQLBackend returns a new sqlDB result backend instance.
// It accepts an *sql.DB connection
func NewSQLBackend(db *sql.DB, dbType string, resTable string, l *log.Logger) (ResultBackend, error) {
func NewSQLBackend(db *sql.DB, opt Opt, l *log.Logger) (ResultBackend, error) {
var (
r = sqlDB{
db: db,
dbType: dbType,
opt: opt,
resTableSchemas: make(map[string]insertSchema),
schemaMutex: sync.RWMutex{},
logger: l,
}
)

// Config.
if resTable != "" {
r.resultsTable = resTable
if opt.ResultsTable != "" {
r.opt.ResultsTable = opt.ResultsTable
} else {
r.resultsTable = "results_%s"
r.opt.ResultsTable = "results_%s"
}

return &r, nil
Expand All @@ -87,7 +93,7 @@ func (s *sqlDB) NewResultSet(jobID, taskName string, ttl time.Duration) (ResultS
jobID: jobID,
taskName: taskName,
backend: s,
tbl: fmt.Sprintf(s.resultsTable, jobID),
tbl: fmt.Sprintf(s.opt.ResultsTable, jobID),
tx: tx,
}, nil
}
Expand Down Expand Up @@ -115,7 +121,7 @@ func (w *sqlDBWriter) RegisterColTypes(cols []string, colTypes []*sql.ColumnType
colNameHolder[i] = fmt.Sprintf(`"%s"`, w.cols[i])

// This will be filled by the driver.
if w.backend.dbType == dbTypePostgres {
if w.backend.opt.DBType == dbTypePostgres {
// Postgres placeholders are $1, $2 ...
colValHolder[i] = fmt.Sprintf("$%d", i+1)
} else {
Expand Down Expand Up @@ -223,11 +229,12 @@ func (s *sqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
colNameHolder = make([]string, len(cols))
colValHolder = make([]string, len(cols))
)

for i := range cols {
colNameHolder[i] = fmt.Sprintf(`"%s"`, cols[i])

// This will be filled by the driver.
if s.dbType == dbTypePostgres {
if s.opt.DBType == dbTypePostgres {
// Postgres placeholders are $1, $2 ...
colValHolder[i] = fmt.Sprintf("$%d", i+1)
} else {
Expand All @@ -236,9 +243,11 @@ func (s *sqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
}

var (
fields = make([]string, len(cols))
typ = ""
fields = make([]string, len(cols))
typ = ""
unlogged = ""
)

for i := 0; i < len(cols); i++ {
typ = colTypes[i].DatabaseTypeName()
switch colTypes[i].DatabaseTypeName() {
Expand All @@ -256,7 +265,7 @@ func (s *sqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
case "BOOLEAN": // Postgres, MySQL
typ = "BOOLEAN"
case "JSON", "JSONB": // Postgres
if s.dbType != dbTypePostgres {
if s.opt.DBType != dbTypePostgres {
typ = "TEXT"
}
// _INT4, _INT8, _TEXT represent array types in Postgres
Expand All @@ -277,9 +286,16 @@ func (s *sqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
fields[i] = fmt.Sprintf(`"%s" %s`, cols[i], typ)
}

// If the DB is Postgres, optionally create an "unlogged" table that disables
// WAL, improving performance of throw-away cache tables.
// https://www.postgresql.org/docs/9.1/sql-createtable.html
knadh marked this conversation as resolved.
Show resolved Hide resolved
if s.opt.DBType == dbTypePostgres && s.opt.UnloggedTables {
unlogged = "UNLOGGED"
}

return insertSchema{
dropTable: `DROP TABLE IF EXISTS "%s";`,
createTable: fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%%s" (%s);`, strings.Join(fields, ",")),
createTable: fmt.Sprintf(`CREATE %s TABLE IF NOT EXISTS "%%s" (%s);`, unlogged, strings.Join(fields, ",")),
insertRow: fmt.Sprintf(`INSERT INTO "%%s" (%s) VALUES (%s)`, strings.Join(colNameHolder, ","),
strings.Join(colValHolder, ",")),
}
Expand Down
13 changes: 9 additions & 4 deletions cmd/jobber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,16 @@ func setup() {
// retain result db to perform queries on this db
testResultDB = conn

var (
opt = backends.Opt{
DBType: cfg.Type,
ResultsTable: ko.String(fmt.Sprintf("results.%s.results_table", dbName)),
UnloggedTables: cfg.Unlogged,
}
)

// Create a new backend instance.
backend, err := backends.NewSQLBackend(conn,
cfg.Type,
ko.String(fmt.Sprintf("results.%s.results_table", dbName)),
sysLog)
backend, err := backends.NewSQLBackend(conn, opt, sysLog)
if err != nil {
sysLog.Fatalf("error initializing result backend: %v", err)
}
Expand Down
12 changes: 8 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Jobber struct {
type DBConfig struct {
Type string `mapstructure:"type"`
DSN string `mapstructure:"dsn"`
Unlogged bool `mapstructure:"unlogged"`
MaxIdleConns int `mapstructure:"max_idle"`
MaxActiveConns int `mapstructure:"max_active"`
ConnectTimeout time.Duration `mapstructure:"connect_timeout"`
Expand Down Expand Up @@ -163,11 +164,14 @@ func main() {
log.Fatal(err)
}

opt := backends.Opt{
DBType: cfg.Type,
ResultsTable: ko.String(fmt.Sprintf("results.%s.results_table", dbName)),
UnloggedTables: cfg.Unlogged,
}

// Create a new backend instance.
backend, err := backends.NewSQLBackend(conn,
cfg.Type,
ko.String(fmt.Sprintf("results.%s.results_table", dbName)),
sysLog)
backend, err := backends.NewSQLBackend(conn, opt, sysLog)
if err != nil {
log.Fatalf("error initializing result backend: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ state_address = "redis://127.0.0.1:6379/1"
# Each job can specify where its results should be stored.
# If there are multiple backends and jobs don't specify
# a particular backend, the results will be saved to a *random* one.
# The optional `unlogged = true` (for postgres) creates faster, unlogged (WAL) tables.
#
# type = "postgres" dsn = "postgres://user:password@host:5432/dbname?sslmode=disable"
# type = "mysql" dsn = "user:password@tcp(host:3306)/dbname"
Expand Down