Skip to content

Commit

Permalink
Merge pull request #134 from instana/fargate_send_sibling_containers
Browse files Browse the repository at this point in the history
Send ECS all containers running under the same task
  • Loading branch information
Andrew Slotin authored Jul 3, 2020
2 parents 50ca247 + 54635de commit fc199dc
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 45 deletions.
33 changes: 23 additions & 10 deletions fargate_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ func newECSTaskPluginPayload(snapshot fargateSnapshot) acceptor.PluginPayload {
})
}

func newECSContainerPluginPayload(container aws.ECSContainerMetadata) acceptor.PluginPayload {
return acceptor.NewECSContainerPluginPayload(ecsEntityID(container), acceptor.ECSContainerData{
Runtime: "go",
Instrumented: true,
func newECSContainerPluginPayload(container aws.ECSContainerMetadata, instrumented bool) acceptor.PluginPayload {
data := acceptor.ECSContainerData{
Instrumented: instrumented,
DockerID: container.DockerID,
DockerName: container.DockerName,
ContainerName: container.Name,
Expand All @@ -75,7 +74,14 @@ func newECSContainerPluginPayload(container aws.ECSContainerMetadata) acceptor.P
CreatedAt: container.CreatedAt,
StartedAt: container.StartedAt,
Type: container.Type,
})
}

// we only know the runtime for sure for the instrumented container
if instrumented {
data.Runtime = "go"
}

return acceptor.NewECSContainerPluginPayload(ecsEntityID(container), data)
}

func newDockerContainerPluginPayload(container aws.ECSContainerMetadata) acceptor.PluginPayload {
Expand Down Expand Up @@ -188,13 +194,11 @@ func newFargateAgent(
func (a *fargateAgent) Ready() bool { return a.snapshot.EntityID != "" }

func (a *fargateAgent) SendMetrics(data acceptor.Metrics) error {
buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(struct {
payload := struct {
Plugins []acceptor.PluginPayload `json:"plugins"`
}{
Plugins: []acceptor.PluginPayload{
newECSTaskPluginPayload(a.snapshot),
newECSContainerPluginPayload(a.snapshot.Container),
newDockerContainerPluginPayload(a.snapshot.Container),
newProcessPluginPayload(a.snapshot),
acceptor.NewGoProcessPluginPayload(acceptor.GoProcessData{
Expand All @@ -203,8 +207,17 @@ func (a *fargateAgent) SendMetrics(data acceptor.Metrics) error {
Metrics: data,
}),
},
},
); err != nil {
}

for _, container := range a.snapshot.Task.Containers {
payload.Plugins = append(
payload.Plugins,
newECSContainerPluginPayload(container, ecsEntityID(container) == a.snapshot.EntityID),
)
}

buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(payload); err != nil {
return fmt.Errorf("failed to marshal metrics payload: %s", err)
}

Expand Down
110 changes: 75 additions & 35 deletions fargate_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,71 +62,111 @@ func TestFargateAgent_SendMetrics(t *testing.T) {
}
require.NoError(t, json.Unmarshal(collected.Body, &payload))

pluginData := make(map[string]serverlessAgentPluginPayload)
pluginData := make(map[string][]serverlessAgentPluginPayload)
for _, plugin := range payload.Plugins {
pluginData[plugin.Name] = serverlessAgentPluginPayload{plugin.EntityID, plugin.Data}
pluginData[plugin.Name] = append(pluginData[plugin.Name], serverlessAgentPluginPayload{plugin.EntityID, plugin.Data})
}

// AWS ECS Task plugin payload
if assert.Contains(t, pluginData, "com.instana.plugin.aws.ecs.task") {
d := pluginData["com.instana.plugin.aws.ecs.task"]
t.Run("AWS ECS Task plugin payload", func(t *testing.T) {
require.Len(t, pluginData["com.instana.plugin.aws.ecs.task"], 1)
d := pluginData["com.instana.plugin.aws.ecs.task"][0]

assert.NotEmpty(t, d.EntityID)
assert.Equal(t, d.Data["taskArn"], d.EntityID)

assert.Equal(t, "default", d.Data["clusterArn"])
assert.Equal(t, "nginx", d.Data["taskDefinition"])
assert.Equal(t, "5", d.Data["taskDefinitionVersion"])
}
})

// AWS ECS Container plugin payload
if assert.Contains(t, pluginData, "com.instana.plugin.aws.ecs.container") {
d := pluginData["com.instana.plugin.aws.ecs.container"]
t.Run("AWS ECS Container plugin payload", func(t *testing.T) {
require.Len(t, pluginData["com.instana.plugin.aws.ecs.container"], 2)

assert.NotEmpty(t, d.EntityID)
containers := make(map[string]serverlessAgentPluginPayload)
for _, container := range pluginData["com.instana.plugin.aws.ecs.container"] {
containers[container.EntityID] = container
}

require.IsType(t, d.Data["taskArn"], "")
require.IsType(t, d.Data["containerName"], "")
assert.Equal(t, d.Data["taskArn"].(string)+"::"+d.Data["containerName"].(string), d.EntityID)
t.Run("instrumented", func(t *testing.T) {
d := containers["arn:aws:ecs:us-east-2:012345678910:task/9781c248-0edd-4cdb-9a93-f63cb662a5d3::nginx-curl"]
require.NotEmpty(t, d)

if assert.NotEmpty(t, d.Data["taskArn"]) {
assert.Equal(t, pluginData["com.instana.plugin.aws.ecs.task"].EntityID, d.Data["taskArn"])
}
assert.NotEmpty(t, d.EntityID)

assert.Equal(t, true, d.Data["instrumented"])
assert.Equal(t, "go", d.Data["runtime"])
assert.Equal(t, "43481a6ce4842eec8fe72fc28500c6b52edcc0917f105b83379f88cac1ff3946", d.Data["dockerId"])
}
require.IsType(t, d.Data["taskArn"], "")
require.IsType(t, d.Data["containerName"], "")
assert.Equal(t, d.Data["taskArn"].(string)+"::"+d.Data["containerName"].(string), d.EntityID)

// Docker plugin payload
if assert.Contains(t, pluginData, "com.instana.plugin.docker") {
d := pluginData["com.instana.plugin.docker"]
if assert.NotEmpty(t, d.Data["taskArn"]) {
require.NotEmpty(t, pluginData["com.instana.plugin.aws.ecs.task"])
assert.Equal(t, pluginData["com.instana.plugin.aws.ecs.task"][0].EntityID, d.Data["taskArn"])
}

assert.NotEmpty(t, d.EntityID)
assert.Equal(t, pluginData["com.instana.plugin.aws.ecs.container"].EntityID, d.EntityID)
assert.Equal(t, true, d.Data["instrumented"])
assert.Equal(t, "go", d.Data["runtime"])
assert.Equal(t, "43481a6ce4842eec8fe72fc28500c6b52edcc0917f105b83379f88cac1ff3946", d.Data["dockerId"])
})

t.Run("non-instrumented", func(t *testing.T) {
d := containers["arn:aws:ecs:us-east-2:012345678910:task/9781c248-0edd-4cdb-9a93-f63cb662a5d3::~internal~ecs~pause"]
require.NotEmpty(t, d)

assert.NotEmpty(t, d.EntityID)

require.IsType(t, d.Data["taskArn"], "")
require.IsType(t, d.Data["containerName"], "")
assert.Equal(t, d.Data["taskArn"].(string)+"::"+d.Data["containerName"].(string), d.EntityID)

if assert.NotEmpty(t, d.Data["taskArn"]) {
require.NotEmpty(t, pluginData["com.instana.plugin.aws.ecs.task"])
assert.Equal(t, pluginData["com.instana.plugin.aws.ecs.task"][0].EntityID, d.Data["taskArn"])
}

assert.Nil(t, d.Data["instrumented"])
assert.Empty(t, d.Data["runtime"])
assert.Equal(t, "731a0d6a3b4210e2448339bc7015aaa79bfe4fa256384f4102db86ef94cbbc4c", d.Data["dockerId"])
})
})

t.Run("Docker plugin payload", func(t *testing.T) {
require.Len(t, pluginData["com.instana.plugin.docker"], 1)
d := pluginData["com.instana.plugin.docker"][0]

assert.NotEmpty(t, d.EntityID)
assert.Equal(t, "43481a6ce4842eec8fe72fc28500c6b52edcc0917f105b83379f88cac1ff3946", d.Data["Id"])
}

// Process plugin payload
if assert.Contains(t, pluginData, "com.instana.plugin.process") {
d := pluginData["com.instana.plugin.process"]
var found bool
for _, container := range pluginData["com.instana.plugin.aws.ecs.container"] {
if container.Data["containerName"] == "nginx-curl" {
found = true
assert.Equal(t, container.EntityID, d.EntityID)
break
}
}
assert.True(t, found)
})

t.Run("Process plugin payload", func(t *testing.T) {
require.Len(t, pluginData["com.instana.plugin.process"], 1)
d := pluginData["com.instana.plugin.process"][0]

assert.NotEmpty(t, d.EntityID)

assert.Equal(t, "docker", d.Data["containerType"])
assert.Equal(t, "43481a6ce4842eec8fe72fc28500c6b52edcc0917f105b83379f88cac1ff3946", d.Data["container"])
}
})

// Go process plugin payload
if assert.Contains(t, pluginData, "com.instana.plugin.golang") {
d := pluginData["com.instana.plugin.golang"]
t.Run("Go process plugin payload", func(t *testing.T) {
require.Len(t, pluginData["com.instana.plugin.golang"], 1)
d := pluginData["com.instana.plugin.golang"][0]

assert.NotEmpty(t, d.EntityID)
assert.Equal(t, pluginData["com.instana.plugin.process"].EntityID, d.EntityID)

require.NotEmpty(t, pluginData["com.instana.plugin.process"])
assert.Equal(t, pluginData["com.instana.plugin.process"][0].EntityID, d.EntityID)

assert.NotEmpty(t, d.Data["metrics"])
}
})
}

func TestFargateAgent_SendSpans(t *testing.T) {
Expand Down

0 comments on commit fc199dc

Please sign in to comment.