Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Route temporaltest logs to testing.T logger #7

Merged
merged 1 commit into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/urfave/cli/v2 v2.3.0
go.temporal.io/api v1.5.0
go.temporal.io/sdk v1.10.0
go.temporal.io/server v1.12.0
go.temporal.io/server v1.12.1-0.20210921161622-b20a256e8e4f
go.uber.org/zap v1.19.1
modernc.org/sqlite v1.13.0
)
186 changes: 117 additions & 69 deletions go.sum

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions internal/common/persistence/sql/sqlplugin/sqlite/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.temporal.io/api/enums/v1"
"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
Expand All @@ -65,12 +66,20 @@ var (

type plugin struct {
mainDB *db
logger log.Logger
}

var _ sqlplugin.Plugin = (*plugin)(nil)

func RegisterPlugin(pluginName string) {
sql.RegisterPlugin(pluginName, &plugin{})
// Register registers the SQLite plugin with default configuration.
func Register() {
logger := log.NewCLILogger()
RegisterPluginWithLogger(PluginName, logger)
}

// RegisterPluginWithLogger registers the SQLite plugin with a custom logger and name.
func RegisterPluginWithLogger(pluginName string, logger log.Logger) {
sql.RegisterPlugin(pluginName, &plugin{logger: logger})
}

// CreateDB initialize the db object
Expand Down Expand Up @@ -175,6 +184,7 @@ func (p *plugin) createDBConnection(dbKind sqlplugin.DbKind, cfg *config.SQL, _
InitialVersion: "1.0",
Overwrite: false,
DisableVersioning: false,
Logger: p.logger,
}, conn)
}(cfg.DatabaseName); err != nil {
// Ignore error from running migrations twice against the same db.
Expand Down
2 changes: 1 addition & 1 deletion internal/liteconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Convert(cfg *Config) *config.Config {
if cfg.Ephemeral {
pluginName = fmt.Sprintf("%s_%d", pluginName, rand.Uint32())
}
sqlite.RegisterPlugin(pluginName)
sqlite.RegisterPluginWithLogger(pluginName, cfg.Logger)

sqliteConfig := config.SQL{
PluginName: pluginName,
Expand Down
34 changes: 34 additions & 0 deletions temporaltest/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package temporaltest

import (
"testing"
)

type testLogger struct {
t *testing.T
}

func (tl *testLogger) logLevel(lvl, msg string, keyvals ...interface{}) {
if tl.t == nil {
return
}
args := []interface{}{lvl, msg}
args = append(args, keyvals...)
tl.t.Log(args...)
}

func (tl *testLogger) Debug(msg string, keyvals ...interface{}) {
tl.logLevel("DEBUG", msg, keyvals)
}

func (tl *testLogger) Info(msg string, keyvals ...interface{}) {
tl.logLevel("INFO ", msg, keyvals)
}

func (tl *testLogger) Warn(msg string, keyvals ...interface{}) {
tl.logLevel("WARN ", msg, keyvals)
}

func (tl *testLogger) Error(msg string, keyvals ...interface{}) {
tl.logLevel("ERROR", msg, keyvals)
}
32 changes: 32 additions & 0 deletions temporaltest/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License.
//
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2021 Datadog, Inc.

package temporaltest

import "testing"

type TestServerOption interface {
apply(*TestServer)
}

// WithT directs all worker and client logs to the test logger.
func WithT(t *testing.T) TestServerOption {
return newApplyFuncContainer(func(server *TestServer) {
server.t = t
})
}

type applyFuncContainer struct {
applyInternal func(*TestServer)
}

func (fso *applyFuncContainer) apply(ts *TestServer) {
fso.applyInternal(ts)
}

func newApplyFuncContainer(apply func(*TestServer)) *applyFuncContainer {
return &applyFuncContainer{
applyInternal: apply,
}
}
60 changes: 49 additions & 11 deletions temporaltest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/server/common/log"
"go.uber.org/zap"

"github.com/DataDog/temporalite"
)
Expand All @@ -24,6 +25,30 @@ type TestServer struct {
defaultTestNamespace string
defaultClient client.Client
clients []client.Client
workers []worker.Worker
t *testing.T
}

func (ts *TestServer) fatal(err error) {
if ts.t == nil {
panic(err)
}
ts.t.Fatal(err)
}

// Worker registers and starts a Temporal worker on the specified task queue.
func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker.Registry)) worker.Worker {
w := worker.New(ts.Client(), taskQueue, worker.Options{
WorkflowPanicPolicy: worker.FailWorkflow,
})
registerFunc(w)
ts.workers = append(ts.workers, w)

if err := w.Start(); err != nil {
ts.fatal(err)
}

return w
}

