From 8a0ab24213c279d1780740b3efb65c02d21ac636 Mon Sep 17 00:00:00 2001 From: Michael Wan Date: Sun, 5 Aug 2018 05:41:36 -0400 Subject: [PATCH] feature: support event service Signed-off-by: Michael Wan --- apis/filters/parse.go | 156 +++++++++++++++++++++++++++++++++++ apis/filters/parse_test.go | 151 +++++++++++++++++++++++++++++++++ daemon/events/events.go | 144 ++++++++++++++++++++++++++++++++ daemon/events/events_test.go | 117 ++++++++++++++++++++++++++ daemon/events/filter.go | 23 ++++++ daemon/events/filter_test.go | 67 +++++++++++++++ 6 files changed, 658 insertions(+) create mode 100644 apis/filters/parse.go create mode 100644 apis/filters/parse_test.go create mode 100644 daemon/events/events.go create mode 100644 daemon/events/events_test.go create mode 100644 daemon/events/filter.go create mode 100644 daemon/events/filter_test.go diff --git a/apis/filters/parse.go b/apis/filters/parse.go new file mode 100644 index 0000000000..2e2c6a70c5 --- /dev/null +++ b/apis/filters/parse.go @@ -0,0 +1,156 @@ +package filters + +import ( + "encoding/json" + "errors" + "strings" +) + +// Args stores filter arguments as map key:{map key: bool}. +// It contains an aggregation of the map of arguments (which are in the form +// of -f 'key=value') based on the key, and stores values for the same key +// in a map with string keys and boolean values. +// e.g given -f 'label=label1=1' -f 'label=label2=2' -f 'image.name=ubuntu' +// the args will be {"image.name":{"ubuntu":true},"label":{"label1=1":true,"label2=2":true}} +type Args struct { + fields map[string]map[string]bool +} + +// KeyValuePair is used to initialize a new Args +type KeyValuePair struct { + Key string + Value string +} + +// Arg creates a new KeyValuePair for initializing Args +func Arg(key, value string) KeyValuePair { + return KeyValuePair{Key: key, Value: value} +} + +// NewArgs returns a new Args populated with the initial args +func NewArgs(initialArgs ...KeyValuePair) Args { + args := Args{fields: map[string]map[string]bool{}} + for _, arg := range initialArgs { + args.Add(arg.Key, arg.Value) + } + return args +} + +// Get returns the list of values associated with the key +func (args Args) Get(key string) []string { + values := args.fields[key] + if values == nil { + return make([]string, 0) + } + slice := make([]string, 0, len(values)) + for key := range values { + slice = append(slice, key) + } + return slice +} + +// Add a new value to the set of values +func (args Args) Add(key, value string) { + if _, ok := args.fields[key]; ok { + args.fields[key][value] = true + } else { + args.fields[key] = map[string]bool{value: true} + } +} + +// Del removes a value from the set +func (args Args) Del(key, value string) { + if _, ok := args.fields[key]; ok { + delete(args.fields[key], value) + if len(args.fields[key]) == 0 { + delete(args.fields, key) + } + } +} + +// Len returns the number of fields in the arguments. +func (args Args) Len() int { + return len(args.fields) +} + +// ExactMatch returns true if the source matches exactly one of the filters. +func (args Args) ExactMatch(field, source string) bool { + fieldValues, ok := args.fields[field] + //do not filter if there is no filter set or cannot determine filter + if !ok || len(fieldValues) == 0 { + return true + } + + // try to match full name value to avoid O(N) regular expression matching + return fieldValues[source] +} + +// MarshalJSON returns a JSON byte representation of the Args +func (args Args) MarshalJSON() ([]byte, error) { + if len(args.fields) == 0 { + return []byte{}, nil + } + return json.Marshal(args.fields) +} + +// UnmarshalJSON populates the Args from JSON encode bytes +func (args Args) UnmarshalJSON(raw []byte) error { + if len(raw) == 0 { + return nil + } + return json.Unmarshal(raw, &args.fields) +} + +// ErrBadFormat is an error returned when a filter is not in the form key=value +// +// Deprecated: this error will be removed in a future version +var ErrBadFormat = errors.New("bad format of filter (expected name=value)") + +// ParseFlag parses a key=value string and adds it to an Args. +// +// Deprecated: Use Args.Add() +func ParseFlag(arg string, prev Args) (Args, error) { + filters := prev + if len(arg) == 0 { + return filters, nil + } + + if !strings.Contains(arg, "=") { + return filters, ErrBadFormat + } + + f := strings.SplitN(arg, "=", 2) + + name := strings.ToLower(strings.TrimSpace(f[0])) + value := strings.TrimSpace(f[1]) + + filters.Add(name, value) + + return filters, nil +} + +// ToParam packs the Args into a string for easy transport from client to server. +func ToParam(a Args) (string, error) { + if a.Len() == 0 { + return "", nil + } + + buf, err := json.Marshal(a) + return string(buf), err +} + +// FromParam decodes a JSON encoded string into Args +func FromParam(p string) (Args, error) { + args := NewArgs() + + if p == "" { + return args, nil + } + + raw := []byte(p) + err := json.Unmarshal(raw, &args) + if err != nil { + return args, err + } + return args, nil +} diff --git a/apis/filters/parse_test.go b/apis/filters/parse_test.go new file mode 100644 index 0000000000..6d1a5db8a5 --- /dev/null +++ b/apis/filters/parse_test.go @@ -0,0 +1,151 @@ +package filters + +import ( + "testing" +) + +func TestParseArgs(t *testing.T) { + // equivalent of `docker ps -f 'created=today' -f 'image.name=ubuntu*' -f 'image.name=*untu'` + flagArgs := []string{ + "created=today", + "image.name=ubuntu*", + "image.name=*untu", + } + var ( + args = NewArgs() + err error + ) + + for i := range flagArgs { + args, err = ParseFlag(flagArgs[i], args) + if err != nil { + t.Fatalf("ParseFlag got err: %v", err) + } + } + + if len(args.Get("created")) != 1 { + t.Fatalf("got unexpected created keys: %v", args.Get("created")) + } + if len(args.Get("image.name")) != 2 { + t.Fatalf("got unexpected image.name keys: %v", args.Get("image.name")) + } +} + +func TestAdd(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + v := f.fields["status"] + if len(v) != 1 || !v["running"] { + t.Fatalf("Expected to include a running status, got %v", v) + } + + f.Add("status", "paused") + if len(v) != 2 || !v["paused"] { + t.Fatalf("Expected to include a paused status, got %v", v) + } +} + +func TestDel(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + f.Del("status", "running") + v := f.fields["status"] + if v["running"] { + t.Fatal("Expected to not include a running status filter, got true") + } +} + +func TestLen(t *testing.T) { + f := NewArgs() + if f.Len() != 0 { + t.Fatal("Expected to not include any field") + } + f.Add("status", "running") + if f.Len() != 1 { + t.Fatal("Expected to include one field") + } +} + +func TestExactMatch(t *testing.T) { + f := NewArgs() + + if !f.ExactMatch("status", "running") { + t.Fatal("Expected to match `running` when there are no filters, got false") + } + + f.Add("status", "running") + f.Add("status", "pause*") + + if !f.ExactMatch("status", "running") { + t.Fatal("Expected to match `running` with one of the filters, got false") + } + + if f.ExactMatch("status", "paused") { + t.Fatal("Expected to not match `paused` with one of the filters, got true") + } +} + +func TestToParam(t *testing.T) { + fields := map[string]map[string]bool{ + "created": {"today": true}, + "image.name": {"ubuntu*": true, "*untu": true}, + } + a := Args{fields: fields} + + _, err := ToParam(a) + if err != nil { + t.Errorf("failed to marshal the filters: %s", err) + } +} + +func TestFromParam(t *testing.T) { + invalids := []string{ + "anything", + "['a','list']", + "{'key': 'value'}", + `{"key": "value"}`, + `{"key": ["value"]}`, + } + valid := map[*Args][]string{ + {fields: map[string]map[string]bool{"key": {"value": true}}}: { + `{"key": {"value": true}}`, + }, + {fields: map[string]map[string]bool{"key": {"value1": true, "value2": true}}}: { + `{"key": {"value1": true, "value2": true}}`, + }, + {fields: map[string]map[string]bool{"key1": {"value1": true}, "key2": {"value2": true}}}: { + `{"key1": {"value1": true}, "key2": {"value2": true}}`, + }, + } + + for _, invalid := range invalids { + if _, err := FromParam(invalid); err == nil { + t.Fatalf("Expected an error with %v, got nothing", invalid) + } + } + + for expectedArgs, matchers := range valid { + for _, json := range matchers { + args, err := FromParam(json) + if err != nil { + t.Fatal(err) + } + if args.Len() != expectedArgs.Len() { + t.Fatalf("Expected %v, go %v", expectedArgs, args) + } + for key, expectedValues := range expectedArgs.fields { + values := args.Get(key) + + if len(values) != len(expectedValues) { + t.Fatalf("Expected %v, go %v", expectedArgs, args) + } + + for _, v := range values { + if !expectedValues[v] { + t.Fatalf("Expected %v, go %v", expectedArgs, args) + } + } + } + } + } +} diff --git a/daemon/events/events.go b/daemon/events/events.go new file mode 100644 index 0000000000..b45a8e98c6 --- /dev/null +++ b/daemon/events/events.go @@ -0,0 +1,144 @@ +package events + +import ( + "context" + "time" + + "github.com/alibaba/pouch/apis/types" + + goevents "github.com/docker/go-events" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// Events is pubsub channel for events generated by the engine. +type Events struct { + broadcaster *goevents.Broadcaster +} + +// NewEvents return a new Events instance +func NewEvents() *Events { + return &Events{ + broadcaster: goevents.NewBroadcaster(), + } +} + +// Publish sends an event. The caller will be considered the initial +// publisher of the event. This means the timestamp will be calculated +// at this point and this method may read from the calling context. +// TODO: do we need lock when use broadcaster??? +func (e *Events) Publish(ctx context.Context, action string, eventType types.EventType, actor *types.EventsActor) error { + var err error + // ensure actor not nil + if actor == nil { + actor = &types.EventsActor{} + } + + now := time.Now().UTC() + msg := types.EventsMessage{ + Action: action, + Type: eventType, + Actor: actor, + Time: now.Unix(), + TimeNano: now.UnixNano(), + } + + // compatibility with moby + switch eventType { + case types.EventTypeContainer: + msg.ID = actor.ID + msg.Status = action + if actor.Attributes != nil { + if image, ok := actor.Attributes["image"]; ok { + msg.From = image + } + } + case types.EventTypeImage: + msg.ID = actor.ID + msg.Status = action + } + + defer func() { + if err != nil { + logrus.Errorf("failed to publishing event {action: %s, type: %s, id: %s}: %v", msg.Action, msg.Type, msg.ID, err) + } else { + logrus.Debugf("event published event {action: %s, type: %s, id: %s}", msg.Action, msg.Type, msg.ID) + } + }() + + err = e.broadcaster.Write(&msg) + return err +} + +// Subscribe to events on the Events. Events are sent through the returned +// channel ch. If an error is encountered, it will be sent on channel errs and +// errs will be closed. To end the subscription, cancel the provided context. +// +// Zero or more filters may be provided as Args. Only events that match +// *any* of the provided filters will be sent on the channel. The filters use +// the standard containerd filters package syntax. +func (e *Events) Subscribe(ctx context.Context, ef *Filter) (ch <-chan *types.EventsMessage, errs <-chan error) { + var ( + evch = make(chan *types.EventsMessage) + errq = make(chan error, 1) + channel = goevents.NewChannel(0) + queue = goevents.NewQueue(channel) + dst goevents.Sink = queue + ) + + closeAll := func() { + defer close(errq) + defer e.broadcaster.Remove(dst) + defer queue.Close() + defer channel.Close() + } + + ch = evch + errs = errq + + // add filters for event messages + if ef != nil && ef.filter.Len() > 0 { + dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool { + // TODO(ziren): maybe we need adaptor here + msg := gev.(*types.EventsMessage) + return ef.Match(*msg) + })) + } + + e.broadcaster.Add(dst) + + go func() { + defer closeAll() + + var err error + loop: + for { + select { + case ev := <-channel.C: + env, ok := ev.(*types.EventsMessage) + if !ok { + err = errors.Errorf("invalid message encountered %#v; please file a bug", ev) + break + } + + select { + case evch <- env: + case <-ctx.Done(): + break loop + } + case <-ctx.Done(): + break loop + } + } + + if err == nil { + if cerr := ctx.Err(); cerr != context.Canceled { + err = cerr + } + } + + errq <- err + }() + + return +} diff --git a/daemon/events/events_test.go b/daemon/events/events_test.go new file mode 100644 index 0000000000..1ff10eb59b --- /dev/null +++ b/daemon/events/events_test.go @@ -0,0 +1,117 @@ +package events + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/alibaba/pouch/apis/types" + "github.com/alibaba/pouch/pkg/utils" +) + +func TestBasicSubscribe(t *testing.T) { + type eventsOption struct { + action string + eventType types.EventType + actor *types.EventsActor + } + + ctx := context.Background() + testEvents := []eventsOption{ + { + action: "create", + eventType: types.EventTypeContainer, + actor: &types.EventsActor{ID: "asdf"}, + }, + { + action: "create", + eventType: types.EventTypeContainer, + actor: &types.EventsActor{ + ID: "qwer", + Attributes: map[string]string{ + "image": "busybox", + }, + }, + }, + } + + eventsService := NewEvents() + + t.Log("subscribe") + var cancel1, cancel2 func() + + // Create two subscribers for same set of events and make sure they + // traverse the event. + ctx1, cancel1 := context.WithCancel(ctx) + eventq1, errq1 := eventsService.Subscribe(ctx1, nil) + + ctx2, cancel2 := context.WithCancel(ctx) + eventq2, errq2 := eventsService.Subscribe(ctx2, nil) + + t.Log("publish") + var wg sync.WaitGroup + wg.Add(1) + errChan := make(chan error) + go func() { + defer wg.Done() + defer close(errChan) + for _, event := range testEvents { + if err := eventsService.Publish(ctx, event.action, event.eventType, event.actor); err != nil { + errChan <- err + return + } + } + + t.Log("finished publishing") + }() + + t.Log("waiting") + wg.Wait() + if err := <-errChan; err != nil { + t.Fatal(err) + } + + for _, subscriber := range []struct { + eventq <-chan *types.EventsMessage + errq <-chan error + cancel func() + }{ + { + eventq: eventq1, + errq: errq1, + cancel: cancel1, + }, + { + eventq: eventq2, + errq: errq2, + cancel: cancel2, + }, + } { + var received []types.EventsMessage + subscribercheck: + for { + select { + case ev := <-subscriber.eventq: + received = append(received, *ev) + case err := <-subscriber.errq: + if err != nil { + t.Fatal(err) + } + break subscribercheck + } + + if len(received) == 2 { + // when we do this, we expect the errs channel to be closed and + // this will return. + subscriber.cancel() + + for _, ev := range received { + if ev.Action != "create" || ev.Type != types.EventTypeContainer || !utils.StringInSlice([]string{"asdf", "qwer"}, ev.ID) { + t.Fatal(fmt.Errorf("got unexpected event message: %#v", ev)) + } + } + } + } + } +} diff --git a/daemon/events/filter.go b/daemon/events/filter.go new file mode 100644 index 0000000000..a5e5477f8e --- /dev/null +++ b/daemon/events/filter.go @@ -0,0 +1,23 @@ +package events + +import ( + "github.com/alibaba/pouch/apis/filters" + "github.com/alibaba/pouch/apis/types" +) + +// Filter uses to filter out pouch events from a stream +type Filter struct { + filter filters.Args +} + +// NewFilter initializes a new Filter. +func NewFilter(filter filters.Args) *Filter { + return &Filter{filter: filter} +} + +// Match returns true when the event ev is included by the filters +func (ef *Filter) Match(ev types.EventsMessage) bool { + // TODO(ziren): add more filters + return ef.filter.ExactMatch("event", ev.Action) && + ef.filter.ExactMatch("type", string(ev.Type)) +} diff --git a/daemon/events/filter_test.go b/daemon/events/filter_test.go new file mode 100644 index 0000000000..77f9869e6a --- /dev/null +++ b/daemon/events/filter_test.go @@ -0,0 +1,67 @@ +package events + +import ( + "testing" + + "github.com/alibaba/pouch/apis/filters" + "github.com/alibaba/pouch/apis/types" +) + +func TestFilter_Match(t *testing.T) { + type fields struct { + filter filters.Args + } + type args struct { + ev types.EventsMessage + } + tests := []struct { + name string + fields fields + args args + want bool + }{ + // TODO: Add test cases. + { + name: "test1", + fields: fields{ + filter: filter.NewArgs(filters.Arg("type", "container")), + }, + args: args{ + ev: types.EventsMessage{ + Action: "create", + Type: types.EventTypeContainer, + Time: now.Unix(), + TimeNano: now.UnixNano(), + ID: "asdf", + }, + }, + want: true, + }, + { + name: "test2", + fields: fields{ + filter: filter.NewArgs(filters.Arg("type", "image")), + }, + args: args{ + ev: types.EventsMessage{ + Action: "create", + Type: types.EventTypeContainer, + Time: now.Unix(), + TimeNano: now.UnixNano(), + ID: "asdf", + }, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ef := &Filter{ + filter: tt.fields.filter, + } + if got := ef.Match(tt.args.ev); got != tt.want { + t.Errorf("Filter.Match() = %v, want %v", got, tt.want) + } + }) + } +}