Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
refactor: rip most of metrics out of db
Browse files Browse the repository at this point in the history
Issue #1238
  • Loading branch information
pjenvey committed May 21, 2018
1 parent 8b6dd7d commit 9d53f20
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 44 deletions.
57 changes: 36 additions & 21 deletions autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,14 @@ where
let AwaitHello { data, tx, rx, .. } = hello.take();
let connected_at = ms_since_epoch();
let response = Box::new(data.srv.ddb.hello(
&connected_at,
uaid.as_ref(),
&data.srv.opts.router_table_name,
&data.srv.opts.router_url,
&data.srv.opts.message_table_names,
&data.srv.opts.current_message_month,
&data.srv.metrics,
));
&connected_at,
uaid.as_ref(),
&data.srv.opts.router_table_name,
&data.srv.opts.router_url,
&data.srv.opts.message_table_names,
&data.srv.opts.current_message_month,
&data.srv.metrics,
));
transition!(AwaitProcessHello {
response,
data,
Expand Down Expand Up @@ -386,6 +386,8 @@ where
rx,
..
} = process_hello.take();
data.srv.metrics.incr("ua.command.hello").ok();

let UnAuthClientData {
srv,
ws,
Expand Down Expand Up @@ -603,6 +605,7 @@ where
#[state_machine_future(transitions(SendThenWait))]
AwaitUnregister {
channel_id: Uuid,
code: u32,
response: MyFuture<bool>,
data: AuthClientData<T>,
},
Expand Down Expand Up @@ -696,9 +699,11 @@ where
));
transition!(AwaitMigrateUser { response, data });
} else if all_acked && webpush.flags.reset_uaid {
let response = Box::new(data.srv
.ddb
.drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid));
let response = Box::new(
data.srv
.ddb
.drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid)
);
transition!(AwaitDropUser { response, data });
}
transition!(AwaitInput { data })
Expand Down Expand Up @@ -764,15 +769,11 @@ where
// register does
let uaid = webpush.uaid;
let message_month = webpush.message_month.clone();
let response = Box::new(data.srv.ddb.unregister(
&uaid,
&channel_id,
&message_month,
code.unwrap_or(200),
&data.srv.metrics,
));
let response =
Box::new(data.srv.ddb.unregister(&uaid, &channel_id, &message_month));
transition!(AwaitUnregister {
channel_id,
code: code.unwrap_or(200),
response,
data,
});
Expand Down Expand Up @@ -802,10 +803,17 @@ where
// Topic/legacy messages have no sortkey_timestamp
if n.sortkey_timestamp.is_none() {
fut = if let Some(call) = fut {
let my_fut = data.srv.ddb.delete_message(&message_month, &webpush.uaid, &n);
let my_fut =
data.srv
.ddb
.delete_message(&message_month, &webpush.uaid, &n);
Some(Box::new(call.and_then(move |_| my_fut)))
} else {
Some(Box::new(data.srv.ddb.delete_message(&message_month, &webpush.uaid, &n)))
Some(Box::new(data.srv.ddb.delete_message(
&message_month,
&webpush.uaid,
&n,
)))
}
}
continue;
Expand Down Expand Up @@ -1020,10 +1028,17 @@ where
}
};

let AwaitUnregister { code, data, .. } = await_unregister.take();
data.srv
.metrics
.incr_with_tags("ua.command.unregister")
.with_tag("code", &code.to_string())
.send()
.ok();
transition!(SendThenWait {
remaining_data: vec![msg],
poll_complete: false,
data: await_unregister.take().data,
data
})
}

Expand Down
7 changes: 7 additions & 0 deletions autopush_rs/src/db/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub fn fetch_messages(
limit: Some(limit as i64),
..Default::default()
};

let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(&input), cond)
.chain_err(|| "Error fetching messages")
Expand Down Expand Up @@ -110,6 +111,7 @@ pub fn fetch_timestamp_messages(
limit: Some(limit as i64),
..Default::default()
};

let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(&input), cond)
.chain_err(|| "Error fetching messages")
Expand Down Expand Up @@ -182,6 +184,7 @@ pub fn register_user(
":router_type".to_string() => val!(S => user.router_type),
":connected_at".to_string() => val!(N => user.connected_at),
};

