Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

add sqlite pragma support #30

Merged
merged 11 commits into from
Dec 31, 2021
49 changes: 38 additions & 11 deletions cmd/temporalite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
goLog "log"
"net"
"os"
"strings"

uiserver "github.com/temporalio/ui-server/server"
uiconfig "github.com/temporalio/ui-server/server/config"
Expand Down Expand Up @@ -39,6 +40,7 @@ const (
ipFlag = "ip"
logFormatFlag = "log-format"
namespaceFlag = "namespace"
pragmaFlag = "sqlite-pragma"
)

func init() {
Expand All @@ -56,7 +58,6 @@ func buildCLI() *cli.App {
app.Name = "temporal"
app.Usage = "Temporal server"
app.Version = headers.ServerVersion

app.Commands = []*cli.Command{
{
Name: "start",
Expand All @@ -74,6 +75,13 @@ func buildCLI() *cli.App {
Value: defaultCfg.DatabaseFilePath,
Usage: "file in which to persist Temporal state",
},
&cli.StringSliceFlag{
Name: namespaceFlag,
Aliases: []string{"n"},
Usage: `specify namespaces that should be pre-created`,
EnvVars: nil,
Value: nil,
},
&cli.IntFlag{
Name: portFlag,
Aliases: []string{"p"},
Expand All @@ -85,25 +93,25 @@ func buildCLI() *cli.App {
Usage: "port for the temporal web UI",
DefaultText: fmt.Sprintf("--port + 1000, eg. %d", liteconfig.DefaultFrontendPort+1000),
},
&cli.StringFlag{
Name: ipFlag,
Usage: `IPv4 address to bind the frontend service to instead of localhost`,
EnvVars: nil,
Value: "127.0.0.1",
},
&cli.StringFlag{
Name: logFormatFlag,
Usage: `customize the log formatting (allowed: "json", "pretty")`,
Usage: `customize the log formatting (allowed: ["json" "pretty"])`,
EnvVars: nil,
Value: "json",
},
&cli.StringSliceFlag{
Name: namespaceFlag,
Aliases: []string{"n"},
Usage: `specify namespaces that should be pre-created`,
Name: pragmaFlag,
Aliases: []string{"sp"},
Usage: fmt.Sprintf("specify sqlite pragma statements in pragma=value format (allowed: %q)", liteconfig.GetAllowedPragmas()),
EnvVars: nil,
Value: nil,
},
&cli.StringFlag{
Name: ipFlag,
Usage: `IPv4 address to bind the frontend service to instead of localhost`,
EnvVars: nil,
Value: "127.0.0.1",
},
},
Before: func(c *cli.Context) error {
if c.Args().Len() > 0 {
Expand All @@ -112,6 +120,7 @@ func buildCLI() *cli.App {
if c.IsSet(ephemeralFlag) && c.IsSet(dbPathFlag) {
return cli.Exit(fmt.Sprintf("ERROR: only one of %q or %q flags may be passed at a time", ephemeralFlag, dbPathFlag), 1)
}

switch c.String(logFormatFlag) {
case "json", "pretty":
default:
Expand Down Expand Up @@ -142,11 +151,17 @@ func buildCLI() *cli.App {
EnableUI: true,
}

pragmas, err := getPragmaMap(c.StringSlice(pragmaFlag))
if err != nil {
return err
}

opts := []temporalite.ServerOption{
temporalite.WithFrontendPort(serverPort),
temporalite.WithFrontendIP(ip),
temporalite.WithDatabaseFilePath(c.String(dbPathFlag)),
temporalite.WithNamespaces(c.StringSlice(namespaceFlag)...),
temporalite.WithSQLitePragmas(pragmas),
temporalite.WithUpstreamOptions(
temporal.InterruptOn(temporal.InterruptCh()),
),
Expand Down Expand Up @@ -184,3 +199,15 @@ func buildCLI() *cli.App {

return app
}

func getPragmaMap(input []string) (map[string]string, error) {
result := make(map[string]string)
for _, pragma := range input {
vals := strings.Split(pragma, "=")
if len(vals) != 2 {
return nil, fmt.Errorf("ERROR: pragma statements must be in KEY=VALUE format, got %q", pragma)
}
result[vals[0]] = vals[1]
}
return result, nil
}
21 changes: 21 additions & 0 deletions internal/liteconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sort"
"time"

"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -49,13 +50,28 @@ type Config struct {
FrontendPort int
DynamicPorts bool
Namespaces []string
SQLitePragmas map[string]string
Logger log.Logger
UpstreamOptions []temporal.ServerOption
portProvider *portProvider
FrontendIP string
UIServer UIServer
}

var SupportedPragmas = map[string]struct{}{
"journal_mode": {},
"synchronous": {},
}

func GetAllowedPragmas() []string {
var allowedPragmaList []string
for k := range SupportedPragmas {
allowedPragmaList = append(allowedPragmaList, k)
}
sort.Strings(allowedPragmaList)
return allowedPragmaList
}

func NewDefaultConfig() (*Config, error) {
userConfigDir, err := os.UserConfigDir()
if err != nil {
Expand All @@ -69,6 +85,7 @@ func NewDefaultConfig() (*Config, error) {
UIServer: noopUIServer{},
DynamicPorts: false,
Namespaces: nil,
SQLitePragmas: nil,
Logger: log.NewZapLogger(log.BuildZapLogger(log.Config{
Stdout: true,
Level: "debug",
Expand Down Expand Up @@ -99,6 +116,10 @@ func Convert(cfg *Config) *config.Config {
sqliteConfig.ConnectAttributes["mode"] = "rwc"
}

for k, v := range cfg.SQLitePragmas {
sqliteConfig.ConnectAttributes["_"+k] = v
}

var metricsPort, pprofPort int
if cfg.DynamicPorts {
if cfg.FrontendPort == 0 {
Expand Down
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ func WithNamespaces(namespaces ...string) ServerOption {
})
}

// WithSQLitePragmas applies pragma statements to SQLite on Temporal start.
func WithSQLitePragmas(pragmas map[string]string) ServerOption {
return newApplyFuncContainer(func(cfg *liteconfig.Config) {
if cfg.SQLitePragmas == nil {
cfg.SQLitePragmas = make(map[string]string)
}
for k, v := range pragmas {
cfg.SQLitePragmas[k] = v
}
})
}

// WithUpstreamOptions registers Temporal server options.
func WithUpstreamOptions(options ...temporal.ServerOption) ServerOption {
return newApplyFuncContainer(func(cfg *liteconfig.Config) {
Expand Down
8 changes: 8 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"go.temporal.io/sdk/client"
Expand Down Expand Up @@ -41,6 +42,13 @@ func NewServer(opts ...ServerOption) (*Server, error) {
for _, opt := range opts {
opt.apply(c)
}

for pragma := range c.SQLitePragmas {
if _, ok := liteconfig.SupportedPragmas[strings.ToLower(pragma)]; !ok {
return nil, fmt.Errorf("ERROR: unsupported pragma %q, %v allowed", pragma, liteconfig.GetAllowedPragmas())
}
}

cfg := liteconfig.Convert(c)
sqlConfig := cfg.Persistence.DataStores[liteconfig.PersistenceStoreName].SQL

Expand Down