// Client returns a Temporal client configured for making requests to the server.
Expand All @@ -43,13 +68,16 @@ func (ts *TestServer) NewClientWithOptions(opts client.Options) client.Client {
if opts.Namespace == "" {
opts.Namespace = ts.defaultTestNamespace
}
if opts.Logger == nil {
opts.Logger = &testLogger{ts.t}
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

c, err := ts.server.NewClientWithOptions(ctx, opts)
if err != nil {
panic(fmt.Errorf("error creating client: %w", err))
ts.fatal(fmt.Errorf("error creating client: %w", err))
}

ts.clients = append(ts.clients, c)
Expand All @@ -59,6 +87,9 @@ func (ts *TestServer) NewClientWithOptions(opts client.Options) client.Client {

// Stop closes test clients and shuts down the server.
func (ts *TestServer) Stop() {
for _, w := range ts.workers {
w.Stop()
}
for _, c := range ts.clients {
c.Close()
}
Expand All @@ -67,28 +98,35 @@ func (ts *TestServer) Stop() {

// NewServer starts and returns a new TestServer. The caller should call Stop
// when finished, to shut it down.
func NewServer() *TestServer {
func NewServer(opts ...TestServerOption) *TestServer {
rand.Seed(time.Now().UnixNano())
testNamespace := fmt.Sprintf("temporaltest-%d", rand.Intn(999999))

ts := TestServer{
defaultTestNamespace: testNamespace,
}

// Apply options
for _, opt := range opts {
opt.apply(&ts)
}

s, err := temporalite.NewServer(
temporalite.WithNamespaces(testNamespace),
temporalite.WithNamespaces(ts.defaultTestNamespace),
temporalite.WithPersistenceDisabled(),
temporalite.WithDynamicPorts(),
temporalite.WithLogger(log.NewZapLogger(zap.NewNop())),
temporalite.WithLogger(log.NewNoopLogger()),
)
if err != nil {
panic(fmt.Errorf("error creating server: %w", err))
ts.fatal(fmt.Errorf("error creating server: %w", err))
}
ts.server = s

go func() {
if err := s.Start(); err != nil {
panic(fmt.Errorf("error starting server: %w", err))
ts.fatal(fmt.Errorf("error starting server: %w", err))
}
}()

return &TestServer{
server: s,
defaultTestNamespace: testNamespace,
}
return &ts
}
59 changes: 23 additions & 36 deletions temporaltest/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,32 @@ package temporaltest_test

import (
"context"
"fmt"
"testing"
"time"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/DataDog/temporalite/internal/examples/helloworld"
"github.com/DataDog/temporalite/temporaltest"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

// to be used in example code
var t *testing.T

func ExampleNewServer_testWorker() {
// Create test Temporal server and client
ts := temporaltest.NewServer()
c := ts.Client()
ts := temporaltest.NewServer(temporaltest.WithT(t))
// Stop server and close clients when tests complete
defer ts.Stop()

// Register a new worker on the `hello_world` task queue
w := worker.New(c, "hello_world", worker.Options{})
helloworld.RegisterWorkflowsAndActivities(w)
// Start worker
if err := w.Start(); err != nil {
t.Fatal(err)
}
// Stop worker when tests complete
defer w.Stop()
ts.Worker("hello_world", func(registry worker.Registry) {
helloworld.RegisterWorkflowsAndActivities(registry)
})

// Create a test client
c := ts.Client()

// Start test workflow
wfr, err := c.ExecuteWorkflow(
Expand All @@ -53,29 +50,23 @@ func ExampleNewServer_testWorker() {
t.Fatal(err)
}

// Fail if result has unexpected value
if result != "Hello world" {
t.Fatalf("unexpected result: %q", result)
}
// Print result
fmt.Println(result)
// Output: Hello world
}

func TestNewServer(t *testing.T) {
ts := temporaltest.NewServer()
c := ts.Client()
ts := temporaltest.NewServer(temporaltest.WithT(t))
defer ts.Stop()

w := worker.New(c, "hello_world", worker.Options{})
helloworld.RegisterWorkflowsAndActivities(w)

if err := w.Start(); err != nil {
t.Fatal(err)
}
defer w.Stop()
ts.Worker("hello_world", func(registry worker.Registry) {
helloworld.RegisterWorkflowsAndActivities(registry)
})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

wfr, err := c.ExecuteWorkflow(
wfr, err := ts.Client().ExecuteWorkflow(
ctx,
client.StartWorkflowOptions{TaskQueue: "hello_world"},
helloworld.Greet,
Expand All @@ -97,20 +88,16 @@ func TestNewServer(t *testing.T) {

func BenchmarkRunWorkflow(b *testing.B) {
ts := temporaltest.NewServer()
c := ts.Client()
defer ts.Stop()

w := worker.New(c, "hello_world", worker.Options{})
helloworld.RegisterWorkflowsAndActivities(w)

if err := w.Start(); err != nil {
panic(err)
}
defer w.Stop()
ts.Worker("hello_world", func(registry worker.Registry) {
helloworld.RegisterWorkflowsAndActivities(registry)
})
c := ts.Client()

for i := 0; i < b.N; i++ {
func(b *testing.B) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

wfr, err := c.ExecuteWorkflow(
Expand Down