Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jsphpl committed Dec 1, 2023
0 parents commit ee3c4ee
Show file tree
Hide file tree
Showing 10 changed files with 545 additions and 0 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: Unit Test

on: [push]

jobs:
build:

runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ '1.19', '1.20', '1.21.x' ]

steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
cache-dependency-path: subdir/go.sum
- run: go test ./...
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# typevent

typevent provides type-safe event messaging channels for Go. They can be used to implement Pub/Sub schemes without the need for type assertions, streamlining the application code that uses the event channels.

At the core it consists of a generic `Channel` interface with the following methods:

```go
type Channel[E Event] interface {
// Emit emits an event of type E on the channel.
Emit(E) error
// Subscribe registers a handler for events of type E on the channel.
Subscribe(ctx context.Context, handler Handler[E]) (Subscription, error)
}
```

The package currently provides one implementation of the interface, using [Redis Pub/Sub](https://redis.io/docs/interact/pubsub/) as the backing distribution system. Usage can look as follows:

```go
import (
"context"
"fmt"

redisclient "github.com/redis/go-redis/v9"
"github.com/sehrgutesoftware/typevent/redis"
)

func ExampleNewChannel() {
type event string

// Create a new channel using redis Pub/Sub as the underlying event bus.
client := redisclient.NewClient(&redisclient.Options{Addr: "localhost:6379"})

// conf holds the redis client used by the channel
conf := redis.NewConfig(client)

// This is where we create the channel that can be used to emit and subscribe to events
channel := redis.NewChannel[event](conf, "CHANNEL_NAME")

// Register a subscriber for the channel.
sub, _ := channel.Subscribe(context.Background(), func(ctx context.Context, ev event) error {
fmt.Printf("subscriber says: %s\n", ev)
return nil
})
defer sub.Close()

// Emit an event on the channel.
channel.Emit("Hello World!")
}

```

## Development
### Run Tests
```sh
go test ./...
```
18 changes: 18 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module github.com/sehrgutesoftware/typevent

go 1.21.3

require (
github.com/alicebob/miniredis/v2 v2.31.0
github.com/go-redis/redismock/v9 v9.2.0
github.com/redis/go-redis/v9 v9.3.0
github.com/vmihailenco/msgpack/v5 v5.4.1
)

require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
)
54 changes: 54 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
github.com/DmitriyVTitov/size v1.5.0/go.mod h1:le6rNI4CoLQV1b9gzp1+3d7hMAD/uu2QcJ+aYbNgiU0=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.31.0 h1:ObEFUNlJwoIiyjxdrYF0QIDE7qXcLc7D3WpSH4c22PU=
github.com/alicebob/miniredis/v2 v2.31.0/go.mod h1:UB/T2Uztp7MlFSDakaX1sTXUv5CASoprx0wulRT6HBg=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw=
github.com/go-redis/redismock/v9 v9.2.0/go.mod h1:18KHfGDK4Y6c2R0H38EUGWAdc7ZQS9gfYxc94k7rWT0=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
86 changes: 86 additions & 0 deletions redis/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package redis

import (
"context"

"github.com/redis/go-redis/v9"
"github.com/sehrgutesoftware/typevent"
)

// channel is a redis backed implementation of [typevent.Channel].
type channel[E typevent.Event] struct {
*config
event string
}

// NewChannel returns a new [typevent.Channel] backed by redis Pub/Sub as the underlying event bus.
//
// The channel will emit and subscribe to events of type E. The `name` param is a unique identifier
// for the event. All channels created with the same `name` must use the same type `E`. The package
// does intentionally not enforce this constraint, as it would require the use of reflection.
//
// Redis Pub/Sub is used as the underlying event bus. The events emitted on the channel are passed
// to all channels subscribed on the same `name` on the same redis server, regardless of the DB
// they're connected to – see [https://redis.io/docs/interact/pubsub/#database--scoping].
func NewChannel[E typevent.Event](conf *config, event string) typevent.Channel[E] {
return &channel[E]{
config: conf,
event: event,
}
}

// Emit emits an event of type E on the channel.
func (c *channel[E]) Emit(event E) error {
encoded, err := c.codec.Marshal(event)
if err != nil {
return err
}

return c.client.Publish(context.Background(), c.prefix(c.event), encoded).Err()
}

// Subscribe registers a handler for events of type E on the channel.
func (c *channel[E]) Subscribe(ctx context.Context, handler typevent.Handler[E]) (typevent.Subscription, error) {
sub := c.client.Subscribe(ctx, c.prefix(c.event))

// The following call is necessary to make sure the subscription is established.
_, err := sub.Receive(ctx)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(ctx)
go c.listen(ctx, sub, handler)

return &subscription{sub: sub, cancel: cancel}, nil
}

