Skip to content

Commit

Permalink
wasm module serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec committed Feb 12, 2025
1 parent f22f85e commit d7ad453
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 78 deletions.
119 changes: 96 additions & 23 deletions pkg/workflows/wasm/host/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package host
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -119,6 +121,14 @@ type Module struct {
stopCh chan struct{}
}

type WasmBinaryStore interface {
// GetSerialisedModulePath returns the path to the serialised module for the given workflowID. If the module does not exist, exists
// will be false.
GetSerialisedModulePath(workflowID string) (path string, exists bool, err error)
StoreSerialisedModule(workflowID string, binaryID string, serialisedModule []byte) error
GetWasmBinary(ctx context.Context, workflowID string) ([]byte, error)
}

// WithDeterminism sets the Determinism field to a deterministic seed from a known time.
//
// "The Times 03/Jan/2009 Chancellor on brink of second bailout for banks"
Expand All @@ -133,7 +143,10 @@ func WithDeterminism() func(*ModuleConfig) {
}
}

func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) (*Module, error) {
// NewModule uses the WasmBinaryStore to load the module for a given workflowID. If the module is available as a serialised
// representation from the WasmBinaryStore that will be used, else it will be loaded from the wasm binary.
func NewModule(ctx context.Context, lggr logger.Logger, modCfg *ModuleConfig, workflowID string, wasmStore WasmBinaryStore,
opts ...func(*ModuleConfig)) (*Module, error) {
// Apply options to the module config.
for _, opt := range opts {
opt(modCfg)
Expand Down Expand Up @@ -193,37 +206,33 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))

cfg.CacheConfigLoadDefault()
cfg.SetCraneliftOptLevel(wasmtime.OptLevelSpeedAndSize)

// Load testing shows that leaving native unwind info enabled causes a very large slowdown when loading multiple modules.
cfg.SetNativeUnwindInfo(false)

engine := wasmtime.NewEngineWithConfig(cfg)
if !modCfg.IsUncompressed {
// validate the binary size before decompressing
// this is to prevent decompression bombs
if uint64(len(binary)) > modCfg.MaxCompressedBinarySize {
return nil, fmt.Errorf("compressed binary size exceeds the maximum allowed size of %d bytes", modCfg.MaxCompressedBinarySize)
}

rdr := io.LimitReader(brotli.NewReader(bytes.NewBuffer(binary)), int64(modCfg.MaxDecompressedBinarySize+1))
decompedBinary, err := io.ReadAll(rdr)
if err != nil {
return nil, fmt.Errorf("failed to decompress binary: %w", err)
}

binary = decompedBinary
var mod *wasmtime.Module
serialisedModulePath, exists, err := wasmStore.GetSerialisedModulePath(workflowID)
if err != nil {
return nil, fmt.Errorf("error getting serialised module: %w", err)
}

// Validate the decompressed binary size.
// io.LimitReader prevents decompression bombs by reading up to a set limit, but it will not return an error if the limit is reached.
// The Read() method will return io.EOF, and ReadAll will gracefully handle it and return nil.
if uint64(len(binary)) > modCfg.MaxDecompressedBinarySize {
return nil, fmt.Errorf("decompressed binary size reached the maximum allowed size of %d bytes", modCfg.MaxDecompressedBinarySize)
if exists {
mod, err = wasmtime.NewModuleDeserializeFile(engine, serialisedModulePath)
if err != nil {
// It's possible that an error occurred because the module was serialised with a different engine configuration or
// wasmtime version so the error is ignored and the code falls back to loading it from the wasm binary.
lggr.Debugw("error deserializing module, attempting to load from binary", "workflowID", workflowID, "error", err)
}
}

mod, err := wasmtime.NewModule(engine, binary)
if err != nil {
return nil, fmt.Errorf("error creating wasmtime module: %w", err)
// If the serialized module was not found or deserialization failed, load the module from the wasm binary.
if mod == nil {
mod, err = loadModuleFromWasmBinary(ctx, lggr, modCfg, workflowID, wasmStore, engine)
if err != nil {
return nil, fmt.Errorf("error loading module from wasm binary: %w", err)
}
}

linker, err := newWasiLinker(modCfg, engine)
Expand Down Expand Up @@ -287,6 +296,70 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
return m, nil
}

func loadModuleFromWasmBinary(ctx context.Context, lggr logger.Logger, modCfg *ModuleConfig, workflowID string, wasmStore WasmBinaryStore, engine *wasmtime.Engine) (*wasmtime.Module, error) {
// Loading from the module binary is relatively very slow (~100 times slower than deserialization) so log the
// time here to make it obvious when this is happening as it will impact workflow startup time.
wasmBinary, err := wasmStore.GetWasmBinary(ctx, workflowID)
if err != nil {
return nil, fmt.Errorf("error getting workflow binary: %w", err)
}

hash := sha256.Sum256(wasmBinary)
binaryID := hex.EncodeToString(hash[:])

lggr.Infow("loading module from binary", "workflowID", workflowID)
mod, err := newModuleFromBinary(wasmBinary, modCfg, engine)
if err != nil {
return nil, fmt.Errorf("error creating new module from wasm binary: %w", err)
}
lggr.Infow("finished loading module from binary", "workflowID", workflowID)

// Store the serialised module for future use.
serialisedMod, err := mod.Serialize()
if err != nil {
return nil, fmt.Errorf("error serialising module: %w", err)
}

err = wasmStore.StoreSerialisedModule(workflowID, binaryID, serialisedMod)
if err != nil {
return nil, fmt.Errorf("error storing serialised module: %w", err)
}
return mod, nil
}

func newModuleFromBinary(wasmBinary []byte, modCfg *ModuleConfig, engine *wasmtime.Engine) (*wasmtime.Module, error) {

if !modCfg.IsUncompressed {
// validate the binary size before decompressing
// this is to prevent decompression bombs
if uint64(len(wasmBinary)) > modCfg.MaxCompressedBinarySize {
return nil, fmt.Errorf("compressed binary size exceeds the maximum allowed size of %d bytes", modCfg.MaxCompressedBinarySize)
}

rdr := io.LimitReader(brotli.NewReader(bytes.NewBuffer(wasmBinary)), int64(modCfg.MaxDecompressedBinarySize+1))
decompedBinary, err := io.ReadAll(rdr)
if err != nil {
return nil, fmt.Errorf("failed to decompress binary: %w", err)
}

wasmBinary = decompedBinary
}

// Validate the decompressed binary size.
// io.LimitReader prevents decompression bombs by reading up to a set limit, but it will not return an error if the limit is reached.
// The Read() method will return io.EOF, and ReadAll will gracefully handle it and return nil.
if uint64(len(wasmBinary)) > modCfg.MaxDecompressedBinarySize {
return nil, fmt.Errorf("decompressed binary size reached the maximum allowed size of %d bytes", modCfg.MaxDecompressedBinarySize)
}

mod, err := wasmtime.NewModule(engine, wasmBinary)
if err != nil {
return nil, fmt.Errorf("error creating wasmtime module: %w", err)
}

return mod, nil
}

func (m *Module) Start() {
m.wg.Add(1)
go func() {
Expand Down
31 changes: 29 additions & 2 deletions pkg/workflows/wasm/host/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (

"google.golang.org/protobuf/types/known/emptypb"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

func GetWorkflowSpec(ctx context.Context, modCfg *ModuleConfig, binary []byte, config []byte) (*sdk.WorkflowSpec, error) {
m, err := NewModule(modCfg, binary, WithDeterminism())
func GetWorkflowSpec(ctx context.Context, lggr logger.Logger, modCfg *ModuleConfig, workflowID string, wasmStore WasmBinaryStore, config []byte) (*sdk.WorkflowSpec, error) {
m, err := NewModule(ctx, lggr, modCfg, workflowID, wasmStore, WithDeterminism())
if err != nil {
return nil, fmt.Errorf("could not instantiate module: %w", err)
}
Expand Down Expand Up @@ -43,3 +44,29 @@ func GetWorkflowSpec(ctx context.Context, modCfg *ModuleConfig, binary []byte, c

return wasmpb.ProtoToWorkflowSpec(sr)
}

func NewSingleBinaryWasmBinaryStore(binary []byte) WasmBinaryStore {
// Create a mock implementation of the wasmBinaryStore interface
binaryStore := &SingleBinaryWasmBinaryStore{
binary: binary,
}
return binaryStore
}

// SingleBinaryWasmBinaryStore is a mock implementation of the wasmBinaryStore interface
type SingleBinaryWasmBinaryStore struct {
binary []byte
}

func (m *SingleBinaryWasmBinaryStore) GetSerialisedModulePath(workflowID string) (string, bool, error) {
return "", false, nil
}

func (m *SingleBinaryWasmBinaryStore) StoreSerialisedModule(workflowID string, binaryID string, module []byte) error {
//noop
return nil
}

func (m *SingleBinaryWasmBinaryStore) GetWasmBinary(ctx context.Context, workflowID string) ([]byte, error) {
return m.binary, nil
}
Loading

0 comments on commit d7ad453

Please sign in to comment.