From 2089f3ab581dda102f603ffc8b01ab58ca79d08e Mon Sep 17 00:00:00 2001 From: holger krekel Date: Thu, 28 Mar 2024 12:13:53 +0100 Subject: [PATCH] persist pending notifications to directory so that they survive a restart --- chatmaild/src/chatmaild/filedict.py | 2 +- chatmaild/src/chatmaild/metadata.py | 41 ++++++++++++------- .../src/chatmaild/tests/test_metadata.py | 8 ++-- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/chatmaild/src/chatmaild/filedict.py b/chatmaild/src/chatmaild/filedict.py index 3d9caf85..02b38735 100644 --- a/chatmaild/src/chatmaild/filedict.py +++ b/chatmaild/src/chatmaild/filedict.py @@ -19,7 +19,7 @@ def modify(self): with filelock.FileLock(self.lock_path): data = self.read() yield data - write_path = self.path.with_suffix(".tmp") + write_path = self.path.with_name(self.path.name + ".tmp") with write_path.open("w") as f: json.dump(data, f) os.rename(write_path, self.path) diff --git a/chatmaild/src/chatmaild/metadata.py b/chatmaild/src/chatmaild/metadata.py index fb5628f2..b6b35654 100644 --- a/chatmaild/src/chatmaild/metadata.py +++ b/chatmaild/src/chatmaild/metadata.py @@ -1,8 +1,7 @@ import pwd from pathlib import Path -from queue import Queue -from threading import Thread +from threading import Thread, Event from socketserver import ( UnixStreamServer, StreamRequestHandler, @@ -32,7 +31,10 @@ class Notifier: def __init__(self, vmail_dir): self.vmail_dir = vmail_dir - self.to_notify_queue = Queue() + self.notification_dir = vmail_dir / "pending_notifications" + if not self.notification_dir.exists(): + self.notification_dir.mkdir() + self.message_arrived_event = Event() def get_metadata_dict(self, addr): return FileDict(self.vmail_dir / addr / "metadata.json") @@ -57,25 +59,32 @@ def get_tokens(self, addr): return self.get_metadata_dict(addr).read().get(METADATA_TOKEN_KEY, []) def new_message_for_addr(self, addr): - self.to_notify_queue.put(addr) + self.notification_dir.joinpath(addr).touch() + self.message_arrived_event.set() def thread_run_loop(self): requests_session = requests.Session() while 1: + self.message_arrived_event.wait() + self.message_arrived_event.clear() self.thread_run_one(requests_session) def thread_run_one(self, requests_session): - addr = self.to_notify_queue.get() - for token in self.get_tokens(addr): - response = requests_session.post( - "https://notifications.delta.chat/notify", - data=token, - timeout=60, - ) - if response.status_code == 410: - # 410 Gone status code - # means the token is no longer valid. - self.remove_token(addr, token) + for addr_path in self.notification_dir.iterdir(): + addr = addr_path.name + if "@" not in addr: + continue + for token in self.get_tokens(addr): + response = requests_session.post( + "https://notifications.delta.chat/notify", + data=token, + timeout=60, + ) + if response.status_code == 410: + # 410 Gone status code + # means the token is no longer valid. + self.remove_token(addr, token) + addr_path.unlink() def handle_dovecot_protocol(rfile, wfile, notifier): @@ -179,6 +188,8 @@ def handle(self): t = Thread(target=notifier.thread_run_loop) t.setDaemon(True) t.start() + # let notifier thread run once for any pending notifications from last run + notifier.message_arrived_event.set() with ThreadedUnixStreamServer(socket, Handler) as server: os.chown(socket, uid=passwd_entry.pw_uid, gid=passwd_entry.pw_gid) diff --git a/chatmaild/src/chatmaild/tests/test_metadata.py b/chatmaild/src/chatmaild/tests/test_metadata.py index a0f358a4..5954ef1d 100644 --- a/chatmaild/src/chatmaild/tests/test_metadata.py +++ b/chatmaild/src/chatmaild/tests/test_metadata.py @@ -84,10 +84,10 @@ def test_handle_dovecot_request_happy_path(notifier, testaddr): assert handle_dovecot_request(f"B{tx2}\t{testaddr}", transactions, notifier) is None msg = f"S{tx2}\tpriv/guid00/messagenew" assert handle_dovecot_request(msg, transactions, notifier) is None - assert notifier.to_notify_queue.get() == testaddr - assert notifier.to_notify_queue.qsize() == 0 + assert notifier.message_arrived_event.is_set() assert handle_dovecot_request(f"C{tx2}", transactions, notifier) == "O\n" assert not transactions + assert notifier.notification_dir.joinpath(testaddr).exists() def test_handle_dovecot_protocol_set_devicetoken(notifier): @@ -159,8 +159,8 @@ def test_handle_dovecot_protocol_messagenew(notifier): wfile = io.BytesIO() handle_dovecot_protocol(rfile, wfile, notifier) assert wfile.getvalue() == b"O\n" - assert notifier.to_notify_queue.get() == "user@example.org" - assert notifier.to_notify_queue.qsize() == 0 + assert notifier.message_arrived_event.is_set() + assert notifier.notification_dir.joinpath("user@example.org").exists() def test_notifier_thread_run(notifier, testaddr):