// listen is the goroutine that listens for events on the redis channel.
func (c *channel[E]) listen(ctx context.Context, sub *redis.PubSub, handler typevent.Handler[E]) {
defer sub.Close()

for {
select {
case msg := <-sub.Channel():
var event E
err := c.codec.Unmarshal([]byte(msg.Payload), &event)
if err != nil {
continue
}
go handler(ctx, event)
case <-ctx.Done():
return
}
}
}

type subscription struct {
sub *redis.PubSub
cancel context.CancelFunc
}

// Close unsubscribes from the channel.
func (s *subscription) Close() error {
s.cancel()
return s.sub.Close()
}
143 changes: 143 additions & 0 deletions redis/channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package redis_test

import (
"context"
"sync"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/go-redis/redismock/v9"
redisclient "github.com/redis/go-redis/v9"
"github.com/sehrgutesoftware/typevent/redis"
)

type evType struct {
ExportedKey string
hiddenKey string
}

func TestItDistributesEventsFromRedisToAllSubscribers(t *testing.T) {
// Setup a redis server.
s := miniredis.RunT(t)

// Create a new channel.
codec := &redis.JSONCodec{}
db := redisclient.NewClient(&redisclient.Options{Addr: s.Addr()})
config := redis.NewConfig(db, redis.WithCodec(codec), redis.WithKeyPrefix("test:"))
channel := redis.NewChannel[evType](config, "CHANNEL_NAME")

// Set up some syncing for the event handlers to know when they're done.
// This is necessary because the event handlers are called in a goroutine.
wg := sync.WaitGroup{}
wg.Add(2)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

// Add a subscriber to the channel.
timesReceivedA := 0
receivedByA := evType{}
sub, err := channel.Subscribe(ctx, func(ctx context.Context, e evType) error {
receivedByA = e
timesReceivedA++
wg.Done()
return nil
})
if err != nil {
t.Fatalf("unexpected error returned from Subscribe(): %s", err)
}
defer sub.Close()

// Add another subscriber to the channel.
timesReceivedB := 0
receivedByB := evType{}
sub, err = channel.Subscribe(ctx, func(ctx context.Context, e evType) error {
receivedByB = e
timesReceivedB++
wg.Done()
return nil
})
if err != nil {
t.Fatalf("unexpected error returned from Subscribe(): %s", err)
}
defer sub.Close()

// Emit an event on the redis Pub/Sub channel.
emitted := evType{
ExportedKey: "exported",
hiddenKey: "hidden",
}

encoded, _ := codec.Marshal(emitted)
err = db.Publish(context.Background(), "test:CHANNEL_NAME", encoded).Err()
if err != nil {
t.Fatalf("unexpected error returned from Publish(): %s", err)
}

// Wait for the waitgroup or the context timeout.
wgDone := make(chan bool)
go func() {
wg.Wait()
wgDone <- true
}()
select {
case <-wgDone:
break
case <-ctx.Done():
break
}

// Subscriber A was called correctly?
if timesReceivedA != 1 {
t.Errorf("expected handler A to be called once, got called %d times", timesReceivedA)
}
if receivedByA.ExportedKey != emitted.ExportedKey {
t.Errorf("expected handler A to receive the submitted event, got %v", receivedByA)
}

// Subscriber B was called correctly?
if timesReceivedB != 1 {
t.Errorf("expected handler B to be called once, got called %d times", timesReceivedB)
}
if receivedByB.ExportedKey != emitted.ExportedKey {
t.Errorf("expected handler B to receive the submitted event, got %v", receivedByB)
}

// Unexported fields are not serialized.
if receivedByA.hiddenKey != "" {
t.Errorf("expected handler A to receive the submitted event with unexported fields removed, got %v", receivedByB)
}
if receivedByB.hiddenKey != "" {
t.Errorf("expected handler B to receive the submitted event with unexported fields removed, got %v", receivedByB)
}
}

func TestItPublishesEventsToRedis(t *testing.T) {
client, mock := redismock.NewClientMock()

// Create a new channel.
codec := &redis.JSONCodec{}
config := redis.NewConfig(client, redis.WithCodec(codec), redis.WithKeyPrefix("test:"))
channel := redis.NewChannel[evType](config, "CHANNEL_NAME")

// Event to be emitted.
emitted := evType{
ExportedKey: "exported",
hiddenKey: "hidden",
}
encoded, _ := codec.Marshal(emitted)

// Set up the mock expectations.
mock.ExpectPublish("test:CHANNEL_NAME", encoded).SetVal(1)

// Emit the event.
err := channel.Emit(emitted)
if err != nil {
t.Fatalf("unexpected error returned from Emit(): %s", err)
}

// Check the expectations.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
}
Loading

0 comments on commit ee3c4ee

Please sign in to comment.