From ee3c4eebc0d02aeef48e62920eb8044441491acf Mon Sep 17 00:00:00 2001 From: Joseph Paul Date: Fri, 1 Dec 2023 09:42:36 +0100 Subject: [PATCH] Initial commit --- .github/workflows/test.yml | 19 +++++ README.md | 56 +++++++++++++++ go.mod | 18 +++++ go.sum | 54 ++++++++++++++ redis/channel.go | 86 ++++++++++++++++++++++ redis/channel_test.go | 143 +++++++++++++++++++++++++++++++++++++ redis/codec.go | 48 +++++++++++++ redis/config.go | 51 +++++++++++++ redis/example_test.go | 42 +++++++++++ typevent.go | 28 ++++++++ 10 files changed, 545 insertions(+) create mode 100644 .github/workflows/test.yml create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 redis/channel.go create mode 100644 redis/channel_test.go create mode 100644 redis/codec.go create mode 100644 redis/config.go create mode 100644 redis/example_test.go create mode 100644 typevent.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..b122d28 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,19 @@ +name: Unit Test + +on: [push] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + matrix: + go-version: [ '1.19', '1.20', '1.21.x' ] + + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go-version }} + cache-dependency-path: subdir/go.sum + - run: go test ./... diff --git a/README.md b/README.md new file mode 100644 index 0000000..fec8414 --- /dev/null +++ b/README.md @@ -0,0 +1,56 @@ +# typevent + +typevent provides type-safe event messaging channels for Go. They can be used to implement Pub/Sub schemes without the need for type assertions, streamlining the application code that uses the event channels. + +At the core it consists of a generic `Channel` interface with the following methods: + +```go +type Channel[E Event] interface { + // Emit emits an event of type E on the channel. + Emit(E) error + // Subscribe registers a handler for events of type E on the channel. + Subscribe(ctx context.Context, handler Handler[E]) (Subscription, error) +} +``` + +The package currently provides one implementation of the interface, using [Redis Pub/Sub](https://redis.io/docs/interact/pubsub/) as the backing distribution system. Usage can look as follows: + +```go +import ( + "context" + "fmt" + + redisclient "github.com/redis/go-redis/v9" + "github.com/sehrgutesoftware/typevent/redis" +) + +func ExampleNewChannel() { + type event string + + // Create a new channel using redis Pub/Sub as the underlying event bus. + client := redisclient.NewClient(&redisclient.Options{Addr: "localhost:6379"}) + + // conf holds the redis client used by the channel + conf := redis.NewConfig(client) + + // This is where we create the channel that can be used to emit and subscribe to events + channel := redis.NewChannel[event](conf, "CHANNEL_NAME") + + // Register a subscriber for the channel. + sub, _ := channel.Subscribe(context.Background(), func(ctx context.Context, ev event) error { + fmt.Printf("subscriber says: %s\n", ev) + return nil + }) + defer sub.Close() + + // Emit an event on the channel. + channel.Emit("Hello World!") +} + +``` + +## Development +### Run Tests +```sh +go test ./... +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4f21fdb --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module github.com/sehrgutesoftware/typevent + +go 1.21.3 + +require ( + github.com/alicebob/miniredis/v2 v2.31.0 + github.com/go-redis/redismock/v9 v9.2.0 + github.com/redis/go-redis/v9 v9.3.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 +) + +require ( + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + github.com/yuin/gopher-lua v1.1.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..84f635e --- /dev/null +++ b/go.sum @@ -0,0 +1,54 @@ +github.com/DmitriyVTitov/size v1.5.0/go.mod h1:le6rNI4CoLQV1b9gzp1+3d7hMAD/uu2QcJ+aYbNgiU0= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.31.0 h1:ObEFUNlJwoIiyjxdrYF0QIDE7qXcLc7D3WpSH4c22PU= +github.com/alicebob/miniredis/v2 v2.31.0/go.mod h1:UB/T2Uztp7MlFSDakaX1sTXUv5CASoprx0wulRT6HBg= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw= +github.com/go-redis/redismock/v9 v9.2.0/go.mod h1:18KHfGDK4Y6c2R0H38EUGWAdc7ZQS9gfYxc94k7rWT0= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= +github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0= +github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= +github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/redis/channel.go b/redis/channel.go new file mode 100644 index 0000000..bd8b9c4 --- /dev/null +++ b/redis/channel.go @@ -0,0 +1,86 @@ +package redis + +import ( + "context" + + "github.com/redis/go-redis/v9" + "github.com/sehrgutesoftware/typevent" +) + +// channel is a redis backed implementation of [typevent.Channel]. +type channel[E typevent.Event] struct { + *config + event string +} + +// NewChannel returns a new [typevent.Channel] backed by redis Pub/Sub as the underlying event bus. +// +// The channel will emit and subscribe to events of type E. The `name` param is a unique identifier +// for the event. All channels created with the same `name` must use the same type `E`. The package +// does intentionally not enforce this constraint, as it would require the use of reflection. +// +// Redis Pub/Sub is used as the underlying event bus. The events emitted on the channel are passed +// to all channels subscribed on the same `name` on the same redis server, regardless of the DB +// they're connected to – see [https://redis.io/docs/interact/pubsub/#database--scoping]. +func NewChannel[E typevent.Event](conf *config, event string) typevent.Channel[E] { + return &channel[E]{ + config: conf, + event: event, + } +} + +// Emit emits an event of type E on the channel. +func (c *channel[E]) Emit(event E) error { + encoded, err := c.codec.Marshal(event) + if err != nil { + return err + } + + return c.client.Publish(context.Background(), c.prefix(c.event), encoded).Err() +} + +// Subscribe registers a handler for events of type E on the channel. +func (c *channel[E]) Subscribe(ctx context.Context, handler typevent.Handler[E]) (typevent.Subscription, error) { + sub := c.client.Subscribe(ctx, c.prefix(c.event)) + + // The following call is necessary to make sure the subscription is established. + _, err := sub.Receive(ctx) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + go c.listen(ctx, sub, handler) + + return &subscription{sub: sub, cancel: cancel}, nil +} + +// listen is the goroutine that listens for events on the redis channel. +func (c *channel[E]) listen(ctx context.Context, sub *redis.PubSub, handler typevent.Handler[E]) { + defer sub.Close() + + for { + select { + case msg := <-sub.Channel(): + var event E + err := c.codec.Unmarshal([]byte(msg.Payload), &event) + if err != nil { + continue + } + go handler(ctx, event) + case <-ctx.Done(): + return + } + } +} + +type subscription struct { + sub *redis.PubSub + cancel context.CancelFunc +} + +// Close unsubscribes from the channel. +func (s *subscription) Close() error { + s.cancel() + return s.sub.Close() +} diff --git a/redis/channel_test.go b/redis/channel_test.go new file mode 100644 index 0000000..5a5a140 --- /dev/null +++ b/redis/channel_test.go @@ -0,0 +1,143 @@ +package redis_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/go-redis/redismock/v9" + redisclient "github.com/redis/go-redis/v9" + "github.com/sehrgutesoftware/typevent/redis" +) + +type evType struct { + ExportedKey string + hiddenKey string +} + +func TestItDistributesEventsFromRedisToAllSubscribers(t *testing.T) { + // Setup a redis server. + s := miniredis.RunT(t) + + // Create a new channel. + codec := &redis.JSONCodec{} + db := redisclient.NewClient(&redisclient.Options{Addr: s.Addr()}) + config := redis.NewConfig(db, redis.WithCodec(codec), redis.WithKeyPrefix("test:")) + channel := redis.NewChannel[evType](config, "CHANNEL_NAME") + + // Set up some syncing for the event handlers to know when they're done. + // This is necessary because the event handlers are called in a goroutine. + wg := sync.WaitGroup{} + wg.Add(2) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + // Add a subscriber to the channel. + timesReceivedA := 0 + receivedByA := evType{} + sub, err := channel.Subscribe(ctx, func(ctx context.Context, e evType) error { + receivedByA = e + timesReceivedA++ + wg.Done() + return nil + }) + if err != nil { + t.Fatalf("unexpected error returned from Subscribe(): %s", err) + } + defer sub.Close() + + // Add another subscriber to the channel. + timesReceivedB := 0 + receivedByB := evType{} + sub, err = channel.Subscribe(ctx, func(ctx context.Context, e evType) error { + receivedByB = e + timesReceivedB++ + wg.Done() + return nil + }) + if err != nil { + t.Fatalf("unexpected error returned from Subscribe(): %s", err) + } + defer sub.Close() + + // Emit an event on the redis Pub/Sub channel. + emitted := evType{ + ExportedKey: "exported", + hiddenKey: "hidden", + } + + encoded, _ := codec.Marshal(emitted) + err = db.Publish(context.Background(), "test:CHANNEL_NAME", encoded).Err() + if err != nil { + t.Fatalf("unexpected error returned from Publish(): %s", err) + } + + // Wait for the waitgroup or the context timeout. + wgDone := make(chan bool) + go func() { + wg.Wait() + wgDone <- true + }() + select { + case <-wgDone: + break + case <-ctx.Done(): + break + } + + // Subscriber A was called correctly? + if timesReceivedA != 1 { + t.Errorf("expected handler A to be called once, got called %d times", timesReceivedA) + } + if receivedByA.ExportedKey != emitted.ExportedKey { + t.Errorf("expected handler A to receive the submitted event, got %v", receivedByA) + } + + // Subscriber B was called correctly? + if timesReceivedB != 1 { + t.Errorf("expected handler B to be called once, got called %d times", timesReceivedB) + } + if receivedByB.ExportedKey != emitted.ExportedKey { + t.Errorf("expected handler B to receive the submitted event, got %v", receivedByB) + } + + // Unexported fields are not serialized. + if receivedByA.hiddenKey != "" { + t.Errorf("expected handler A to receive the submitted event with unexported fields removed, got %v", receivedByB) + } + if receivedByB.hiddenKey != "" { + t.Errorf("expected handler B to receive the submitted event with unexported fields removed, got %v", receivedByB) + } +} + +func TestItPublishesEventsToRedis(t *testing.T) { + client, mock := redismock.NewClientMock() + + // Create a new channel. + codec := &redis.JSONCodec{} + config := redis.NewConfig(client, redis.WithCodec(codec), redis.WithKeyPrefix("test:")) + channel := redis.NewChannel[evType](config, "CHANNEL_NAME") + + // Event to be emitted. + emitted := evType{ + ExportedKey: "exported", + hiddenKey: "hidden", + } + encoded, _ := codec.Marshal(emitted) + + // Set up the mock expectations. + mock.ExpectPublish("test:CHANNEL_NAME", encoded).SetVal(1) + + // Emit the event. + err := channel.Emit(emitted) + if err != nil { + t.Fatalf("unexpected error returned from Emit(): %s", err) + } + + // Check the expectations. + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} diff --git a/redis/codec.go b/redis/codec.go new file mode 100644 index 0000000..d887749 --- /dev/null +++ b/redis/codec.go @@ -0,0 +1,48 @@ +package redis + +import ( + "encoding/json" + + "github.com/vmihailenco/msgpack/v5" +) + +// defaultCodec is the default codec used to marshal and unmarshal events. +var defaultCodec = &MsgpackCodec{} + +// the following are compile time assertions to ensure the codecs implement the interface. +var ( + _ Codec = &MsgpackCodec{} + _ Codec = &JSONCodec{} +) + +// Codec is the Codec used to marshal and unmarshal events. +type Codec interface { + Marshal(v any) ([]byte, error) + Unmarshal(data []byte, v any) error +} + +// MsgpackCodec is a codec that uses msgpack as the encoding. +type MsgpackCodec struct{} + +// MarshalBinary turns the Event into a binary representation using msgpack. +func (c *MsgpackCodec) Marshal(v any) ([]byte, error) { + return msgpack.Marshal(v) +} + +// UnmarshalBinary unmarshals a binary representation of the Event using msgpack. +func (c *MsgpackCodec) Unmarshal(data []byte, v any) error { + return msgpack.Unmarshal(data, v) +} + +// JSONCodec is a codec that uses JSON as the encoding. +type JSONCodec struct{} + +// Marshal turns the Event into a binary representation using JSON. +func (c *JSONCodec) Marshal(v any) ([]byte, error) { + return json.Marshal(v) +} + +// Unmarshal unmarshals a binary representation of the Event using JSON. +func (c *JSONCodec) Unmarshal(data []byte, v any) error { + return json.Unmarshal(data, v) +} diff --git a/redis/config.go b/redis/config.go new file mode 100644 index 0000000..617416e --- /dev/null +++ b/redis/config.go @@ -0,0 +1,51 @@ +package redis + +import ( + redis "github.com/redis/go-redis/v9" +) + +// config contains the configuration for a redis channel. +type config struct { + // client is the redis client used by the channel. + client *redis.Client + // codec is the encoding used to serialize and deserialize events. + codec Codec + // keyPrefix is the prefix for all redis keys used by the channel. + keyPrefix string +} + +// NewConfig returns a new [config] for a redis channel. +func NewConfig(client *redis.Client, opts ...ConfigOption) *config { + c := &config{ + client: client, + codec: defaultCodec, + } + + for _, opt := range opts { + opt(c) + } + + return c +} + +// prefix prefixes the given prefix with the channel's prefix prefix. +func (c *config) prefix(key string) string { + return c.keyPrefix + key +} + +// ConfigOption is a configuration option for the redis channel. +type ConfigOption func(*config) + +// WithCodec sets the codec used to serialize and deserialize events. +func WithCodec(codec Codec) ConfigOption { + return func(c *config) { + c.codec = codec + } +} + +// WithKeyPrefix sets the prefix for all redis keys used by the channel. +func WithKeyPrefix(prefix string) ConfigOption { + return func(c *config) { + c.keyPrefix = prefix + } +} diff --git a/redis/example_test.go b/redis/example_test.go new file mode 100644 index 0000000..bc5173e --- /dev/null +++ b/redis/example_test.go @@ -0,0 +1,42 @@ +package redis_test + +import ( + "context" + "fmt" + "sync" + + "github.com/alicebob/miniredis/v2" + redisclient "github.com/redis/go-redis/v9" + "github.com/sehrgutesoftware/typevent/redis" +) + +func ExampleNewChannel() { + server, err := miniredis.Run() + if err != nil { + panic(err) + } + defer server.Close() + + type event string + + // Create a new channel using redis Pub/Sub as the underlying event bus. + client := redisclient.NewClient(&redisclient.Options{Addr: server.Addr()}) + conf := redis.NewConfig(client) + channel := redis.NewChannel[event](conf, "CHANNEL_NAME") + + // Register a subscriber for the channel. + wg := sync.WaitGroup{} + wg.Add(1) + sub, _ := channel.Subscribe(context.Background(), func(ctx context.Context, ev event) error { + defer wg.Done() + fmt.Printf("subscriber says: %s\n", ev) + return nil + }) + defer sub.Close() + + // Emit an event on the channel. + channel.Emit("Hello World!") + + wg.Wait() + // Output: subscriber says: Hello World! +} diff --git a/typevent.go b/typevent.go new file mode 100644 index 0000000..240d0c3 --- /dev/null +++ b/typevent.go @@ -0,0 +1,28 @@ +// Package typevent provides type safe event channels. +package typevent + +import ( + "context" +) + +// Channel is an event channel that can emit and subscribe to events of a specific type. +// +// See [typevent.redis.NewChannel] for an example implementation. +type Channel[E Event] interface { + // Emit emits an event of type E on the channel. + Emit(E) error + // Subscribe registers a handler for events of type E on the channel. + Subscribe(ctx context.Context, handler Handler[E]) (Subscription, error) +} + +// Event is an event that can be emitted on an event channel. +type Event any + +// Handler is a function that handles an event of a specific type. +type Handler[E Event] func(context.Context, E) error + +// Subscription is a subscription to an event channel. +type Subscription interface { + // Close unsubscribes from the channel. + Close() error +}