Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds an unauthenticated Bluesky archiver #160

Merged
merged 5 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example.orchestration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ steps:
# only 1 feeder allowed
feeder: gsheet_feeder # defaults to cli_feeder
archivers: # order matters, uncomment to activate
- bluesky_archiver
# - vk_archiver
# - telethon_archiver
# - telegram_archiver
Expand Down
3 changes: 2 additions & 1 deletion src/auto_archiver/archivers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
from .telegram_archiver import TelegramArchiver
from .vk_archiver import VkArchiver
from .youtubedl_archiver import YoutubeDLArchiver
from .instagram_api_archiver import InstagramAPIArchiver
from .instagram_api_archiver import InstagramAPIArchiver
from .bluesky_archiver import BlueskyArchiver
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a general comment here (not relevant to this PR) but I don't like how we import all archivers in the init.py file. Ideally, we should only load dependencies (archivers, feeders etc.) that are enabled in the orchestration.

Reason: I came across a bug whereby an archiver was broken (snscrape was broken), and even though I hadn't enabled it, it was still being loaded and throwing an error. Imagine if an archiver breaks for whatever reason (or more likely, an archiver's dependencies break), it would be much easier to just disable the archiver than telling the user to find an __init__.py file and 'change the python code'.

This is essentially how django works. You create a list of INSTALLED_APPS and then the archiver tries to install them one by one. We already have this list of 'installed apps' (it's called steps in our orchestration). So it's just a matter of on startup, trying to load each of them. Then when they can't be loaded throwing an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, please open an issue for those changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, under 'backlog'

2 changes: 2 additions & 0 deletions src/auto_archiver/archivers/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def download_from_url(self, url: str, to_filename: str = None, verbose=True) ->
"""
downloads a URL to provided filename, or inferred from URL, returns local filename
"""
# TODO: should we refactor to use requests.get(url, stream=True) and write to file in chunks? compare approaches
# TODO: should we guess the extension?
if not to_filename:
to_filename = url.split('/')[-1].split('?')[0]
if len(to_filename) > 64:
Expand Down
117 changes: 117 additions & 0 deletions src/auto_archiver/archivers/bluesky_archiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import os
import re, requests, mimetypes
from loguru import logger


from . import Archiver
from ..core import Metadata, Media, ArchivingContext


class BlueskyArchiver(Archiver):
"""
Uses an unauthenticated Bluesky API to archive posts including metadata, images and videos. Relies on `public.api.bsky.app/xrpc` and `bsky.social/xrpc`. Avoids ATProto to avoid auth.

Some inspiration from https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/bluesky.py
"""
name = "bluesky_archiver"
BSKY_POST = re.compile(r"/profile/([^/]+)/post/([a-zA-Z0-9]+)")

def __init__(self, config: dict) -> None:
super().__init__(config)

@staticmethod
def configs() -> dict:
return {}

def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if not re.search(self.BSKY_POST, url):
return False

logger.debug(f"Identified a Bluesky post: {url}, archiving...")
result = Metadata()

# fetch post info and update result
post = self._get_post_from_uri(url)
logger.debug(f"Extracted post info: {post['record']['text']}")
result.set_title(post["record"]["text"])
result.set_timestamp(post["record"]["createdAt"])
for k, v in self._get_post_data(post).items():
if v: result.set(k, v)

# download if embeds present (1 video XOR >=1 images)
for media in self._download_bsky_embeds(post):
result.add_media(media)
logger.debug(f"Downloaded {len(result.media)} media files")

return result.success("bluesky")

def _get_post_from_uri(self, post_uri: str) -> dict:
"""
Calls a public (no auth needed) Bluesky API to get a post from its uri, uses .getPostThread as it brings author info as well (unlike .getPost).
"""
post_match = re.search(self.BSKY_POST, post_uri)
username = post_match.group(1)
post_id = post_match.group(2)
at_uri = f'at://{username}/app.bsky.feed.post/{post_id}'
r = requests.get(f"https://public.api.bsky.app/xrpc/app.bsky.feed.getPostThread?uri={at_uri}&depth=0&parent_height=0")
r.raise_for_status()
thread = r.json()
assert thread["thread"]["$type"] == "app.bsky.feed.defs#threadViewPost"
return thread["thread"]["post"]

def _download_bsky_embeds(self, post: dict) -> list[Media]:
"""
Iterates over image(s) or video in a Bluesky post and downloads them
"""
media = []
embed = post.get("record", {}).get("embed", {})
if "images" in embed:
for image in embed["images"]:
image_media = self._download_bsky_file_as_media(image["image"]["ref"]["$link"], post["author"]["did"])
media.append(image_media)
if "video" in embed:
video_media = self._download_bsky_file_as_media(embed["video"]["ref"]["$link"], post["author"]["did"])
media.append(video_media)
return media

def _download_bsky_file_as_media(self, cid: str, did: str) -> Media:
"""
Uses the Bluesky API to download a file by its `cid` and `did`.
"""
# TODO: replace with self.download_from_url once that function has been cleaned-up
file_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={cid}&did={did}"
response = requests.get(file_url, stream=True)
response.raise_for_status()
ext = mimetypes.guess_extension(response.headers["Content-Type"])
filename = os.path.join(ArchivingContext.get_tmp_dir(), f"{cid}{ext}")
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
media = Media(filename=filename)
media.set("src", file_url)
return media

def _get_post_data(self, post: dict) -> dict:
"""
Extracts relevant information returned by the .getPostThread api call (excluding text/created_at): author, mentions, tags, links.
"""
author = post["author"]
if "labels" in author and not author["labels"]: del author["labels"]
if "associated" in author: del author["associated"]

mentions, tags, links = [], [], []
facets = post.get("record", {}).get("facets", [])
for f in facets:
for feature in f["features"]:
if feature["$type"] == "app.bsky.richtext.facet#mention":
mentions.append(feature["did"])
elif feature["$type"] == "app.bsky.richtext.facet#tag":
tags.append(feature["tag"])
elif feature["$type"] == "app.bsky.richtext.facet#link":
links.append(feature["uri"])
res = {"author": author}
if mentions: res["mentions"] = mentions
if tags: res["tags"] = tags
if links: res["links"] = links
return res