diff --git a/.chloggen/deltatocumulative-evict-only-stale.yaml b/.chloggen/deltatocumulative-evict-only-stale.yaml new file mode 100644 index 000000000000..eb253a3c61d7 --- /dev/null +++ b/.chloggen/deltatocumulative-evict-only-stale.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulativeprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Evict only stale streams + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33014] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Changes eviction behavior to only evict streams that are actually stale. + Currently, once the stream limit is hit, on each new stream the oldest tracked one is evicted. + Under heavy load this can rapidly delete all streams over and over, rendering the processor useless. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/extension-storage-filestorage-cleanup_on_start.yaml b/.chloggen/extension-storage-filestorage-cleanup_on_start.yaml new file mode 100644 index 000000000000..75e33b239e40 --- /dev/null +++ b/.chloggen/extension-storage-filestorage-cleanup_on_start.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: extension/storage/filestorage + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: New flag cleanup_on_start for the compaction section (default=false). + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32863] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + It will remove all temporary files in the compaction directory (those which start with `tempdb`), + temp files will be left if a previous run of the process is killed while compacting. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/feat_opamp-supervisor-persist-instance-id.yaml b/.chloggen/feat_opamp-supervisor-persist-instance-id.yaml new file mode 100644 index 000000000000..4461e759891d --- /dev/null +++ b/.chloggen/feat_opamp-supervisor-persist-instance-id.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allows the supervisor to persist its instance ID between restarts. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21073] diff --git a/.chloggen/feat_supervisor-agent-description.yaml b/.chloggen/feat_supervisor-agent-description.yaml new file mode 100644 index 000000000000..eba148aad42c --- /dev/null +++ b/.chloggen/feat_supervisor-agent-description.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds the ability to configure the agent description + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32824] diff --git a/.chloggen/fix_hec_single_value.yaml b/.chloggen/fix_hec_single_value.yaml new file mode 100644 index 000000000000..71eefe372c49 --- /dev/null +++ b/.chloggen/fix_hec_single_value.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix single metric value parsing + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33084] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/mx-psi_stepping-beta.yaml b/.chloggen/mx-psi_stepping-beta.yaml new file mode 100644 index 000000000000..6c7d4aed8d09 --- /dev/null +++ b/.chloggen/mx-psi_stepping-beta.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: resourcedetectionprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change type of `host.cpu.stepping` from int to string. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31136] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + - Disable the `processor.resourcedetection.hostCPUSteppingAsString` feature gate to get the old behavior. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 20670c429e64..03589f83dabd 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -69,7 +69,7 @@ exporter/lokiexporter/ @open-telemetry/collect exporter/mezmoexporter/ @open-telemetry/collector-contrib-approvers @dashpole @billmeyer @gjanco exporter/opencensusexporter/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers exporter/opensearchexporter/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @MitchellGale @MaxKsyunz @YANG-DB -exporter/otelarrowexporter/ @open-telemetry/collector-contrib-approvers @jmacd @moh-osman3 +exporter/otelarrowexporter/ @open-telemetry/collector-contrib-approvers @jmacd @moh-osman3 @codeboten exporter/prometheusexporter/ @open-telemetry/collector-contrib-approvers @Aneurysm9 exporter/prometheusremotewriteexporter/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @rapphil exporter/pulsarexporter/ @open-telemetry/collector-contrib-approvers @dmitryax @dao-jun @@ -268,7 +268,7 @@ receiver/sqlqueryreceiver/ @open-telemetry/collect receiver/sqlserverreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski @StefanKurek receiver/sshcheckreceiver/ @open-telemetry/collector-contrib-approvers @nslaughter @codeboten receiver/statsdreceiver/ @open-telemetry/collector-contrib-approvers @jmacd @dmitryax -receiver/syslogreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski +receiver/syslogreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski @andrzej-stencel receiver/tcplogreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski receiver/udplogreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski receiver/vcenterreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski @schmikei @StefanKurek diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d7f4c79bf23a..f4257d883714 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -113,11 +113,10 @@ With above guidelines, you can write code that is more portable and easier to ma issue](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/new?assignees=&labels=Sponsor+Needed%2Cneeds+triage&projects=&template=new_component.yaml&title=New+component%3A+) providing the following information: -* Who's the sponsor for your component. A sponsor is an approver who will be in charge of being the official reviewer of - the code and become a code owner for the component. For vendor-specific components, it's good to have a volunteer - sponsor. If you can't find one, we'll assign one in a round-robin fashion. A vendor-specific component directly interfaces - with a vendor-specific API and is expected to be maintained by a representative of the same vendor. For non-vendor specific - components, having a sponsor means that your use case has been validated. +* Who's the sponsor for your component. A sponsor is an approver or maintainer who will be the official reviewer of the code and a code owner + for the component. For vendor-specific components, it is always preferred to find a sponsor. However, if the vendor has not yet contributed + a component of the same class (i.e. receiver, processor, exporter, connector, or extension), then a sponsor will be assigned in a + round-robin fashion. In all other cases, you will need to find a sponsor for the component in order for it to be accepted. * Some information about your component, such as the reasoning behind it, use-cases, telemetry data types supported, and anything else you think is relevant for us to make a decision about accepting the component. * The configuration options your component will accept. This will give us a better understanding of what it does, and diff --git a/README.md b/README.md index 3b221077ede0..dad96647a1f5 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ Emeritus Approvers: Maintainers ([@open-telemetry/collector-contrib-maintainer](https://github.com/orgs/open-telemetry/teams/collector-contrib-maintainer)): - [Alex Boten](https://github.com/codeboten), Honeycomb -- [Andrzej Stencel](https://github.com/andrzej-stencel), Sumo Logic +- [Andrzej Stencel](https://github.com/andrzej-stencel), Elastic - [Bogdan Drutu](https://github.com/bogdandrutu), Snowflake - [Daniel Jaglowski](https://github.com/djaglowski), observIQ - [Dmitrii Anoshin](https://github.com/dmitryax), Splunk @@ -137,8 +137,3 @@ The facilitator is not required to perform a thorough review, but they are encou enforce Collector best practices and consistency across the codebase and component behavior. The facilitators will typically rely on codeowner's detailed review of the code when making the final approval decision. - -We recommend maintainers and approvers to keep an eye on the -[project board](https://github.com/orgs/open-telemetry/projects/3). All newly created -PRs are automatically added to this board. (If you don't see the PR on the board you -may need to add it manually by setting the Project field in the PR view). diff --git a/cmd/opampsupervisor/README.md b/cmd/opampsupervisor/README.md index 10d4761aadaf..a292623fc26a 100644 --- a/cmd/opampsupervisor/README.md +++ b/cmd/opampsupervisor/README.md @@ -33,6 +33,17 @@ The supervisor is currently undergoing heavy development and is not ready for an 4. The supervisor should connect to the OpAMP server and start a Collector instance. +## Persistent data storage +The supervisor persists some data to disk in order to mantain state between restarts. The directory where this data is stored may be specified via the supervisor configuration: +```yaml +storage: + directory: "/path/to/storage/dir" +``` + +By default, the supervisor will use `/var/lib/otelcol/supervisor` on posix systems, and `%ProgramData%/Otelcol/Supervisor` on Windows. + +This directory will be created on supervisor startup if it does not exist. + ## Status The OpenTelemetry OpAMP Supervisor is intended to be the reference diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 3756bed93152..6b089555166b 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -158,9 +158,10 @@ func getSupervisorConfig(t *testing.T, configType string, extraConfigData map[st extension = ".exe" } configData := map[string]string{ - "goos": runtime.GOOS, - "goarch": runtime.GOARCH, - "extension": extension, + "goos": runtime.GOOS, + "goarch": runtime.GOARCH, + "extension": extension, + "storage_dir": t.TempDir(), } for key, val := range extraConfigData { @@ -416,6 +417,97 @@ func TestSupervisorBootstrapsCollector(t *testing.T) { }, 5*time.Second, 250*time.Millisecond) } +func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) { + // Load the Supervisor config so we can get the location of + // the Collector that will be run. + var cfg config.Supervisor + cfgFile := getSupervisorConfig(t, "agent_description", map[string]string{}) + k := koanf.New("::") + err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser()) + require.NoError(t, err) + err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{ + Tag: "mapstructure", + }) + require.NoError(t, err) + + host, err := os.Hostname() + require.NoError(t, err) + + // Get the binary name and version from the Collector binary + // using the `components` command that prints a YAML-encoded + // map of information about the Collector build. Some of this + // information will be used as defaults for the telemetry + // attributes. + agentPath := cfg.Agent.Executable + componentsInfo, err := exec.Command(agentPath, "components").Output() + require.NoError(t, err) + k = koanf.New("::") + err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser()) + require.NoError(t, err) + buildinfo := k.StringMap("buildinfo") + command := buildinfo["command"] + version := buildinfo["version"] + + agentDescMessageChan := make(chan *protobufs.AgentToServer, 1) + + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.AgentDescription != nil { + select { + case agentDescMessageChan <- message: + default: + } + } + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "agent_description", map[string]string{"url": server.addr}) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + var ad *protobufs.AgentToServer + select { + case ad = <-agentDescMessageChan: + case <-time.After(5 * time.Second): + t.Fatal("Failed to get agent description after 5 seconds") + } + + expectedDescription := &protobufs.AgentDescription{ + IdentifyingAttributes: []*protobufs.KeyValue{ + stringKeyValue("client.id", "my-client-id"), + stringKeyValue(semconv.AttributeServiceInstanceID, ad.InstanceUid), + stringKeyValue(semconv.AttributeServiceName, command), + stringKeyValue(semconv.AttributeServiceVersion, version), + }, + NonIdentifyingAttributes: []*protobufs.KeyValue{ + stringKeyValue("env", "prod"), + stringKeyValue(semconv.AttributeHostArch, runtime.GOARCH), + stringKeyValue(semconv.AttributeHostName, host), + stringKeyValue(semconv.AttributeOSType, runtime.GOOS), + }, + } + + require.Equal(t, expectedDescription, ad.AgentDescription) + + time.Sleep(250 * time.Millisecond) +} + +func stringKeyValue(key, val string) *protobufs.KeyValue { + return &protobufs.KeyValue{ + Key: key, + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{ + StringValue: val, + }, + }, + } +} + // Creates a Collector config that reads and writes logs to files and provides // file descriptors for I/O operations to those files. The files are placed // in a unique temp directory that is cleaned up after the test's completion. @@ -690,3 +782,154 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) { }, 10*time.Second, 500*time.Millisecond, "Collector was not started with the last received remote config") } + +func TestSupervisorPersistsInstanceID(t *testing.T) { + // Tests shutting down and starting up a new supervisor will + // persist and re-use the same instance ID. + storageDir := t.TempDir() + + agentIDChan := make(chan string, 1) + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + + select { + case agentIDChan <- message.InstanceUid: + default: + } + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "basic", map[string]string{ + "url": server.addr, + "storage_dir": storageDir, + }) + + waitForSupervisorConnection(server.supervisorConnected, true) + + t.Logf("Supervisor connected") + + var firstAgentID string + select { + case firstAgentID = <-agentIDChan: + case <-time.After(1 * time.Second): + t.Fatalf("failed to get first agent ID") + } + + t.Logf("Got agent ID %s, shutting down supervisor", firstAgentID) + + s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, false) + + t.Logf("Supervisor disconnected") + + // Drain agent ID channel so we get a fresh ID from the new supervisor + select { + case <-agentIDChan: + default: + } + + s = newSupervisor(t, "basic", map[string]string{ + "url": server.addr, + "storage_dir": storageDir, + }) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + t.Logf("Supervisor connected") + + var secondAgentID string + select { + case secondAgentID = <-agentIDChan: + case <-time.After(1 * time.Second): + t.Fatalf("failed to get second agent ID") + } + + require.Equal(t, firstAgentID, secondAgentID) +} + +func TestSupervisorPersistsNewInstanceID(t *testing.T) { + // Tests that an agent ID that is given from the server to the agent in an AgentIdentification message + // is properly persisted. + storageDir := t.TempDir() + + newID := "01HW3GS9NWD840C5C2BZS3KYPW" + + agentIDChan := make(chan string, 1) + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + + select { + case agentIDChan <- message.InstanceUid: + default: + } + + if message.InstanceUid != newID { + return &protobufs.ServerToAgent{ + InstanceUid: message.InstanceUid, + AgentIdentification: &protobufs.AgentIdentification{ + NewInstanceUid: newID, + }, + } + } + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "basic", map[string]string{ + "url": server.addr, + "storage_dir": storageDir, + }) + + waitForSupervisorConnection(server.supervisorConnected, true) + + t.Logf("Supervisor connected") + + for id := range agentIDChan { + if id == newID { + t.Logf("Agent ID was changed to new ID") + break + } + } + + s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, false) + + t.Logf("Supervisor disconnected") + + // Drain agent ID channel so we get a fresh ID from the new supervisor + select { + case <-agentIDChan: + default: + } + + s = newSupervisor(t, "basic", map[string]string{ + "url": server.addr, + "storage_dir": storageDir, + }) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + t.Logf("Supervisor connected") + + var newRecievedAgentID string + select { + case newRecievedAgentID = <-agentIDChan: + case <-time.After(1 * time.Second): + t.Fatalf("failed to get second agent ID") + } + + require.Equal(t, newID, newRecievedAgentID) +} diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index 19ac4b90213c..044d38d8bc2e 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -17,6 +17,7 @@ require ( go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 google.golang.org/protobuf v1.34.1 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -32,5 +33,4 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/cmd/opampsupervisor/specification/README.md b/cmd/opampsupervisor/specification/README.md index 0fef754e446a..82bd91c42a3f 100644 --- a/cmd/opampsupervisor/specification/README.md +++ b/cmd/opampsupervisor/specification/README.md @@ -119,7 +119,7 @@ storage: # and %ProgramData%/Otelcol/Supervisor on Windows. directory: /path/to/dir -collector: +agent: # Path to Collector executable. Required. executable: /opt/otelcol/bin/otelcol @@ -144,6 +144,17 @@ collector: deny: \[/var/log/secret_logs\] write: allow: \[/var/otelcol\] + + # Optional key-value pairs to add to either the identifying attributes or + # non-identifying attributes of the agent description sent to the OpAMP server. + # Values here override the values in the agent description retrieved from the collector's + # OpAMP extension (self-reported by the Collector). + description: + identifying_attributes: + client.id: "01HWWSK84BMT7J45663MBJMTPJ" + non_identifying_attributes: + custom.attribute: "custom-value" + ``` ### Executing Collector diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index fc353dca4a0a..00e8d6a3ec8d 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -5,6 +5,9 @@ package config import ( "net/http" + "os" + "path/filepath" + "runtime" "go.opentelemetry.io/collector/config/configtls" ) @@ -14,7 +17,7 @@ type Supervisor struct { Server *OpAMPServer Agent *Agent Capabilities *Capabilities `mapstructure:"capabilities"` - Storage *Storage `mapstructure:"storage"` + Storage Storage `mapstructure:"storage"` } type Storage struct { @@ -22,6 +25,29 @@ type Storage struct { Directory string `mapstructure:"directory"` } +// DirectoryOrDefault returns the configured storage directory if it was configured, +// otherwise it returns the system default. +func (s Storage) DirectoryOrDefault() string { + if s.Directory == "" { + switch runtime.GOOS { + case "windows": + // Windows default is "%ProgramData%\Otelcol\Supervisor" + // If the ProgramData environment variable is not set, + // it falls back to C:\ProgramData + programDataDir := os.Getenv("ProgramData") + if programDataDir == "" { + programDataDir = `C:\ProgramData` + } + return filepath.Join(programDataDir, "Otelcol", "Supervisor") + default: + // Default for non-windows systems + return "/var/lib/otelcol/supervisor" + } + } + + return s.Directory +} + // Capabilities is the set of capabilities that the Supervisor supports. type Capabilities struct { AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"` @@ -40,5 +66,11 @@ type OpAMPServer struct { } type Agent struct { - Executable string + Executable string + Description AgentDescription `mapstructure:"description"` +} + +type AgentDescription struct { + IdentifyingAttributes map[string]string `mapstructure:"identifying_attributes"` + NonIdentifyingAttributes map[string]string `mapstructure:"non_identifying_attributes"` } diff --git a/cmd/opampsupervisor/supervisor/persistence.go b/cmd/opampsupervisor/supervisor/persistence.go new file mode 100644 index 000000000000..e3f4077adc09 --- /dev/null +++ b/cmd/opampsupervisor/supervisor/persistence.go @@ -0,0 +1,92 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package supervisor + +import ( + "crypto/rand" + "errors" + "os" + "time" + + "github.com/oklog/ulid/v2" + "gopkg.in/yaml.v3" +) + +// persistentState represents persistent state for the supervisor +type persistentState struct { + InstanceID ulid.ULID `yaml:"instance_id"` + + // Path to the config file that the state should be saved to. + // This is not marshaled. + configPath string `yaml:"-"` +} + +func (p *persistentState) SetInstanceID(id ulid.ULID) error { + p.InstanceID = id + return p.writeState() +} + +func (p *persistentState) writeState() error { + by, err := yaml.Marshal(p) + if err != nil { + return err + } + + return os.WriteFile(p.configPath, by, 0600) +} + +// loadOrCreatePersistentState attempts to load the persistent state from disk. If it doesn't +// exist, a new persistent state file is created. +func loadOrCreatePersistentState(file string) (*persistentState, error) { + state, err := loadPersistentState(file) + switch { + case errors.Is(err, os.ErrNotExist): + return createNewPersistentState(file) + case err != nil: + return nil, err + default: + return state, nil + } +} + +func loadPersistentState(file string) (*persistentState, error) { + var state *persistentState + + by, err := os.ReadFile(file) + if err != nil { + return nil, err + } + + if err := yaml.Unmarshal(by, &state); err != nil { + return nil, err + } + + state.configPath = file + + return state, nil +} + +func createNewPersistentState(file string) (*persistentState, error) { + id, err := generateNewULID() + if err != nil { + return nil, err + } + + p := &persistentState{ + InstanceID: id, + configPath: file, + } + + return p, p.writeState() +} + +func generateNewULID() (ulid.ULID, error) { + entropy := ulid.Monotonic(rand.Reader, 0) + id, err := ulid.New(ulid.Timestamp(time.Now()), entropy) + if err != nil { + return ulid.ULID{}, err + } + + return id, nil +} diff --git a/cmd/opampsupervisor/supervisor/persistence_test.go b/cmd/opampsupervisor/supervisor/persistence_test.go new file mode 100644 index 000000000000..77e4ba9eb0f9 --- /dev/null +++ b/cmd/opampsupervisor/supervisor/persistence_test.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package supervisor + +import ( + "os" + "path/filepath" + "testing" + + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/require" +) + +func TestCreateOrLoadPersistentState(t *testing.T) { + t.Run("Creates a new state file if it does not exist", func(t *testing.T) { + f := filepath.Join(t.TempDir(), "state.yaml") + state, err := loadOrCreatePersistentState(f) + require.NoError(t, err) + + // instance ID should be populated + require.NotEqual(t, ulid.ULID{}, state.InstanceID) + require.FileExists(t, f) + }) + + t.Run("loads state from file if it exists", func(t *testing.T) { + f := filepath.Join(t.TempDir(), "state.yaml") + + err := os.WriteFile(f, []byte(`instance_id: "01HW3GS9NWD840C5C2BZS3KYPW"`), 0600) + require.NoError(t, err) + + state, err := loadOrCreatePersistentState(f) + require.NoError(t, err) + + // instance ID should be populated with value from file + require.Equal(t, ulid.MustParse("01HW3GS9NWD840C5C2BZS3KYPW"), state.InstanceID) + require.FileExists(t, f) + }) + +} + +func TestPersistentState_SetInstanceID(t *testing.T) { + f := filepath.Join(t.TempDir(), "state.yaml") + state, err := createNewPersistentState(f) + require.NoError(t, err) + + // instance ID should be populated + require.NotEqual(t, ulid.ULID{}, state.InstanceID) + require.FileExists(t, f) + + newULID := ulid.MustParse("01HW3GS9NWD840C5C2BZS3KYPW") + err = state.SetInstanceID(newULID) + require.NoError(t, err) + + require.Equal(t, newULID, state.InstanceID) + + // Test that loading the state after setting the instance ID has the new instance ID + loadedState, err := loadPersistentState(f) + require.NoError(t, err) + + require.Equal(t, newULID, loadedState.InstanceID) +} + +func TestGenerateNewULID(t *testing.T) { + // Test generating a new ULID twice returns 2 different results + id1, err := generateNewULID() + require.NoError(t, err) + + id2, err := generateNewULID() + require.NoError(t, err) + + require.NotEqual(t, id1, id2) +} diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 339c05ecf2b2..47f58d66d888 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -9,7 +9,6 @@ import ( _ "embed" "errors" "fmt" - "math/rand" "net" "net/http" "os" @@ -56,6 +55,8 @@ var ( lastRecvOwnMetricsConfigFile = "last_recv_own_metrics_config.dat" ) +const persistentStateFile = "persistent_state.yaml" + // Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient // to work with an OpAMP Server. type Supervisor struct { @@ -75,8 +76,8 @@ type Supervisor struct { agentDescription *protobufs.AgentDescription - // Agent's instance id. - instanceID ulid.ULID + // Supervisor's persistent state + persistentState *persistentState bootstrapTemplate *template.Template extraConfigTemplate *template.Template @@ -107,7 +108,7 @@ type Supervisor struct { // The OpAMP client to connect to the OpAMP Server. opampClient client.OpAMPClient - shuttingDown bool + doneChan chan struct{} supervisorWG sync.WaitGroup agentHasStarted bool @@ -125,6 +126,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { agentConfigOwnMetricsSection: &atomic.Value{}, effectiveConfig: &atomic.Value{}, connectedToOpAMPServer: make(chan struct{}), + doneChan: make(chan struct{}), } if err := s.createTemplates(); err != nil { @@ -135,13 +137,17 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, fmt.Errorf("error loading config: %w", err) } - id, err := s.createInstanceID() + storageDir := s.config.Storage.DirectoryOrDefault() + if err := os.MkdirAll(storageDir, 0700); err != nil { + return nil, fmt.Errorf("error creating storage dir: %w", err) + } + + var err error + s.persistentState, err = loadOrCreatePersistentState(s.persistentStateFile()) if err != nil { return nil, err } - s.instanceID = id - if err = s.getBootstrapInfo(); err != nil { return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err) } @@ -155,7 +161,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort) logger.Debug("Supervisor starting", - zap.String("id", s.instanceID.String())) + zap.String("id", s.persistentState.InstanceID.String())) s.loadAgentEffectiveConfig() @@ -233,7 +239,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) { var cfg bytes.Buffer err = s.bootstrapTemplate.Execute(&cfg, map[string]any{ - "InstanceUid": s.instanceID.String(), + "InstanceUid": s.persistentState.InstanceID.String(), "SupervisorPort": supervisorPort, }) if err != nil { @@ -256,18 +262,18 @@ func (s *Supervisor) getBootstrapInfo() (err error) { onMessageFunc: func(_ serverTypes.Connection, message *protobufs.AgentToServer) { if message.AgentDescription != nil { instanceIDSeen := false - s.agentDescription = message.AgentDescription + s.setAgentDescription(message.AgentDescription) identAttr := s.agentDescription.IdentifyingAttributes for _, attr := range identAttr { if attr.Key == semconv.AttributeServiceInstanceID { // TODO: Consider whether to attempt restarting the Collector. // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29864 - if attr.Value.GetStringValue() != s.instanceID.String() { + if attr.Value.GetStringValue() != s.persistentState.InstanceID.String() { done <- fmt.Errorf( "the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s)", attr.Value.GetStringValue(), - s.instanceID.String()) + s.persistentState.InstanceID.String()) return } instanceIDSeen = true @@ -375,7 +381,7 @@ func (s *Supervisor) startOpAMP() error { OpAMPServerURL: s.config.Server.Endpoint, Header: s.config.Server.Headers, TLSConfig: tlsConfig, - InstanceUid: s.instanceID.String(), + InstanceUid: s.persistentState.InstanceID.String(), Callbacks: types.CallbacksStruct{ OnConnectFunc: func(_ context.Context) { s.connectedToOpAMPServer <- struct{}{} @@ -431,6 +437,51 @@ func (s *Supervisor) startOpAMP() error { return nil } +// setAgentDescription sets the agent description, merging in any user-specified attributes from the supervisor configuration. +func (s *Supervisor) setAgentDescription(ad *protobufs.AgentDescription) { + ad.IdentifyingAttributes = applyKeyValueOverrides(s.config.Agent.Description.IdentifyingAttributes, ad.IdentifyingAttributes) + ad.NonIdentifyingAttributes = applyKeyValueOverrides(s.config.Agent.Description.NonIdentifyingAttributes, ad.NonIdentifyingAttributes) + s.agentDescription = ad +} + +// applyKeyValueOverrides merges the overrides map into the array of key value pairs. +// If a key from overrides already exists in the array of key value pairs, it is overwritten by the value from the overrides map. +// An array of KeyValue pair is returned, with each key value pair having a distinct key. +func applyKeyValueOverrides(overrides map[string]string, orig []*protobufs.KeyValue) []*protobufs.KeyValue { + kvMap := make(map[string]*protobufs.KeyValue, len(orig)+len(overrides)) + + for _, kv := range orig { + kvMap[kv.Key] = kv + } + + for k, v := range overrides { + kvMap[k] = &protobufs.KeyValue{ + Key: k, + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{ + StringValue: v, + }, + }, + } + } + + // Sort keys for stable output, makes it easier to test. + keys := make([]string, 0, len(kvMap)) + for k := range kvMap { + keys = append(keys, k) + } + + sort.Strings(keys) + + kvOut := make([]*protobufs.KeyValue, 0, len(kvMap)) + for _, k := range keys { + v := kvMap[k] + kvOut = append(kvOut, v) + } + + return kvOut +} + func (s *Supervisor) stopOpAMP() error { s.logger.Debug("Stopping OpAMP client...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -513,19 +564,6 @@ func (s *Supervisor) waitForOpAMPConnection() error { } } -// TODO: Persist instance ID. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21073 -func (s *Supervisor) createInstanceID() (ulid.ULID, error) { - entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0) - id, err := ulid.New(ulid.Timestamp(time.Now()), entropy) - - if err != nil { - return ulid.ULID{}, err - } - - return id, nil - -} - func (s *Supervisor) composeExtraLocalConfig() []byte { var cfg bytes.Buffer resourceAttrs := map[string]string{} @@ -567,10 +605,9 @@ func (s *Supervisor) loadAgentEffectiveConfig() { s.effectiveConfig.Store(string(effectiveConfigBytes)) if s.config.Capabilities != nil && s.config.Capabilities.AcceptsRemoteConfig != nil && - *s.config.Capabilities.AcceptsRemoteConfig && - s.config.Storage != nil { + *s.config.Capabilities.AcceptsRemoteConfig { // Try to load the last received remote config if it exists. - lastRecvRemoteConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile)) + lastRecvRemoteConfig, err = os.ReadFile(filepath.Join(s.config.Storage.DirectoryOrDefault(), lastRecvRemoteConfigFile)) if err == nil { config := &protobufs.AgentRemoteConfig{} err = proto.Unmarshal(lastRecvRemoteConfig, config) @@ -587,10 +624,9 @@ func (s *Supervisor) loadAgentEffectiveConfig() { } if s.config.Capabilities != nil && s.config.Capabilities.ReportsOwnMetrics != nil && - *s.config.Capabilities.ReportsOwnMetrics && - s.config.Storage != nil { + *s.config.Capabilities.ReportsOwnMetrics { // Try to load the last received own metrics config if it exists. - lastRecvOwnMetricsConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvOwnMetricsConfigFile)) + lastRecvOwnMetricsConfig, err = os.ReadFile(filepath.Join(s.config.Storage.DirectoryOrDefault(), lastRecvOwnMetricsConfigFile)) if err == nil { set := &protobufs.TelemetryConnectionSettings{} err = proto.Unmarshal(lastRecvOwnMetricsConfig, set) @@ -849,6 +885,7 @@ func (s *Supervisor) healthCheck() { func (s *Supervisor) runAgentProcess() { if _, err := os.Stat(s.effectiveConfigFilePath); err == nil { // We have an effective config file saved previously. Use it to start the agent. + s.logger.Debug("Effective config found, starting agent initial time") s.startAgent() } @@ -858,6 +895,7 @@ func (s *Supervisor) runAgentProcess() { for { select { case <-s.hasNewConfig: + s.logger.Debug("Restarting agent due to new config") restartTimer.Stop() s.stopAgentApplyConfig() s.startAgent() @@ -868,10 +906,6 @@ func (s *Supervisor) runAgentProcess() { continue } - if s.shuttingDown { - return - } - s.logger.Debug("Agent process exited unexpectedly. Will restart in a bit...", zap.Int("pid", s.commander.Pid()), zap.Int("exit_code", s.commander.ExitCode())) errMsg := fmt.Sprintf( "Agent process PID=%d exited unexpectedly, exit code=%d. Will restart in a bit...", @@ -896,10 +930,18 @@ func (s *Supervisor) runAgentProcess() { restartTimer.Reset(5 * time.Second) case <-restartTimer.C: + s.logger.Debug("Agent starting after start backoff") s.startAgent() case <-s.healthCheckTicker.C: s.healthCheck() + + case <-s.doneChan: + err := s.commander.Stop(context.Background()) + if err != nil { + s.logger.Error("Could not stop agent process", zap.Error(err)) + } + return } } } @@ -932,14 +974,7 @@ func (s *Supervisor) writeEffectiveConfigToFile(cfg string, filePath string) { func (s *Supervisor) Shutdown() { s.logger.Debug("Supervisor shutting down...") - s.shuttingDown = true - if s.commander != nil { - err := s.commander.Stop(context.Background()) - - if err != nil { - s.logger.Error("Could not stop agent process", zap.Error(err)) - } - } + close(s.doneChan) if s.opampClient != nil { err := s.opampClient.SetHealth( @@ -959,37 +994,29 @@ func (s *Supervisor) Shutdown() { } } + s.supervisorWG.Wait() + if s.healthCheckTicker != nil { s.healthCheckTicker.Stop() } - - s.supervisorWG.Wait() } func (s *Supervisor) saveLastReceivedConfig(config *protobufs.AgentRemoteConfig) error { - if s.config.Storage == nil { - return nil - } - cfg, err := proto.Marshal(config) if err != nil { return err } - return os.WriteFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile), cfg, 0600) + return os.WriteFile(filepath.Join(s.config.Storage.DirectoryOrDefault(), lastRecvRemoteConfigFile), cfg, 0600) } func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.TelemetryConnectionSettings, filePath string) error { - if s.config.Storage == nil { - return nil - } - cfg, err := proto.Marshal(set) if err != nil { return err } - return os.WriteFile(filepath.Join(s.config.Storage.Directory, filePath), cfg, 0600) + return os.WriteFile(filepath.Join(s.config.Storage.DirectoryOrDefault(), filePath), cfg, 0600) } func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { @@ -1037,9 +1064,14 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { } s.logger.Debug("Agent identity is changing", - zap.String("old_id", s.instanceID.String()), + zap.String("old_id", s.persistentState.InstanceID.String()), zap.String("new_id", newInstanceID.String())) - s.instanceID = newInstanceID + + err = s.persistentState.SetInstanceID(newInstanceID) + if err != nil { + s.logger.Error("Failed to persist new instance ID, instance ID will revert on restart.", zap.String("new_id", newInstanceID.String()), zap.Error(err)) + } + err = s.opampClient.SetAgentDescription(s.agentDescription) if err != nil { s.logger.Error("Failed to send agent description to OpAMP server") @@ -1063,6 +1095,10 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { } } +func (s *Supervisor) persistentStateFile() string { + return filepath.Join(s.config.Storage.DirectoryOrDefault(), persistentStateFile) +} + func (s *Supervisor) findRandomPort() (int, error) { l, err := net.Listen("tcp", "localhost:0") diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml index 0282577b252a..e86ab6cb25f0 100644 --- a/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml @@ -11,5 +11,8 @@ capabilities: reports_remote_config: true accepts_opamp_connection_settings: true +storage: + directory: "{{.storage_dir}}" + agent: executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_agent_description.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_agent_description.yaml new file mode 100644 index 000000000000..be601485ee9b --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_agent_description.yaml @@ -0,0 +1,23 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + tls: + insecure: true + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + accepts_restart_command: true + +storage: + directory: "{{.storage_dir}}" + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} + description: + identifying_attributes: + client.id: "my-client-id" + non_identifying_attributes: + env: "prod" diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml index 7e4b0a08536c..75490189b904 100644 --- a/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml @@ -11,5 +11,8 @@ capabilities: reports_remote_config: true accepts_restart_command: true +storage: + directory: "{{.storage_dir}}" + agent: executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_nocap.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_nocap.yaml index 5b61ad4b1b68..ca0d9378887d 100644 --- a/cmd/opampsupervisor/testdata/supervisor/supervisor_nocap.yaml +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_nocap.yaml @@ -10,5 +10,8 @@ capabilities: accepts_remote_config: false reports_remote_config: false +storage: + directory: "{{.storage_dir}}" + agent: executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_test.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_test.yaml index bdcdc2e72c93..68881cd4970c 100644 --- a/cmd/opampsupervisor/testdata/supervisor/supervisor_test.yaml +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_test.yaml @@ -10,5 +10,8 @@ capabilities: accepts_remote_config: true reports_remote_config: true +storage: + directory: "{{.storage_dir}}" + agent: executable: ../../bin/otelcontribcol_darwin_arm64 diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index c35267948f3f..c5dcda0c3cac 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -33,7 +33,7 @@ type esDataReceiver struct { endpoint string } -func newElasticsearchDataReceiver(t testing.TB) testbed.DataReceiver { +func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver { return &esDataReceiver{ DataReceiverBase: testbed.DataReceiverBase{}, endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)), @@ -113,6 +113,11 @@ type mockESReceiver struct { } func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consumer.Logs) (receiver.Logs, error) { + emptyLogs := plog.NewLogs() + emptyLogs.ResourceLogs().AppendEmpty(). + ScopeLogs().AppendEmpty(). + LogRecords().AppendEmpty() + r := mux.NewRouter() r.Use(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -129,13 +134,10 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume for k, item := range itemMap { // Ideally bulk request should be converted to log record // however, since we only assert count for now there is no - // need to do the actual translation. - logs := plog.NewLogs() - logs.ResourceLogs().AppendEmpty(). - ScopeLogs().AppendEmpty(). - LogRecords().AppendEmpty() - - if err := next.ConsumeLogs(context.Background(), logs); err != nil { + // need to do the actual translation. We use a pre-initialized + // empty plog.Logs to reduce allocation impact on tests and + // benchmarks due to this. + if err := next.ConsumeLogs(context.Background(), emptyLogs); err != nil { response.HasErrors = true item.Status = http.StatusTooManyRequests item.Error.Type = "simulated_es_error" diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go new file mode 100644 index 000000000000..7350c44add69 --- /dev/null +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package integrationtest + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" +) + +func BenchmarkLogsExporter(b *testing.B) { + for _, tc := range []struct { + name string + batchSize int + }{ + {name: "small_batch", batchSize: 10}, + {name: "medium_batch", batchSize: 100}, + {name: "large_batch", batchSize: 1000}, + {name: "xlarge_batch", batchSize: 10000}, + } { + b.Run(tc.name, func(b *testing.B) { + benchmarkLogs(b, tc.batchSize) + }) + } +} + +func benchmarkLogs(b *testing.B, batchSize int) { + var generatedCount, observedCount atomic.Uint64 + + receiver := newElasticsearchDataReceiver(b) + factory := elasticsearchexporter.NewFactory() + + cfg := factory.CreateDefaultConfig().(*elasticsearchexporter.Config) + cfg.Endpoints = []string{receiver.endpoint} + cfg.Flush.Interval = 10 * time.Millisecond + cfg.NumWorkers = 1 + + exporter, err := factory.CreateLogsExporter( + context.Background(), + exportertest.NewNopCreateSettings(), + cfg, + ) + require.NoError(b, err) + + provider := testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize}) + provider.SetLoadGeneratorCounters(&generatedCount) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logsConsumer, err := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error { + observedCount.Add(uint64(ld.LogRecordCount())) + return nil + }) + require.NoError(b, err) + + require.NoError(b, receiver.Start(nil, nil, logsConsumer)) + defer func() { require.NoError(b, receiver.Stop()) }() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + logs, _ := provider.GenerateLogs() + b.StartTimer() + require.NoError(b, exporter.ConsumeLogs(ctx, logs)) + } + require.NoError(b, exporter.Shutdown(ctx)) + require.Equal(b, generatedCount.Load(), observedCount.Load(), "failed to send all logs to backend") +} diff --git a/exporter/otelarrowexporter/README.md b/exporter/otelarrowexporter/README.md index a6db7dd30cbc..87ece851ffe0 100644 --- a/exporter/otelarrowexporter/README.md +++ b/exporter/otelarrowexporter/README.md @@ -6,7 +6,7 @@ | Stability | [development]: traces, metrics, logs | | Distributions | [] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fotelarrow%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fotelarrow) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fotelarrow%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fotelarrow) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@jmacd](https://www.github.com/jmacd), [@moh-osman3](https://www.github.com/moh-osman3) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@jmacd](https://www.github.com/jmacd), [@moh-osman3](https://www.github.com/moh-osman3), [@codeboten](https://www.github.com/codeboten) | [development]: https://github.com/open-telemetry/opentelemetry-collector#development diff --git a/exporter/otelarrowexporter/go.mod b/exporter/otelarrowexporter/go.mod index 55c169d2b390..34615b215a94 100644 --- a/exporter/otelarrowexporter/go.mod +++ b/exporter/otelarrowexporter/go.mod @@ -4,7 +4,7 @@ go 1.21.0 require ( github.com/apache/arrow/go/v14 v14.0.2 - github.com/open-telemetry/otel-arrow v0.22.0 + github.com/open-telemetry/otel-arrow v0.23.0 github.com/open-telemetry/otel-arrow/collector v0.23.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.100.1-0.20240509190532-c555005fcc80 diff --git a/exporter/otelarrowexporter/go.sum b/exporter/otelarrowexporter/go.sum index e6a447927805..f8fd4a0ff0b1 100644 --- a/exporter/otelarrowexporter/go.sum +++ b/exporter/otelarrowexporter/go.sum @@ -84,8 +84,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/mostynb/go-grpc-compression v1.2.2 h1:XaDbnRvt2+1vgr0b/l0qh4mJAfIxE0bKXtz2Znl3GGI= github.com/mostynb/go-grpc-compression v1.2.2/go.mod h1:GOCr2KBxXcblCuczg3YdLQlcin1/NfyDA348ckuCH6w= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/open-telemetry/otel-arrow v0.22.0 h1:G1jgtqAM2ho5pyKQ4tyrDzk9Y0VcJ+GZQRJgN26vRlI= -github.com/open-telemetry/otel-arrow v0.22.0/go.mod h1:F50XFaiNfkfB0MYftZIUKFULm6pxfGqjbgQzevi+65M= +github.com/open-telemetry/otel-arrow v0.23.0 h1:Vx4q3GR36l9O+S7ZOOITNL1TPp+X1WxkXbeXQA146k0= +github.com/open-telemetry/otel-arrow v0.23.0/go.mod h1:F50XFaiNfkfB0MYftZIUKFULm6pxfGqjbgQzevi+65M= github.com/open-telemetry/otel-arrow/collector v0.23.0 h1:ztmq1ipJBhm4xWjHDbmKOtgP3Nl/ZDoLX+3ThhzFs6k= github.com/open-telemetry/otel-arrow/collector v0.23.0/go.mod h1:SLgLEhhcfR9MjG1taK8RPuwiuIoAPW7IpCjFBobwIUM= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= diff --git a/exporter/otelarrowexporter/metadata.yaml b/exporter/otelarrowexporter/metadata.yaml index 3a830d26c436..110222201898 100644 --- a/exporter/otelarrowexporter/metadata.yaml +++ b/exporter/otelarrowexporter/metadata.yaml @@ -7,7 +7,7 @@ status: development: [traces, metrics, logs] distributions: [] codeowners: - active: [jmacd, moh-osman3] + active: [jmacd, moh-osman3, codeboten] tests: config: diff --git a/extension/oauth2clientauthextension/extension_test.go b/extension/oauth2clientauthextension/extension_test.go index 517877541670..7a20a24616c0 100644 --- a/extension/oauth2clientauthextension/extension_test.go +++ b/extension/oauth2clientauthextension/extension_test.go @@ -106,7 +106,6 @@ func TestOAuthClientSettings(t *testing.T) { } func TestOAuthClientSettingsCredsConfig(t *testing.T) { - // test files for TLS testing var ( testCredsFile = "testdata/test-cred.txt" testCredsEmptyFile = "testdata/test-cred-empty.txt" @@ -125,8 +124,6 @@ func TestOAuthClientSettingsCredsConfig(t *testing.T) { settings: &Config{ ClientIDFile: testCredsFile, ClientSecret: "testsecret", - TokenURL: "https://example.com/v1/token", - Scopes: []string{"resource.read"}, }, expectedClientConfig: &clientcredentials.Config{ ClientID: "testcreds", @@ -140,8 +137,6 @@ func TestOAuthClientSettingsCredsConfig(t *testing.T) { settings: &Config{ ClientID: "testclientid", ClientSecretFile: testCredsFile, - TokenURL: "https://example.com/v1/token", - Scopes: []string{"resource.read"}, }, expectedClientConfig: &clientcredentials.Config{ ClientID: "testclientid", @@ -155,8 +150,6 @@ func TestOAuthClientSettingsCredsConfig(t *testing.T) { settings: &Config{ ClientIDFile: testCredsEmptyFile, ClientSecret: "testsecret", - TokenURL: "https://example.com/v1/token", - Scopes: []string{"resource.read"}, }, shouldError: true, expectedError: &errNoClientIDProvided, @@ -166,8 +159,6 @@ func TestOAuthClientSettingsCredsConfig(t *testing.T) { settings: &Config{ ClientID: "testclientid", ClientSecretFile: testCredsMissingFile, - TokenURL: "https://example.com/v1/token", - Scopes: []string{"resource.read"}, }, shouldError: true, expectedError: &errNoClientSecretProvided, @@ -186,13 +177,6 @@ func TestOAuthClientSettingsCredsConfig(t *testing.T) { assert.NoError(t, err) assert.Equal(t, test.expectedClientConfig.ClientID, cfg.ClientID) assert.Equal(t, test.expectedClientConfig.ClientSecret, cfg.ClientSecret) - - // test tls settings - transport := rc.client.Transport.(*http.Transport) - tlsClientConfig := transport.TLSClientConfig - tlsTestSettingConfig, err := test.settings.TLSSetting.LoadTLSConfig(context.Background()) - assert.NoError(t, err) - assert.Equal(t, tlsClientConfig.Certificates, tlsTestSettingConfig.Certificates) }) } } diff --git a/extension/storage/filestorage/README.md b/extension/storage/filestorage/README.md index c1a6bb5a9b4d..36fb14346685 100644 --- a/extension/storage/filestorage/README.md +++ b/extension/storage/filestorage/README.md @@ -34,6 +34,10 @@ The default timeout is `1s`. `compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction. A value of zero will ignore transaction sizes. +`compaction.cleanup_on_start` (default: false) - specifies if removal of compaction temporary files is performed on start. +It will remove all temporary files in the compaction directory (those which start with `tempdb`), +temp files will be left if a previous run of the process is killed while compacting. + ### Rebound (online) compaction For rebound compaction, there are two additional parameters available: diff --git a/extension/storage/filestorage/client.go b/extension/storage/filestorage/client.go index c8fca4ba0201..b97cad73c044 100644 --- a/extension/storage/filestorage/client.go +++ b/extension/storage/filestorage/client.go @@ -20,6 +20,8 @@ import ( var defaultBucket = []byte(`default`) const ( + TempDbPrefix = "tempdb" + elapsedKey = "elapsed" directoryKey = "directory" tempDirectoryKey = "tempDirectory" @@ -152,7 +154,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur var compactedDb *bbolt.DB // create temporary file in compactionDirectory - file, err = os.CreateTemp(compactionDirectory, "tempdb") + file, err = os.CreateTemp(compactionDirectory, TempDbPrefix) if err != nil { return err } diff --git a/extension/storage/filestorage/config.go b/extension/storage/filestorage/config.go index d71bbe0234fc..19e288a7655b 100644 --- a/extension/storage/filestorage/config.go +++ b/extension/storage/filestorage/config.go @@ -45,6 +45,10 @@ type CompactionConfig struct { MaxTransactionSize int64 `mapstructure:"max_transaction_size,omitempty"` // CheckInterval specifies frequency of compaction check CheckInterval time.Duration `mapstructure:"check_interval,omitempty"` + // CleanupOnStart specifies removal of temporary files is performed on start. + // It will remove all the files in the compaction directory starting with tempdb, + // temp files will be left if a previous run of the process is killed while compacting. + CleanupOnStart bool `mapstructure:"cleanup_on_start,omitempty"` } func (cfg *Config) Validate() error { diff --git a/extension/storage/filestorage/config_test.go b/extension/storage/filestorage/config_test.go index 11d898a55f17..67decc8dbed5 100644 --- a/extension/storage/filestorage/config_test.go +++ b/extension/storage/filestorage/config_test.go @@ -45,6 +45,7 @@ func TestLoadConfig(t *testing.T) { ReboundTriggerThresholdMiB: 16, ReboundNeededThresholdMiB: 128, CheckInterval: time.Second * 5, + CleanupOnStart: true, }, Timeout: 2 * time.Second, FSync: true, diff --git a/extension/storage/filestorage/extension.go b/extension/storage/filestorage/extension.go index bf22b97ab458..fdbf533158c3 100644 --- a/extension/storage/filestorage/extension.go +++ b/extension/storage/filestorage/extension.go @@ -5,7 +5,9 @@ package filestorage // import "github.com/open-telemetry/opentelemetry-collector import ( "context" + "errors" "fmt" + "os" "path/filepath" "strings" @@ -40,8 +42,11 @@ func newLocalFileStorage(logger *zap.Logger, config *Config) (extension.Extensio }, nil } -// Start does nothing +// Start runs cleanup if configured func (lfs *localFileStorage) Start(context.Context, component.Host) error { + if lfs.cfg.Compaction.CleanupOnStart { + return lfs.cleanup(lfs.cfg.Compaction.Directory) + } return nil } @@ -135,3 +140,30 @@ func isSafe(character rune) bool { } return false } + +// cleanup left compaction temporary files from previous killed process +func (lfs *localFileStorage) cleanup(compactionDirectory string) error { + pattern := filepath.Join(compactionDirectory, fmt.Sprintf("%s*", TempDbPrefix)) + contents, err := filepath.Glob(pattern) + if err != nil { + lfs.logger.Info("cleanup error listing temporary files", + zap.Error(err)) + return err + } + + var errs []error + for _, item := range contents { + err = os.Remove(item) + if err == nil { + lfs.logger.Debug("cleanup", + zap.String("deletedFile", item)) + } else { + errs = append(errs, err) + } + } + if errs != nil { + lfs.logger.Info("cleanup errors", + zap.Error(errors.Join(errs...))) + } + return nil +} diff --git a/extension/storage/filestorage/extension_test.go b/extension/storage/filestorage/extension_test.go index d808647b1293..578a52bd3f44 100644 --- a/extension/storage/filestorage/extension_test.go +++ b/extension/storage/filestorage/extension_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/extension/extensiontest" ) @@ -448,3 +449,39 @@ func TestCompactionRemoveTemp(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(files)) } + +func TestCleanupOnStart(t *testing.T) { + ctx := context.Background() + + tempDir := t.TempDir() + // simulate left temporary compaction file from killed process + temp, _ := os.CreateTemp(tempDir, TempDbPrefix) + temp.Close() + + f := NewFactory() + cfg := f.CreateDefaultConfig().(*Config) + cfg.Directory = tempDir + cfg.Compaction.Directory = tempDir + cfg.Compaction.CleanupOnStart = true + extension, err := f.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + + se, ok := extension.(storage.Extension) + require.True(t, ok) + require.NoError(t, se.Start(ctx, componenttest.NewNopHost())) + + client, err := se.GetClient( + ctx, + component.KindReceiver, + newTestEntity("my_component"), + "", + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, client.Close(ctx)) + }) + + files, err := os.ReadDir(tempDir) + require.NoError(t, err) + require.Equal(t, 1, len(files)) +} diff --git a/extension/storage/filestorage/factory.go b/extension/storage/filestorage/factory.go index ef3e04e9d3c7..18178c54a4ea 100644 --- a/extension/storage/filestorage/factory.go +++ b/extension/storage/filestorage/factory.go @@ -45,6 +45,7 @@ func createDefaultConfig() component.Config { ReboundNeededThresholdMiB: defaultReboundNeededThresholdMib, ReboundTriggerThresholdMiB: defaultReboundTriggerThresholdMib, CheckInterval: defaultCompactionInterval, + CleanupOnStart: false, }, Timeout: time.Second, FSync: false, diff --git a/extension/storage/filestorage/testdata/config.yaml b/extension/storage/filestorage/testdata/config.yaml index 4a923aee71fe..bcdbaac9a291 100644 --- a/extension/storage/filestorage/testdata/config.yaml +++ b/extension/storage/filestorage/testdata/config.yaml @@ -12,5 +12,6 @@ file_storage/all_settings: rebound_trigger_threshold_mib: 16 rebound_needed_threshold_mib: 128 max_transaction_size: 2048 + cleanup_on_start: true timeout: 2s fsync: true diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index ce69321cadd9..975e7750a446 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -88,10 +88,15 @@ func (s *Staleness[T]) Next() time.Time { return ts } -func (s *Staleness[T]) Evict() identity.Stream { +func (s *Staleness[T]) Evict() (identity.Stream, bool) { + _, ts := s.pq.Peek() + if ts.Add(s.Max).Before(time.Now()) { + return identity.Stream{}, false + } + id, _ := s.pq.Pop() s.items.Delete(id) - return id + return id, true } func (s *Staleness[T]) Clear() { diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go index 03fba7812a43..8e64ae0e0305 100644 --- a/internal/exp/metrics/streams/streams.go +++ b/internal/exp/metrics/streams/streams.go @@ -58,6 +58,8 @@ func (m HashMap[T]) Clear() { // Evictors remove the "least important" stream based on some strategy such as // the oldest, least active, etc. +// +// Returns whether a stream was evicted and if so the now gone stream id type Evictor interface { - Evict() identity.Stream + Evict() (gone identity.Stream, ok bool) } diff --git a/internal/splunk/common.go b/internal/splunk/common.go index a9ad5088ebb2..a98d064b6513 100644 --- a/internal/splunk/common.go +++ b/internal/splunk/common.go @@ -51,12 +51,16 @@ type Event struct { } // IsMetric returns true if the Splunk event is a metric. -func (e Event) IsMetric() bool { +func (e *Event) IsMetric() bool { return e.Event == HecEventMetricType || (e.Event == nil && len(e.GetMetricValues()) > 0) } // GetMetricValues extracts metric key value pairs from a Splunk HEC metric. -func (e Event) GetMetricValues() map[string]any { +func (e *Event) GetMetricValues() map[string]any { + if v, ok := e.Fields["metric_name"]; ok { + return map[string]any{v.(string): e.Fields["_value"]} + } + values := map[string]any{} for k, v := range e.Fields { if strings.HasPrefix(k, "metric_name:") { diff --git a/internal/splunk/common_test.go b/internal/splunk/common_test.go index 621ae844a1f9..5ec597153351 100644 --- a/internal/splunk/common_test.go +++ b/internal/splunk/common_test.go @@ -22,6 +22,16 @@ func TestGetValues(t *testing.T) { assert.Equal(t, map[string]any{"foo": "bar", "foo2": "foobar"}, metric.GetMetricValues()) } +func TestSingleValue(t *testing.T) { + metric := Event{ + Fields: map[string]any{ + "metric_name": "foo", + "_value": 123, + }, + } + assert.Equal(t, map[string]any{"foo": 123}, metric.GetMetricValues()) +} + func TestIsMetric(t *testing.T) { ev := Event{ Event: map[string]any{}, diff --git a/internal/sqlquery/go.mod b/internal/sqlquery/go.mod index ce72231fd028..5f93bff20c3e 100644 --- a/internal/sqlquery/go.mod +++ b/internal/sqlquery/go.mod @@ -8,7 +8,7 @@ require ( github.com/lib/pq v1.10.9 github.com/microsoft/go-mssqldb v1.7.1 github.com/sijms/go-ora/v2 v2.8.18 - github.com/snowflakedb/gosnowflake v1.9.0 + github.com/snowflakedb/gosnowflake v1.10.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.100.1-0.20240509190532-c555005fcc80 go.opentelemetry.io/collector/pdata v1.7.1-0.20240509190532-c555005fcc80 @@ -44,7 +44,6 @@ require ( github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect - github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -52,6 +51,7 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect diff --git a/internal/sqlquery/go.sum b/internal/sqlquery/go.sum index b1653e91eff1..fad1e7097e38 100644 --- a/internal/sqlquery/go.sum +++ b/internal/sqlquery/go.sum @@ -75,8 +75,6 @@ github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= -github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9mImVMaBPw9L/0TBHU8= -github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -94,9 +92,8 @@ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= -github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= -github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= @@ -192,8 +189,8 @@ github.com/sijms/go-ora/v2 v2.8.18 h1:hrmgl0Iognh7XiYDRvFKmSgJW7J05yq7TMljravaXE github.com/sijms/go-ora/v2 v2.8.18/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/snowflakedb/gosnowflake v1.9.0 h1:s2ZdwFxFfpqwa5CqlhnzRESnLmwU3fED6zyNOJHFBQA= -github.com/snowflakedb/gosnowflake v1.9.0/go.mod h1:4ZgHxVf2OKwecx07WjfyAMr0gn8Qj4yvwAo68Og8wsU= +github.com/snowflakedb/gosnowflake v1.10.0 h1:5hBGKa/jJEhciokzgJcz5xmLNlJ8oUm8vhfu5tg82tM= +github.com/snowflakedb/gosnowflake v1.10.0/go.mod h1:WC4eGUOH3K9w3pLsdwZsdawIwtWgse4kZPPqNG0Ky/k= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/pkg/stanza/docs/operators/README.md b/pkg/stanza/docs/operators/README.md index 3ee3744a6f72..a79058c27767 100644 --- a/pkg/stanza/docs/operators/README.md +++ b/pkg/stanza/docs/operators/README.md @@ -27,6 +27,7 @@ Parsers: - [trace_parser](./trace_parser.md) - [uri_parser](./uri_parser.md) - [key_value_parser](./key_value_parser.md) +- [container](./container.md) Outputs: - [file_output](./file_output.md) diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go index 3e021b6d5d74..dd1d927687c9 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -12,7 +12,12 @@ import ( ) func Limit[T any](m Map[T], max int) LimitMap[T] { - return LimitMap[T]{Map: m, Max: max} + return LimitMap[T]{ + Map: m, Max: max, + Evictor: EvictorFunc(func() (identity.Stream, bool) { + return identity.Stream{}, false + }), + } } type LimitMap[T any] struct { @@ -23,21 +28,27 @@ type LimitMap[T any] struct { } func (m LimitMap[T]) Store(id identity.Stream, v T) error { - _, ok := m.Map.Load(id) - avail := m.Map.Len() < m.Max - if ok || avail { - return m.Map.Store(id, v) - } + _, exist := m.Map.Load(id) - errl := ErrLimit(m.Max) - if m.Evictor != nil { - gone := m.Evictor.Evict() - if err := m.Map.Store(id, v); err != nil { - return err + var errEv error + // if not already tracked and no space: try to evict + if !exist && m.Map.Len() >= m.Max { + errl := ErrLimit(m.Max) + gone, ok := m.Evictor.Evict() + if !ok { + // if no eviction possible, fail as there is no space + return errl } - return ErrEvicted{ErrLimit: errl, Ident: gone} + errEv = ErrEvicted{ErrLimit: errl, Ident: gone} + } + + // there was space, or we made space: store it + if err := m.Map.Store(id, v); err != nil { + return err } - return errl + + // we may have evicted something, let the caller know + return errEv } type ErrLimit int @@ -59,3 +70,9 @@ type ErrEvicted struct { func (e ErrEvicted) Error() string { return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident) } + +type EvictorFunc func() (identity.Stream, bool) + +func (ev EvictorFunc) Evict() (identity.Stream, bool) { + return ev() +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go index 04ffffbde5f5..440e466dc2e3 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go @@ -57,3 +57,52 @@ func TestLimit(t *testing.T) { require.NoError(t, err) } } + +func TestLimitEvict(t *testing.T) { + sum := random.Sum() + evictable := make(map[identity.Stream]struct{}) + + items := make(exp.HashMap[data.Number]) + lim := streams.Limit(items, 5) + lim.Evictor = streams.EvictorFunc(func() (identity.Stream, bool) { + for id := range evictable { + delete(evictable, id) + return id, true + } + return identity.Stream{}, false + }) + + ids := make([]identity.Stream, 10) + dps := make([]data.Number, 10) + for i := 0; i < 10; i++ { + id, dp := sum.Stream() + ids[i] = id + dps[i] = dp + } + + // store up to limit must work + for i := 0; i < 5; i++ { + err := lim.Store(ids[i], dps[i]) + require.NoError(t, err) + } + + // store beyond limit must fail + for i := 5; i < 10; i++ { + err := lim.Store(ids[i], dps[i]) + require.Equal(t, streams.ErrLimit(5), err) + } + + // put two streams up for eviction + evictable[ids[2]] = struct{}{} + evictable[ids[3]] = struct{}{} + + // while evictable do so, fail again afterwards + for i := 5; i < 10; i++ { + err := lim.Store(ids[i], dps[i]) + if i < 7 { + require.Equal(t, streams.ErrEvicted{ErrLimit: streams.ErrLimit(5), Ident: ids[i-3]}, err) + } else { + require.Equal(t, streams.ErrLimit(5), err) + } + } +} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go index 4809d74fda04..51081099995d 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go @@ -140,11 +140,11 @@ type ts = pcommon.Timestamp // HeadEvictor drops the first stream on Evict() type HeadEvictor[T any] struct{ streams.Map[T] } -func (e HeadEvictor[T]) Evict() (evicted identity.Stream) { +func (e HeadEvictor[T]) Evict() (evicted identity.Stream, ok bool) { e.Items()(func(id identity.Stream, _ T) bool { e.Delete(id) evicted = id return false }) - return evicted + return evicted, true } diff --git a/processor/resourcedetectionprocessor/internal/system/system.go b/processor/resourcedetectionprocessor/internal/system/system.go index a8614584b330..b2fb49232181 100644 --- a/processor/resourcedetectionprocessor/internal/system/system.go +++ b/processor/resourcedetectionprocessor/internal/system/system.go @@ -35,7 +35,7 @@ var ( hostCPUSteppingAsStringID = "processor.resourcedetection.hostCPUSteppingAsString" hostCPUSteppingAsStringFeatureGate = featuregate.GlobalRegistry().MustRegister( hostCPUSteppingAsStringID, - featuregate.StageAlpha, + featuregate.StageBeta, featuregate.WithRegisterDescription("Change type of host.cpu.stepping to string."), featuregate.WithRegisterFromVersion("v0.95.0"), featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/semantic-conventions/issues/664"), diff --git a/receiver/cloudfoundryreceiver/README.md b/receiver/cloudfoundryreceiver/README.md index d1f445456ddc..2d2336d782b6 100644 --- a/receiver/cloudfoundryreceiver/README.md +++ b/receiver/cloudfoundryreceiver/README.md @@ -6,7 +6,7 @@ | Stability | [beta]: metrics | | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fcloudfoundry%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fcloudfoundry) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fcloudfoundry%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fcloudfoundry) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@crobert-1](https://www.github.com/crobert-1) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@crobert-1](https://www.github.com/crobert-1) \| Seeking more code owners! | | Emeritus | [@agoallikmaa](https://www.github.com/agoallikmaa), [@pellared](https://www.github.com/pellared) | [beta]: https://github.com/open-telemetry/opentelemetry-collector#beta diff --git a/receiver/cloudfoundryreceiver/metadata.yaml b/receiver/cloudfoundryreceiver/metadata.yaml index bf1a281b8c65..9406c6140ae3 100644 --- a/receiver/cloudfoundryreceiver/metadata.yaml +++ b/receiver/cloudfoundryreceiver/metadata.yaml @@ -9,6 +9,7 @@ status: codeowners: active: [crobert-1] emeritus: [agoallikmaa, pellared] + seeking_new: true tests: config: diff --git a/receiver/elasticsearchreceiver/config.go b/receiver/elasticsearchreceiver/config.go index 06c71ea112cf..7c81fc75350f 100644 --- a/receiver/elasticsearchreceiver/config.go +++ b/receiver/elasticsearchreceiver/config.go @@ -11,14 +11,11 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/receiver/scraperhelper" - "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/metadata" ) -var ( - defaultEndpoint = "http://localhost:9200" -) +var defaultEndpoint = "http://localhost:9200" var ( errEndpointBadScheme = errors.New("endpoint scheme must be http or https") @@ -54,16 +51,16 @@ type Config struct { func (cfg *Config) Validate() error { var combinedErr error if err := invalidCredentials(cfg.Username, string(cfg.Password)); err != nil { - combinedErr = multierr.Append(combinedErr, err) + combinedErr = err } if cfg.Endpoint == "" { - return multierr.Append(combinedErr, errEmptyEndpoint) + return errors.Join(combinedErr, errEmptyEndpoint) } u, err := url.Parse(cfg.Endpoint) if err != nil { - return multierr.Append( + return errors.Join( combinedErr, fmt.Errorf("invalid endpoint '%s': %w", cfg.Endpoint, err), ) @@ -72,7 +69,7 @@ func (cfg *Config) Validate() error { switch u.Scheme { case "http", "https": // ok default: - return multierr.Append(combinedErr, errEndpointBadScheme) + return errors.Join(combinedErr, errEndpointBadScheme) } return combinedErr diff --git a/receiver/elasticsearchreceiver/go.mod b/receiver/elasticsearchreceiver/go.mod index f2909c8831d0..d8bd78d6d437 100644 --- a/receiver/elasticsearchreceiver/go.mod +++ b/receiver/elasticsearchreceiver/go.mod @@ -22,7 +22,6 @@ require ( go.opentelemetry.io/otel/metric v1.26.0 go.opentelemetry.io/otel/trace v1.26.0 go.uber.org/goleak v1.3.0 - go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 ) @@ -98,6 +97,7 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.48.0 // indirect go.opentelemetry.io/otel/sdk v1.26.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.26.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.23.0 // indirect golang.org/x/mod v0.16.0 // indirect golang.org/x/net v0.25.0 // indirect diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index 4540df441e59..032798044b81 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -3,7 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelar go 1.21.0 require ( - github.com/open-telemetry/otel-arrow v0.22.0 + github.com/open-telemetry/otel-arrow v0.23.0 github.com/open-telemetry/otel-arrow/collector v0.23.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.100.1-0.20240509190532-c555005fcc80 diff --git a/receiver/otelarrowreceiver/go.sum b/receiver/otelarrowreceiver/go.sum index bb83505ddaf6..b3c7bb50264e 100644 --- a/receiver/otelarrowreceiver/go.sum +++ b/receiver/otelarrowreceiver/go.sum @@ -82,8 +82,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/mostynb/go-grpc-compression v1.2.2 h1:XaDbnRvt2+1vgr0b/l0qh4mJAfIxE0bKXtz2Znl3GGI= github.com/mostynb/go-grpc-compression v1.2.2/go.mod h1:GOCr2KBxXcblCuczg3YdLQlcin1/NfyDA348ckuCH6w= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/open-telemetry/otel-arrow v0.22.0 h1:G1jgtqAM2ho5pyKQ4tyrDzk9Y0VcJ+GZQRJgN26vRlI= -github.com/open-telemetry/otel-arrow v0.22.0/go.mod h1:F50XFaiNfkfB0MYftZIUKFULm6pxfGqjbgQzevi+65M= +github.com/open-telemetry/otel-arrow v0.23.0 h1:Vx4q3GR36l9O+S7ZOOITNL1TPp+X1WxkXbeXQA146k0= +github.com/open-telemetry/otel-arrow v0.23.0/go.mod h1:F50XFaiNfkfB0MYftZIUKFULm6pxfGqjbgQzevi+65M= github.com/open-telemetry/otel-arrow/collector v0.23.0 h1:ztmq1ipJBhm4xWjHDbmKOtgP3Nl/ZDoLX+3ThhzFs6k= github.com/open-telemetry/otel-arrow/collector v0.23.0/go.mod h1:SLgLEhhcfR9MjG1taK8RPuwiuIoAPW7IpCjFBobwIUM= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= diff --git a/receiver/splunkhecreceiver/splunkhec_to_metricdata.go b/receiver/splunkhecreceiver/splunkhec_to_metricdata.go index e8586d088d6e..d148576c0fac 100644 --- a/receiver/splunkhecreceiver/splunkhec_to_metricdata.go +++ b/receiver/splunkhecreceiver/splunkhec_to_metricdata.go @@ -128,7 +128,7 @@ func buildAttributes(dimensions map[string]any) pcommon.Map { attributes.EnsureCapacity(len(dimensions)) for key, val := range dimensions { - if strings.HasPrefix(key, "metric_name") { + if strings.HasPrefix(key, "metric_name") || key == "_value" { continue } if key == "" || val == nil { diff --git a/receiver/splunkhecreceiver/splunkhec_to_metricdata_test.go b/receiver/splunkhecreceiver/splunkhec_to_metricdata_test.go index 5e5196667fe6..a218bbd83a88 100644 --- a/receiver/splunkhecreceiver/splunkhec_to_metricdata_test.go +++ b/receiver/splunkhecreceiver/splunkhec_to_metricdata_test.go @@ -54,6 +54,19 @@ func Test_splunkV2ToMetricsData(t *testing.T) { wantMetricsData: buildDefaultMetricsData(nanos), hecConfig: defaultTestingHecConfig, }, + { + name: "int_gauge_v7", + splunkDataPoint: func() *splunk.Event { + pt := buildDefaultSplunkDataPt() + delete(pt.Fields, "metric_name:single") + pt.Fields["metric_name"] = "single" + pt.Fields["_value"] = int64Ptr(13) + return pt + + }(), + wantMetricsData: buildDefaultMetricsData(nanos), + hecConfig: defaultTestingHecConfig, + }, { name: "multiple", splunkDataPoint: func() *splunk.Event { diff --git a/receiver/sqlqueryreceiver/go.mod b/receiver/sqlqueryreceiver/go.mod index 35a9366efa7a..91ad0a8aa6db 100644 --- a/receiver/sqlqueryreceiver/go.mod +++ b/receiver/sqlqueryreceiver/go.mod @@ -64,7 +64,6 @@ require ( github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/expr-lang/expr v1.16.7 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -74,6 +73,7 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect @@ -127,7 +127,7 @@ require ( github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sijms/go-ora/v2 v2.8.18 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/snowflakedb/gosnowflake v1.9.0 // indirect + github.com/snowflakedb/gosnowflake v1.10.0 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/valyala/fastjson v1.6.4 // indirect diff --git a/receiver/sqlqueryreceiver/go.sum b/receiver/sqlqueryreceiver/go.sum index 8fe536409708..413e08808d6a 100644 --- a/receiver/sqlqueryreceiver/go.sum +++ b/receiver/sqlqueryreceiver/go.sum @@ -119,8 +119,6 @@ github.com/expr-lang/expr v1.16.7 h1:gCIiHt5ODA0xIaDbD0DPKyZpM9Drph3b3lolYAYq2Kw github.com/expr-lang/expr v1.16.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9mImVMaBPw9L/0TBHU8= -github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -140,8 +138,8 @@ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= -github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= @@ -276,8 +274,8 @@ github.com/sijms/go-ora/v2 v2.8.18 h1:hrmgl0Iognh7XiYDRvFKmSgJW7J05yq7TMljravaXE github.com/sijms/go-ora/v2 v2.8.18/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/snowflakedb/gosnowflake v1.9.0 h1:s2ZdwFxFfpqwa5CqlhnzRESnLmwU3fED6zyNOJHFBQA= -github.com/snowflakedb/gosnowflake v1.9.0/go.mod h1:4ZgHxVf2OKwecx07WjfyAMr0gn8Qj4yvwAo68Og8wsU= +github.com/snowflakedb/gosnowflake v1.10.0 h1:5hBGKa/jJEhciokzgJcz5xmLNlJ8oUm8vhfu5tg82tM= +github.com/snowflakedb/gosnowflake v1.10.0/go.mod h1:WC4eGUOH3K9w3pLsdwZsdawIwtWgse4kZPPqNG0Ky/k= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/receiver/sqlserverreceiver/go.mod b/receiver/sqlserverreceiver/go.mod index 9feb67931a29..94090d3463a7 100644 --- a/receiver/sqlserverreceiver/go.mod +++ b/receiver/sqlserverreceiver/go.mod @@ -51,7 +51,6 @@ require ( github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect - github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -60,6 +59,7 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect @@ -96,7 +96,7 @@ require ( github.com/prometheus/procfs v0.13.0 // indirect github.com/sijms/go-ora/v2 v2.8.18 // indirect github.com/sirupsen/logrus v1.9.0 // indirect - github.com/snowflakedb/gosnowflake v1.9.0 // indirect + github.com/snowflakedb/gosnowflake v1.10.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/collector v0.100.1-0.20240509190532-c555005fcc80 // indirect diff --git a/receiver/sqlserverreceiver/go.sum b/receiver/sqlserverreceiver/go.sum index 6182ddbee427..43faa0a63386 100644 --- a/receiver/sqlserverreceiver/go.sum +++ b/receiver/sqlserverreceiver/go.sum @@ -75,8 +75,6 @@ github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= -github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9mImVMaBPw9L/0TBHU8= -github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -94,9 +92,8 @@ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= -github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= -github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= @@ -192,8 +189,8 @@ github.com/sijms/go-ora/v2 v2.8.18 h1:hrmgl0Iognh7XiYDRvFKmSgJW7J05yq7TMljravaXE github.com/sijms/go-ora/v2 v2.8.18/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/snowflakedb/gosnowflake v1.9.0 h1:s2ZdwFxFfpqwa5CqlhnzRESnLmwU3fED6zyNOJHFBQA= -github.com/snowflakedb/gosnowflake v1.9.0/go.mod h1:4ZgHxVf2OKwecx07WjfyAMr0gn8Qj4yvwAo68Og8wsU= +github.com/snowflakedb/gosnowflake v1.10.0 h1:5hBGKa/jJEhciokzgJcz5xmLNlJ8oUm8vhfu5tg82tM= +github.com/snowflakedb/gosnowflake v1.10.0/go.mod h1:WC4eGUOH3K9w3pLsdwZsdawIwtWgse4kZPPqNG0Ky/k= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/receiver/syslogreceiver/README.md b/receiver/syslogreceiver/README.md index 0faf67541024..2f9a664db95f 100644 --- a/receiver/syslogreceiver/README.md +++ b/receiver/syslogreceiver/README.md @@ -6,7 +6,7 @@ | Stability | [alpha]: logs | | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fsyslog%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fsyslog) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fsyslog%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fsyslog) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@djaglowski](https://www.github.com/djaglowski) \| Seeking more code owners! | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@djaglowski](https://www.github.com/djaglowski), [@andrzej-stencel](https://www.github.com/andrzej-stencel) \| Seeking more code owners! | [alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib diff --git a/receiver/syslogreceiver/metadata.yaml b/receiver/syslogreceiver/metadata.yaml index c1d8f70a569e..6319233aef9e 100644 --- a/receiver/syslogreceiver/metadata.yaml +++ b/receiver/syslogreceiver/metadata.yaml @@ -7,7 +7,7 @@ status: alpha: [logs] distributions: [contrib] codeowners: - active: [djaglowski] + active: [djaglowski, andrzej-stencel] seeking_new: true tests: