diff --git a/pkg/binding/transport_test.go b/pkg/binding/transport_test.go index 5f0e6584c..b41227e38 100644 --- a/pkg/binding/transport_test.go +++ b/pkg/binding/transport_test.go @@ -16,7 +16,7 @@ func TestTransportSend(t *testing.T) { transport := binding.NewTransportAdapter(binding.ChanSender(messageChannel), binding.ChanReceiver(messageChannel), nil) ev := test.MinEvent() - client, err := cloudevents.NewClient(transport) + client, err := cloudevents.NewClient(transport, cloudevents.WithoutTracePropagation()) require.NoError(t, err) _, _, err = client.Send(context.Background(), ev) diff --git a/pkg/cloudevents/client/client.go b/pkg/cloudevents/client/client.go index a2c4f180c..209eb8f77 100644 --- a/pkg/cloudevents/client/client.go +++ b/pkg/cloudevents/client/client.go @@ -121,6 +121,7 @@ func (c *ceClient) obsSend(ctx context.Context, event cloudevents.Event) (contex // Set distributed tracing extension. if !c.disableTracePropagation { if span := trace.FromContext(ctx); span != nil { + event.Context = event.Context.Clone() if err := extensions.FromSpanContext(span.SpanContext()).AddTracingAttributes(event.Context); err != nil { return ctx, nil, fmt.Errorf("error setting distributed tracing extension: %w", err) } diff --git a/pkg/cloudevents/eventcontext_test.go b/pkg/cloudevents/eventcontext_test.go index 8ba5da0a9..b99b5f8b9 100644 --- a/pkg/cloudevents/eventcontext_test.go +++ b/pkg/cloudevents/eventcontext_test.go @@ -7,6 +7,7 @@ import ( ce "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/types" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" ) func TestContextAsV01(t *testing.T) { @@ -313,3 +314,40 @@ func TestContextAsV1(t *testing.T) { }) } } + +func TestEventContextClone(t *testing.T) { + tests := []struct { + name string + context ce.EventContext + }{ + { + name: "v0.1", + context: FullEventContextV01(types.Timestamp{Time: time.Now()}), + }, + { + name: "v0.2", + context: FullEventContextV02(types.Timestamp{Time: time.Now()}), + }, + { + name: "v0.3", + context: FullEventContextV03(types.Timestamp{Time: time.Now()}), + }, + { + name: "v1.0", + context: FullEventContextV1(types.Timestamp{Time: time.Now()}), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + initial := test.context + require.NoError(t, initial.SetExtension("aaa", "bbb")) + + clone := initial.Clone() + require.NoError(t, clone.SetExtension("aaa", "ccc")) + + val, err := initial.GetExtension("aaa") + require.NoError(t, err) + require.Equal(t, "bbb", val) + }) + } +} diff --git a/pkg/cloudevents/eventcontext_v01.go b/pkg/cloudevents/eventcontext_v01.go index 0b01823be..f48459cdf 100644 --- a/pkg/cloudevents/eventcontext_v01.go +++ b/pkg/cloudevents/eventcontext_v01.go @@ -76,7 +76,21 @@ func (ec *EventContextV01) SetExtension(name string, value interface{}) error { // Clone implements EventContextConverter.Clone func (ec EventContextV01) Clone() EventContext { - return ec.AsV01() + ev01 := ec.AsV01() + ev01.Extensions = ev01.cloneExtensions() + return ev01 +} + +func (ec *EventContextV01) cloneExtensions() map[string]interface{} { + old := ec.Extensions + if old == nil { + return nil + } + new := make(map[string]interface{}, len(ec.Extensions)) + for k, v := range old { + new[k] = v + } + return new } // AsV01 implements EventContextConverter.AsV01 diff --git a/pkg/cloudevents/eventcontext_v02.go b/pkg/cloudevents/eventcontext_v02.go index 3dde8a19b..7b936bd81 100644 --- a/pkg/cloudevents/eventcontext_v02.go +++ b/pkg/cloudevents/eventcontext_v02.go @@ -85,7 +85,21 @@ func (ec *EventContextV02) SetExtension(name string, value interface{}) error { // Clone implements EventContextConverter.Clone func (ec EventContextV02) Clone() EventContext { - return ec.AsV02() + ec02 := ec.AsV02() + ec02.Extensions = ec02.cloneExtensions() + return ec02 +} + +func (ec *EventContextV02) cloneExtensions() map[string]interface{} { + old := ec.Extensions + if old == nil { + return nil + } + new := make(map[string]interface{}, len(ec.Extensions)) + for k, v := range old { + new[k] = v + } + return new } // AsV01 implements EventContextConverter.AsV01 diff --git a/pkg/cloudevents/eventcontext_v03.go b/pkg/cloudevents/eventcontext_v03.go index 2e714d187..6fc776539 100644 --- a/pkg/cloudevents/eventcontext_v03.go +++ b/pkg/cloudevents/eventcontext_v03.go @@ -94,7 +94,21 @@ func (ec *EventContextV03) SetExtension(name string, value interface{}) error { // Clone implements EventContextConverter.Clone func (ec EventContextV03) Clone() EventContext { - return ec.AsV03() + ec03 := ec.AsV03() + ec03.Extensions = ec03.cloneExtensions() + return ec03 +} + +func (ec *EventContextV03) cloneExtensions() map[string]interface{} { + old := ec.Extensions + if old == nil { + return nil + } + new := make(map[string]interface{}, len(ec.Extensions)) + for k, v := range old { + new[k] = v + } + return new } // AsV01 implements EventContextConverter.AsV01 diff --git a/pkg/cloudevents/eventcontext_v1.go b/pkg/cloudevents/eventcontext_v1.go index 8bb8cb41b..320f47e0d 100644 --- a/pkg/cloudevents/eventcontext_v1.go +++ b/pkg/cloudevents/eventcontext_v1.go @@ -97,7 +97,21 @@ func (ec *EventContextV1) SetExtension(name string, value interface{}) error { // Clone implements EventContextConverter.Clone func (ec EventContextV1) Clone() EventContext { - return ec.AsV1() + ec1 := ec.AsV1() + ec1.Extensions = ec1.cloneExtensions() + return ec1 +} + +func (ec *EventContextV1) cloneExtensions() map[string]interface{} { + old := ec.Extensions + if old == nil { + return nil + } + new := make(map[string]interface{}, len(ec.Extensions)) + for k, v := range old { + new[k] = v + } + return new } // AsV01 implements EventContextConverter.AsV01 diff --git a/test/http/direct_v1_test.go b/test/http/direct_v1_test.go index 27a895329..9403a0404 100644 --- a/test/http/direct_v1_test.go +++ b/test/http/direct_v1_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" ) func TestSenderReceiver_binary_v01(t *testing.T) { @@ -88,7 +89,7 @@ func TestSenderReceiver_structured_v01(t *testing.T) { Header: map[string][]string{ "content-type": {"application/cloudevents+json"}, }, - Body: fmt.Sprintf(`{"data":{"hello":"unittest"},"id":"ABC-123","source":"/unit/test/client","specversion":"1.0","subject":"resource","time":%q,"type":"unit.test.client.sent"}`, now.UTC().Format(time.RFC3339Nano)), + Body: fmt.Sprintf(`{"data":{"hello":"unittest"},"id":"ABC-123","source":"/unit/test/client","specversion":"1.0","subject":"resource","time":%q,"type":"unit.test.client.sent"}`, types.FormatTime(now.UTC())), ContentLength: 182, }, },