diff --git a/go.mod b/go.mod index d0996e46c..13d9ac913 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/smartcontractkit/chainlink-common -go 1.23.3 +go 1.23.4 require ( github.com/XSAM/otelsql v0.29.0 diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index 56fd13117..d9d584e4b 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -3,8 +3,10 @@ package host import ( "bytes" "context" + "crypto/sha256" "encoding/base64" "encoding/binary" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -119,6 +121,14 @@ type Module struct { stopCh chan struct{} } +type WasmBinaryStore interface { + // GetSerialisedModulePath returns the path to the serialised module for the given workflowID. If the module does not exist, exists + // will be false. + GetSerialisedModulePath(workflowID string) (path string, exists bool, err error) + StoreSerialisedModule(workflowID string, binaryID string, serialisedModule []byte) error + GetWasmBinary(ctx context.Context, workflowID string) ([]byte, error) +} + // WithDeterminism sets the Determinism field to a deterministic seed from a known time. // // "The Times 03/Jan/2009 Chancellor on brink of second bailout for banks" @@ -133,7 +143,10 @@ func WithDeterminism() func(*ModuleConfig) { } } -func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) (*Module, error) { +// NewModule uses the WasmBinaryStore to load the module for a given workflowID. If the module is available as a serialised +// representation from the WasmBinaryStore that will be used, else it will be loaded from the wasm binary. +func NewModule(ctx context.Context, lggr logger.Logger, modCfg *ModuleConfig, workflowID string, wasmStore WasmBinaryStore, + opts ...func(*ModuleConfig)) (*Module, error) { // Apply options to the module config. for _, opt := range opts { opt(modCfg) @@ -193,37 +206,33 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) cfg.CacheConfigLoadDefault() cfg.SetCraneliftOptLevel(wasmtime.OptLevelSpeedAndSize) - + // Load testing shows that leaving native unwind info enabled causes a very large slowdown when loading multiple modules. cfg.SetNativeUnwindInfo(false) engine := wasmtime.NewEngineWithConfig(cfg) - if !modCfg.IsUncompressed { - // validate the binary size before decompressing - // this is to prevent decompression bombs - if uint64(len(binary)) > modCfg.MaxCompressedBinarySize { - return nil, fmt.Errorf("compressed binary size exceeds the maximum allowed size of %d bytes", modCfg.MaxCompressedBinarySize) - } - rdr := io.LimitReader(brotli.NewReader(bytes.NewBuffer(binary)), int64(modCfg.MaxDecompressedBinarySize+1)) - decompedBinary, err := io.ReadAll(rdr) - if err != nil { - return nil, fmt.Errorf("failed to decompress binary: %w", err) - } - - binary = decompedBinary + var mod *wasmtime.Module + serialisedModulePath, exists, err := wasmStore.GetSerialisedModulePath(workflowID) + if err != nil { + return nil, fmt.Errorf("error getting serialised module: %w", err) } - // Validate the decompressed binary size. - // io.LimitReader prevents decompression bombs by reading up to a set limit, but it will not return an error if the limit is reached. - // The Read() method will return io.EOF, and ReadAll will gracefully handle it and return nil. - if uint64(len(binary)) > modCfg.MaxDecompressedBinarySize { - return nil, fmt.Errorf("decompressed binary size reached the maximum allowed size of %d bytes", modCfg.MaxDecompressedBinarySize) + if exists { + mod, err = wasmtime.NewModuleDeserializeFile(engine, serialisedModulePath) + if err != nil { + // It's possible that an error occurred because the module was serialised with a different engine configuration or + // wasmtime version so the error is ignored and the code falls back to loading it from the wasm binary. + lggr.Debugw("error deserializing module, attempting to load from binary", "workflowID", workflowID, "error", err) + } } - mod, err := wasmtime.NewModule(engine, binary) - if err != nil { - return nil, fmt.Errorf("error creating wasmtime module: %w", err) + // If the serialized module was not found or deserialization failed, load the module from the wasm binary. + if mod == nil { + mod, err = loadModuleFromWasmBinary(ctx, lggr, modCfg, workflowID, wasmStore, engine) + if err != nil { + return nil, fmt.Errorf("error loading module from wasm binary: %w", err) + } } linker, err := newWasiLinker(modCfg, engine) @@ -287,6 +296,70 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) return m, nil } +func loadModuleFromWasmBinary(ctx context.Context, lggr logger.Logger, modCfg *ModuleConfig, workflowID string, wasmStore WasmBinaryStore, engine *wasmtime.Engine) (*wasmtime.Module, error) { + // Loading from the module binary is relatively very slow (~100 times slower than deserialization) so log the + // time here to make it obvious when this is happening as it will impact workflow startup time. + wasmBinary, err := wasmStore.GetWasmBinary(ctx, workflowID) + if err != nil { + return nil, fmt.Errorf("error getting workflow binary: %w", err) + } + + hash := sha256.Sum256(wasmBinary) + binaryID := hex.EncodeToString(hash[:]) + + lggr.Infow("loading module from binary", "workflowID", workflowID) + mod, err := newModuleFromBinary(wasmBinary, modCfg, engine) + if err != nil { + return nil, fmt.Errorf("error creating new module from wasm binary: %w", err) + } + lggr.Infow("finished loading module from binary", "workflowID", workflowID) + + // Store the serialised module for future use. + serialisedMod, err := mod.Serialize() + if err != nil { + return nil, fmt.Errorf("error serialising module: %w", err) + } + + err = wasmStore.StoreSerialisedModule(workflowID, binaryID, serialisedMod) + if err != nil { + return nil, fmt.Errorf("error storing serialised module: %w", err) + } + return mod, nil +} + +func newModuleFromBinary(wasmBinary []byte, modCfg *ModuleConfig, engine *wasmtime.Engine) (*wasmtime.Module, error) { + + if !modCfg.IsUncompressed { + // validate the binary size before decompressing + // this is to prevent decompression bombs + if uint64(len(wasmBinary)) > modCfg.MaxCompressedBinarySize { + return nil, fmt.Errorf("compressed binary size exceeds the maximum allowed size of %d bytes", modCfg.MaxCompressedBinarySize) + } + + rdr := io.LimitReader(brotli.NewReader(bytes.NewBuffer(wasmBinary)), int64(modCfg.MaxDecompressedBinarySize+1)) + decompedBinary, err := io.ReadAll(rdr) + if err != nil { + return nil, fmt.Errorf("failed to decompress binary: %w", err) + } + + wasmBinary = decompedBinary + } + + // Validate the decompressed binary size. + // io.LimitReader prevents decompression bombs by reading up to a set limit, but it will not return an error if the limit is reached. + // The Read() method will return io.EOF, and ReadAll will gracefully handle it and return nil. + if uint64(len(wasmBinary)) > modCfg.MaxDecompressedBinarySize { + return nil, fmt.Errorf("decompressed binary size reached the maximum allowed size of %d bytes", modCfg.MaxDecompressedBinarySize) + } + + mod, err := wasmtime.NewModule(engine, wasmBinary) + if err != nil { + return nil, fmt.Errorf("error creating wasmtime module: %w", err) + } + + return mod, nil +} + func (m *Module) Start() { m.wg.Add(1) go func() { diff --git a/pkg/workflows/wasm/host/wasm.go b/pkg/workflows/wasm/host/wasm.go index 2e0dea4d3..441f6bbb5 100644 --- a/pkg/workflows/wasm/host/wasm.go +++ b/pkg/workflows/wasm/host/wasm.go @@ -9,12 +9,13 @@ import ( "google.golang.org/protobuf/types/known/emptypb" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb" ) -func GetWorkflowSpec(ctx context.Context, modCfg *ModuleConfig, binary []byte, config []byte) (*sdk.WorkflowSpec, error) { - m, err := NewModule(modCfg, binary, WithDeterminism()) +func GetWorkflowSpec(ctx context.Context, lggr logger.Logger, modCfg *ModuleConfig, workflowID string, wasmStore WasmBinaryStore, config []byte) (*sdk.WorkflowSpec, error) { + m, err := NewModule(ctx, lggr, modCfg, workflowID, wasmStore, WithDeterminism()) if err != nil { return nil, fmt.Errorf("could not instantiate module: %w", err) } @@ -43,3 +44,29 @@ func GetWorkflowSpec(ctx context.Context, modCfg *ModuleConfig, binary []byte, c return wasmpb.ProtoToWorkflowSpec(sr) } + +func NewSingleBinaryWasmBinaryStore(binary []byte) WasmBinaryStore { + // Create a mock implementation of the wasmBinaryStore interface + binaryStore := &SingleBinaryWasmBinaryStore{ + binary: binary, + } + return binaryStore +} + +// SingleBinaryWasmBinaryStore is a mock implementation of the wasmBinaryStore interface +type SingleBinaryWasmBinaryStore struct { + binary []byte +} + +func (m *SingleBinaryWasmBinaryStore) GetSerialisedModulePath(workflowID string) (string, bool, error) { + return "", false, nil +} + +func (m *SingleBinaryWasmBinaryStore) StoreSerialisedModule(workflowID string, binaryID string, module []byte) error { + //noop + return nil +} + +func (m *SingleBinaryWasmBinaryStore) GetWasmBinary(ctx context.Context, workflowID string) ([]byte, error) { + return m.binary, nil +} diff --git a/pkg/workflows/wasm/host/wasm_test.go b/pkg/workflows/wasm/host/wasm_test.go index a40cac8e4..6d16a9c02 100644 --- a/pkg/workflows/wasm/host/wasm_test.go +++ b/pkg/workflows/wasm/host/wasm_test.go @@ -93,11 +93,12 @@ func Test_GetWorkflowSpec(t *testing.T) { _, err := GetWorkflowSpec( ctx, + logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, }, - binary, + "", NewSingleBinaryWasmBinaryStore(binary), []byte(""), ) require.NoError(t, err) @@ -110,11 +111,12 @@ func Test_GetWorkflowSpec_UncompressedBinary(t *testing.T) { _, err := GetWorkflowSpec( ctx, + logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: false, }, - binary, + "", NewSingleBinaryWasmBinaryStore(binary), []byte(""), ) require.NoError(t, err) @@ -126,11 +128,12 @@ func Test_GetWorkflowSpec_BinaryErrors(t *testing.T) { _, err := GetWorkflowSpec( ctx, + logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, }, - failBinary, + "", NewSingleBinaryWasmBinaryStore(failBinary), []byte(""), ) // panic @@ -145,12 +148,13 @@ func Test_GetWorkflowSpec_Timeout(t *testing.T) { d := time.Duration(0) _, err := GetWorkflowSpec( ctx, + logger.Test(t), &ModuleConfig{ Timeout: &d, Logger: logger.Test(t), IsUncompressed: true, }, - binary, // use the success binary with a zero timeout + "", NewSingleBinaryWasmBinaryStore(binary), // use the success binary with a zero timeout []byte(""), ) // panic @@ -164,11 +168,12 @@ func Test_GetWorkflowSpec_BuildError(t *testing.T) { _, err := GetWorkflowSpec( ctx, + logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, }, - binary, + "", NewSingleBinaryWasmBinaryStore(binary), []byte(""), ) assert.ErrorContains(t, err, "oops") @@ -180,13 +185,13 @@ func Test_Compute_Logs(t *testing.T) { binary := createTestBinary(logBinaryCmd, logBinaryLocation, true, t) logger, logs := logger.TestObserved(t, zapcore.InfoLevel) - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger, &ModuleConfig{ Logger: logger, IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { return nil, nil }, - }, binary) + }, "wf1", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -208,8 +213,21 @@ func Test_Compute_Logs(t *testing.T) { _, err = m.Run(ctx, req) assert.Nil(t, err) - require.Len(t, logs.AllUntimed(), 2) + require.Len(t, logs.AllUntimed(), 4) expectedEntries := []Entry{ + { + Log: zapcore.Entry{Level: zapcore.InfoLevel, Message: "loading module from binary"}, + Fields: []zapcore.Field{ + zap.String("workflowID", "wf1"), + }, + }, + { + Log: zapcore.Entry{Level: zapcore.InfoLevel, Message: "finished loading module from binary"}, + Fields: []zapcore.Field{ + zap.String("workflowID", "wf1"), + }, + }, + { Log: zapcore.Entry{Level: zapcore.InfoLevel, Message: "building workflow..."}, Fields: []zapcore.Field{ @@ -266,7 +284,7 @@ func Test_Compute_Emit(t *testing.T) { ctx := tests.Context(t) ctxValue := "test-value" ctx = context.WithValue(ctx, ctxKey, ctxValue) - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: lggr, Fetch: fetchFunc, IsUncompressed: true, @@ -284,7 +302,7 @@ func Test_Compute_Emit(t *testing.T) { assert.Equal(t, "workflow-execution-id", kvs["workflow_execution_id"]) return nil }), - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -295,8 +313,9 @@ func Test_Compute_Emit(t *testing.T) { t.Run("failure on emit writes to error chain and logs", func(t *testing.T) { lggr, logs := logger.TestObserved(t, zapcore.InfoLevel) + ctx := tests.Context(t) - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: lggr, Fetch: fetchFunc, IsUncompressed: true, @@ -312,12 +331,11 @@ func Test_Compute_Emit(t *testing.T) { return assert.AnError }), - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() - ctx := tests.Context(t) _, err = m.Run(ctx, req) assert.Error(t, err) assert.ErrorContains(t, err, assert.AnError.Error()) @@ -337,15 +355,16 @@ func Test_Compute_Emit(t *testing.T) { t.Run("failure on emit due to missing workflow identifying metadata", func(t *testing.T) { lggr := logger.Test(t) + ctx := tests.Context(t) - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: lggr, Fetch: fetchFunc, IsUncompressed: true, Labeler: newMockMessageEmitter(func(_ context.Context, msg string, labels map[string]string) error { return nil }), // never called - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -365,7 +384,6 @@ func Test_Compute_Emit(t *testing.T) { }, } - ctx := tests.Context(t) _, err = m.Run(ctx, req) assert.Error(t, err) assert.ErrorContains(t, err, "failed to create emission") @@ -377,10 +395,10 @@ func Test_Compute_PanicIsRecovered(t *testing.T) { binary := createTestBinary(computePanicBinaryCmd, computePanicBinaryLocation, true, t) ctx := tests.Context(t) - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -417,7 +435,7 @@ func Test_Compute_Fetch(t *testing.T) { Headers: map[string]any{}, } - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { @@ -427,7 +445,7 @@ func Test_Compute_Fetch(t *testing.T) { StatusCode: uint32(expected.StatusCode), }, nil }, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -468,7 +486,7 @@ func Test_Compute_Fetch(t *testing.T) { Headers: map[string]any{}, } - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { @@ -478,7 +496,7 @@ func Test_Compute_Fetch(t *testing.T) { StatusCode: uint32(expected.StatusCode), }, nil }, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -517,13 +535,13 @@ func Test_Compute_Fetch(t *testing.T) { ctx := tests.Context(t) logger, logs := logger.TestObserved(t, zapcore.InfoLevel) - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger, &ModuleConfig{ Logger: logger, IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { return nil, assert.AnError }, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -545,9 +563,21 @@ func Test_Compute_Fetch(t *testing.T) { _, err = m.Run(ctx, req) assert.NotNil(t, err) assert.ErrorContains(t, err, assert.AnError.Error()) - require.Len(t, logs.AllUntimed(), 1) + require.Len(t, logs.AllUntimed(), 3) expectedEntries := []Entry{ + { + Log: zapcore.Entry{Level: zapcore.InfoLevel, Message: "loading module from binary"}, + Fields: []zapcore.Field{ + zap.String("workflowID", "wf1"), + }, + }, + { + Log: zapcore.Entry{Level: zapcore.InfoLevel, Message: "finished loading module from binary"}, + Fields: []zapcore.Field{ + zap.String("workflowID", "wf1"), + }, + }, { Log: zapcore.Entry{Level: zapcore.ErrorLevel, Message: fmt.Sprintf("error calling fetch: %s", assert.AnError)}, }, @@ -560,6 +590,7 @@ func Test_Compute_Fetch(t *testing.T) { t.Run("OK_context_propagation", func(t *testing.T) { t.Parallel() + ctx := tests.Context(t) type testkey string var key testkey = "test-key" var expectedValue string = "test-value" @@ -571,7 +602,7 @@ func Test_Compute_Fetch(t *testing.T) { Headers: map[string]any{}, } - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { @@ -581,7 +612,7 @@ func Test_Compute_Fetch(t *testing.T) { StatusCode: uint32(expected.StatusCode), }, nil }, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -604,7 +635,7 @@ func Test_Compute_Fetch(t *testing.T) { }, } - ctx := context.WithValue(tests.Context(t), key, expectedValue) + ctx = context.WithValue(tests.Context(t), key, expectedValue) response, err := m.Run(ctx, req) assert.Nil(t, err) @@ -619,7 +650,8 @@ func Test_Compute_Fetch(t *testing.T) { t.Run("OK_context_cancelation", func(t *testing.T) { t.Parallel() - m, err := NewModule(&ModuleConfig{ + ctx := tests.Context(t) + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { @@ -630,7 +662,7 @@ func Test_Compute_Fetch(t *testing.T) { return &wasmpb.FetchResponse{}, nil } }, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -671,7 +703,7 @@ func Test_Compute_Fetch(t *testing.T) { Headers: map[string]any{}, } - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { @@ -682,7 +714,7 @@ func Test_Compute_Fetch(t *testing.T) { }, nil }, MaxFetchRequests: 1, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -716,7 +748,7 @@ func Test_Compute_Fetch(t *testing.T) { Headers: map[string]any{}, } - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { @@ -726,7 +758,7 @@ func Test_Compute_Fetch(t *testing.T) { StatusCode: uint32(expected.StatusCode), }, nil }, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -760,7 +792,7 @@ func Test_Compute_Fetch(t *testing.T) { Headers: map[string]any{}, } - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { @@ -771,7 +803,7 @@ func Test_Compute_Fetch(t *testing.T) { }, nil }, MaxFetchRequests: 6, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -805,7 +837,7 @@ func Test_Compute_Fetch(t *testing.T) { Headers: map[string]any{}, } - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, Fetch: func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) { @@ -816,7 +848,7 @@ func Test_Compute_Fetch(t *testing.T) { }, nil }, MaxFetchRequests: 6, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -850,7 +882,7 @@ func TestModule_Errors(t *testing.T) { ctx := tests.Context(t) binary := createTestBinary(successBinaryCmd, successBinaryLocation, true, t) - m, err := NewModule(&ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, binary) + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) _, err = m.Run(ctx, nil) @@ -897,7 +929,7 @@ func TestModule_Sandbox_Memory(t *testing.T) { ctx := tests.Context(t) binary := createTestBinary(oomBinaryCmd, oomBinaryLocation, true, t) - m, err := NewModule(&ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, binary) + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -912,11 +944,12 @@ func TestModule_Sandbox_Memory(t *testing.T) { func TestModule_CompressedBinarySize(t *testing.T) { t.Parallel() + ctx := tests.Context(t) t.Run("compressed binary size is smaller than the default 10mb limit", func(t *testing.T) { binary := createTestBinary(successBinaryCmd, successBinaryLocation, false, t) - _, err := NewModule(&ModuleConfig{IsUncompressed: false, Logger: logger.Test(t)}, binary) + _, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: false, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) }) @@ -929,7 +962,7 @@ func TestModule_CompressedBinarySize(t *testing.T) { require.NoError(t, err) require.NoError(t, bwr.Close()) - _, err = NewModule(&ModuleConfig{IsUncompressed: false, Logger: logger.Test(t)}, binary) + _, err = NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: false, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) default10mbLimit := fmt.Sprintf("binary size exceeds the maximum allowed size of %d bytes", defaultMaxCompressedBinarySize) require.ErrorContains(t, err, default10mbLimit) }) @@ -944,7 +977,7 @@ func TestModule_CompressedBinarySize(t *testing.T) { require.NoError(t, err) require.NoError(t, bwr.Close()) - _, err = NewModule(&ModuleConfig{IsUncompressed: false, MaxCompressedBinarySize: customMaxCompressedBinarySize, Logger: logger.Test(t)}, binary) + _, err = NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: false, MaxCompressedBinarySize: customMaxCompressedBinarySize, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) default10mbLimit := fmt.Sprintf("binary size exceeds the maximum allowed size of %d bytes", customMaxCompressedBinarySize) require.ErrorContains(t, err, default10mbLimit) }) @@ -952,19 +985,20 @@ func TestModule_CompressedBinarySize(t *testing.T) { func TestModule_DecompressedBinarySize(t *testing.T) { t.Parallel() + ctx := tests.Context(t) // compressed binary size is 4.121 MB // decompressed binary size is 23.7 MB binary := createTestBinary(successBinaryCmd, successBinaryLocation, false, t) t.Run("decompressed binary size is within the limit", func(t *testing.T) { customDecompressedBinarySize := uint64(24 * 1024 * 1024) - _, err := NewModule(&ModuleConfig{IsUncompressed: false, MaxDecompressedBinarySize: customDecompressedBinarySize, Logger: logger.Test(t)}, binary) + _, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: false, MaxDecompressedBinarySize: customDecompressedBinarySize, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) }) t.Run("decompressed binary size is bigger than the limit", func(t *testing.T) { customDecompressedBinarySize := uint64(3 * 1024 * 1024) - _, err := NewModule(&ModuleConfig{IsUncompressed: false, MaxDecompressedBinarySize: customDecompressedBinarySize, Logger: logger.Test(t)}, binary) + _, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: false, MaxDecompressedBinarySize: customDecompressedBinarySize, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) decompressedSizeExceeded := fmt.Sprintf("decompressed binary size reached the maximum allowed size of %d bytes", customDecompressedBinarySize) require.ErrorContains(t, err, decompressedSizeExceeded) }) @@ -976,7 +1010,7 @@ func TestModule_Sandbox_SleepIsStubbedOut(t *testing.T) { binary := createTestBinary(sleepBinaryCmd, sleepBinaryLocation, true, t) d := 1 * time.Millisecond - m, err := NewModule(&ModuleConfig{Timeout: &d, IsUncompressed: true, Logger: logger.Test(t)}, binary) + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{Timeout: &d, IsUncompressed: true, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -1002,7 +1036,7 @@ func TestModule_Sandbox_Timeout(t *testing.T) { binary := createTestBinary(sleepBinaryCmd, sleepBinaryLocation, true, t) tmt := 10 * time.Millisecond - m, err := NewModule(&ModuleConfig{IsUncompressed: true, Logger: logger.Test(t), Timeout: &tmt}, binary) + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: true, Logger: logger.Test(t), Timeout: &tmt}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -1022,7 +1056,7 @@ func TestModule_Sandbox_CantReadFiles(t *testing.T) { ctx := tests.Context(t) binary := createTestBinary(filesBinaryCmd, filesBinaryLocation, true, t) - m, err := NewModule(&ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, binary) + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -1050,7 +1084,7 @@ func TestModule_Sandbox_CantCreateDir(t *testing.T) { ctx := tests.Context(t) binary := createTestBinary(dirsBinaryCmd, dirsBinaryLocation, true, t) - m, err := NewModule(&ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, binary) + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -1078,7 +1112,7 @@ func TestModule_Sandbox_HTTPRequest(t *testing.T) { ctx := tests.Context(t) binary := createTestBinary(httpBinaryCmd, httpBinaryLocation, true, t) - m, err := NewModule(&ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, binary) + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -1104,9 +1138,10 @@ func TestModule_Sandbox_HTTPRequest(t *testing.T) { func TestModule_Sandbox_ReadEnv(t *testing.T) { t.Parallel() ctx := tests.Context(t) + binary := createTestBinary(envBinaryCmd, envBinaryLocation, true, t) - m, err := NewModule(&ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, binary) + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{IsUncompressed: true, Logger: logger.Test(t)}, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -1153,13 +1188,13 @@ func TestModule_Sandbox_RandomGet(t *testing.T) { ctx := tests.Context(t) binary := createTestBinary(randBinaryCmd, randBinaryLocation, true, t) - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, Determinism: &DeterminismConfig{ Seed: 42, }, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start() @@ -1172,10 +1207,10 @@ func TestModule_Sandbox_RandomGet(t *testing.T) { ctx := tests.Context(t) binary := createTestBinary(randBinaryCmd, randBinaryLocation, true, t) - m, err := NewModule(&ModuleConfig{ + m, err := NewModule(ctx, logger.Test(t), &ModuleConfig{ Logger: logger.Test(t), IsUncompressed: true, - }, binary) + }, "", NewSingleBinaryWasmBinaryStore(binary)) require.NoError(t, err) m.Start()