From 0cf61a8bfb44c8d3fbfb80712a7475e4b8cd5c49 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 29 Jul 2021 20:04:56 +0200 Subject: [PATCH] Resolved Trigger deadLetterSink is available in the status (#1092) * Resolved Trigger deadLetterSink is available in the status Signed-off-by: Pierangelo Di Pilato * Add another test case Signed-off-by: Pierangelo Di Pilato --- control-plane/pkg/core/config/utils.go | 38 +++ control-plane/pkg/core/config/utils_test.go | 157 +++++++++++ .../pkg/reconciler/trigger/trigger.go | 19 +- .../reconciler/trigger/trigger_lifecycle.go | 17 +- .../pkg/reconciler/trigger/trigger_test.go | 10 + go.mod | 6 +- go.sum | 13 +- .../github.com/hashicorp/go-uuid/.travis.yml | 12 + .../github.com/hashicorp/go-uuid/LICENSE | 0 .../github.com/hashicorp/go-uuid/README.md | 8 + .../github.com/hashicorp/go-uuid/go.mod | 1 + .../github.com/hashicorp/go-uuid/uuid.go | 83 ++++++ .../hashicorp/golang-lru/.gitignore | 23 ++ .../github.com/hashicorp/golang-lru/2q.go | 223 +++++++++++++++ .../github.com/hashicorp/golang-lru/LICENSE | 0 .../github.com/hashicorp/golang-lru/README.md | 25 ++ .../github.com/hashicorp/golang-lru/arc.go | 257 ++++++++++++++++++ .../github.com/hashicorp/golang-lru/doc.go | 21 ++ .../github.com/hashicorp/golang-lru/go.mod | 3 + .../github.com/hashicorp/golang-lru/lru.go | 150 ++++++++++ .../hashicorp/golang-lru/simplelru/lru.go | 177 ++++++++++++ .../golang-lru/simplelru/lru_interface.go | 39 +++ .../pkg/apis/eventing/v1/trigger_lifecycle.go | 16 +- .../pkg/apis/eventing/v1/trigger_types.go | 4 + .../apis/eventing/v1/zz_generated.deepcopy.go | 5 + .../pkg/reconciler/testing/v1/broker.go | 17 ++ .../pkg/reconciler/testing/v1/trigger.go | 25 ++ .../pkg/test/spoof/error_checks.go | 4 + vendor/knative.dev/pkg/test/spoof/spoof.go | 5 + .../reconciler-test/cmd/eventshub/main.go | 82 ++---- .../pkg/eventshub/103-pod.yaml | 2 +- .../pkg/eventshub/event_info.go | 3 + .../pkg/eventshub/event_log.go | 14 +- .../pkg/eventshub/eventshub.go | 97 +++++++ .../pkg/eventshub/eventshub_image.go | 45 +++ .../reconciler-test/pkg/eventshub/options.go | 7 + .../reconciler-test/pkg/eventshub/prober.go | 5 + .../pkg/eventshub/receiver/receiver.go | 7 +- .../pkg/eventshub/resources.go | 7 +- vendor/modules.txt | 6 +- 40 files changed, 1530 insertions(+), 103 deletions(-) create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/.travis.yml mode change 100755 => 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/LICENSE create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/README.md create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/go.mod create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/uuid.go create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/.gitignore create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/2q.go mode change 100755 => 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/LICENSE create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/README.md create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/arc.go create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/doc.go create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/go.mod create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/lru.go create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/simplelru/lru.go create mode 100644 third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/simplelru/lru_interface.go create mode 100644 vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub.go create mode 100644 vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub_image.go diff --git a/control-plane/pkg/core/config/utils.go b/control-plane/pkg/core/config/utils.go index efea8777c9..736264c5ad 100644 --- a/control-plane/pkg/core/config/utils.go +++ b/control-plane/pkg/core/config/utils.go @@ -136,3 +136,41 @@ func DurationMillisFromISO8601String(durationStr *string, defaultDurationMillis func IncrementContractGeneration(ct *contract.Contract) { ct.Generation = (ct.Generation + 1) % (math.MaxUint64 - 1) } + +// MergeEgressConfig merges the 2 given egress configs into one egress config prioritizing e0 values. +func MergeEgressConfig(e0, e1 *contract.EgressConfig) *contract.EgressConfig { + if e0 == nil { + return e1 + } + if e1 == nil { + return e0 + } + return &contract.EgressConfig{ + DeadLetter: mergeString(e0.GetDeadLetter(), e1.GetDeadLetter()), + Retry: mergeUint32(e0.GetRetry(), e1.GetRetry()), + BackoffPolicy: e0.GetBackoffPolicy(), + BackoffDelay: mergeUint64(e0.GetBackoffDelay(), e1.GetBackoffDelay()), + Timeout: mergeUint64(e0.GetTimeout(), e1.GetTimeout()), + } +} + +func mergeUint64(a, b uint64) uint64 { + if a == 0 { + return b + } + return a +} + +func mergeUint32(a, b uint32) uint32 { + if a == 0 { + return b + } + return a +} + +func mergeString(a, b string) string { + if a == "" { + return b + } + return a +} diff --git a/control-plane/pkg/core/config/utils_test.go b/control-plane/pkg/core/config/utils_test.go index 1317ed6ba8..05a79f237c 100644 --- a/control-plane/pkg/core/config/utils_test.go +++ b/control-plane/pkg/core/config/utils_test.go @@ -23,6 +23,9 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/protobuf/runtime/protoimpl" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -346,3 +349,157 @@ func TestEgressConfigFromDelivery(t *testing.T) { }) } } + +func TestMergeEgressConfig(t *testing.T) { + + tt := []struct { + name string + e0 *contract.EgressConfig + e1 *contract.EgressConfig + expected *contract.EgressConfig + }{ + { + name: "e0 nil", + e1: &contract.EgressConfig{ + Retry: 42, + }, + expected: &contract.EgressConfig{ + Retry: 42, + }, + }, + { + name: "e1 nil", + e0: &contract.EgressConfig{ + Retry: 42, + }, + expected: &contract.EgressConfig{ + Retry: 42, + }, + }, + { + name: "e0 retry priority", + e0: &contract.EgressConfig{ + Retry: 42, + }, + e1: &contract.EgressConfig{ + Retry: 43, + }, + expected: &contract.EgressConfig{ + Retry: 42, + }, + }, + { + name: "e0 dead letter priority", + e0: &contract.EgressConfig{ + DeadLetter: "e0", + }, + e1: &contract.EgressConfig{ + Retry: 43, + DeadLetter: "e1", + }, + expected: &contract.EgressConfig{ + DeadLetter: "e0", + Retry: 43, + }, + }, + { + name: "e0 timeout priority", + e0: &contract.EgressConfig{ + DeadLetter: "e0", + Timeout: 100, + }, + e1: &contract.EgressConfig{ + Retry: 43, + DeadLetter: "e1", + Timeout: 101, + }, + expected: &contract.EgressConfig{ + DeadLetter: "e0", + Retry: 43, + Timeout: 100, + }, + }, + { + name: "e0 backoff delay priority", + e0: &contract.EgressConfig{ + DeadLetter: "e0", + Timeout: 100, + BackoffDelay: 4001, + }, + e1: &contract.EgressConfig{ + Retry: 43, + DeadLetter: "e1", + Timeout: 101, + BackoffDelay: 4000, + }, + expected: &contract.EgressConfig{ + DeadLetter: "e0", + Retry: 43, + Timeout: 100, + BackoffDelay: 4001, + }, + }, + { + name: "e0 backoff policy priority", + e0: &contract.EgressConfig{ + DeadLetter: "e0", + Timeout: 100, + BackoffDelay: 4001, + BackoffPolicy: 0, + }, + e1: &contract.EgressConfig{ + Retry: 43, + DeadLetter: "e1", + Timeout: 101, + BackoffDelay: 4000, + BackoffPolicy: 1, + }, + expected: &contract.EgressConfig{ + DeadLetter: "e0", + Retry: 43, + Timeout: 100, + BackoffDelay: 4001, + BackoffPolicy: 0, + }, + }, + { + name: "e0 retry priority (all config)", + e0: &contract.EgressConfig{ + Retry: 42, + DeadLetter: "e0", + Timeout: 100, + BackoffDelay: 4001, + BackoffPolicy: 0, + }, + e1: &contract.EgressConfig{ + Retry: 43, + DeadLetter: "e1", + Timeout: 101, + BackoffDelay: 4000, + BackoffPolicy: 1, + }, + expected: &contract.EgressConfig{ + DeadLetter: "e0", + Retry: 42, + Timeout: 100, + BackoffDelay: 4001, + BackoffPolicy: 0, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + opts := []cmp.Option{ + cmpopts.IgnoreTypes( + protoimpl.MessageState{}, + protoimpl.SizeCache(0), + protoimpl.UnknownFields{}, + ), + } + if diff := cmp.Diff(tc.expected, MergeEgressConfig(tc.e0, tc.e1), opts...); diff != "" { + t.Errorf("(-want, +got) %s", diff) + } + }) + } +} diff --git a/control-plane/pkg/reconciler/trigger/trigger.go b/control-plane/pkg/reconciler/trigger/trigger.go index 6aae2183d2..72de4f97c7 100644 --- a/control-plane/pkg/reconciler/trigger/trigger.go +++ b/control-plane/pkg/reconciler/trigger/trigger.go @@ -137,13 +137,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge } triggerIndex := coreconfig.FindEgress(ct.Resources[brokerIndex].Egresses, trigger.UID) - triggerConfig, err := r.getTriggerConfig(ctx, trigger) + triggerConfig, err := r.getTriggerConfig(ctx, broker, trigger) if err != nil { return statusConditionManager.failedToResolveTriggerConfig(err) } - statusConditionManager.subscriberResolved( - fmt.Sprintf("Subscriber will receive events with the delivery order: %s", triggerConfig.DeliveryOrder.String()), - ) + statusConditionManager.subscriberResolved(triggerConfig) changed := coreconfig.AddOrUpdateEgressConfig(ct, brokerIndex, triggerConfig, triggerIndex) @@ -266,7 +264,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, trigger *eventing.Trigger return nil } -func (r *Reconciler) getTriggerConfig(ctx context.Context, trigger *eventing.Trigger) (*contract.Egress, error) { +func (r *Reconciler) getTriggerConfig(ctx context.Context, broker *eventing.Broker, trigger *eventing.Trigger) (*contract.Egress, error) { destination, err := r.Resolver.URIFromDestinationV1(ctx, trigger.Spec.Subscriber, trigger) if err != nil { return nil, fmt.Errorf("failed to resolve Trigger.Spec.Subscriber: %w", err) @@ -285,11 +283,16 @@ func (r *Reconciler) getTriggerConfig(ctx context.Context, trigger *eventing.Tri } } - egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, trigger, trigger.Spec.Delivery, r.Configs.DefaultBackoffDelayMs) + triggerEgressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, trigger, trigger.Spec.Delivery, r.Configs.DefaultBackoffDelayMs) + if err != nil { + return nil, fmt.Errorf("[trigger] %w", err) + } + brokerEgressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, broker, broker.Spec.Delivery, r.Configs.DefaultBackoffDelayMs) if err != nil { - return nil, err + return nil, fmt.Errorf("[broker] %w", err) } - egress.EgressConfig = egressConfig + // Merge Broker and Trigger egress configuration prioritizing the Trigger configuration. + egress.EgressConfig = coreconfig.MergeEgressConfig(triggerEgressConfig, brokerEgressConfig) deliveryOrderAnnotationValue, ok := trigger.Annotations[deliveryOrderAnnotation] if ok { diff --git a/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go b/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go index 88431c61ea..cd9219708b 100644 --- a/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go +++ b/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go @@ -26,6 +26,7 @@ import ( "knative.dev/pkg/reconciler" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" + "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" ) type statusConditionManager struct { @@ -122,10 +123,22 @@ func (m *statusConditionManager) failedToUpdateDispatcherPodsAnnotation(err erro ) } -func (m *statusConditionManager) subscriberResolved(details string) { +func (m *statusConditionManager) subscriberResolved(egress *contract.Egress) { m.Trigger.GetConditionSet().Manage(&m.Trigger.Status).MarkTrueWithReason( eventing.TriggerConditionSubscriberResolved, string(eventing.TriggerConditionSubscriberResolved), - details, + fmt.Sprintf("Subscriber will receive events with the delivery order: %s", egress.DeliveryOrder.String()), ) + + if isDeadLetterSinkConfigured(egress.EgressConfig) { + m.Trigger.Status.MarkDeadLetterSinkResolvedSucceeded() + uri, _ /* safe to ignore */ := apis.ParseURL(egress.EgressConfig.DeadLetter) + m.Trigger.Status.DeadLetterURI = uri + } else { + m.Trigger.Status.MarkDeadLetterSinkNotConfigured() + } +} + +func isDeadLetterSinkConfigured(egressConfig *contract.EgressConfig) bool { + return egressConfig != nil && egressConfig.DeadLetter != "" } diff --git a/control-plane/pkg/reconciler/trigger/trigger_test.go b/control-plane/pkg/reconciler/trigger/trigger_test.go index 49cef136ea..3c6226ab98 100644 --- a/control-plane/pkg/reconciler/trigger/trigger_test.go +++ b/control-plane/pkg/reconciler/trigger/trigger_test.go @@ -149,6 +149,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) reconcilertesting.WithTriggerDependencyReady(), reconcilertesting.WithTriggerBrokerReady(), withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED), + reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(), ), }, }, @@ -212,6 +213,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) reconcilertesting.WithTriggerBrokerReady(), withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_ORDERED), reconcilertesting.WithAnnotation(deliveryOrderAnnotation, deliveryOrderOrdered), + reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(), ), }, }, @@ -275,6 +277,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) reconcilertesting.WithTriggerBrokerReady(), withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED), reconcilertesting.WithAnnotation(deliveryOrderAnnotation, deliveryOrderUnordered), + reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(), ), }, }, @@ -344,6 +347,8 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) reconcilertesting.WithTriggerDependencyReady(), reconcilertesting.WithTriggerBrokerReady(), withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED), + reconcilertesting.WithTriggerDeadLetterSinkResolvedSucceeded(), + reconcilertesting.WithTriggerStatusDeadLetterSinkURI("http://localhost/path"), ), }, }, @@ -456,6 +461,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) reconcilertesting.WithTriggerDependencyReady(), reconcilertesting.WithTriggerBrokerReady(), withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED), + reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(), ), }, }, @@ -753,6 +759,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) reconcilertesting.WithTriggerDependencyReady(), reconcilertesting.WithTriggerBrokerReady(), withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED), + reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(), ), }, }, @@ -939,6 +946,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) reconcilertesting.WithTriggerDependencyReady(), reconcilertesting.WithTriggerBrokerReady(), withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED), + reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(), ), }, }, @@ -1120,6 +1128,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) reconcilertesting.WithTriggerDependencyReady(), reconcilertesting.WithTriggerBrokerReady(), withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED), + reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(), ), }, }, @@ -1365,6 +1374,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) reconcilertesting.WithTriggerDependencyReady(), reconcilertesting.WithTriggerBrokerReady(), withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED), + reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(), ), }, }, diff --git a/go.mod b/go.mod index f2af803add..de5a4bf25d 100644 --- a/go.mod +++ b/go.mod @@ -21,8 +21,8 @@ require ( k8s.io/apiserver v0.20.7 k8s.io/client-go v0.20.7 k8s.io/utils v0.0.0-20210111153108-fddb29f9d009 - knative.dev/eventing v0.24.0 + knative.dev/eventing v0.24.1-0.20210708130023-221dfdfced62 knative.dev/hack v0.0.0-20210622141627-e28525d8d260 - knative.dev/pkg v0.0.0-20210628225612-51cfaabbcdf6 - knative.dev/reconciler-test v0.0.0-20210630182710-2a6d91dfee1e + knative.dev/pkg v0.0.0-20210706174620-fe90576475ca + knative.dev/reconciler-test v0.0.0-20210707164418-32f0df0c7399 ) diff --git a/go.sum b/go.sum index 708eb57dc9..2015c2f715 100644 --- a/go.sum +++ b/go.sum @@ -1222,17 +1222,16 @@ k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAG k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210111153108-fddb29f9d009 h1:0T5IaWHO3sJTEmCP6mUlBvMukxPKUQWqiI/YuiBNMiQ= k8s.io/utils v0.0.0-20210111153108-fddb29f9d009/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -knative.dev/eventing v0.24.0 h1:CoaQwZBizxZyOFJUvFcyb7vYSvpYBmfb4IYRNWUdTPE= -knative.dev/eventing v0.24.0/go.mod h1:9xo0SWkIfpXrx0lvGQO7MUlPF8cu+QCMd2gGxj6wxrU= +knative.dev/eventing v0.24.1-0.20210708130023-221dfdfced62 h1:UiWAQRr1xifUbK7znIJSLQmnx6hnUIhpJQ1coMO6S9U= +knative.dev/eventing v0.24.1-0.20210708130023-221dfdfced62/go.mod h1:vrmE0f+CSunMBBCvb1lyO5OTsBN72bipJOsC0WoJvj4= knative.dev/hack v0.0.0-20210622141627-e28525d8d260 h1:f2eMtOubAOc/Q7JlvFPDKXiPlJVK+VpX2Cot8hRzCgQ= knative.dev/hack v0.0.0-20210622141627-e28525d8d260/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= knative.dev/hack/schema v0.0.0-20210622141627-e28525d8d260/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0= knative.dev/pkg v0.0.0-20210622173328-dd0db4b05c80/go.mod h1:kGegTnbZ+ljFjAE3E1+8wgaH2LMv8qYi+72o3F3cbdc= -knative.dev/pkg v0.0.0-20210628225612-51cfaabbcdf6 h1:DEZJpuiDMEzdJQZI0+ZAqsLe5uwsHHYOgrjcg0Sv8wA= -knative.dev/pkg v0.0.0-20210628225612-51cfaabbcdf6/go.mod h1:kGegTnbZ+ljFjAE3E1+8wgaH2LMv8qYi+72o3F3cbdc= -knative.dev/reconciler-test v0.0.0-20210623134345-88c84739abd9/go.mod h1:4wqv2WyWUC5yhTesRUVwgjv/fHTHny1RYBfdB6tVDok= -knative.dev/reconciler-test v0.0.0-20210630182710-2a6d91dfee1e h1:OlvD5NiM8rl3j56NIQfmwSazBffwofQE+XDAt88+BV4= -knative.dev/reconciler-test v0.0.0-20210630182710-2a6d91dfee1e/go.mod h1:4wqv2WyWUC5yhTesRUVwgjv/fHTHny1RYBfdB6tVDok= +knative.dev/pkg v0.0.0-20210706174620-fe90576475ca h1:WF0VUpn7S8RvhQ1Q419NAlI+iFJUBTDmVR7lbMzhGMk= +knative.dev/pkg v0.0.0-20210706174620-fe90576475ca/go.mod h1:kGegTnbZ+ljFjAE3E1+8wgaH2LMv8qYi+72o3F3cbdc= +knative.dev/reconciler-test v0.0.0-20210707164418-32f0df0c7399 h1:qPbQKtiWTL2Pw5r44CgxN9wxy7SN+drHlJMxvL7D6DA= +knative.dev/reconciler-test v0.0.0-20210707164418-32f0df0c7399/go.mod h1:4wqv2WyWUC5yhTesRUVwgjv/fHTHny1RYBfdB6tVDok= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/.travis.yml b/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/.travis.yml new file mode 100644 index 0000000000..769849071e --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/.travis.yml @@ -0,0 +1,12 @@ +language: go + +sudo: false + +go: + - 1.4 + - 1.5 + - 1.6 + - tip + +script: + - go test -bench . -benchmem -v ./... diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/LICENSE b/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/LICENSE old mode 100755 new mode 100644 diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/README.md b/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/README.md new file mode 100644 index 0000000000..fbde8b9aef --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/README.md @@ -0,0 +1,8 @@ +# uuid [![Build Status](https://travis-ci.org/hashicorp/go-uuid.svg?branch=master)](https://travis-ci.org/hashicorp/go-uuid) + +Generates UUID-format strings using high quality, _purely random_ bytes. It is **not** intended to be RFC compliant, merely to use a well-understood string representation of a 128-bit value. It can also parse UUID-format strings into their component bytes. + +Documentation +============= + +The full documentation is available on [Godoc](http://godoc.org/github.com/hashicorp/go-uuid). diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/go.mod b/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/go.mod new file mode 100644 index 0000000000..dd57f9d21a --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/go.mod @@ -0,0 +1 @@ +module github.com/hashicorp/go-uuid diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/uuid.go b/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/uuid.go new file mode 100644 index 0000000000..0c10c4e9f5 --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/uuid.go @@ -0,0 +1,83 @@ +package uuid + +import ( + "crypto/rand" + "encoding/hex" + "fmt" + "io" +) + +// GenerateRandomBytes is used to generate random bytes of given size. +func GenerateRandomBytes(size int) ([]byte, error) { + return GenerateRandomBytesWithReader(size, rand.Reader) +} + +// GenerateRandomBytesWithReader is used to generate random bytes of given size read from a given reader. +func GenerateRandomBytesWithReader(size int, reader io.Reader) ([]byte, error) { + if reader == nil { + return nil, fmt.Errorf("provided reader is nil") + } + buf := make([]byte, size) + if _, err := io.ReadFull(reader, buf); err != nil { + return nil, fmt.Errorf("failed to read random bytes: %v", err) + } + return buf, nil +} + + +const uuidLen = 16 + +// GenerateUUID is used to generate a random UUID +func GenerateUUID() (string, error) { + return GenerateUUIDWithReader(rand.Reader) +} + +// GenerateUUIDWithReader is used to generate a random UUID with a given Reader +func GenerateUUIDWithReader(reader io.Reader) (string, error) { + if reader == nil { + return "", fmt.Errorf("provided reader is nil") + } + buf, err := GenerateRandomBytesWithReader(uuidLen, reader) + if err != nil { + return "", err + } + return FormatUUID(buf) +} + +func FormatUUID(buf []byte) (string, error) { + if buflen := len(buf); buflen != uuidLen { + return "", fmt.Errorf("wrong length byte slice (%d)", buflen) + } + + return fmt.Sprintf("%x-%x-%x-%x-%x", + buf[0:4], + buf[4:6], + buf[6:8], + buf[8:10], + buf[10:16]), nil +} + +func ParseUUID(uuid string) ([]byte, error) { + if len(uuid) != 2 * uuidLen + 4 { + return nil, fmt.Errorf("uuid string is wrong length") + } + + if uuid[8] != '-' || + uuid[13] != '-' || + uuid[18] != '-' || + uuid[23] != '-' { + return nil, fmt.Errorf("uuid is improperly formatted") + } + + hexStr := uuid[0:8] + uuid[9:13] + uuid[14:18] + uuid[19:23] + uuid[24:36] + + ret, err := hex.DecodeString(hexStr) + if err != nil { + return nil, err + } + if len(ret) != uuidLen { + return nil, fmt.Errorf("decoded hex is the wrong length") + } + + return ret, nil +} diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/.gitignore b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/.gitignore new file mode 100644 index 0000000000..836562412f --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/.gitignore @@ -0,0 +1,23 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/2q.go b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/2q.go new file mode 100644 index 0000000000..e474cd0758 --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/2q.go @@ -0,0 +1,223 @@ +package lru + +import ( + "fmt" + "sync" + + "github.com/hashicorp/golang-lru/simplelru" +) + +const ( + // Default2QRecentRatio is the ratio of the 2Q cache dedicated + // to recently added entries that have only been accessed once. + Default2QRecentRatio = 0.25 + + // Default2QGhostEntries is the default ratio of ghost + // entries kept to track entries recently evicted + Default2QGhostEntries = 0.50 +) + +// TwoQueueCache is a thread-safe fixed size 2Q cache. +// 2Q is an enhancement over the standard LRU cache +// in that it tracks both frequently and recently used +// entries separately. This avoids a burst in access to new +// entries from evicting frequently used entries. It adds some +// additional tracking overhead to the standard LRU cache, and is +// computationally about 2x the cost, and adds some metadata over +// head. The ARCCache is similar, but does not require setting any +// parameters. +type TwoQueueCache struct { + size int + recentSize int + + recent simplelru.LRUCache + frequent simplelru.LRUCache + recentEvict simplelru.LRUCache + lock sync.RWMutex +} + +// New2Q creates a new TwoQueueCache using the default +// values for the parameters. +func New2Q(size int) (*TwoQueueCache, error) { + return New2QParams(size, Default2QRecentRatio, Default2QGhostEntries) +} + +// New2QParams creates a new TwoQueueCache using the provided +// parameter values. +func New2QParams(size int, recentRatio float64, ghostRatio float64) (*TwoQueueCache, error) { + if size <= 0 { + return nil, fmt.Errorf("invalid size") + } + if recentRatio < 0.0 || recentRatio > 1.0 { + return nil, fmt.Errorf("invalid recent ratio") + } + if ghostRatio < 0.0 || ghostRatio > 1.0 { + return nil, fmt.Errorf("invalid ghost ratio") + } + + // Determine the sub-sizes + recentSize := int(float64(size) * recentRatio) + evictSize := int(float64(size) * ghostRatio) + + // Allocate the LRUs + recent, err := simplelru.NewLRU(size, nil) + if err != nil { + return nil, err + } + frequent, err := simplelru.NewLRU(size, nil) + if err != nil { + return nil, err + } + recentEvict, err := simplelru.NewLRU(evictSize, nil) + if err != nil { + return nil, err + } + + // Initialize the cache + c := &TwoQueueCache{ + size: size, + recentSize: recentSize, + recent: recent, + frequent: frequent, + recentEvict: recentEvict, + } + return c, nil +} + +// Get looks up a key's value from the cache. +func (c *TwoQueueCache) Get(key interface{}) (value interface{}, ok bool) { + c.lock.Lock() + defer c.lock.Unlock() + + // Check if this is a frequent value + if val, ok := c.frequent.Get(key); ok { + return val, ok + } + + // If the value is contained in recent, then we + // promote it to frequent + if val, ok := c.recent.Peek(key); ok { + c.recent.Remove(key) + c.frequent.Add(key, val) + return val, ok + } + + // No hit + return nil, false +} + +// Add adds a value to the cache. +func (c *TwoQueueCache) Add(key, value interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + + // Check if the value is frequently used already, + // and just update the value + if c.frequent.Contains(key) { + c.frequent.Add(key, value) + return + } + + // Check if the value is recently used, and promote + // the value into the frequent list + if c.recent.Contains(key) { + c.recent.Remove(key) + c.frequent.Add(key, value) + return + } + + // If the value was recently evicted, add it to the + // frequently used list + if c.recentEvict.Contains(key) { + c.ensureSpace(true) + c.recentEvict.Remove(key) + c.frequent.Add(key, value) + return + } + + // Add to the recently seen list + c.ensureSpace(false) + c.recent.Add(key, value) + return +} + +// ensureSpace is used to ensure we have space in the cache +func (c *TwoQueueCache) ensureSpace(recentEvict bool) { + // If we have space, nothing to do + recentLen := c.recent.Len() + freqLen := c.frequent.Len() + if recentLen+freqLen < c.size { + return + } + + // If the recent buffer is larger than + // the target, evict from there + if recentLen > 0 && (recentLen > c.recentSize || (recentLen == c.recentSize && !recentEvict)) { + k, _, _ := c.recent.RemoveOldest() + c.recentEvict.Add(k, nil) + return + } + + // Remove from the frequent list otherwise + c.frequent.RemoveOldest() +} + +// Len returns the number of items in the cache. +func (c *TwoQueueCache) Len() int { + c.lock.RLock() + defer c.lock.RUnlock() + return c.recent.Len() + c.frequent.Len() +} + +// Keys returns a slice of the keys in the cache. +// The frequently used keys are first in the returned slice. +func (c *TwoQueueCache) Keys() []interface{} { + c.lock.RLock() + defer c.lock.RUnlock() + k1 := c.frequent.Keys() + k2 := c.recent.Keys() + return append(k1, k2...) +} + +// Remove removes the provided key from the cache. +func (c *TwoQueueCache) Remove(key interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + if c.frequent.Remove(key) { + return + } + if c.recent.Remove(key) { + return + } + if c.recentEvict.Remove(key) { + return + } +} + +// Purge is used to completely clear the cache. +func (c *TwoQueueCache) Purge() { + c.lock.Lock() + defer c.lock.Unlock() + c.recent.Purge() + c.frequent.Purge() + c.recentEvict.Purge() +} + +// Contains is used to check if the cache contains a key +// without updating recency or frequency. +func (c *TwoQueueCache) Contains(key interface{}) bool { + c.lock.RLock() + defer c.lock.RUnlock() + return c.frequent.Contains(key) || c.recent.Contains(key) +} + +// Peek is used to inspect the cache value of a key +// without updating recency or frequency. +func (c *TwoQueueCache) Peek(key interface{}) (value interface{}, ok bool) { + c.lock.RLock() + defer c.lock.RUnlock() + if val, ok := c.frequent.Peek(key); ok { + return val, ok + } + return c.recent.Peek(key) +} diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/LICENSE b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/LICENSE old mode 100755 new mode 100644 diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/README.md b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/README.md new file mode 100644 index 0000000000..33e58cfaf9 --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/README.md @@ -0,0 +1,25 @@ +golang-lru +========== + +This provides the `lru` package which implements a fixed-size +thread safe LRU cache. It is based on the cache in Groupcache. + +Documentation +============= + +Full docs are available on [Godoc](http://godoc.org/github.com/hashicorp/golang-lru) + +Example +======= + +Using the LRU is very simple: + +```go +l, _ := New(128) +for i := 0; i < 256; i++ { + l.Add(i, nil) +} +if l.Len() != 128 { + panic(fmt.Sprintf("bad len: %v", l.Len())) +} +``` diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/arc.go b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/arc.go new file mode 100644 index 0000000000..555225a218 --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/arc.go @@ -0,0 +1,257 @@ +package lru + +import ( + "sync" + + "github.com/hashicorp/golang-lru/simplelru" +) + +// ARCCache is a thread-safe fixed size Adaptive Replacement Cache (ARC). +// ARC is an enhancement over the standard LRU cache in that tracks both +// frequency and recency of use. This avoids a burst in access to new +// entries from evicting the frequently used older entries. It adds some +// additional tracking overhead to a standard LRU cache, computationally +// it is roughly 2x the cost, and the extra memory overhead is linear +// with the size of the cache. ARC has been patented by IBM, but is +// similar to the TwoQueueCache (2Q) which requires setting parameters. +type ARCCache struct { + size int // Size is the total capacity of the cache + p int // P is the dynamic preference towards T1 or T2 + + t1 simplelru.LRUCache // T1 is the LRU for recently accessed items + b1 simplelru.LRUCache // B1 is the LRU for evictions from t1 + + t2 simplelru.LRUCache // T2 is the LRU for frequently accessed items + b2 simplelru.LRUCache // B2 is the LRU for evictions from t2 + + lock sync.RWMutex +} + +// NewARC creates an ARC of the given size +func NewARC(size int) (*ARCCache, error) { + // Create the sub LRUs + b1, err := simplelru.NewLRU(size, nil) + if err != nil { + return nil, err + } + b2, err := simplelru.NewLRU(size, nil) + if err != nil { + return nil, err + } + t1, err := simplelru.NewLRU(size, nil) + if err != nil { + return nil, err + } + t2, err := simplelru.NewLRU(size, nil) + if err != nil { + return nil, err + } + + // Initialize the ARC + c := &ARCCache{ + size: size, + p: 0, + t1: t1, + b1: b1, + t2: t2, + b2: b2, + } + return c, nil +} + +// Get looks up a key's value from the cache. +func (c *ARCCache) Get(key interface{}) (value interface{}, ok bool) { + c.lock.Lock() + defer c.lock.Unlock() + + // If the value is contained in T1 (recent), then + // promote it to T2 (frequent) + if val, ok := c.t1.Peek(key); ok { + c.t1.Remove(key) + c.t2.Add(key, val) + return val, ok + } + + // Check if the value is contained in T2 (frequent) + if val, ok := c.t2.Get(key); ok { + return val, ok + } + + // No hit + return nil, false +} + +// Add adds a value to the cache. +func (c *ARCCache) Add(key, value interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + + // Check if the value is contained in T1 (recent), and potentially + // promote it to frequent T2 + if c.t1.Contains(key) { + c.t1.Remove(key) + c.t2.Add(key, value) + return + } + + // Check if the value is already in T2 (frequent) and update it + if c.t2.Contains(key) { + c.t2.Add(key, value) + return + } + + // Check if this value was recently evicted as part of the + // recently used list + if c.b1.Contains(key) { + // T1 set is too small, increase P appropriately + delta := 1 + b1Len := c.b1.Len() + b2Len := c.b2.Len() + if b2Len > b1Len { + delta = b2Len / b1Len + } + if c.p+delta >= c.size { + c.p = c.size + } else { + c.p += delta + } + + // Potentially need to make room in the cache + if c.t1.Len()+c.t2.Len() >= c.size { + c.replace(false) + } + + // Remove from B1 + c.b1.Remove(key) + + // Add the key to the frequently used list + c.t2.Add(key, value) + return + } + + // Check if this value was recently evicted as part of the + // frequently used list + if c.b2.Contains(key) { + // T2 set is too small, decrease P appropriately + delta := 1 + b1Len := c.b1.Len() + b2Len := c.b2.Len() + if b1Len > b2Len { + delta = b1Len / b2Len + } + if delta >= c.p { + c.p = 0 + } else { + c.p -= delta + } + + // Potentially need to make room in the cache + if c.t1.Len()+c.t2.Len() >= c.size { + c.replace(true) + } + + // Remove from B2 + c.b2.Remove(key) + + // Add the key to the frequently used list + c.t2.Add(key, value) + return + } + + // Potentially need to make room in the cache + if c.t1.Len()+c.t2.Len() >= c.size { + c.replace(false) + } + + // Keep the size of the ghost buffers trim + if c.b1.Len() > c.size-c.p { + c.b1.RemoveOldest() + } + if c.b2.Len() > c.p { + c.b2.RemoveOldest() + } + + // Add to the recently seen list + c.t1.Add(key, value) + return +} + +// replace is used to adaptively evict from either T1 or T2 +// based on the current learned value of P +func (c *ARCCache) replace(b2ContainsKey bool) { + t1Len := c.t1.Len() + if t1Len > 0 && (t1Len > c.p || (t1Len == c.p && b2ContainsKey)) { + k, _, ok := c.t1.RemoveOldest() + if ok { + c.b1.Add(k, nil) + } + } else { + k, _, ok := c.t2.RemoveOldest() + if ok { + c.b2.Add(k, nil) + } + } +} + +// Len returns the number of cached entries +func (c *ARCCache) Len() int { + c.lock.RLock() + defer c.lock.RUnlock() + return c.t1.Len() + c.t2.Len() +} + +// Keys returns all the cached keys +func (c *ARCCache) Keys() []interface{} { + c.lock.RLock() + defer c.lock.RUnlock() + k1 := c.t1.Keys() + k2 := c.t2.Keys() + return append(k1, k2...) +} + +// Remove is used to purge a key from the cache +func (c *ARCCache) Remove(key interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + if c.t1.Remove(key) { + return + } + if c.t2.Remove(key) { + return + } + if c.b1.Remove(key) { + return + } + if c.b2.Remove(key) { + return + } +} + +// Purge is used to clear the cache +func (c *ARCCache) Purge() { + c.lock.Lock() + defer c.lock.Unlock() + c.t1.Purge() + c.t2.Purge() + c.b1.Purge() + c.b2.Purge() +} + +// Contains is used to check if the cache contains a key +// without updating recency or frequency. +func (c *ARCCache) Contains(key interface{}) bool { + c.lock.RLock() + defer c.lock.RUnlock() + return c.t1.Contains(key) || c.t2.Contains(key) +} + +// Peek is used to inspect the cache value of a key +// without updating recency or frequency. +func (c *ARCCache) Peek(key interface{}) (value interface{}, ok bool) { + c.lock.RLock() + defer c.lock.RUnlock() + if val, ok := c.t1.Peek(key); ok { + return val, ok + } + return c.t2.Peek(key) +} diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/doc.go b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/doc.go new file mode 100644 index 0000000000..2547df979d --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/doc.go @@ -0,0 +1,21 @@ +// Package lru provides three different LRU caches of varying sophistication. +// +// Cache is a simple LRU cache. It is based on the +// LRU implementation in groupcache: +// https://github.com/golang/groupcache/tree/master/lru +// +// TwoQueueCache tracks frequently used and recently used entries separately. +// This avoids a burst of accesses from taking out frequently used entries, +// at the cost of about 2x computational overhead and some extra bookkeeping. +// +// ARCCache is an adaptive replacement cache. It tracks recent evictions as +// well as recent usage in both the frequent and recent caches. Its +// computational overhead is comparable to TwoQueueCache, but the memory +// overhead is linear with the size of the cache. +// +// ARC has been patented by IBM, so do not use it if that is problematic for +// your program. +// +// All caches in this package take locks while operating, and are therefore +// thread-safe for consumers. +package lru diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/go.mod b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/go.mod new file mode 100644 index 0000000000..8ad8826b36 --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/go.mod @@ -0,0 +1,3 @@ +module github.com/hashicorp/golang-lru + +go 1.12 diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/lru.go b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/lru.go new file mode 100644 index 0000000000..4e5e9d8fd0 --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/lru.go @@ -0,0 +1,150 @@ +package lru + +import ( + "sync" + + "github.com/hashicorp/golang-lru/simplelru" +) + +// Cache is a thread-safe fixed size LRU cache. +type Cache struct { + lru simplelru.LRUCache + lock sync.RWMutex +} + +// New creates an LRU of the given size. +func New(size int) (*Cache, error) { + return NewWithEvict(size, nil) +} + +// NewWithEvict constructs a fixed size cache with the given eviction +// callback. +func NewWithEvict(size int, onEvicted func(key interface{}, value interface{})) (*Cache, error) { + lru, err := simplelru.NewLRU(size, simplelru.EvictCallback(onEvicted)) + if err != nil { + return nil, err + } + c := &Cache{ + lru: lru, + } + return c, nil +} + +// Purge is used to completely clear the cache. +func (c *Cache) Purge() { + c.lock.Lock() + c.lru.Purge() + c.lock.Unlock() +} + +// Add adds a value to the cache. Returns true if an eviction occurred. +func (c *Cache) Add(key, value interface{}) (evicted bool) { + c.lock.Lock() + evicted = c.lru.Add(key, value) + c.lock.Unlock() + return evicted +} + +// Get looks up a key's value from the cache. +func (c *Cache) Get(key interface{}) (value interface{}, ok bool) { + c.lock.Lock() + value, ok = c.lru.Get(key) + c.lock.Unlock() + return value, ok +} + +// Contains checks if a key is in the cache, without updating the +// recent-ness or deleting it for being stale. +func (c *Cache) Contains(key interface{}) bool { + c.lock.RLock() + containKey := c.lru.Contains(key) + c.lock.RUnlock() + return containKey +} + +// Peek returns the key value (or undefined if not found) without updating +// the "recently used"-ness of the key. +func (c *Cache) Peek(key interface{}) (value interface{}, ok bool) { + c.lock.RLock() + value, ok = c.lru.Peek(key) + c.lock.RUnlock() + return value, ok +} + +// ContainsOrAdd checks if a key is in the cache without updating the +// recent-ness or deleting it for being stale, and if not, adds the value. +// Returns whether found and whether an eviction occurred. +func (c *Cache) ContainsOrAdd(key, value interface{}) (ok, evicted bool) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.lru.Contains(key) { + return true, false + } + evicted = c.lru.Add(key, value) + return false, evicted +} + +// PeekOrAdd checks if a key is in the cache without updating the +// recent-ness or deleting it for being stale, and if not, adds the value. +// Returns whether found and whether an eviction occurred. +func (c *Cache) PeekOrAdd(key, value interface{}) (previous interface{}, ok, evicted bool) { + c.lock.Lock() + defer c.lock.Unlock() + + previous, ok = c.lru.Peek(key) + if ok { + return previous, true, false + } + + evicted = c.lru.Add(key, value) + return nil, false, evicted +} + +// Remove removes the provided key from the cache. +func (c *Cache) Remove(key interface{}) (present bool) { + c.lock.Lock() + present = c.lru.Remove(key) + c.lock.Unlock() + return +} + +// Resize changes the cache size. +func (c *Cache) Resize(size int) (evicted int) { + c.lock.Lock() + evicted = c.lru.Resize(size) + c.lock.Unlock() + return evicted +} + +// RemoveOldest removes the oldest item from the cache. +func (c *Cache) RemoveOldest() (key interface{}, value interface{}, ok bool) { + c.lock.Lock() + key, value, ok = c.lru.RemoveOldest() + c.lock.Unlock() + return +} + +// GetOldest returns the oldest entry +func (c *Cache) GetOldest() (key interface{}, value interface{}, ok bool) { + c.lock.Lock() + key, value, ok = c.lru.GetOldest() + c.lock.Unlock() + return +} + +// Keys returns a slice of the keys in the cache, from oldest to newest. +func (c *Cache) Keys() []interface{} { + c.lock.RLock() + keys := c.lru.Keys() + c.lock.RUnlock() + return keys +} + +// Len returns the number of items in the cache. +func (c *Cache) Len() int { + c.lock.RLock() + length := c.lru.Len() + c.lock.RUnlock() + return length +} diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/simplelru/lru.go b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/simplelru/lru.go new file mode 100644 index 0000000000..a86c8539e0 --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/simplelru/lru.go @@ -0,0 +1,177 @@ +package simplelru + +import ( + "container/list" + "errors" +) + +// EvictCallback is used to get a callback when a cache entry is evicted +type EvictCallback func(key interface{}, value interface{}) + +// LRU implements a non-thread safe fixed size LRU cache +type LRU struct { + size int + evictList *list.List + items map[interface{}]*list.Element + onEvict EvictCallback +} + +// entry is used to hold a value in the evictList +type entry struct { + key interface{} + value interface{} +} + +// NewLRU constructs an LRU of the given size +func NewLRU(size int, onEvict EvictCallback) (*LRU, error) { + if size <= 0 { + return nil, errors.New("Must provide a positive size") + } + c := &LRU{ + size: size, + evictList: list.New(), + items: make(map[interface{}]*list.Element), + onEvict: onEvict, + } + return c, nil +} + +// Purge is used to completely clear the cache. +func (c *LRU) Purge() { + for k, v := range c.items { + if c.onEvict != nil { + c.onEvict(k, v.Value.(*entry).value) + } + delete(c.items, k) + } + c.evictList.Init() +} + +// Add adds a value to the cache. Returns true if an eviction occurred. +func (c *LRU) Add(key, value interface{}) (evicted bool) { + // Check for existing item + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + ent.Value.(*entry).value = value + return false + } + + // Add new item + ent := &entry{key, value} + entry := c.evictList.PushFront(ent) + c.items[key] = entry + + evict := c.evictList.Len() > c.size + // Verify size not exceeded + if evict { + c.removeOldest() + } + return evict +} + +// Get looks up a key's value from the cache. +func (c *LRU) Get(key interface{}) (value interface{}, ok bool) { + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + if ent.Value.(*entry) == nil { + return nil, false + } + return ent.Value.(*entry).value, true + } + return +} + +// Contains checks if a key is in the cache, without updating the recent-ness +// or deleting it for being stale. +func (c *LRU) Contains(key interface{}) (ok bool) { + _, ok = c.items[key] + return ok +} + +// Peek returns the key value (or undefined if not found) without updating +// the "recently used"-ness of the key. +func (c *LRU) Peek(key interface{}) (value interface{}, ok bool) { + var ent *list.Element + if ent, ok = c.items[key]; ok { + return ent.Value.(*entry).value, true + } + return nil, ok +} + +// Remove removes the provided key from the cache, returning if the +// key was contained. +func (c *LRU) Remove(key interface{}) (present bool) { + if ent, ok := c.items[key]; ok { + c.removeElement(ent) + return true + } + return false +} + +// RemoveOldest removes the oldest item from the cache. +func (c *LRU) RemoveOldest() (key interface{}, value interface{}, ok bool) { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + kv := ent.Value.(*entry) + return kv.key, kv.value, true + } + return nil, nil, false +} + +// GetOldest returns the oldest entry +func (c *LRU) GetOldest() (key interface{}, value interface{}, ok bool) { + ent := c.evictList.Back() + if ent != nil { + kv := ent.Value.(*entry) + return kv.key, kv.value, true + } + return nil, nil, false +} + +// Keys returns a slice of the keys in the cache, from oldest to newest. +func (c *LRU) Keys() []interface{} { + keys := make([]interface{}, len(c.items)) + i := 0 + for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() { + keys[i] = ent.Value.(*entry).key + i++ + } + return keys +} + +// Len returns the number of items in the cache. +func (c *LRU) Len() int { + return c.evictList.Len() +} + +// Resize changes the cache size. +func (c *LRU) Resize(size int) (evicted int) { + diff := c.Len() - size + if diff < 0 { + diff = 0 + } + for i := 0; i < diff; i++ { + c.removeOldest() + } + c.size = size + return diff +} + +// removeOldest removes the oldest item from the cache. +func (c *LRU) removeOldest() { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + } +} + +// removeElement is used to remove a given list element from the cache +func (c *LRU) removeElement(e *list.Element) { + c.evictList.Remove(e) + kv := e.Value.(*entry) + delete(c.items, kv.key) + if c.onEvict != nil { + c.onEvict(kv.key, kv.value) + } +} diff --git a/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/simplelru/lru_interface.go b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/simplelru/lru_interface.go new file mode 100644 index 0000000000..92d70934d6 --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/hashicorp/golang-lru/simplelru/lru_interface.go @@ -0,0 +1,39 @@ +package simplelru + +// LRUCache is the interface for simple LRU cache. +type LRUCache interface { + // Adds a value to the cache, returns true if an eviction occurred and + // updates the "recently used"-ness of the key. + Add(key, value interface{}) bool + + // Returns key's value from the cache and + // updates the "recently used"-ness of the key. #value, isFound + Get(key interface{}) (value interface{}, ok bool) + + // Checks if a key exists in cache without updating the recent-ness. + Contains(key interface{}) (ok bool) + + // Returns key's value without updating the "recently used"-ness of the key. + Peek(key interface{}) (value interface{}, ok bool) + + // Removes a key from the cache. + Remove(key interface{}) bool + + // Removes the oldest entry from cache. + RemoveOldest() (interface{}, interface{}, bool) + + // Returns the oldest entry from the cache. #key, value, isFound + GetOldest() (interface{}, interface{}, bool) + + // Returns a slice of the keys in the cache, from oldest to newest. + Keys() []interface{} + + // Returns the number of items in the cache. + Len() int + + // Clears all cache entries. + Purge() + + // Resizes cache, returning number evicted + Resize(int) int +} diff --git a/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_lifecycle.go b/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_lifecycle.go index 5dceb767ab..c2ebce95cf 100644 --- a/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_lifecycle.go +++ b/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_lifecycle.go @@ -23,7 +23,7 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" ) -var triggerCondSet = apis.NewLivingConditionSet(TriggerConditionBroker, TriggerConditionSubscribed, TriggerConditionDependency, TriggerConditionSubscriberResolved) +var triggerCondSet = apis.NewLivingConditionSet(TriggerConditionBroker, TriggerConditionSubscribed, TriggerConditionDependency, TriggerConditionSubscriberResolved, TriggerConditionDeadLetterSinkResolved) const ( // TriggerConditionReady has status True when all subconditions below have been set to True. @@ -37,6 +37,8 @@ const ( TriggerConditionSubscriberResolved apis.ConditionType = "SubscriberResolved" + TriggerConditionDeadLetterSinkResolved apis.ConditionType = "DeadLetterSinkResolved" + // TriggerAnyFilter Constant to represent that we should allow anything. TriggerAnyFilter = "" ) @@ -150,6 +152,18 @@ func (ts *TriggerStatus) MarkSubscriberResolvedUnknown(reason, messageFormat str triggerCondSet.Manage(ts).MarkUnknown(TriggerConditionSubscriberResolved, reason, messageFormat, messageA...) } +func (ts *TriggerStatus) MarkDeadLetterSinkResolvedSucceeded() { + triggerCondSet.Manage(ts).MarkTrue(TriggerConditionDeadLetterSinkResolved) +} + +func (ts *TriggerStatus) MarkDeadLetterSinkNotConfigured() { + triggerCondSet.Manage(ts).MarkTrueWithReason(TriggerConditionDeadLetterSinkResolved, "DeadLetterSinkNotConfigured", "No dead letter sink is configured.") +} + +func (ts *TriggerStatus) MarkDeadLetterSinkResolvedFailed(reason, messageFormat string, messageA ...interface{}) { + triggerCondSet.Manage(ts).MarkFalse(TriggerConditionDeadLetterSinkResolved, reason, messageFormat, messageA...) +} + func (ts *TriggerStatus) MarkDependencySucceeded() { triggerCondSet.Manage(ts).MarkTrue(TriggerConditionDependency) } diff --git a/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_types.go b/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_types.go index f40e802929..97ffc8759f 100644 --- a/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_types.go +++ b/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_types.go @@ -119,6 +119,10 @@ type TriggerStatus struct { // SubscriberURI is the resolved URI of the receiver for this Trigger. // +optional SubscriberURI *apis.URL `json:"subscriberUri,omitempty"` + + // DeadLetterURI is the resolved URI of the dead letter sink for this Trigger. + // +optional + DeadLetterURI *apis.URL `json:"deadLetterUri,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/vendor/knative.dev/eventing/pkg/apis/eventing/v1/zz_generated.deepcopy.go b/vendor/knative.dev/eventing/pkg/apis/eventing/v1/zz_generated.deepcopy.go index c1f4be2487..7be79a4ee5 100644 --- a/vendor/knative.dev/eventing/pkg/apis/eventing/v1/zz_generated.deepcopy.go +++ b/vendor/knative.dev/eventing/pkg/apis/eventing/v1/zz_generated.deepcopy.go @@ -274,6 +274,11 @@ func (in *TriggerStatus) DeepCopyInto(out *TriggerStatus) { *out = new(apis.URL) (*in).DeepCopyInto(*out) } + if in.DeadLetterURI != nil { + in, out := &in.DeadLetterURI, &out.DeadLetterURI + *out = new(apis.URL) + (*in).DeepCopyInto(*out) + } return } diff --git a/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/broker.go b/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/broker.go index 5412b3b0bd..e417270ba8 100644 --- a/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/broker.go +++ b/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/broker.go @@ -18,6 +18,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" v1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" @@ -192,3 +193,19 @@ func WithChannelNameAnnotation(name string) BrokerOption { b.Status.Annotations[eventing.BrokerChannelNameStatusAnnotationKey] = name } } + +func WithDeadLeaderSink(ref *duckv1.KReference, uri string) BrokerOption { + return func(b *v1.Broker) { + if b.Spec.Delivery == nil { + b.Spec.Delivery = new(eventingv1.DeliverySpec) + } + var u *apis.URL + if uri != "" { + u, _ = apis.ParseURL(uri) + } + b.Spec.Delivery.DeadLetterSink = &duckv1.Destination{ + Ref: ref, + URI: u, + } + } +} diff --git a/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/trigger.go b/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/trigger.go index 73e5b8539b..bd9ba6ad0b 100644 --- a/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/trigger.go +++ b/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/trigger.go @@ -189,6 +189,13 @@ func WithTriggerStatusSubscriberURI(uri string) TriggerOption { } } +func WithTriggerStatusDeadLetterSinkURI(uri string) TriggerOption { + return func(t *v1.Trigger) { + u, _ := apis.ParseURL(uri) + t.Status.DeadLetterURI = u + } +} + func WithAnnotation(key, value string) TriggerOption { return func(t *v1.Trigger) { if t.Annotations == nil { @@ -237,6 +244,24 @@ func WithTriggerSubscriberResolvedFailed(reason, message string) TriggerOption { } } +func WithTriggerDeadLetterSinkResolvedFailed(reason, message string) TriggerOption { + return func(t *v1.Trigger) { + t.Status.MarkDeadLetterSinkResolvedFailed(reason, message) + } +} + +func WithTriggerDeadLetterSinkResolvedSucceeded() TriggerOption { + return func(t *v1.Trigger) { + t.Status.MarkDeadLetterSinkResolvedSucceeded() + } +} + +func WithTriggerDeadLetterSinkNotConfigured() TriggerOption { + return func(t *v1.Trigger) { + t.Status.MarkDeadLetterSinkNotConfigured() + } +} + func WithTriggerSubscriberResolvedUnknown(reason, message string) TriggerOption { return func(t *v1.Trigger) { t.Status.MarkSubscriberResolvedUnknown(reason, message) diff --git a/vendor/knative.dev/pkg/test/spoof/error_checks.go b/vendor/knative.dev/pkg/test/spoof/error_checks.go index e152e11d79..378bf58240 100644 --- a/vendor/knative.dev/pkg/test/spoof/error_checks.go +++ b/vendor/knative.dev/pkg/test/spoof/error_checks.go @@ -58,3 +58,7 @@ func isConnectionRefused(err error) bool { func isConnectionReset(err error) bool { return err != nil && strings.Contains(err.Error(), "connection reset by peer") } + +func isNoRouteToHostError(err error) bool { + return err != nil && strings.Contains(err.Error(), "connect: no route to host") +} diff --git a/vendor/knative.dev/pkg/test/spoof/spoof.go b/vendor/knative.dev/pkg/test/spoof/spoof.go index 0513f7e0f1..74e735623b 100644 --- a/vendor/knative.dev/pkg/test/spoof/spoof.go +++ b/vendor/knative.dev/pkg/test/spoof/spoof.go @@ -223,6 +223,11 @@ func DefaultErrorRetryChecker(err error) (bool, error) { if errors.Is(err, io.EOF) { return true, fmt.Errorf("retrying for: %w", err) } + // No route to host errors are in the same category as connection refused errors and + // are usually transient. + if isNoRouteToHostError(err) { + return true, fmt.Errorf("retrying for 'no route to host' error: %w", err) + } return false, err } diff --git a/vendor/knative.dev/reconciler-test/cmd/eventshub/main.go b/vendor/knative.dev/reconciler-test/cmd/eventshub/main.go index af0606d677..566b2cd92d 100644 --- a/vendor/knative.dev/reconciler-test/cmd/eventshub/main.go +++ b/vendor/knative.dev/reconciler-test/cmd/eventshub/main.go @@ -19,9 +19,6 @@ package main import ( "context" - "github.com/kelseyhightower/envconfig" - "golang.org/x/sync/errgroup" - "knative.dev/pkg/injection" "knative.dev/pkg/logging" "knative.dev/reconciler-test/pkg/eventshub" @@ -31,66 +28,23 @@ import ( "knative.dev/reconciler-test/pkg/eventshub/sender" ) -type envConfig struct { - EventGenerators []string `envconfig:"EVENT_GENERATORS" required:"true"` - EventLogs []string `envconfig:"EVENT_LOGS" required:"true"` -} - func main() { - //nolint // nil ctx is fine here, look at the code of EnableInjectionOrDie - ctx, _ := injection.EnableInjectionOrDie(nil, nil) - ctx = eventshub.ConfigureLogging(ctx, "eventshub") - - if err := eventshub.ConfigureTracing(logging.FromContext(ctx), ""); err != nil { - logging.FromContext(ctx).Fatal("Unable to setup trace publishing", err) - } - - var env envConfig - if err := envconfig.Process("", &env); err != nil { - logging.FromContext(ctx).Fatal("Failed to process env var", err) - } - logging.FromContext(ctx).Infof("Events Hub environment configuration: %+v", env) - - eventLogs := createEventLogs(ctx, env.EventLogs) - err := startEventGenerators(ctx, env.EventGenerators, eventLogs) - - if err != nil { - logging.FromContext(ctx).Fatal("Error during start: ", err) - } - - logging.FromContext(ctx).Info("Closing the eventshub process") -} - -func createEventLogs(ctx context.Context, logTypes []string) *eventshub.EventLogs { - var l []eventshub.EventLog - for _, logType := range logTypes { - switch eventshub.EventLogType(logType) { - case eventshub.RecorderEventLog: - l = append(l, recorder_vent.NewFromEnv(ctx)) - case eventshub.LoggerEventLog: - l = append(l, logger_vent.Logger(logging.FromContext(ctx).Named("event logger").Infof)) - default: - logging.FromContext(ctx).Fatal("Cannot recognize event log type: ", logType) - } - } - return eventshub.NewEventLogs(l...) -} - -func startEventGenerators(ctx context.Context, genTypes []string, eventLogs *eventshub.EventLogs) error { - errs, _ := errgroup.WithContext(ctx) - for _, genType := range genTypes { - switch eventshub.EventGeneratorType(genType) { - case eventshub.ReceiverEventGenerator: - errs.Go(func() error { - return receiver.NewFromEnv(ctx, eventLogs).Start(ctx, eventshub.WithTracing) - }) - case eventshub.SenderEventGenerator: - errs.Go(func() error { - return sender.Start(ctx, eventLogs) - }) - default: - logging.FromContext(ctx).Fatal("Cannot recognize event generator type: ", genType) - } - } - return errs.Wait() + eventshub.Start( + map[string]eventshub.EventLogFactory{ + eventshub.RecorderEventLog: func(ctx context.Context) (eventshub.EventLog, error) { + return recorder_vent.NewFromEnv(ctx), nil + }, + eventshub.LoggerEventLog: func(ctx context.Context) (eventshub.EventLog, error) { + return logger_vent.Logger(logging.FromContext(ctx).Named("event logger").Infof), nil + }, + }, + map[string]eventshub.EventGeneratorStarter{ + eventshub.ReceiverEventGenerator: func(ctx context.Context, logs *eventshub.EventLogs) error { + return receiver.NewFromEnv(ctx, logs).Start(ctx, eventshub.WithTracing) + }, + eventshub.SenderEventGenerator: func(ctx context.Context, logs *eventshub.EventLogs) error { + return sender.Start(ctx, logs) + }, + }, + ) } diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml b/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml index fdf1bfe80b..faf7d6e8b3 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml @@ -24,7 +24,7 @@ spec: restartPolicy: "Never" containers: - name: eventshub - image: ko://knative.dev/reconciler-test/cmd/eventshub + image: {{ .image }} imagePullPolicy: "IfNotPresent" env: - name: "SYSTEM_NAMESPACE" diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/event_info.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/event_info.go index 8035d892a8..e8f2ca0540 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/event_info.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/event_info.go @@ -64,6 +64,9 @@ type EventInfo struct { // This is filled with the ID of the sent event (if any) and in the Response also // jot it down so you can correlate which event (ID) as well as sequence to match sent/response 1:1. SentId string `json:"id"` + + // AdditionalInfo can be used by event generator implementations to add more event details + AdditionalInfo map[string]interface{} `json:"additionalInfo"` } // Pretty print the event. Meant for debugging. diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/event_log.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/event_log.go index b68139a329..dc17e88f5c 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/event_log.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/event_log.go @@ -44,16 +44,10 @@ func (e *EventLogs) Vent(observed EventInfo) error { return nil } -type EventGeneratorType string - const ( - ReceiverEventGenerator EventGeneratorType = "receiver" - SenderEventGenerator EventGeneratorType = "sender" -) + ReceiverEventGenerator string = "receiver" + SenderEventGenerator string = "sender" -type EventLogType string - -const ( - RecorderEventLog EventLogType = "recorder" - LoggerEventLog EventLogType = "logger" + RecorderEventLog string = "recorder" + LoggerEventLog string = "logger" ) diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub.go new file mode 100644 index 0000000000..acf4911189 --- /dev/null +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub.go @@ -0,0 +1,97 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventshub + +import ( + "context" + + "github.com/kelseyhightower/envconfig" + "golang.org/x/sync/errgroup" + "knative.dev/pkg/injection" + "knative.dev/pkg/logging" +) + +type envConfig struct { + EventGenerators []string `envconfig:"EVENT_GENERATORS" required:"true"` + EventLogs []string `envconfig:"EVENT_LOGS" required:"true"` +} + +// EventLogFactory creates a new EventLog instance. +type EventLogFactory func(context.Context) (EventLog, error) + +// EventGeneratorStarter starts a new event generator. This function is executed in a separate goroutine, so it can block. +type EventGeneratorStarter func(context.Context, *EventLogs) error + +// Start starts a new eventshub process, with the provided factories. +// You can create your own eventshub providing event log factories and event generator factories. +func Start(eventLogFactories map[string]EventLogFactory, eventGeneratorFactories map[string]EventGeneratorStarter) { + //nolint // nil ctx is fine here, look at the code of EnableInjectionOrDie + ctx, _ := injection.EnableInjectionOrDie(nil, nil) + ctx = ConfigureLogging(ctx, "eventshub") + + if err := ConfigureTracing(logging.FromContext(ctx), ""); err != nil { + logging.FromContext(ctx).Fatal("Unable to setup trace publishing", err) + } + + var env envConfig + if err := envconfig.Process("", &env); err != nil { + logging.FromContext(ctx).Fatal("Failed to process env var", err) + } + logging.FromContext(ctx).Infof("Events Hub environment configuration: %+v", env) + + eventLogs := createEventLogs(ctx, eventLogFactories, env.EventLogs) + err := startEventGenerators(ctx, eventGeneratorFactories, env.EventGenerators, eventLogs) + + if err != nil { + logging.FromContext(ctx).Fatal("Error during start: ", err) + } + + logging.FromContext(ctx).Info("Closing the eventshub process") +} + +func createEventLogs(ctx context.Context, factories map[string]EventLogFactory, logTypes []string) *EventLogs { + var eventLogs []EventLog + for _, logType := range logTypes { + factory, ok := factories[logType] + if !ok { + logging.FromContext(ctx).Fatal("Cannot recognize event log type: ", logType) + } + + eventLog, err := factory(ctx) + if err != nil { + logging.FromContext(ctx).Fatalf("Error while instantiating the event log %s: %s", logType, err) + } + + eventLogs = append(eventLogs, eventLog) + } + return NewEventLogs(eventLogs...) +} + +func startEventGenerators(ctx context.Context, factories map[string]EventGeneratorStarter, genTypes []string, eventLogs *EventLogs) error { + errs, _ := errgroup.WithContext(ctx) + for _, genType := range genTypes { + factory, ok := factories[genType] + if !ok { + logging.FromContext(ctx).Fatal("Cannot recognize event generator type: ", genType) + } + + errs.Go(func() error { + return factory(ctx, eventLogs) + }) + } + return errs.Wait() +} diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub_image.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub_image.go new file mode 100644 index 0000000000..5c79e80ed9 --- /dev/null +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub_image.go @@ -0,0 +1,45 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventshub + +import ( + "context" + + "knative.dev/reconciler-test/pkg/environment" +) + +const ( + thisPackage = "knative.dev/reconciler-test/cmd/eventshub" + defaultEventshubImage = "ko://" + thisPackage +) + +type eventshubImageKey struct{} + +// ImageFromContext gets the eventshub image from context +func ImageFromContext(ctx context.Context) string { + if e, ok := ctx.Value(eventshubImageKey{}).(string); ok { + return e + } + return defaultEventshubImage +} + +// WithCustomImage allows you to specify a custom eventshub image to be used when invoking eventshub.Install +func WithCustomImage(image string) environment.EnvOpts { + return func(ctx context.Context, env environment.Environment) (context.Context, error) { + return context.WithValue(ctx, eventshubImageKey{}, image), nil + } +} diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go index 5b2c4a94d5..f3f8f01abb 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go @@ -113,6 +113,13 @@ func DropFirstN(n uint) EventsHubOption { ) } +// DropEventsResponseCode will cause the receiver to reply with the specific status code to the dropped events +func DropEventsResponseCode(code int) EventsHubOption { + return compose( + envOption("SKIP_RESPONSE_CODE", strconv.Itoa(code)), + ) +} + // --- Sender options // InitialSenderDelay defines how much the sender has to wait, when started, before start sending events. diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/prober.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/prober.go index 4050e0d378..8f18f7662d 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/prober.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/prober.go @@ -103,6 +103,11 @@ func (p *EventProber) ReceiversRejectFirstN(n uint) { p.receiverOptions = append(p.receiverOptions, DropFirstN(n)) } +// ReceiversRejectResponseCode adds DropEventsResponseCode to the default config for new receivers. +func (p *EventProber) ReceiversRejectResponseCode(code int) { + p.receiverOptions = append(p.receiverOptions, DropEventsResponseCode(code)) +} + // ReceiversHaveResponseDelay adds ResponseWaitTime to the default config for // new receivers. func (p *EventProber) ReceiversHaveResponseDelay(delay time.Duration) { diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/receiver/receiver.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/receiver/receiver.go index daf4da0db4..314244b706 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/receiver/receiver.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/receiver/receiver.go @@ -47,6 +47,7 @@ type Receiver struct { replyFunc func(context.Context, http.ResponseWriter, eventshub.EventInfo) counter *dropevents.CounterHandler responseWaitTime time.Duration + skipResponseCode int } type envConfig struct { @@ -78,6 +79,9 @@ type envConfig struct { // If events should be dropped according to Linear policy, this controls // how many events are dropped. SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0" required:"false"` + + // If events should be dropped, specify the HTTP response code here. + SkipResponseCode int `envconfig:"SKIP_RESPONSE_CODE" default:"409" required:"false"` } func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs) *Receiver { @@ -121,6 +125,7 @@ func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs) *Receiver { replyFunc: replyFunc, counter: counter, responseWaitTime: responseWaitTime, + skipResponseCode: env.SkipResponseCode, } } @@ -211,7 +216,7 @@ func (o *Receiver) ServeHTTP(writer http.ResponseWriter, request *http.Request) if shouldSkip { // Trigger a redelivery - writer.WriteHeader(http.StatusConflict) + writer.WriteHeader(o.skipResponseCode) } else { o.replyFunc(o.ctx, writer, eventInfo) } diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go index 0dfcbcda14..d31ba36342 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go @@ -32,7 +32,7 @@ import ( var templates embed.FS func init() { - environment.RegisterPackage(manifest.ImagesFromFS(templates)...) + environment.RegisterPackage(thisPackage) } // Install starts a new eventshub with the provided name @@ -67,8 +67,9 @@ func Install(name string, options ...EventsHubOption) feature.StepFn { // Deploy if _, err := manifest.InstallYamlFS(ctx, templates, map[string]interface{}{ - "name": name, - "envs": envs, + "name": name, + "envs": envs, + "image": ImageFromContext(ctx), }); err != nil { t.Fatal(err) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 89aa1b13bd..07cf3908e2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1026,7 +1026,7 @@ k8s.io/utils/buffer k8s.io/utils/integer k8s.io/utils/pointer k8s.io/utils/trace -# knative.dev/eventing v0.24.0 +# knative.dev/eventing v0.24.1-0.20210708130023-221dfdfced62 ## explicit knative.dev/eventing/pkg/apis/config knative.dev/eventing/pkg/apis/duck @@ -1137,7 +1137,7 @@ knative.dev/eventing/test/test_images/request-sender # knative.dev/hack v0.0.0-20210622141627-e28525d8d260 ## explicit knative.dev/hack -# knative.dev/pkg v0.0.0-20210628225612-51cfaabbcdf6 +# knative.dev/pkg v0.0.0-20210706174620-fe90576475ca ## explicit knative.dev/pkg/apis knative.dev/pkg/apis/duck @@ -1228,7 +1228,7 @@ knative.dev/pkg/webhook/certificates/resources knative.dev/pkg/webhook/resourcesemantics knative.dev/pkg/webhook/resourcesemantics/defaulting knative.dev/pkg/webhook/resourcesemantics/validation -# knative.dev/reconciler-test v0.0.0-20210630182710-2a6d91dfee1e +# knative.dev/reconciler-test v0.0.0-20210707164418-32f0df0c7399 ## explicit knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment