Skip to content

Commit

Permalink
Add support for sending entity to Fluentbit via cloudwatch agent serv…
Browse files Browse the repository at this point in the history
…er (#262)

* added agent server port as default port on CW agent (#234)

* Attach service name source as resource attribute to operator (#235)

* Add service name source for instrumentation and workload

* Change service name source attribute naming

* Fix unit tests for pod mutators

* Simplify service name source determination

* Fix incorrect refactor of source constants

* Use replicaset name as part of service name fallback (#249)

* Use replicaset name as part of service name fallback

* Add chooseServiceName unit test

* Change import statement ordering

* Fix service name source being added multiple times for multiple injections (#254)

* added fix to not override the service name source for multiple auto-inject

---------

Co-authored-by: POOJA REDDY NATHALA <poojardy@amazon.com>
Co-authored-by: zhihonl <61301537+zhihonl@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 28, 2024
1 parent 71fab73 commit ca98f33
Show file tree
Hide file tree
Showing 6 changed files with 598 additions and 87 deletions.
45 changes: 20 additions & 25 deletions internal/manifests/collector/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,25 @@ import (
)

const (
StatsD = "statsd"
CollectD = "collectd"
XrayProxy = "aws-proxy"
XrayTraces = "aws-traces"
OtlpGrpc = "otlp-grpc"
OtlpHttp = "otlp-http"
AppSignalsGrpc = "appsig-grpc"
AppSignalsHttp = "appsig-http"
AppSignalsProxy = "appsig-xray"
AppSignalsGrpcSA = ":4315"
AppSignalsHttpSA = ":4316"
AppSignalsProxySA = ":2000"
EMF = "emf"
EMFTcp = "emf-tcp"
EMFUdp = "emf-udp"
CWA = "cwa-"
JmxHttp = "jmx-http"
StatsD = "statsd"
CollectD = "collectd"
XrayProxy = "aws-proxy"
XrayTraces = "aws-traces"
OtlpGrpc = "otlp-grpc"
OtlpHttp = "otlp-http"
AppSignalsGrpc = "appsig-grpc"
AppSignalsHttp = "appsig-http"
AppSignalsProxy = "appsig-xray"
AppSignalsGrpcSA = ":4315"
AppSignalsHttpSA = ":4316"
AppSignalsProxySA = ":2000"
AppSignalsServerSA = ":4311"
EMF = "emf"
EMFTcp = "emf-tcp"
EMFUdp = "emf-udp"
CWA = "cwa-"
JmxHttp = "jmx-http"
Server = "server"
)

var receiverDefaultPortsMap = map[string]int32{
Expand Down Expand Up @@ -286,14 +288,6 @@ func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConf
return tracesPorts
}

func getAppSignalsServicePortsMap() map[int32][]corev1.ServicePort {
servicePortMap := make(map[int32][]corev1.ServicePort)
for k, v := range AppSignalsPortToServicePortMap {
servicePortMap[k] = v
}
return servicePortMap
}

func getApplicationSignalsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
if !isAppSignalEnabled(config) {
return
Expand All @@ -302,6 +296,7 @@ func getApplicationSignalsReceiversServicePorts(logger logr.Logger, config *adap
getReceiverServicePort(logger, AppSignalsGrpcSA, AppSignalsGrpc, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, AppSignalsHttpSA, AppSignalsHttp, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, AppSignalsProxySA, AppSignalsProxy, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, AppSignalsServerSA, Server, corev1.ProtocolTCP, servicePortsMap)
}

func portFromEndpoint(endpoint string) (int32, error) {
Expand Down
26 changes: 21 additions & 5 deletions internal/manifests/collector/ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func TestDefaultCollectDGetContainerPorts(t *testing.T) {
func TestApplicationSignals(t *testing.T) {
cfg := getStringFromFile("./test-resources/application_signals.json")
containerPorts := getContainerPorts(logger, cfg, "", []corev1.ServicePort{})
assert.Equal(t, 3, len(containerPorts))
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, int32(4311), containerPorts[CWA+Server].ContainerPort)
assert.Equal(t, CWA+Server, containerPorts[CWA+Server].Name)
assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+Server].Protocol)
assert.Equal(t, int32(4315), containerPorts[CWA+AppSignalsGrpc].ContainerPort)
assert.Equal(t, CWA+AppSignalsGrpc, containerPorts[CWA+AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[CWA+AppSignalsHttp].ContainerPort)
Expand Down Expand Up @@ -152,6 +155,11 @@ func TestMultipleReceiversGetContainerPorts(t *testing.T) {
cfg := getStringFromFile("./test-resources/multipleReceiversAgentConfig.json")
strings.Replace(cfg, "2900", "2000", 1)
wantPorts := []corev1.ContainerPort{
{
Name: CWA + Server,
Protocol: corev1.ProtocolTCP,
ContainerPort: int32(4311),
},
{
Name: CWA + AppSignalsGrpc,
Protocol: corev1.ProtocolTCP,
Expand Down Expand Up @@ -310,7 +318,9 @@ func TestValidJSONAndValidOtelConfig(t *testing.T) {
cfg := getStringFromFile("./test-resources/application_signals.json")
otelCfg := getStringFromFile("./test-resources/otelConfigs/otlpOtelConfig.yaml")
containerPorts := getContainerPorts(logger, cfg, otelCfg, []corev1.ServicePort{})
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, 5, len(containerPorts))
assert.Equal(t, int32(4311), containerPorts[CWA+Server].ContainerPort)
assert.Equal(t, CWA+Server, containerPorts[CWA+Server].Name)
assert.Equal(t, int32(4315), containerPorts[CWA+AppSignalsGrpc].ContainerPort)
assert.Equal(t, CWA+AppSignalsGrpc, containerPorts[CWA+AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[CWA+AppSignalsHttp].ContainerPort)
Expand All @@ -325,7 +335,9 @@ func TestValidJSONAndInvalidOtelConfig(t *testing.T) {
cfg := getStringFromFile("./test-resources/application_signals.json")
otelCfg := getStringFromFile("./test-resources/otelConfigs/invalidOtlpConfig.yaml")
containerPorts := getContainerPorts(logger, cfg, otelCfg, []corev1.ServicePort{})
assert.Equal(t, 3, len(containerPorts))
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, int32(4311), containerPorts[CWA+Server].ContainerPort)
assert.Equal(t, CWA+Server, containerPorts[CWA+Server].Name)
assert.Equal(t, int32(4315), containerPorts[CWA+AppSignalsGrpc].ContainerPort)
assert.Equal(t, CWA+AppSignalsGrpc, containerPorts[CWA+AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[CWA+AppSignalsHttp].ContainerPort)
Expand All @@ -338,7 +350,9 @@ func TestValidJSONAndConflictingOtelConfig(t *testing.T) {
cfg := getStringFromFile("./test-resources/application_signals.json")
otelCfg := getStringFromFile("./test-resources/otelConfigs/conflictingPortOtlpConfig.yaml")
containerPorts := getContainerPorts(logger, cfg, otelCfg, []corev1.ServicePort{})
assert.Equal(t, 3, len(containerPorts))
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, int32(4311), containerPorts[CWA+Server].ContainerPort)
assert.Equal(t, CWA+Server, containerPorts[CWA+Server].Name)
assert.Equal(t, int32(4315), containerPorts[CWA+AppSignalsGrpc].ContainerPort)
assert.Equal(t, CWA+AppSignalsGrpc, containerPorts[CWA+AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[CWA+AppSignalsHttp].ContainerPort)
Expand All @@ -351,7 +365,9 @@ func TestValidJSONAndConflictingOtelConfigForXray(t *testing.T) {
cfg := getStringFromFile("./test-resources/application_signals_with_traces.json")
otelCfg := getStringFromFile("./test-resources/otelConfigs/xrayOtelConfig.yaml")
containerPorts := getContainerPorts(logger, cfg, otelCfg, []corev1.ServicePort{})
assert.Equal(t, 6, len(containerPorts))
assert.Equal(t, 7, len(containerPorts))
assert.Equal(t, int32(4311), containerPorts[CWA+Server].ContainerPort)
assert.Equal(t, CWA+Server, containerPorts[CWA+Server].Name)
assert.Equal(t, int32(4315), containerPorts[CWA+AppSignalsGrpc].ContainerPort)
assert.Equal(t, CWA+AppSignalsGrpc, containerPorts[CWA+AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[CWA+AppSignalsHttp].ContainerPort)
Expand Down
5 changes: 5 additions & 0 deletions pkg/constants/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,9 @@ const (
EnvPodName = "OTEL_RESOURCE_ATTRIBUTES_POD_NAME"
EnvPodUID = "OTEL_RESOURCE_ATTRIBUTES_POD_UID"
EnvNodeName = "OTEL_RESOURCE_ATTRIBUTES_NODE_NAME"

AWSEntityPrefix = "com.amazonaws.cloudwatch.entity.internal."
ServiceNameSource = AWSEntityPrefix + "service.name.source"
SourceInstrumentation = "Instrumentation"
SourceK8sWorkload = "K8sWorkload"
)
Loading

0 comments on commit ca98f33

Please sign in to comment.