Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Timeout queue for posting updates #2

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions classes.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import Dict, List, Tuple, TypedDict, NamedTuple
from typing import Dict, List, Tuple, TypedDict, NamedTuple, override
from dataclasses import dataclass
from datetime import time, datetime

Property = NamedTuple("Property", [("name", str), ("value", str)])

class Page(TypedDict):
@dataclass
class Page():
"""Represents a Notion page
"""
page_id: str
Expand All @@ -12,6 +14,10 @@ class Page(TypedDict):
properties: Dict[str, Property]
url: str

@override
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is @override necessary here? Does Page have a __hash__ method in the first place?

def __hash__(self) -> int:
return hash(page_id)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return hash(page_id)
return hash(self.page_id)



class PropertyChange(TypedDict):
"""Represents a change in a property
Expand All @@ -28,9 +34,29 @@ class ChangeSet():
"""Pages that were added in the diff"""
removed: List[Page]
"""Pages that were removed in the diff"""
changed: List[Tuple[Page, List[PropertyChange]]]
changed: Dict[Page, Tuple[List[PropertyChange], datetime]]
"""Pages that had their properties changed in the diff. The key is the page id and the value is the list of
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this docstring accordingly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need anymore

properties that were changed"""

def is_empty(self) -> bool:
return len(self.added) == 0 and len(self.removed) == 0 and len(self.changed) == 0

def merge_set(self, other_changed_set):
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add type annotations

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How? It wont let me do typehint to ChangeSet

"""merges the changes from the other set into the current set
"""

# For the added and removed pages, we simply add them to the new set if there is any
self.added += other_changed_set.added
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is not necessary since you don't have timeout to deal with for page additions/removals and therefore don't pass them forward to the next changeset

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do, because what we intend to do is always forward those into the master changeset, since our current logic is
get_new_changes -> merge_to_master -> check_timeout_at_master -> send_message_from_master

self.removed += other_changed_set.removed

# For each change we check if its already in our dict
for page, (propertyList, timestamp) in other_changed_set.changed:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you iterating through the dict here? If so,

Suggested change
for page, (propertyList, timestamp) in other_changed_set.changed:
for page, (propertyList, timestamp) in other_changed_set.changed.items():

if page in self.changed:
new_timestamp = self.changed[page][1]
Copy link
Owner Author

@QuantumManiac QuantumManiac Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
new_timestamp = self.changed[page][1]
property_changes, new_timestamp = self.changed[page]

and then use property_changes on line 59

if timestamp > self.changed[page][1]:
new_timestamp = timestamp

self.changed[page] = (propertyList + self.changed[page][0], new_timestamp)
else:
self.changed[page] = (propertyList, timestamp)

53 changes: 41 additions & 12 deletions display.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@
from enum import Enum
from typing import List, Dict, Tuple
from webhook import WebhookEmbed
from datetime import datetime, time

class MessageColor(Enum):
ADD = 0x44CC00
REMOVE = 0xFF2A00
CHANGE = 0xFFD000

def generate_changeset_messages(changeset: ChangeSet) -> List[WebhookEmbed]:
# changed: List[Tuple[Page, List[PropertyChange]]]
def changeset_timeout(changeset: ChangeSet, timeout: time) -> List[WebhookEmbed]:
"""Processes the changeSet on timeout by parsing the set into embeds and purging expired elements in the set
"""
res = []
res += generate_added_messages(changeset.added)
res += generate_removed_messages(changeset.removed)
res += generate_changed_messages(changeset.changed)

# List[Tuple[Page, List[PropertyChange]]]
list_of_pages_to_purge = check_timeout_on_changed(changeset.changed, timeout)
res += generate_changed_messages(list_of_pages_to_purge)

# Cleanup the existing changeSet
changeset.added = []
changeset.removed = []
for page, _ in list_of_pages_to_purge:
del changeset.changed[page]

return res

def generate_properties_text(page: Page) -> str:
Expand Down Expand Up @@ -71,17 +83,34 @@ def generate_removed_messages(removals: List[Page]) -> List[WebhookEmbed]:

return res

def generate_changed_messages(pages: List[Tuple[Page, List[PropertyChange]]]) -> List[WebhookEmbed]:
def check_timeout_on_changed(changes: Dict[Page, Tuple[List[PropertyChange], datetime]], timeout: time) -> List[Tuple[Page, List[PropertyChange]]]:
res = []

for page, changes in pages:
message = WebhookEmbed(
title = f"Task Changed: {page['properties']['title'].value}",
url = page["url"],
description = generate_changes_text(changes),
color = MessageColor.CHANGE.value
)
current_time = datetime.now()
for page, changeTuple in changes.items():
# need to redo the time math here
time_delta = time(current_time - changeTuple[1])

res.append(message)
if time_delta > timeout:
res.append((page, changeTuple[0]))

return res


def generate_changed_messages(pages: List[Tuple[Page, List[PropertyChange]]]) -> List[WebhookEmbed]:
"""Generate the messages for changed by checking if any pages are expired
If expired then add to the output message and remove the page from the queue
"""

res = []

message = WebhookEmbed(
title = f"Task Changed: {page['properties']['title'].value}",
url = page["url"],
description = generate_changes_text(changes),
color = MessageColor.CHANGE.value
)

res.append(message)

return res
39 changes: 30 additions & 9 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from dotenv import load_dotenv
from notion_client import Client
from typing import Dict
from classes import Page
from classes import ChangeSet, Page
from webhook import generate_message, send_message
from display import generate_changeset_messages
from display import changeset_timeout
import os
from utils import parse_db_query_response, identify_changes
from time import sleep
from datetime import time
import logging
import sys

Expand All @@ -23,26 +24,46 @@ def get_state() -> Dict[str, Page]:
db_data = get_database_data()
return parse_db_query_response(db_data)

"""
We identify the new changes in every polling interval.
If there are new changes, we apply to the changeset.
Then, if the changeset is non-empty, we try to generate a message:
Specifically, the message is only non-empty if there is any expired fields
Then send message if message is non-empty
Set new state and poll again.
"""

def main():
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger()
polling_interval = int(os.environ.get("NOTION_POLLING_INTERVAL_SECONDS", 60))
timeout = time.fromisoformat('00:02:00')
prev_state = []
prev_state = get_state()
changeset = ChangeSet()

while True:
new_state = get_state()
changeset = identify_changes(prev_state, new_state)
new_changeset = identify_changes(prev_state, new_state)

if changeset.is_empty():
if new_changeset.is_empty():
logger.info("No changes detected")

else:
messages = generate_changeset_messages(changeset)
logger.info(f"Changes detected. {len(messages)} to be sent.")
changeset.merge_set(new_changeset)
logger.info("Changes detected.")

if not(changeset.is_empty()):
messages = changeset_timeout(changeset, timeout)
message = generate_message(messages)
send_message(message)
prev_state = new_state

sleep(int(os.environ.get("NOTION_POLLING_INTERVAL_SECONDS", 60)))
# Expect message == "" if there is nothing returned from generate_message
if message != "":
send_message(message)

prev_state = new_state
sleep(polling_interval)


if __name__ == "__main__":
main()
14 changes: 10 additions & 4 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any, Literal
from classes import Page, ChangeSet, PropertyChange, Property
from property_parsers import parse_properties
Expand Down Expand Up @@ -100,9 +101,11 @@ def parse_diff(data: Dict[str, Page], diff: List[Tuple]) -> ChangeSet:
res = ChangeSet(
added=[],
removed=[],
changed=[]
changed={}
)

# List[Tuple[Page, List[PropertyChange]]]
# Dict[Page, Tuple[List[PropertyChange], datetime]]
changes: Dict[str, List[PropertyChange]] = defaultdict(list) # A dictionary that maps the page_id to the list of changes

change_type: Literal["add", "remove", "change"]
Expand All @@ -114,17 +117,20 @@ def parse_diff(data: Dict[str, Page], diff: List[Tuple]) -> ChangeSet:
res.removed = parse_add_or_remove(change_value)
elif change_type == 'change':
change_key_tokens = change_key.split('.')


timestamp = datetime.now()
# I should be parsing the time here, but I will leave it till tonight to figure that out
if len(change_key_tokens) <= 2: # We are only interested in changes to properties, not the page itself
#timestamp = datetime.fromisoformat(change_key_tokens[1])
continue

page_id, _, _ = change_key_tokens
change = parse_change(change_value)

changes[page_id].append(change)

for page_id, change_list in changes.items():
res.changed.append((data[page_id], change_list))
for page_id, change_list in changes.items():
res.changed[data[page_id]] = (change_list, timestamp)

return res

Expand Down