retry_if(
move || {
debug!("Registering user: {:?}", item);
Expand Down Expand Up @@ -225,6 +228,7 @@ pub fn update_user_message_month(
table_name: router_table_name.to_string(),
..Default::default()
};

retry_if(
move || ddb.update_item(&update_item).and_then(|_| future::ok(())),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
Expand All @@ -245,6 +249,7 @@ pub fn all_channels(
},
..Default::default()
};

let cond = |err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.get_item(&input), cond)
.and_then(|output| {
Expand Down Expand Up @@ -283,6 +288,7 @@ pub fn save_channels(
table_name: message_table_name.to_string(),
..Default::default()
};

retry_if(
move || ddb.update_item(&update_item).and_then(|_| future::ok(())),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
Expand All @@ -309,6 +315,7 @@ pub fn unregister_channel_id(
table_name: message_table_name.to_string(),
..Default::default()
};

retry_if(
move || ddb.update_item(&update_item),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
Expand Down
44 changes: 21 additions & 23 deletions autopush_rs/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::env;
use std::rc::Rc;
use uuid::Uuid;

use cadence::{Counted, StatsdClient};
use cadence::StatsdClient;
use futures::{future, Future};
use futures_backoff::retry_if;
use rusoto_core::reactor::RequestDispatcher;
Expand Down Expand Up @@ -103,6 +103,7 @@ impl DynamoStorage {
table_name: table_name.to_string(),
..Default::default()
};

retry_if(
move || ddb.update_item(&update_input),
|err: &UpdateItemError| {
Expand Down Expand Up @@ -145,7 +146,8 @@ impl DynamoStorage {
let ddb = self.ddb.clone();
let router_url = router_url.to_string();
let connected_at = *connected_at;
let response = response.and_then(move |(mut hello_response, user_opt)| {

response.and_then(move |(mut hello_response, user_opt)| {
let hello_message_month = hello_response.message_month.clone();
let user = user_opt.unwrap_or_else(|| DynamoDbUser {
current_month: Some(hello_message_month),
Expand All @@ -166,9 +168,7 @@ impl DynamoStorage {
debug!("Error registering user: {:?}", e);
future::ok(err_response)
})
});
metrics.incr("ua.command.hello").ok();
response
})
}

pub fn register(
Expand Down Expand Up @@ -202,9 +202,13 @@ impl DynamoStorage {
Box::new(response)
}

pub fn drop_uaid(&self, table_name: &str, uaid: &Uuid) -> impl Future<Item = (), Error = Error> {
pub fn drop_uaid(
&self,
table_name: &str,
uaid: &Uuid,
) -> impl Future<Item = (), Error = Error> {
commands::drop_user(self.ddb.clone(), uaid, table_name)
.and_then(move |_| future::ok(()))
.and_then(|_| future::ok(()))
.chain_err(|| "Unable to drop user record")
}

Expand All @@ -213,19 +217,10 @@ impl DynamoStorage {
uaid: &Uuid,
channel_id: &Uuid,
message_month: &str,
code: u32,
metrics: &StatsdClient,
) -> impl Future<Item = bool, Error = Error> {
let response =
commands::unregister_channel_id(self.ddb.clone(), uaid, channel_id, message_month)
.and_then(|_| future::ok(true))
.or_else(|_| future::ok(false));
metrics
.incr_with_tags("ua.command.unregister")
.with_tag("code", &code.to_string())
.send()
.ok();
response
commands::unregister_channel_id(self.ddb.clone(), uaid, channel_id, message_month)
.and_then(|_| future::ok(true))
.or_else(|_| future::ok(false))
}

/// Migrate a user to a new month table
Expand All @@ -242,6 +237,7 @@ impl DynamoStorage {
let cur_month = current_message_month.to_string();
let cur_month2 = cur_month.clone();
let router_table_name = router_table_name.to_string();

commands::all_channels(self.ddb.clone(), &uaid, message_month)
.and_then(move |channels| -> MyFuture<_> {
if channels.is_empty() {
Expand All @@ -253,7 +249,7 @@ impl DynamoStorage {
.and_then(move |_| {
commands::update_user_message_month(ddb2, &uaid, &router_table_name, &cur_month2)
})
.and_then(move |_| future::ok(()))
.and_then(|_| future::ok(()))
.chain_err(|| "Unable to migrate user")
}

Expand All @@ -280,6 +276,7 @@ impl DynamoStorage {
request_items: hashmap! { message_month.to_string() => put_items },
..Default::default()
};

let cond = |err: &BatchWriteItemError| {
matches!(err, &BatchWriteItemError::ProvisionedThroughputExceeded(_))
};
Expand Down Expand Up @@ -313,6 +310,7 @@ impl DynamoStorage {
},
..Default::default()
};

let cond = |err: &DeleteItemError| {
matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_))
};
Expand Down Expand Up @@ -341,7 +339,8 @@ impl DynamoStorage {
let uaid = *uaid;
let table_name = table_name.to_string();
let ddb = self.ddb.clone();
let response = response.and_then(move |resp| -> MyFuture<_> {

response.and_then(move |resp| -> MyFuture<_> {
// Return now from this future if we have messages
if !resp.messages.is_empty() {
debug!("Topic message returns: {:?}", resp.messages);
Expand Down Expand Up @@ -381,7 +380,6 @@ impl DynamoStorage {
})
});
Box::new(next_query)
});
response
})
}
}

0 comments on commit 9d53f20

Please sign in to comment.