Skip to content

Commit

Permalink
fix lines too long
Browse files Browse the repository at this point in the history
  • Loading branch information
zganger committed Jul 2, 2024
1 parent 21ed26c commit 16cbc56
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 18 deletions.
6 changes: 3 additions & 3 deletions backend/database/core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""This file defines the database connection, plus some terminal commands for
setting up and tearing down the database.
Do not importer anything directly from `backend.database._core`. Instead, importer
Do not import anything directly from `backend.database._core`. Instead, import
from `backend.database`.
"""
import os
Expand Down Expand Up @@ -61,9 +61,9 @@ def execute_query(filename: str) -> Optional[pd.DataFrame]:
"""Run SQL from a file. It will return a Pandas DataFrame if it selected
anything; otherwise it will return None.
I do not recommend you use this function too often. In general we should be
I do not recommend you use this function too often. In general, we should be
using the SQLAlchemy ORM. That said, it's a nice convenience, and there are
times where this function is genuinely something you want to run.
times when this function is genuinely something you want to run.
"""
with open(os.path.join(QUERIES_DIR, secure_filename(filename))) as f:
query = f.read()
Expand Down
33 changes: 21 additions & 12 deletions backend/importer/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,34 @@ def __init__(self, content: bytes):
self.content = content

def load(self):
raise Exception("unimplemented; extend this class to write a load migration.")
raise Exception("unimplemented; extend class to write a load migration")


class Importer(Thread):
def __init__(self, queue_name: str, region: str = "us-east-1"):
super().__init__(daemon=True) # TODO: ideally we would have a function on the app to catch shutdown events and close gracefully, but until then daemon it is.
# TODO: ideally we would have a function on the app to catch shutdown
# events and close gracefully, but until then daemon it is.
super().__init__(daemon=True)
self.queue_name = queue_name
self.session = boto3.Session(region_name=region)
self.sqs_client = self.session.client("sqs")
self.s3_client = self.session.client("s3")
self.sqs_queue_url = self.sqs_client.get_queue_url(QueueName=self.queue_name)
self.sqs_queue_url = self.sqs_client.get_queue_url(
QueueName=self.queue_name)
self.logger = getLogger(self.__class__.__name__)

self.loader_map: dict[str, Loader] = {
# this should be a mapping of s3 key prefix : loader class for that file type
}
self.loader_map: dict[str, Loader] = {}

def run(self):
while True:
resp = self.sqs_client.receive_message(
QueueUrl=self.sqs_queue_url,
MaxNumberOfMessages=1, # retrieve one message at a time - we could up this and parallelize but no point until way more files.
VisibilityTimeout=600, # 10 minutes to process message before it becomes visible for another consumer.
# retrieve one message at a time - we could up this
# and parallelize but no point until way more files.
MaxNumberOfMessages=1,
# 10 minutes to process message before it becomes
# visible for another consumer.
VisibilityTimeout=600,
)
# if no messages found, wait 5m for next poll
if len(resp["Messages"]) == 0:
Expand All @@ -43,16 +48,20 @@ def run(self):

for message in resp["Messages"]:
sqs_body = ujson.loads(message["Body"])
for record in sqs_body["Records"]: # this comes through as a list, but we expect one object
# this comes through as a list, but we expect one object
for record in sqs_body["Records"]:
bucket_name = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
with BytesIO() as fileobj:
self.s3_client.download_fileobj(bucket_name, key, fileobj)
self.s3_client.download_fileobj(
bucket_name, key, fileobj)
fileobj.seek(0)
content = fileobj.read()
_ = content # for linting.

# TODO: we now have an in-memory copy of the s3 file content. This is where we would run the importer.
# we want a standardized importer class; we would call something like below:
# TODO: we now have an in-memory copy of s3 file content
# This is where we would run the importer.
# we want a standardized importer class; use like:
# loader = self.get_loader_for_content_type(key)
# loader(content).load()

Expand Down
2 changes: 1 addition & 1 deletion frontend/babel.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module.exports = {
plugins: ["inline-react-svg"],
env: {
test: {
plugins: ["transform-dynamic-importer"]
plugins: ["transform-dynamic-import"]
}
}
}
2 changes: 1 addition & 1 deletion frontend/helpers/api/mocks/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { handlers, rejectUnhandledApiRequests } from "./handlers"

export const worker = setupWorker(...handlers)

/** Starts worker, convenience for conditional importer */
/** Starts worker, convenience for conditional import */
export const startWorker = () => {
worker.start({
onUnhandledRequest: rejectUnhandledApiRequests
Expand Down
2 changes: 1 addition & 1 deletion frontend/tests/helpers/api.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import { server } from "../test-utils"
/** Turn off API mocking for the test so we use the real API */
beforeAll(() => server.close())

/** Re-importer the main test file to pick up the tests */
/** Re-import the main test file to pick up the tests */
require("./api.test")

0 comments on commit 16cbc56

Please sign in to comment.