Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS Fargate] Added support for DesiredStatus and KnownStatus for ECS Tasks #32342

Merged
merged 9 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Upgrade Mongodb library in Beats to v5 {pull}31185[31185]
- Azure Billing: upgrade Usage Details API to version 2019-10-01 {pull}31970[31970]
* Differentiate between actual idle CPU states and an uninterruptible disk sleep. https://github.com/elastic/elastic-agent-system-metrics/pull/32[system-metrics#32]
- AWS Fargate: Added support for DesiredStatus and KnownStatus {issue}32077[32077] {pull}32342[#32342]

*Packetbeat*

Expand Down
20 changes: 20 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4944,6 +4944,26 @@ type: keyword
Container identifier across tasks and clusters, which equals to container.name + '/' + container.id.


type: keyword

--

*`awsfargate.task_stats.task_desired_status`*::
+
--
The desired status for the task from Amazon ECS.


type: keyword

--

*`awsfargate.task_stats.task_known_status`*::
+
--
The known status for the task from Amazon ECS.


type: keyword

--
Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/awsfargate/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 3 additions & 10 deletions x-pack/metricbeat/module/awsfargate/task_stats/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@
}
}
},
"task_name": "query-metadata"
"task_desired_status": "RUNNING",
"task_known_status": "ACTIVATING",
"task_name": "query-metadata-1"
}
},
"cloud": {
Expand All @@ -148,15 +150,6 @@
},
"name": "query-metadata"
},
"event": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are those two fields no longer present in the data.json file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the same but could not find the exact cause. That files was generated by executing go test -data -tags=integration inside task_stats

