Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transform-sdk: Introduce sr package #13049

Merged
merged 14 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/go/transform-sdk/internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package cache

import "container/list"

type (
entry[K comparable, V any] struct {
key K
value V
}
// A cache that evicts in LRU order based on number of entries
Cache[K comparable, V any] struct {
// list contains the entries themselves
// in LRU order
list *list.List
underlying map[K]*list.Element
maxEntries int
}
)

// New returns a new cache with a limited set of entries.
func New[K comparable, V any](maxSize int) Cache[K, V] {
return Cache[K, V]{
underlying: make(map[K]*list.Element),
list: list.New(),
maxEntries: maxSize,
}
}

// Put adds an entry into the cache
func (c *Cache[K, V]) Put(k K, v V) {
if n, ok := c.underlying[k]; ok {
// this key is already in the cache, update the value
// then move it to the end of our list
n.Value.(*entry[K, V]).value = v
c.list.MoveToBack(n)
} else {
// otherwise prune the least recently used entry and
// insert the new value
c.prune()
c.underlying[k] = c.list.PushBack(&entry[K, V]{k, v})
}
}

// Get extracts a value from the cache
func (c *Cache[K, V]) Get(k K) (v V, ok bool) {
n, ok := c.underlying[k]
if !ok {
return v, false
}
v = n.Value.(*entry[K, V]).value
// move this node to the end of our list
// to mark it as most recently used
c.list.MoveToBack(n)
return v, true
}

// Size returns the size of the cache
func (c *Cache[K, V]) Size() int {
return len(c.underlying)
}

func (c *Cache[K, V]) prune() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The values (overall) are quite large, so I wonder if it's worth choosing a different data structure that would trade off some memory for bookkeeping, but reduce insertion time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for len(c.underlying) >= c.maxEntries {
toRemove := c.list.Front()
entry := toRemove.Value.(*entry[K, V])
delete(c.underlying, entry.key)
c.list.Remove(toRemove)
}
}
88 changes: 88 additions & 0 deletions src/go/transform-sdk/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cache_test

import (
"testing"

"github.com/redpanda-data/redpanda/src/go/transform-sdk/internal/cache"
)

func expectValue(t *testing.T, c cache.Cache[int, string], key int) string {
v, ok := c.Get(key)
if !ok {
t.Fatalf("missing value for key %v", key)
}
return v
}

func TestCacheMaxSize(t *testing.T) {
c := cache.New[int, string](3)
c.Put(1, "foo")
c.Put(2, "bar")
c.Put(3, "qux")
c.Put(2, "baz")
if c.Size() != 3 {
t.Fatalf("got: %v want: 3", c.Size())
}
v := expectValue(t, c, 1)
if v != "foo" {
t.Fatalf("got: %v want: %v", v, "foo")
}
v = expectValue(t, c, 2)
if v != "baz" {
t.Fatalf("got: %v want: %v", v, "baz")
}
v = expectValue(t, c, 3)
if v != "qux" {
t.Fatalf("got: %v want: %v", v, "qux")
}
c.Put(4, "thud")
if c.Size() != 3 {
t.Fatalf("got: %v want: 3", c.Size())
}
if _, ok := c.Get(1); ok {
t.Fatalf("expected evicted entry for %v", 1)
}
v = expectValue(t, c, 2)
if v != "baz" {
t.Fatalf("got: %v want: %v", v, "baz")
}
v = expectValue(t, c, 3)
if v != "qux" {
t.Fatalf("got: %v want: %v", v, "qux")
}
v = expectValue(t, c, 4)
if v != "thud" {
t.Fatalf("got: %v want: %v", v, "thud")
}
}

func TestCacheLRU(t *testing.T) {
c := cache.New[int, string](3)
c.Put(1, "foo")
c.Put(2, "bar")
c.Put(3, "qux")
v := expectValue(t, c, 1)
if v != "foo" {
t.Fatalf("got: %v want: %v", v, "baz")
}
// this should evict key 2 because it's LRU
c.Put(4, "baz")
if c.Size() != 3 {
t.Fatalf("got: %v want: 3", c.Size())
}
if _, ok := c.Get(2); ok {
t.Fatalf("got: %v want: false", ok)
}
v = expectValue(t, c, 1)
if v != "foo" {
t.Fatalf("got: %v want: %v", v, "baz")
}
v = expectValue(t, c, 3)
if v != "qux" {
t.Fatalf("got: %v want: %v", v, "qux")
}
v = expectValue(t, c, 4)
if v != "baz" {
t.Fatalf("got: %v want: %v", v, "thud")
}
}
5 changes: 3 additions & 2 deletions src/go/transform-sdk/internal/testdata/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
set(GOPATH ${CMAKE_CURRENT_BINARY_DIR})
execute_process(COMMAND ${CMAKE_COMMAND} -E env
${GO_PROGRAM} env GOROOT
OUTPUT_VARIABLE GOROOT
Expand All @@ -9,9 +8,10 @@ function(add_wasm_transform NAME)
find_program(TINYGO_BIN "tinygo")
set(wasm_output "${CMAKE_CURRENT_BINARY_DIR}/${NAME}.wasm")
set(tinygo_cmd ${TINYGO_BIN} build -o ${wasm_output} -quiet -target wasi "${NAME}/transform.go")
set(gopath ${CMAKE_CURRENT_BINARY_DIR}/${NAME})
add_custom_command(OUTPUT ${wasm_output}
COMMAND Python3::Interpreter ${CMAKE_CURRENT_SOURCE_DIR}/retry.py
ARGS -- ${CMAKE_COMMAND} -E env PATH="${GOROOT}/bin:$ENV{PATH}" GOPATH="${GOPATH}" GOROOT="${GOROOT}" ${tinygo_cmd}
ARGS -- ${CMAKE_COMMAND} -E env PATH="${GOROOT}/bin:$ENV{PATH}" GOPATH="${gopath}" GOFLAGS="-modcacherw" GOROOT="${GOROOT}" ${tinygo_cmd}
WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}
DEPENDS
"${CMAKE_CURRENT_LIST_DIR}/${NAME}/transform.go"
Expand All @@ -27,4 +27,5 @@ add_wasm_transform(transform-error)
add_wasm_transform(transform-panic)
add_wasm_transform(setup-panic)
add_wasm_transform(wasi)
add_wasm_transform(schema-registry)

5 changes: 4 additions & 1 deletion src/go/transform-sdk/internal/testdata/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ module github.com/redpanda-data/wasm-transform-testdata

go 1.20

require github.com/redpanda-data/redpanda/src/go/transform-sdk v0.0.0
require (
github.com/actgardner/gogen-avro/v10 v10.2.1
github.com/redpanda-data/redpanda/src/go/transform-sdk v0.0.0
)

replace github.com/redpanda-data/redpanda/src/go/transform-sdk => ../../
6 changes: 6 additions & 0 deletions src/go/transform-sdk/internal/testdata/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
github.com/actgardner/gogen-avro/v10 v10.2.1 h1:z3pOGblRjAJCYpkIJ8CmbMJdksi4rAhaygw0dyXZ930=
github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
185 changes: 185 additions & 0 deletions src/go/transform-sdk/internal/testdata/schema-registry/avro/example.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package avro

//go:generate $GOPATH/bin/gogen-avro --package=avro . schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "record",
"name": "Example",
"namespace": "com.example",
"fields": [
{"name": "a", "type": "long", "default": 0},
{"name": "b", "type": "string", "default": ""}
]
}
Loading