Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

databases: add Badger #556

Merged
merged 5 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ os:
- linux
language: go
go:
- 1.8
- "1.10"
sudo: required

services:
Expand Down Expand Up @@ -43,6 +43,10 @@ jobs:
- script: make test-pbs-torque
env:
- n=pbs-torque
- script:
- make test-badger
env:
- n=badger
- script:
- sleep 10
- make test-elasticsearch
Expand Down
27 changes: 26 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ test-mongodb:
@go test ./tests/core/ -funnel-config `pwd`/tests/mongo.config.yml
@go test ./tests/scheduler/ -funnel-config `pwd`/tests/mongo.config.yml

test-badger:
@go test ./tests/core/ -funnel-config `pwd`/tests/badger.config.yml

start-dynamodb:
@docker rm -f funnel-dynamodb-test > /dev/null 2>&1 || echo
@docker run -d --name funnel-dynamodb-test -p 18000:8000 docker.io/dwmkerr/dynamodb:38 -sharedDb > /dev/null
Expand Down
12 changes: 12 additions & 0 deletions cmd/server/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ohsu-comp-bio/funnel/compute/scheduler"
"github.com/ohsu-comp-bio/funnel/compute/slurm"
"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/database/badger"
"github.com/ohsu-comp-bio/funnel/database/boltdb"
"github.com/ohsu-comp-bio/funnel/database/datastore"
"github.com/ohsu-comp-bio/funnel/database/dynamodb"
Expand Down Expand Up @@ -73,6 +74,15 @@ func NewServer(ctx context.Context, conf config.Config, log *logger.Logger) (*Se
queue = b
writers = append(writers, b)

case "badger":
b, err := badger.NewBadger(conf.Badger)
if err != nil {
return nil, dberr(err)
}
database = b
reader = b
writers = append(writers, b)

case "datastore":
d, err := datastore.NewDatastore(conf.Datastore)
if err != nil {
Expand Down Expand Up @@ -139,6 +149,8 @@ func NewServer(ctx context.Context, conf config.Config, log *logger.Logger) (*Se
continue
case "boltdb":
writer, err = boltdb.NewBoltDB(conf.BoltDB)
case "badger":
writer, err = badger.NewBadger(conf.Badger)
case "dynamodb":
writer, err = dynamodb.NewDynamoDB(conf.DynamoDB)
case "elastic":
Expand Down
4 changes: 4 additions & 0 deletions cmd/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger) (*wo
writer = &events.Logger{Log: log}
case "boltdb":
writer, err = events.NewRPCWriter(ctx, conf.Server)
case "badger":
writer, err = events.NewRPCWriter(ctx, conf.Server)
case "dynamodb":
writer, err = dynamodb.NewDynamoDB(conf.DynamoDB)
case "datastore":
Expand Down Expand Up @@ -86,6 +88,8 @@ func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger) (*wo
db, err = mongodb.NewMongoDB(conf.MongoDB)
case "boltdb":
reader, err = worker.NewRPCTaskReader(ctx, conf.Server)
case "badger":
reader, err = worker.NewRPCTaskReader(ctx, conf.Server)
default:
err = fmt.Errorf("unknown database: '%s'", conf.Database)
}
Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {
Logger logger.Config
// databases / event handlers
BoltDB BoltDB
Badger Badger
DynamoDB DynamoDB
Elastic Elastic
MongoDB MongoDB
Expand Down Expand Up @@ -153,6 +154,12 @@ type BoltDB struct {
Path string
}

// Badger describes configuration for the Badger embedded database.
type Badger struct {
// Path to database directory.
Path string
}

// MongoDB configures access to an MongoDB database.
type MongoDB struct {
// Addrs holds the addresses for the seed servers.
Expand Down
4 changes: 2 additions & 2 deletions config/default-config.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# The name of the active server database backend
# Available backends: boltdb, dynamodb, elastic, mongodb
# Available backends: boltdb, badger, datastore, dynamodb, elastic, mongodb
Database: boltdb

# The name of the active compute backend
# Available backends: local, htcondor, slurm, pbs, gridengine, manual, aws-batch
Compute: local

# The name of the active event writer backend(s).
# Available backends: log, boltdb, dynamodb, elastic, mongodb, kafka
# Available backends: log, boltdb, badger, datastore, dynamodb, elastic, mongodb, kafka
EventWriters:
- boltdb
- log
Expand Down
3 changes: 3 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func DefaultConfig() Config {
BoltDB: BoltDB{
Path: path.Join(workDir, "funnel.db"),
},
Badger: Badger{
Path: path.Join(workDir, "funnel.badger.db"),
},
DynamoDB: DynamoDB{
TableBasename: "funnel",
},
Expand Down
115 changes: 115 additions & 0 deletions database/badger/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package badger

import (
"context"
"fmt"
"time"

"github.com/dgraph-io/badger"
proto "github.com/golang/protobuf/proto"
"github.com/ohsu-comp-bio/funnel/events"
"github.com/ohsu-comp-bio/funnel/tes"
"github.com/ohsu-comp-bio/funnel/util"
)

// WriteEvent creates an event for the server to handle.
func (db *Badger) WriteEvent(ctx context.Context, req *events.Event) error {
r := util.Retrier{
InitialInterval: time.Millisecond,
MaxInterval: 10 * time.Millisecond,
MaxElapsedTime: time.Second,
MaxTries: 50,
ShouldRetry: func(err error) bool {
// Don't retry on state transition errors.
if _, ok := err.(*tes.TransitionError); ok {
return false
}
return true
},
}

return r.Retry(ctx, func() error {
return db.writeEvent(ctx, req)
})
}

func (db *Badger) writeEvent(ctx context.Context, req *events.Event) error {

// It's important this error be returned directly without being wrapped,
// because the retrier's ShouldRetry needs to check the error type (above).
return db.db.Update(func(txn *badger.Txn) error {

// If this event creates a new task, we don't need to update logic below,
// just marshal and save the task.
if req.Type == events.Type_TASK_CREATED {
task := req.GetTask()
val, err := proto.Marshal(task)
if err != nil {
return fmt.Errorf("marshaling task to bytes: %s", err)
}

return txn.Set(taskKey(task.Id), val)
}

// The rest of the events below all update a task, so we need to make sure it exists.
task, err := db.getTask(txn, req.Id)
if err != nil {
return err
}

switch req.Type {
case events.Type_TASK_STATE:
from := task.State
to := req.GetState()
if err = tes.ValidateTransition(from, to); err != nil {
return err
}
task.State = to

case events.Type_TASK_START_TIME:
task.GetTaskLog(0).StartTime = req.GetStartTime()

case events.Type_TASK_END_TIME:
task.GetTaskLog(0).EndTime = req.GetEndTime()

case events.Type_TASK_OUTPUTS:
task.GetTaskLog(0).Outputs = req.GetOutputs().Value

case events.Type_TASK_METADATA:
meta := req.GetMetadata().Value
tl := task.GetTaskLog(0)
if tl.Metadata == nil {
tl.Metadata = map[string]string{}
}
for k, v := range meta {
tl.Metadata[k] = v
}

case events.Type_EXECUTOR_START_TIME:
task.GetExecLog(0, int(req.Index)).StartTime = req.GetStartTime()

case events.Type_EXECUTOR_END_TIME:
task.GetExecLog(0, int(req.Index)).EndTime = req.GetEndTime()

case events.Type_EXECUTOR_EXIT_CODE:
task.GetExecLog(0, int(req.Index)).ExitCode = req.GetExitCode()

case events.Type_EXECUTOR_STDOUT:
task.GetExecLog(0, int(req.Index)).Stdout = req.GetStdout()

case events.Type_EXECUTOR_STDERR:
task.GetExecLog(0, int(req.Index)).Stderr = req.GetStderr()

case events.Type_SYSTEM_LOG:
tl := task.GetTaskLog(0)
tl.SystemLogs = append(tl.SystemLogs, req.SysLogString())
}

val, err := proto.Marshal(task)
if err != nil {
return fmt.Errorf("marshaling task to bytes: %s", err)
}

return txn.Set(taskKey(task.Id), val)
})
}
45 changes: 45 additions & 0 deletions database/badger/new.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package badger

import (
"fmt"

"github.com/dgraph-io/badger"
"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/util/fsutil"
)

// Badger provides a task database based on the Badger embedded database.
type Badger struct {
db *badger.DB
}

// NewBadger creates a new database instance.
func NewBadger(conf config.Badger) (*Badger, error) {
err := fsutil.EnsureDir(conf.Path)
if err != nil {
return nil, fmt.Errorf("creating database directory: %s", err)
}
opts := badger.DefaultOptions
opts.Dir = conf.Path
opts.ValueDir = conf.Path
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("opening database: %s", err)
}
return &Badger{db: db}, nil
}

// Init initializes the database.
func (db *Badger) Init() error {
return nil
}

var taskKeyPrefix = []byte("tasks")

func taskKey(id string) []byte {
idb := []byte(id)
key := make([]byte, 0, len(taskKeyPrefix)+len(idb))
key = append(key, taskKeyPrefix...)
key = append(key, idb...)
return key
}
Loading