"dataset": "awsfargate.task_stats",
"duration": 115000,
"module": "awsfargate"
},
"metricset": {
"name": "task_stats",
"period": 10000
},
"service": {
"type": "awsfargate"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
type: keyword
description: >
Container identifier across tasks and clusters, which equals to container.name + '/' + container.id.
- name: task_desired_status
type: keyword
description: >
The desired status for the task from Amazon ECS.
- name: task_known_status
type: keyword
description: >
The known status for the task from Amazon ECS.
- name: cpu
type: group
description: Runtime CPU metrics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"TaskARN": "arn:aws:ecs:us-west-2:123:task/default/febee207c04a",
"Family": "query-metadata-1",
"Revision": "7",
"DesiredStatus": "RUNNING",
"KnownStatus": "ACTIVATING",
"Containers": [{
"DockerId": "1234",
"Name": "query-metadata",
Expand Down
11 changes: 1 addition & 10 deletions x-pack/metricbeat/module/awsfargate/task_stats/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,7 @@ type container struct {
Labels map[string]string
}

// ContainerMetadata is an struct represents container metadata
type ContainerMetadata struct {
Cluster string
TaskARN string
Family string
Revision string
Container *container
}

func getContainerStats(c *container) *container {
func getContainerMetadata(c *container) *container {
return &container{
DockerId: c.DockerId,
Image: c.Image,
Expand Down
37 changes: 26 additions & 11 deletions x-pack/metricbeat/module/awsfargate/task_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package task_stats

import (
"fmt"
"strings"
"time"

Expand All @@ -16,12 +17,11 @@ import (

var (
clusterLabel = "com_amazonaws_ecs_cluster"
taskLabel = "com_amazonaws_ecs_task-definition-family"
)

func eventsMapping(r mb.ReporterV2, statsList []Stats) {
for _, stats := range statsList {
r.Event(createEvent(&stats))
for i := range statsList {
r.Event(createEvent(&statsList[i]))
}
}

Expand All @@ -39,15 +39,25 @@ func createEvent(stats *Stats) mb.Event {
regionName, clusterName := getRegionAndClusterName(stats.Container.Labels)
e.RootFields = createRootFields(stats, regionName)
if clusterName != "" {
e.MetricSetFields.Put("cluster_name", clusterName)
_, _ = e.MetricSetFields.Put("cluster_name", clusterName)
}

taskName := stats.Container.Labels[taskLabel]
taskName := stats.taskInfo.Family
if taskName != "" {
e.MetricSetFields.Put("task_name", taskName)
_, _ = e.MetricSetFields.Put("task_name", taskName)
}

e.MetricSetFields.Put("identifier", generateIdentifier(stats.Container.Name, stats.Container.DockerId))
taskDesiredStatus := stats.taskInfo.TaskDesiredStatus
if taskDesiredStatus != "" {
_, _ = e.MetricSetFields.Put("task_desired_status", taskDesiredStatus)
}

taskKnownStatus := stats.taskInfo.TaskKnownStatus
if taskKnownStatus != "" {
_, _ = e.MetricSetFields.Put("task_known_status", taskKnownStatus)
}

_, _ = e.MetricSetFields.Put("identifier", generateIdentifier(stats.Container.Name, stats.Container.DockerId))
return e
}

Expand All @@ -66,9 +76,8 @@ func getRegionAndClusterName(labels map[string]string) (regionName string, clust
if err == nil {
regionName = arnParsed.Region
}
return
}
return
return regionName, clusterName
}

func createRootFields(stats *Stats, regionName string) mapstr.M {
Expand All @@ -88,7 +97,10 @@ func createRootFields(stats *Stats, regionName string) mapstr.M {
cloud := mapstr.M{
"region": regionName,
}
rootFields.Put("cloud", cloud)
_, err := rootFields.Put("cloud", cloud)
if err != nil {
_ = fmt.Errorf("error putting root field 'cloud': %w", err)
}
}
return rootFields
}
Expand Down Expand Up @@ -163,7 +175,7 @@ func createMemoryFields(stats *Stats) mapstr.M {
func createNetworkFields(stats *Stats) mapstr.M {
networkFields := mapstr.M{}
for _, n := range stats.networkStats {
networkFields.Put(n.NameInterface,
_, err := networkFields.Put(n.NameInterface,
mapstr.M{"inbound": mapstr.M{
"bytes": n.Total.RxBytes,
"dropped": n.Total.RxDropped,
Expand All @@ -176,6 +188,9 @@ func createNetworkFields(stats *Stats) mapstr.M {
"errors": n.Total.TxErrors,
"packets": n.Total.TxPackets,
}})
if err != nil {
_ = fmt.Errorf("error while putting network fields: %w", err)
}
}
return networkFields
}
Expand Down
78 changes: 54 additions & 24 deletions x-pack/metricbeat/module/awsfargate/task_stats/task_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package task_stats

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"time"

"github.com/docker/docker/api/types"

Expand All @@ -21,9 +23,10 @@ import (
)

var (
metricsetName = "task_stats"
taskStatsPath = "task/stats"
taskPath = "task"
metricsetName = "task_stats"
taskStatsPath = "task/stats"
taskPath = "task"
queryTaskMetadataEndpointTimeout = 60 * time.Second
)

// init registers the MetricSet with the central registry as soon as the program
Expand All @@ -47,23 +50,36 @@ type MetricSet struct {
taskEndpoint string
}

// Stats is a struct represents information regarding a container
// TaskInfo is a struct that represents information about a specific ECS Fargate Task
type TaskInfo struct {
Cluster string
TaskARN string
Family string
Revision string
TaskDesiredStatus string
TaskKnownStatus string
}

// Stats is a struct that represents information regarding a container
type Stats struct {
Time common.Time
taskInfo TaskInfo
Container *container
cpuStats cpu.CPUStats
memoryStats memoryStats
networkStats []networkStats
blkioStats blkioStats
}

// TaskMetadata is an struct represents response body from ${ECS_CONTAINER_METADATA_URI_V4}/task
// TaskMetadata is a struct that represents response body from ${ECS_CONTAINER_METADATA_URI_V4}/task
type TaskMetadata struct {
Cluster string `json:"Cluster"`
TaskARN string `json:"TaskARN"`
Family string `json:"Family"`
Revision string `json:"Revision"`
Containers []*container `json:"Containers"`
Cluster string `json:"Cluster"`
TaskARN string `json:"TaskARN"`
Family string `json:"Family"`
Revision string `json:"Revision"`
DesiredStatus string `json:"DesiredStatus"`
KnownStatus string `json:"KnownStatus"`
Containers []*container `json:"Containers"`
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
Expand Down Expand Up @@ -104,8 +120,14 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
}

func (m *MetricSet) queryTaskMetadataEndpoints() ([]Stats, error) {
// Get response from ${ECS_CONTAINER_METADATA_URI_V4}/task/stats
taskStatsResp, err := http.Get(m.taskStatsEndpoint)
context, cancel := context.WithTimeout(context.Background(), queryTaskMetadataEndpointTimeout)
defer cancel()
// Collect information from ${ECS_CONTAINER_METADATA_URI_V4}/task/stats
girodav marked this conversation as resolved.
Show resolved Hide resolved
req, err := http.NewRequestWithContext(context, http.MethodGet, m.taskStatsEndpoint, nil)
if err != nil {
return nil, fmt.Errorf("http.NewRequestWithContext: %w", err)
}
taskStatsResp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http.Get failed: %w", err)
}
Expand All @@ -115,7 +137,11 @@ func (m *MetricSet) queryTaskMetadataEndpoints() ([]Stats, error) {
}

// Collect container metadata information from ${ECS_CONTAINER_METADATA_URI_V4}/task
taskResp, err := http.Get(m.taskEndpoint)
req, err = http.NewRequestWithContext(context, http.MethodGet, m.taskEndpoint, nil)
if err != nil {
return nil, fmt.Errorf("http.NewRequestWithContext: %w", err)
}
taskResp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http.Get failed: %w", err)
}
Expand Down Expand Up @@ -157,29 +183,33 @@ func getTask(taskResp *http.Response) (TaskMetadata, error) {
}

func getStatsList(taskStatsOutput map[string]types.StatsJSON, taskOutput TaskMetadata) []Stats {
containersInfo := map[string]ContainerMetadata{}
containersInfo := map[string]container{}

taskInfo := TaskInfo{
Family: taskOutput.Family,
TaskARN: taskOutput.TaskARN,
Cluster: taskOutput.Cluster,
Revision: taskOutput.Revision,
TaskDesiredStatus: taskOutput.DesiredStatus,
TaskKnownStatus: taskOutput.KnownStatus,
}

for _, c := range taskOutput.Containers {
// Skip ~internal~ecs~pause container
if c.Name == "~internal~ecs~pause" {
continue
}

containerMetadata := ContainerMetadata{
Container: c,
Family: taskOutput.Family,
TaskARN: taskOutput.TaskARN,
Cluster: taskOutput.Cluster,
Revision: taskOutput.Revision,
}
containersInfo[c.DockerId] = containerMetadata
containersInfo[c.DockerId] = *c
}

var formattedStats []Stats
for id, taskStats := range taskStatsOutput {
if cInfo, ok := containersInfo[id]; ok {
if c, ok := containersInfo[id]; ok {
statsPerContainer := Stats{
Time: common.Time(taskStats.Stats.Read),
Container: getContainerStats(cInfo.Container),
taskInfo: taskInfo,
Container: getContainerMetadata(&c),
cpuStats: getCPUStats(taskStats),
memoryStats: getMemoryStats(taskStats),
networkStats: getNetworkStats(taskStats),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var (
"TaskARN": "arn:aws:ecs:us-west-2:123:task/default/febee207c04a",
"Family": "query-metadata-1",
"Revision": "7",
"DesiredStatus": "RUNNING",
"KnownStatus": "ACTIVATING",
"Containers": [{
"DockerId": "query-metadata-1",
"Name": "query-metadata",
Expand Down Expand Up @@ -82,6 +84,8 @@ func TestGetTask(t *testing.T) {
assert.Equal(t, "arn:aws:ecs:us-west-2:123:task/default/febee207c04a", taskOutput.TaskARN)
assert.Equal(t, "query-metadata-1", taskOutput.Family)
assert.Equal(t, "7", taskOutput.Revision)
assert.Equal(t, "RUNNING", taskOutput.DesiredStatus)
assert.Equal(t, "ACTIVATING", taskOutput.KnownStatus)

assert.Equal(t, 1, len(taskOutput.Containers))
assert.Equal(t, "query-metadata-1", taskOutput.Containers[0].DockerId)
Expand Down