Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1227 from mozilla-services/feat/issue-1204
Browse files Browse the repository at this point in the history
feat: port delete command to Rust
  • Loading branch information
bbangert authored May 11, 2018
2 parents 68a6129 + 8154552 commit 9b37dfa
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 202 deletions.
40 changes: 0 additions & 40 deletions autopush/tests/test_webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
from autopush.utils import WebPushNotification, ns_time
from autopush.websocket import USER_RECORD_VERSION
from autopush.webpush_server import (
CheckStorage,
DeleteMessage,
MigrateUser,
StoreMessages,
WebPushMessage,
Expand Down Expand Up @@ -111,14 +109,6 @@ class Meta:
version = factory.LazyAttribute(generate_version)


class CheckStorageFactory(factory.Factory):
class Meta:
model = CheckStorage

uaid = factory.LazyFunction(lambda: uuid4().hex)
include_topic = True


def webpush_messages(obj):
return [attr.asdict(WebPushMessageFactory(uaid=obj.uaid))
for _ in range(obj.message_count)]
Expand Down Expand Up @@ -198,36 +188,6 @@ def test_start_stop(self):
ws.stop()


class TestDeleteMessageProcessor(BaseSetup):
def _makeFUT(self):
from autopush.webpush_server import DeleteMessageCommand
return DeleteMessageCommand(self.conf, self.db)

def test_delete_message(self):
from autopush.webpush_server import CheckStorageCommand
check_command = CheckStorageCommand(self.conf, self.db)
check = CheckStorageFactory(message_month=self.db.current_msg_month)
delete_command = self._makeFUT()

# Store some topic messages
self._store_messages(check.uaid, topic=True, num=7)

# Fetch them
results = check_command.process(check)
assert len(results.messages) == 7

# Delete 2 of them
for notif in results.messages[:2]:
delete_command.process(DeleteMessage(
message_month=self.db.current_msg_month,
message=notif,
))

# Fetch messages again
results = check_command.process(check)
assert len(results.messages) == 5


class TestMigrateUserProcessor(BaseSetup):
def _makeFUT(self):
from autopush.webpush_server import MigrateUserCommand
Expand Down
87 changes: 0 additions & 87 deletions autopush/webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,6 @@ class InputCommand(object):
pass


@attrs(slots=True)
class CheckStorage(InputCommand):
uaid = attrib(convert=uaid_from_str) # type: UUID
message_month = attrib() # type: str
include_topic = attrib() # type: bool
timestamp = attrib(default=None) # type: Optional[int]


@attrs(slots=True)
class DeleteMessage(InputCommand):
message_month = attrib() # type: str
message = attrib(convert=dict_to_webpush_message) # type: WebPushMessage


@attrs(slots=True)
class MigrateUser(InputCommand):
uaid = attrib(convert=uaid_from_str) # type: UUID
Expand All @@ -158,20 +144,6 @@ class OutputCommand(object):
pass


@attrs(slots=True)
class CheckStorageResponse(OutputCommand):
include_topic = attrib() # type: bool
messages = attrib(
default=attr.Factory(list)
) # type: List[WebPushMessage]
timestamp = attrib(default=None) # type: Optional[int]


@attrs(slots=True)
class DeleteMessageResponse(OutputCommand):
success = attrib(default=True) # type: bool


@attrs(slots=True)
class MigrateUserResponse(OutputCommand):
message_month = attrib() # type: str
Expand Down Expand Up @@ -249,17 +221,13 @@ def __init__(self, conf, db):
# type: (AutopushConfig, DatabaseManager) -> None
self.conf = conf
self.db = db
self.check_storage_processor = CheckStorageCommand(conf, db)
self.delete_message_processor = DeleteMessageCommand(conf, db)
self.migrate_user_proocessor = MigrateUserCommand(conf, db)
self.store_messages_process = StoreMessagesUserCommand(conf, db)
self.deserialize = dict(
delete_message=DeleteMessage,
migrate_user=MigrateUser,
store_messages=StoreMessages,
)
self.command_dict = dict(
delete_message=self.delete_message_processor,
migrate_user=self.migrate_user_proocessor,
store_messages=self.store_messages_process,
) # type: Dict[str, ProcessorCommand]
Expand Down Expand Up @@ -302,61 +270,6 @@ def process(self, command):
raise NotImplementedError()


class CheckStorageCommand(ProcessorCommand):
def process(self, command):
# type: (CheckStorage) -> CheckStorageResponse
timestamp, messages, include_topic = self._check_storage(command)
return CheckStorageResponse(
timestamp=timestamp,
messages=messages,
include_topic=include_topic,
)

def _check_storage(self, command):
timestamp = None
messages = []
message = Message(command.message_month,
boto_resource=self.db.resource)
if command.include_topic:
timestamp, messages = message.fetch_messages(
uaid=command.uaid, limit=11
)

# If we have topic messages, return them immediately
messages = [WebPushMessage.from_WebPushNotification(m)
for m in messages]
if messages:
return timestamp, messages, True

# No messages, update the command to include the last timestamp
# that was ack'd
command.timestamp = timestamp

if not messages or command.timestamp:
timestamp, messages = message.fetch_timestamp_messages(
uaid=command.uaid,
timestamp=command.timestamp,
)
messages = [WebPushMessage.from_WebPushNotification(m)
for m in messages]

# If we're out of messages, timestamp is set to None, so we return
# the last timestamp supplied
if not timestamp:
timestamp = command.timestamp
return timestamp, messages, False


class DeleteMessageCommand(ProcessorCommand):
def process(self, command):
# type: (DeleteMessage) -> DeleteMessageResponse
notif = command.message.to_WebPushNotification()
message = Message(command.message_month,
boto_resource=self.db.resource)
message.delete_message(notif)
return DeleteMessageResponse()


class MigrateUserCommand(ProcessorCommand):
def process(self, command):
# type: (MigrateUser) -> MigrateUserResponse
Expand Down
23 changes: 0 additions & 23 deletions autopush_rs/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ impl<F: FnOnce(&str) + Send> FnBox for F {
#[derive(Serialize)]
#[serde(tag = "command", rename_all = "snake_case")]
enum Call {
DeleteMessage {
message: protocol::Notification,
message_month: String,
},

MigrateUser {
uaid: String,
message_month: String,
Expand All @@ -137,11 +132,6 @@ struct PythonError {
pub error_msg: String,
}

#[derive(Deserialize)]
pub struct DeleteMessageResponse {
pub success: bool,
}

#[derive(Deserialize)]
pub struct MigrateUserResponse {
pub message_month: String,
Expand All @@ -153,19 +143,6 @@ pub struct StoreMessagesResponse {
}

impl Server {
pub fn delete_message(
&self,
message_month: String,
notif: protocol::Notification,
) -> MyFuture<DeleteMessageResponse> {
let (call, fut) = PythonCall::new(&Call::DeleteMessage {
message: notif,
message_month: message_month,
});
self.send_to_python(call);
return fut;
}

pub fn migrate_user(
&self,
uaid: String,
Expand Down
8 changes: 4 additions & 4 deletions autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ where

#[state_machine_future(transitions(DetermineAck))]
AwaitDelete {
response: MyFuture<call::DeleteMessageResponse>,
response: MyFuture<()>,
data: AuthClientData<T>,
},

Expand Down Expand Up @@ -782,7 +782,7 @@ where
}
Either::A(ClientMessage::Ack { updates }) => {
data.srv.metrics.incr("ua.command.ack").ok();
let mut fut: Option<MyFuture<call::DeleteMessageResponse>> = None;
let mut fut: Option<MyFuture<()>> = None;
for notif in updates.iter() {
if let Some(pos) = webpush.unacked_direct_notifs.iter().position(|v| {
v.channel_id == notif.channel_id && v.version == notif.version
Expand All @@ -800,10 +800,10 @@ where
// Topic/legacy messages have no sortkey_timestamp
if n.sortkey_timestamp.is_none() {
fut = if let Some(call) = fut {
let my_fut = data.srv.delete_message(message_month, n);
let my_fut = data.srv.ddb.delete_message(&message_month, n);
Some(Box::new(call.and_then(move |_| my_fut)))
} else {
Some(data.srv.delete_message(message_month, n))
Some(data.srv.ddb.delete_message(&message_month, n))
}
}
continue;
Expand Down
32 changes: 31 additions & 1 deletion autopush_rs/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use std::collections::HashMap;
use uuid::Uuid;

use util::ms_since_epoch;

// Used for the server to flag a webpush client to deliver a Notification or Check storage
pub enum ServerNotification {
CheckStorage,
Expand Down Expand Up @@ -103,11 +105,12 @@ pub enum ServerMessage {

#[derive(Serialize, Default, Deserialize, Clone, Debug)]
pub struct Notification {
#[serde(skip_serializing)]
pub uaid: Option<String>,
#[serde(rename = "channelID")]
pub channel_id: Uuid,
pub version: String,
#[serde(default = "default_ttl")]
#[serde(default = "default_ttl", skip_serializing)]
pub ttl: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub topic: Option<String>,
Expand All @@ -120,6 +123,33 @@ pub struct Notification {
pub headers: Option<HashMap<String, String>>,
}

impl Notification {
/// Return an appropriate sort_key to use for the chidmessageid
///
/// For new messages:
/// 02:{sortkey_timestamp}:{chid}
///
/// For topic messages:
/// 01:{chid}:{topic}
///
/// Old format for non-topic messages that is no longer returned:
/// {chid}:{message_id}
pub fn sort_key(&self) -> String {
let chid = self.channel_id.hyphenated();
if let Some(ref topic) = self.topic {
format!("01:{}:{}", chid, topic)
} else if let Some(sortkey_timestamp) = self.sortkey_timestamp {
format!("02:{}:{}",
if sortkey_timestamp == 0 { ms_since_epoch() } else { sortkey_timestamp },
chid
)
} else {
// Legacy messages which we should never get anymore
format!("{}:{}", chid, self.version)
}
}
}

fn default_ttl() -> u32 {
0
}
Loading

0 comments on commit 9b37dfa

Please sign in to comment.