Skip to content

Commit

Permalink
Revert "fix(aws_ec2_metadata transform): try to fix random lockup (#1…
Browse files Browse the repository at this point in the history
…1139)" (#11165)

This reverts commit 4791318.
  • Loading branch information
jszwedko authored Feb 3, 2022
1 parent 86bfec8 commit 284ad4c
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 66 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ vrl-stdlib = { path = "lib/vrl/stdlib" }
lookup = { path = "lib/lookup" }

# External libs
arc-swap = { version = "1.5", default-features = false }
async-compression = { version = "0.3.7", default-features = false, features = ["tokio", "gzip", "zstd"] }
avro-rs = { version = "0.13.0", default-features = false, optional = true }
base64 = { version = "0.13.0", default-features = false, optional = true }
Expand Down
3 changes: 1 addition & 2 deletions scripts/integration/docker-compose.aws.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3"

services:
mock-ec2-metadata:
image: timberiodev/mock-ec2-metadata:latest
Expand Down Expand Up @@ -74,3 +72,4 @@ volumes:

networks:
backend: {}

123 changes: 61 additions & 62 deletions src/transforms/aws_ec2_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
error, fmt,
future::ready,
pin::Pin,
sync::Arc,
sync::{Arc, RwLock},
};

use arc_swap::ArcSwap;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use http::{uri::PathAndQuery, Request, StatusCode, Uri};
Expand Down Expand Up @@ -91,23 +89,23 @@ pub struct Ec2Metadata {

#[derive(Clone, Debug)]
pub struct Ec2MetadataTransform {
state: Arc<ArcSwap<HashMap<Cow<'static, str>, Bytes>>>,
state: Arc<RwLock<HashMap<String, Bytes>>>,
}

#[derive(Debug, Clone)]
struct Keys {
ami_id_key: Cow<'static, str>,
availability_zone_key: Cow<'static, str>,
instance_id_key: Cow<'static, str>,
instance_type_key: Cow<'static, str>,
local_hostname_key: Cow<'static, str>,
local_ipv4_key: Cow<'static, str>,
public_hostname_key: Cow<'static, str>,
public_ipv4_key: Cow<'static, str>,
region_key: Cow<'static, str>,
subnet_id_key: Cow<'static, str>,
vpc_id_key: Cow<'static, str>,
role_name_key: Cow<'static, str>,
ami_id_key: String,
availability_zone_key: String,
instance_id_key: String,
instance_type_key: String,
local_hostname_key: String,
local_ipv4_key: String,
public_hostname_key: String,
public_ipv4_key: String,
region_key: String,
subnet_id_key: String,
vpc_id_key: String,
role_name_key: String,
}

inventory::submit! {
Expand All @@ -120,7 +118,7 @@ impl_generate_config_from_default!(Ec2Metadata);
#[typetag::serde(name = "aws_ec2_metadata")]
impl TransformConfig for Ec2Metadata {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
let state = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
let state: Arc<RwLock<HashMap<String, Bytes>>> = Arc::new(RwLock::new(HashMap::new()));

// Check if the namespace is set to `""` which should mean that we do
// not want a prefixed namespace.
Expand Down Expand Up @@ -202,19 +200,21 @@ impl TaskTransform<Event> for Ec2MetadataTransform {

impl Ec2MetadataTransform {
fn transform_one(&mut self, mut event: Event) -> Event {
let state = self.state.load();
match event {
Event::Log(ref mut log) => {
state.iter().for_each(|(k, v)| {
log.insert(k, v.clone());
});
}
Event::Metric(ref mut metric) => {
state.iter().for_each(|(k, v)| {
metric.insert_tag(k.to_string(), String::from_utf8_lossy(v).to_string());
});
if let Ok(state) = self.state.read() {
match event {
Event::Log(ref mut log) => {
state.iter().for_each(|(k, v)| {
log.insert(k.clone(), v.clone());
});
}
Event::Metric(ref mut metric) => {
state.iter().for_each(|(k, v)| {
metric.insert_tag(k.clone(), String::from_utf8_lossy(v).to_string());
});
}
}
}

event
}
}
Expand All @@ -224,7 +224,7 @@ struct MetadataClient {
host: Uri,
token: Option<(Bytes, Instant)>,
keys: Keys,
state: Arc<ArcSwap<HashMap<Cow<'static, str>, Bytes>>>,
state: Arc<RwLock<HashMap<String, Bytes>>>,
refresh_interval: Duration,
fields: HashSet<String>,
}
Expand All @@ -248,7 +248,7 @@ impl MetadataClient {
client: HttpClient<Body>,
host: Uri,
keys: Keys,
state: Arc<ArcSwap<HashMap<Cow<'static, str>, Bytes>>>,
state: Arc<RwLock<HashMap<String, Bytes>>>,
refresh_interval: Duration,
fields: Vec<String>,
) -> Self {
Expand Down Expand Up @@ -329,59 +329,59 @@ impl MetadataClient {
}

pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
let mut new_state = HashMap::new();
let mut state: HashMap<String, Bytes> = HashMap::new();

// Fetch all resources, _then_ add them to the state map.
if let Some(document) = self.get_document().await? {
if self.fields.contains(AMI_ID_KEY) {
new_state.insert(self.keys.ami_id_key.clone(), document.image_id.into());
state.insert(self.keys.ami_id_key.clone(), document.image_id.into());
}

if self.fields.contains(INSTANCE_ID_KEY) {
new_state.insert(
state.insert(
self.keys.instance_id_key.clone(),
document.instance_id.into(),
);
}

if self.fields.contains(INSTANCE_TYPE_KEY) {
new_state.insert(
state.insert(
self.keys.instance_type_key.clone(),
document.instance_type.into(),
);
}

if self.fields.contains(REGION_KEY) {
new_state.insert(self.keys.region_key.clone(), document.region.into());
state.insert(self.keys.region_key.clone(), document.region.into());
}

if self.fields.contains(AVAILABILITY_ZONE_KEY) {
if let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await? {
new_state.insert(self.keys.availability_zone_key.clone(), availability_zone);
state.insert(self.keys.availability_zone_key.clone(), availability_zone);
}
}

if self.fields.contains(LOCAL_HOSTNAME_KEY) {
if let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await? {
new_state.insert(self.keys.local_hostname_key.clone(), local_hostname);
state.insert(self.keys.local_hostname_key.clone(), local_hostname);
}
}

if self.fields.contains(LOCAL_IPV4_KEY) {
if let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await? {
new_state.insert(self.keys.local_ipv4_key.clone(), local_ipv4);
state.insert(self.keys.local_ipv4_key.clone(), local_ipv4);
}
}

if self.fields.contains(PUBLIC_HOSTNAME_KEY) {
if let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await? {
new_state.insert(self.keys.public_hostname_key.clone(), public_hostname);
state.insert(self.keys.public_hostname_key.clone(), public_hostname);
}
}

if self.fields.contains(PUBLIC_IPV4_KEY) {
if let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await? {
new_state.insert(self.keys.public_ipv4_key.clone(), public_ipv4);
state.insert(self.keys.public_ipv4_key.clone(), public_ipv4);
}
}

Expand All @@ -400,7 +400,7 @@ impl MetadataClient {
})?;

if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
new_state.insert(self.keys.subnet_id_key.clone(), subnet_id);
state.insert(self.keys.subnet_id_key.clone(), subnet_id);
}
}

Expand All @@ -413,7 +413,7 @@ impl MetadataClient {
})?;

if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
new_state.insert(self.keys.vpc_id_key.clone(), vpc_id);
state.insert(self.keys.vpc_id_key.clone(), vpc_id);
}
}
}
Expand All @@ -424,17 +424,19 @@ impl MetadataClient {
let role_names = String::from_utf8_lossy(&role_names[..]);

for (i, role_name) in role_names.lines().enumerate() {
new_state.insert(
Cow::from(format!("{}[{}]", self.keys.role_name_key, i)),
state.insert(
format!("{}[{}]", self.keys.role_name_key, i),
role_name.to_string().into(),
);
}
}
}

let mut existing_state = self.state.load().as_ref().clone();
existing_state.extend(new_state);
self.state.store(Arc::new(existing_state));
{
if let Ok(mut old_state) = self.state.write() {
old_state.extend(state);
}
}
}

Ok(())
Expand Down Expand Up @@ -484,21 +486,18 @@ impl Keys {
pub fn new(namespace: &Option<String>) -> Self {
if let Some(namespace) = &namespace {
Keys {
ami_id_key: Cow::from(format!("{}.{}", namespace, AMI_ID_KEY)),
availability_zone_key: Cow::from(format!(
"{}.{}",
namespace, AVAILABILITY_ZONE_KEY
)),
instance_id_key: Cow::from(format!("{}.{}", namespace, INSTANCE_ID_KEY)),
instance_type_key: Cow::from(format!("{}.{}", namespace, INSTANCE_TYPE_KEY)),
local_hostname_key: Cow::from(format!("{}.{}", namespace, LOCAL_HOSTNAME_KEY)),
local_ipv4_key: Cow::from(format!("{}.{}", namespace, LOCAL_IPV4_KEY)),
public_hostname_key: Cow::from(format!("{}.{}", namespace, PUBLIC_HOSTNAME_KEY)),
public_ipv4_key: Cow::from(format!("{}.{}", namespace, PUBLIC_IPV4_KEY)),
region_key: Cow::from(format!("{}.{}", namespace, REGION_KEY)),
subnet_id_key: Cow::from(format!("{}.{}", namespace, SUBNET_ID_KEY)),
vpc_id_key: Cow::from(format!("{}.{}", namespace, VPC_ID_KEY)),
role_name_key: Cow::from(format!("{}.{}", namespace, VPC_ID_KEY)),
ami_id_key: format!("{}.{}", namespace, AMI_ID_KEY),
availability_zone_key: format!("{}.{}", namespace, AVAILABILITY_ZONE_KEY),
instance_id_key: format!("{}.{}", namespace, INSTANCE_ID_KEY),
instance_type_key: format!("{}.{}", namespace, INSTANCE_TYPE_KEY),
local_hostname_key: format!("{}.{}", namespace, LOCAL_HOSTNAME_KEY),
local_ipv4_key: format!("{}.{}", namespace, LOCAL_IPV4_KEY),
public_hostname_key: format!("{}.{}", namespace, PUBLIC_HOSTNAME_KEY),
public_ipv4_key: format!("{}.{}", namespace, PUBLIC_IPV4_KEY),
region_key: format!("{}.{}", namespace, REGION_KEY),
subnet_id_key: format!("{}.{}", namespace, SUBNET_ID_KEY),
vpc_id_key: format!("{}.{}", namespace, VPC_ID_KEY),
role_name_key: format!("{}.{}", namespace, VPC_ID_KEY),
}
} else {
Keys {
Expand Down

0 comments on commit 284ad4c

Please sign in to comment.