From 78a9cedfe20d92591de26d0b7f21be1163477983 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Sun, 18 Aug 2019 10:22:57 -0700 Subject: [PATCH] starting to add context for 0.4 --- alias.go | 1 + go.mod | 1 + pkg/cloudevents/event_interface.go | 10 +- pkg/cloudevents/event_reader.go | 4 +- pkg/cloudevents/event_reader_writer_test.go | 10 +- pkg/cloudevents/event_test.go | 2 +- pkg/cloudevents/event_writer.go | 4 +- pkg/cloudevents/eventcontext.go | 9 +- pkg/cloudevents/eventcontext_v01.go | 8 +- pkg/cloudevents/eventcontext_v01_writer.go | 2 +- pkg/cloudevents/eventcontext_v02_writer.go | 2 +- pkg/cloudevents/eventcontext_v03.go | 29 +- pkg/cloudevents/eventcontext_v03_writer.go | 2 +- pkg/cloudevents/eventcontext_v04.go | 298 ++++++++++++++++++ pkg/cloudevents/transport/amqp/codec.go | 4 +- pkg/cloudevents/transport/http/codec_v01.go | 6 +- .../transport/http/codec_v01_test.go | 6 +- pkg/cloudevents/transport/pubsub/codec.go | 4 +- 18 files changed, 368 insertions(+), 34 deletions(-) create mode 100644 pkg/cloudevents/eventcontext_v04.go diff --git a/alias.go b/alias.go index f97b6473a..118e461a0 100644 --- a/alias.go +++ b/alias.go @@ -57,6 +57,7 @@ const ( VersionV01 = cloudevents.CloudEventsVersionV01 VersionV02 = cloudevents.CloudEventsVersionV02 VersionV03 = cloudevents.CloudEventsVersionV03 + VersionV04 = cloudevents.CloudEventsVersionV04 // HTTP Transport Encodings diff --git a/go.mod b/go.mod index 0824342d2..26ba560e3 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Azure/go-autorest/autorest/to v0.2.0 // indirect github.com/Azure/go-autorest/autorest/validation v0.1.0 // indirect github.com/fortytw2/leaktest v1.3.0 // indirect + github.com/gogo/protobuf v1.2.0 // indirect github.com/google/go-cmp v0.3.0 github.com/google/uuid v1.1.1 github.com/kelseyhightower/envconfig v1.4.0 diff --git a/pkg/cloudevents/event_interface.go b/pkg/cloudevents/event_interface.go index 8ca52b04d..d25b60ea2 100644 --- a/pkg/cloudevents/event_interface.go +++ b/pkg/cloudevents/event_interface.go @@ -18,8 +18,8 @@ type EventReader interface { ID() string // Time returns event.Context.GetTime(). Time() time.Time - // SchemaURL returns event.Context.GetSchemaURL(). - SchemaURL() string + // DataSchema returns event.Context.GetDataSchema(). + DataSchema() string // DataContentType returns event.Context.GetDataContentType(). DataContentType() string // DataMediaType returns event.Context.GetDataMediaType(). @@ -58,8 +58,8 @@ type EventWriter interface { SetID(string) // SetTime performs event.Context.SetTime. SetTime(time.Time) - // SetSchemaURL performs event.Context.SetSchemaURL. - SetSchemaURL(string) + // SetDataSchema performs event.Context.SetDataSchema. + SetDataSchema(string) // SetDataContentType performs event.Context.SetDataContentType. SetDataContentType(string) // SetDataContentEncoding performs event.Context.SetDataContentEncoding. @@ -68,7 +68,7 @@ type EventWriter interface { // Extension Attributes // SetExtension performs event.Context.SetExtension. - SetExtension(string, interface{}) + SetExtension(string, interface{}) // TODO: this needs to move to just string. // SetData encodes the given payload with the current encoding settings. SetData(interface{}) error diff --git a/pkg/cloudevents/event_reader.go b/pkg/cloudevents/event_reader.go index a5be4ecf8..3cdf35828 100644 --- a/pkg/cloudevents/event_reader.go +++ b/pkg/cloudevents/event_reader.go @@ -54,8 +54,8 @@ func (e Event) Time() time.Time { return time.Time{} } -// SchemaURL implements EventReader.SchemaURL -func (e Event) SchemaURL() string { +// DataSchema implements EventReader.DataSchema +func (e Event) DataSchema() string { if e.Context != nil { return e.Context.GetSchemaURL() } diff --git a/pkg/cloudevents/event_reader_writer_test.go b/pkg/cloudevents/event_reader_writer_test.go index 5659d957e..b87e4664f 100644 --- a/pkg/cloudevents/event_reader_writer_test.go +++ b/pkg/cloudevents/event_reader_writer_test.go @@ -412,7 +412,7 @@ func TestEventRW_SchemaURL(t *testing.T) { "nilled v01": { event: func() ce.Event { e := ce.New("0.1") - e.SetSchemaURL("should nil") + e.SetDataSchema("should nil") return e }(), want: "", @@ -420,7 +420,7 @@ func TestEventRW_SchemaURL(t *testing.T) { "nilled v02": { event: func() ce.Event { e := ce.New("0.2") - e.SetSchemaURL("should nil") + e.SetDataSchema("should nil") return e }(), want: "", @@ -428,7 +428,7 @@ func TestEventRW_SchemaURL(t *testing.T) { "nilled v03": { event: func() ce.Event { e := ce.New("0.3") - e.SetSchemaURL("should nil") + e.SetDataSchema("should nil") return e }(), want: "", @@ -446,8 +446,8 @@ func TestEventRW_SchemaURL(t *testing.T) { validateReaderWriter(t, tc, got, err) }() - tc.event.SetSchemaURL(tc.set) - got = tc.event.SchemaURL() + tc.event.SetDataSchema(tc.set) + got = tc.event.DataSchema() }) } } diff --git a/pkg/cloudevents/event_test.go b/pkg/cloudevents/event_test.go index 1ddc46617..73fb0d192 100644 --- a/pkg/cloudevents/event_test.go +++ b/pkg/cloudevents/event_test.go @@ -177,7 +177,7 @@ func TestSchemaURL(t *testing.T) { for n, tc := range testCases { t.Run(n, func(t *testing.T) { - got := tc.event.SchemaURL() + got := tc.event.DataSchema() if diff := cmp.Diff(tc.want, got); diff != "" { t.Errorf("unexpected (-want, +got) = %v", diff) diff --git a/pkg/cloudevents/event_writer.go b/pkg/cloudevents/event_writer.go index ce5b3e876..9a31ab621 100644 --- a/pkg/cloudevents/event_writer.go +++ b/pkg/cloudevents/event_writer.go @@ -63,8 +63,8 @@ func (e *Event) SetTime(t time.Time) { } } -// SetSchemaURL implements EventWriter.SetSchemaURL -func (e *Event) SetSchemaURL(s string) { +// SetDataSchema implements EventWriter.SetDataSchema +func (e *Event) SetDataSchema(s string) { if err := e.Context.SetSchemaURL(s); err != nil { panic(err) } diff --git a/pkg/cloudevents/eventcontext.go b/pkg/cloudevents/eventcontext.go index 92ad1f729..f07d8f26f 100644 --- a/pkg/cloudevents/eventcontext.go +++ b/pkg/cloudevents/eventcontext.go @@ -56,7 +56,7 @@ type EventContextWriter interface { SetID(string) error // SetTime sets the time of the context. SetTime(time time.Time) error - // SetSchemaURL sets the schema url of the context. + // SetDataSchema sets the schema url of the context. SetSchemaURL(string) error // SetDataContentType sets the data content type of the context. SetDataContentType(string) error @@ -68,6 +68,8 @@ type EventContextWriter interface { SetExtension(string, interface{}) error } +// EventContextConverter are the methods that allow for event version +// conversion. type EventContextConverter interface { // AsV01 provides a translation from whatever the "native" encoding of the // CloudEvent was to the equivalent in v0.1 field names, moving fields to or @@ -83,6 +85,11 @@ type EventContextConverter interface { // CloudEvent was to the equivalent in v0.3 field names, moving fields to or // from extensions as necessary. AsV03() *EventContextV03 + + // AsV04 provides a translation from whatever the "native" encoding of the + // CloudEvent was to the equivalent in v0.4 field names, moving fields to or + // from extensions as necessary. + AsV04() *EventContextV04 } // EventContext is conical interface for a CloudEvents Context. diff --git a/pkg/cloudevents/eventcontext_v01.go b/pkg/cloudevents/eventcontext_v01.go index d4f416dd1..9cf92d848 100644 --- a/pkg/cloudevents/eventcontext_v01.go +++ b/pkg/cloudevents/eventcontext_v01.go @@ -115,8 +115,12 @@ func (ec EventContextV01) AsV02() *EventContextV02 { // AsV03 implements EventContextConverter.AsV03 func (ec EventContextV01) AsV03() *EventContextV03 { - ecv2 := ec.AsV02() - return ecv2.AsV03() + return ec.AsV02().AsV03() +} + +// AsV04 implements EventContextConverter.AsV04 +func (ec EventContextV01) AsV04() *EventContextV04 { + return ec.AsV02().AsV03().AsV04() } // Validate returns errors based on requirements from the CloudEvents spec. diff --git a/pkg/cloudevents/eventcontext_v01_writer.go b/pkg/cloudevents/eventcontext_v01_writer.go index 7c196d939..74603f4f9 100644 --- a/pkg/cloudevents/eventcontext_v01_writer.go +++ b/pkg/cloudevents/eventcontext_v01_writer.go @@ -79,7 +79,7 @@ func (ec *EventContextV01) SetTime(t time.Time) error { return nil } -// SetSchemaURL implements EventContextWriter.SetSchemaURL +// SetDataSchema implements EventContextWriter.SetDataSchema func (ec *EventContextV01) SetSchemaURL(u string) error { u = strings.TrimSpace(u) if u == "" { diff --git a/pkg/cloudevents/eventcontext_v02_writer.go b/pkg/cloudevents/eventcontext_v02_writer.go index 8935e93d7..7f67465bb 100644 --- a/pkg/cloudevents/eventcontext_v02_writer.go +++ b/pkg/cloudevents/eventcontext_v02_writer.go @@ -79,7 +79,7 @@ func (ec *EventContextV02) SetTime(t time.Time) error { return nil } -// SetSchemaURL implements EventContextWriter.SetSchemaURL +// SetDataSchema implements EventContextWriter.SetDataSchema func (ec *EventContextV02) SetSchemaURL(u string) error { u = strings.TrimSpace(u) if u == "" { diff --git a/pkg/cloudevents/eventcontext_v03.go b/pkg/cloudevents/eventcontext_v03.go index 5f97c043e..44f236156 100644 --- a/pkg/cloudevents/eventcontext_v03.go +++ b/pkg/cloudevents/eventcontext_v03.go @@ -9,8 +9,6 @@ import ( "github.com/cloudevents/sdk-go/pkg/cloudevents/types" ) -// WIP: AS OF FEB 19, 2019 - const ( // CloudEventsVersionV03 represents the version 0.3 of the CloudEvents spec. CloudEventsVersionV03 = "0.3" @@ -32,7 +30,7 @@ type EventContextV03 struct { ID string `json:"id"` // Time - A Timestamp when the event happened. Time *types.Timestamp `json:"time,omitempty"` - // SchemaURL - A link to the schema that the `data` attribute adheres to. + // DataSchema - A link to the schema that the `data` attribute adheres to. SchemaURL *types.URLRef `json:"schemaurl,omitempty"` // GetDataMediaType - A MIME (RFC2046) string describing the media type of `data`. // TODO: Should an empty string assume `application/json`, `application/octet-stream`, or auto-detect the content? @@ -138,6 +136,31 @@ func (ec EventContextV03) AsV03() *EventContextV03 { return &ec } +// AsV04 implements EventContextConverter.AsV04 +func (ec EventContextV03) AsV04() *EventContextV04 { + ret := EventContextV04{ + SpecVersion: CloudEventsVersionV02, + ID: ec.ID, + Time: ec.Time, + Type: ec.Type, + DataSchema: ec.SchemaURL, + DataContentType: ec.DataContentType, + DataContentEncoding: ec.DataContentEncoding, + Source: ec.Source, + Subject: ec.Subject, + Extensions: make(map[string]string), + } + if ec.Extensions != nil { + for k, v := range ec.Extensions { + ret.Extensions[k] = fmt.Sprintf("%v", v) // TODO: This is wrong. Follow up with what should be done. + } + } + if len(ret.Extensions) == 0 { + ret.Extensions = nil + } + return &ret +} + // Validate returns errors based on requirements from the CloudEvents spec. // For more details, see https://github.com/cloudevents/spec/blob/master/spec.md // As of Feb 26, 2019, commit 17c32ea26baf7714ad027d9917d03d2fff79fc7e diff --git a/pkg/cloudevents/eventcontext_v03_writer.go b/pkg/cloudevents/eventcontext_v03_writer.go index 9370d2a3d..2856f6948 100644 --- a/pkg/cloudevents/eventcontext_v03_writer.go +++ b/pkg/cloudevents/eventcontext_v03_writer.go @@ -81,7 +81,7 @@ func (ec *EventContextV03) SetTime(t time.Time) error { return nil } -// SetSchemaURL implements EventContextWriter.SetSchemaURL +// SetDataSchema implements EventContextWriter.SetDataSchema func (ec *EventContextV03) SetSchemaURL(u string) error { u = strings.TrimSpace(u) if u == "" { diff --git a/pkg/cloudevents/eventcontext_v04.go b/pkg/cloudevents/eventcontext_v04.go new file mode 100644 index 000000000..9e49732f7 --- /dev/null +++ b/pkg/cloudevents/eventcontext_v04.go @@ -0,0 +1,298 @@ +package cloudevents + +import ( + "encoding/json" + "fmt" + "sort" + "strings" + + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" +) + +// WIP: AS OF AUG 18, 2019 + +const ( + // CloudEventsVersionV04 represents the version 0.4 of the CloudEvents spec. + CloudEventsVersionV04 = "0.4" +) + +// EventContextV04 represents the non-data attributes of a CloudEvents v0.3 +// event. +type EventContextV04 struct { + // SpecVersion - The version of the CloudEvents specification used by the event. + SpecVersion string `json:"specversion"` + // Type - The type of the occurrence which has happened. + Type string `json:"type"` + // Source - A URI describing the event producer. + Source types.URLRef `json:"source"` + // Subject - The subject of the event in the context of the event producer + // (identified by `source`). + Subject *string `json:"subject,omitempty"` + // ID of the event; must be non-empty and unique within the scope of the producer. + ID string `json:"id"` + // Time - A Timestamp when the event happened. + Time *types.Timestamp `json:"time,omitempty"` + // DataSchema - A link to the schema that the `data` attribute adheres to. + DataSchema *types.URLRef `json:"dataschema,omitempty"` // TODO: spec changed to URL. + // GetDataMediaType - A MIME (RFC2046) string describing the media type of `data`. + // TODO: Should an empty string assume `application/json`, `application/octet-stream`, or auto-detect the content? + DataContentType *string `json:"datacontenttype,omitempty"` + // DataContentEncoding describes the content encoding for the `data` attribute. Valid: nil, `Base64`. + DataContentEncoding *string `json:"datacontentencoding,omitempty"` + // Extensions - Additional extension metadata beyond the base spec. + Extensions map[string]string `json:"-"` +} + +// Adhere to EventContext +var _ EventContext = (*EventContextV04)(nil) + +// ExtensionAs implements EventContext.ExtensionAs +func (ec EventContextV04) ExtensionAs(name string, obj interface{}) error { + value, ok := ec.Extensions[name] + if !ok { + return fmt.Errorf("extension %q does not exist", name) + } + + // Try to unmarshal extension if we find it as a RawMessage. + switch v := value.(type) { + case json.RawMessage: + if err := json.Unmarshal(v, obj); err == nil { + // if that worked, return with obj set. + return nil + } + } + // else try as a string ptr. + + // Only support *string for now. + switch v := obj.(type) { + case *string: + if valueAsString, ok := value.(string); ok { + *v = valueAsString + return nil + } else { + return fmt.Errorf("invalid type for extension %q", name) + } + default: + return fmt.Errorf("unknown extension type %T", obj) + } +} + +// SetExtension adds the extension 'name' with value 'value' to the CloudEvents context. +func (ec *EventContextV04) SetExtension(name string, value interface{}) error { + if ec.Extensions == nil { + ec.Extensions = make(map[string]interface{}) + } + if value == nil { + delete(ec.Extensions, name) + } else { + ec.Extensions[name] = value + } + return nil +} + +// Clone implements EventContextConverter.Clone +func (ec EventContextV04) Clone() EventContext { + return ec.AsV04() +} + +// AsV01 implements EventContextConverter.AsV01 +func (ec EventContextV04) AsV01() *EventContextV01 { + ecv2 := ec.AsV02() + return ecv2.AsV01() +} + +// AsV02 implements EventContextConverter.AsV02 +func (ec EventContextV04) AsV02() *EventContextV02 { + ecv3 := ec.AsV03() + return ecv3.AsV02() +} + +// AsV03 implements EventContextConverter.AsV03 +func (ec EventContextV04) AsV03() *EventContextV03 { + ret := EventContextV03{ + SpecVersion: CloudEventsVersionV02, + ID: ec.ID, + Time: ec.Time, + Type: ec.Type, + SchemaURL: ec.DataSchema, + DataContentType: ec.DataContentType, + DataContentEncoding: ec.DataContentEncoding, + Source: ec.Source, + Subject: ec.Subject, + Extensions: make(map[string]interface{}), + } + if ec.Extensions != nil { + for k, v := range ec.Extensions { + ret.Extensions[k] = v + } + } + if len(ret.Extensions) == 0 { + ret.Extensions = nil + } + return &ret +} + +// AsV04 implements EventContextConverter.AsV04 +func (ec EventContextV04) AsV04() *EventContextV04 { + ec.SpecVersion = CloudEventsVersionV04 + return &ec +} + +// Validate returns errors based on requirements from the CloudEvents spec. +// For more details, see https://github.com/cloudevents/spec/blob/master/spec.md +// As of Feb 26, 2019, commit TODO +// + https://github.com/cloudevents/spec/pull/TODO -> extensions change +// + https://github.com/cloudevents/spec/pull/TODO -> dataschema +func (ec EventContextV04) Validate() error { + errors := []string(nil) + + // TODO: a lot of these have changed. Double check them all. + + // type + // Type: String + // Constraints: + // REQUIRED + // MUST be a non-empty string + // SHOULD be prefixed with a reverse-DNS name. The prefixed domain dictates the organization which defines the semantics of this event type. + eventType := strings.TrimSpace(ec.Type) + if eventType == "" { + errors = append(errors, "type: MUST be a non-empty string") + } + + // specversion + // Type: String + // Constraints: + // REQUIRED + // MUST be a non-empty string + specVersion := strings.TrimSpace(ec.SpecVersion) + if specVersion == "" { + errors = append(errors, "specversion: MUST be a non-empty string") + } + + // source + // Type: URI-reference + // Constraints: + // REQUIRED + source := strings.TrimSpace(ec.Source.String()) + if source == "" { + errors = append(errors, "source: REQUIRED") + } + + // subject + // Type: String + // Constraints: + // OPTIONAL + // MUST be a non-empty string + if ec.Subject != nil { + subject := strings.TrimSpace(*ec.Subject) + if subject == "" { + errors = append(errors, "subject: if present, MUST be a non-empty string") + } + } + + // id + // Type: String + // Constraints: + // REQUIRED + // MUST be a non-empty string + // MUST be unique within the scope of the producer + id := strings.TrimSpace(ec.ID) + if id == "" { + errors = append(errors, "id: MUST be a non-empty string") + + // no way to test "MUST be unique within the scope of the producer" + } + + // time + // Type: Timestamp + // Constraints: + // OPTIONAL + // If present, MUST adhere to the format specified in RFC 3339 + // --> no need to test this, no way to set the time without it being valid. + + // dataschema + // Type: URI + // Constraints: + // OPTIONAL + // If present, MUST adhere to the format specified in RFC 3986 + if ec.DataSchema != nil { + dataSchema := strings.TrimSpace(ec.DataSchema.String()) + // empty string is not RFC 3986 compatible. + if dataSchema == "" { + errors = append(errors, "dataschema: if present, MUST adhere to the format specified in RFC 3986") + } + } + + // datacontenttype + // Type: String per RFC 2046 + // Constraints: + // OPTIONAL + // If present, MUST adhere to the format specified in RFC 2046 + if ec.DataContentType != nil { + dataContentType := strings.TrimSpace(*ec.DataContentType) + if dataContentType == "" { + // TODO: need to test for RFC 2046 + errors = append(errors, "datacontenttype: if present, MUST adhere to the format specified in RFC 2046") + } + } + + // datacontentencoding + // Type: String per RFC 2045 Section 6.1 + // Constraints: + // The attribute MUST be set if the data attribute contains string-encoded binary data. + // Otherwise the attribute MUST NOT be set. + // If present, MUST adhere to RFC 2045 Section 6.1 + if ec.DataContentEncoding != nil { + dataContentEncoding := strings.ToLower(strings.TrimSpace(*ec.DataContentEncoding)) + if dataContentEncoding != Base64 { + // TODO: need to test for RFC 2046 + errors = append(errors, "datacontentencoding: if present, MUST adhere to RFC 2045 Section 6.1") + } + } + + if len(errors) > 0 { + return fmt.Errorf(strings.Join(errors, "\n")) + } + return nil +} + +// String returns a pretty-printed representation of the EventContext. +func (ec EventContextV04) String() string { + b := strings.Builder{} + + b.WriteString("Context Attributes,\n") + + b.WriteString(" specversion: " + ec.SpecVersion + "\n") + b.WriteString(" type: " + ec.Type + "\n") + b.WriteString(" source: " + ec.Source.String() + "\n") + if ec.Subject != nil { + b.WriteString(" subject: " + *ec.Subject + "\n") + } + b.WriteString(" id: " + ec.ID + "\n") + if ec.Time != nil { + b.WriteString(" time: " + ec.Time.String() + "\n") + } + if ec.SchemaURL != nil { + b.WriteString(" schemaurl: " + ec.SchemaURL.String() + "\n") + } + if ec.DataContentType != nil { + b.WriteString(" datacontenttype: " + *ec.DataContentType + "\n") + } + if ec.DataContentEncoding != nil { + b.WriteString(" datacontentencoding: " + *ec.DataContentEncoding + "\n") + } + + if ec.Extensions != nil && len(ec.Extensions) > 0 { + b.WriteString("Extensions,\n") + keys := make([]string, 0, len(ec.Extensions)) + for k := range ec.Extensions { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + b.WriteString(fmt.Sprintf(" %s: %v\n", key, ec.Extensions[key])) + } + } + + return b.String() +} diff --git a/pkg/cloudevents/transport/amqp/codec.go b/pkg/cloudevents/transport/amqp/codec.go index 4498318bd..2f1f72e32 100644 --- a/pkg/cloudevents/transport/amqp/codec.go +++ b/pkg/cloudevents/transport/amqp/codec.go @@ -109,8 +109,8 @@ func (c Codec) toHeaders(e cloudevents.Event) (map[string]interface{}, error) { if e.DataContentEncoding() != "" { h[prefix+"datacontentencoding"] = e.DataContentEncoding() } - if e.SchemaURL() != "" { - h[prefix+"schemaurl"] = e.SchemaURL() + if e.DataSchema() != "" { + h[prefix+"schemaurl"] = e.DataSchema() } for k, v := range e.Extensions() { diff --git a/pkg/cloudevents/transport/http/codec_v01.go b/pkg/cloudevents/transport/http/codec_v01.go index e414c090a..43ada68f0 100644 --- a/pkg/cloudevents/transport/http/codec_v01.go +++ b/pkg/cloudevents/transport/http/codec_v01.go @@ -107,7 +107,7 @@ func (v CodecV01) toHeaders(ec *cloudevents.EventContextV01) (http.Header, error h["CE-EventTypeVersion"] = []string{*ec.EventTypeVersion} } if ec.SchemaURL != nil { - h["CE-SchemaURL"] = []string{ec.SchemaURL.String()} + h["CE-DataSchema"] = []string{ec.SchemaURL.String()} } if ec.ContentType != nil { h.Set("Content-Type", *ec.ContentType) @@ -179,8 +179,8 @@ func (v CodecV01) fromHeaders(h http.Header) (cloudevents.EventContextV01, error if etv != "" { ec.EventTypeVersion = &etv } - ec.SchemaURL = types.ParseURLRef(h.Get("CE-SchemaURL")) - h.Del("CE-SchemaURL") + ec.SchemaURL = types.ParseURLRef(h.Get("CE-DataSchema")) + h.Del("CE-DataSchema") et := h.Get("Content-Type") ec.ContentType = &et diff --git a/pkg/cloudevents/transport/http/codec_v01_test.go b/pkg/cloudevents/transport/http/codec_v01_test.go index 4808c858d..1511b49b8 100644 --- a/pkg/cloudevents/transport/http/codec_v01_test.go +++ b/pkg/cloudevents/transport/http/codec_v01_test.go @@ -73,7 +73,7 @@ func TestCodecV01_Encode(t *testing.T) { "CE-EventType": {"com.example.full"}, "CE-EventTypeVersion": {"v1alpha1"}, "CE-Source": {"http://example.com/source"}, - "CE-SchemaURL": {"http://example.com/schema"}, + "CE-DataSchema": {"http://example.com/schema"}, "Content-Type": {"application/json"}, "CE-X-Test": {`"extended"`}, }, @@ -126,7 +126,7 @@ func TestCodecV01_Encode(t *testing.T) { "CE-EventType": {"com.example.full"}, "CE-EventTypeVersion": {"v1alpha1"}, "CE-Source": {"http://example.com/source"}, - "CE-SchemaURL": {"http://example.com/schema"}, + "CE-DataSchema": {"http://example.com/schema"}, "Content-Type": {"application/json"}, "CE-X-Test": {`"extended"`}, }, @@ -277,7 +277,7 @@ func TestCodecV01_Decode(t *testing.T) { "CE-EventType": {"com.example.full"}, "CE-EventTypeVersion": {"v1alpha1"}, "CE-Source": {"http://example.com/source"}, - "CE-SchemaURL": {"http://example.com/schema"}, + "CE-DataSchema": {"http://example.com/schema"}, "Content-Type": {"application/json"}, "CE-X-Test": {`"extended"`}, }, diff --git a/pkg/cloudevents/transport/pubsub/codec.go b/pkg/cloudevents/transport/pubsub/codec.go index 98b3d7755..bc762d020 100644 --- a/pkg/cloudevents/transport/pubsub/codec.go +++ b/pkg/cloudevents/transport/pubsub/codec.go @@ -85,8 +85,8 @@ func (c Codec) toAttributes(e cloudevents.Event) (map[string]string, error) { t := types.Timestamp{Time: e.Time()} // TODO: change e.Time() to return string so I don't have to do this. a[prefix+"time"] = t.String() } - if e.SchemaURL() != "" { - a[prefix+"schemaurl"] = e.SchemaURL() + if e.DataSchema() != "" { + a[prefix+"schemaurl"] = e.DataSchema() } if e.DataContentType() != "" {