Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting S3 #4

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ NB_UID=1000
NB_GID=100
NB_HOST_PORT=8888

TZ=UTC
TZ=UTC

AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
3 changes: 3 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ services:
- MDS_DB
- MDS_USER
- MDS_PASSWORD
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_DEFAULT_REGION
ports:
- "${NB_HOST_PORT}:8888"
user: root
Expand Down
26 changes: 26 additions & 0 deletions ingest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ Results in the following backfill requests:

Backfills ignore the `no_paging` flag, always requesting all pages.

#### `--aws_region REGION`

The AWS region to use for S3. Only applies when given with the `--s3_bucket` argument. Overrides the `AWS_DEFAULT_REGION` environment variable. If `AWS_DEFAULT_REGION` is not set, this parameter must be given.

### `--bbox BBOX`

The bounding-box with which to restrict the results of this request.
Expand Down Expand Up @@ -114,6 +118,12 @@ One or more `provider_name` to query. The default is to query all configured pro

Git branch name, commit hash, or tag at which to reference MDS. The default is `master`.

#### `--s3_bucket BUCKET`

Name of an AWS S3 bucket to store data files in. When used with `--output`, a common key prefix is given to data files.

AWS credentials must be configured; set the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables or use standard AWS credential configuration in e.g. `~/.aws/credentials`.

### `--source SOURCE [SOURCE ...]`

One or more paths to (directories containing) MDS Provider JSON file(s). These will be read instead of requesting from Provider APIs.
Expand Down Expand Up @@ -149,3 +159,19 @@ Flag indicating Trips should be requested.
### `--vehicle_id VEHICLE_ID`

The `vehicle_id` to obtain results for. Only applies to `--trips`.

## Working with Amazon S3

To read and write data files from S3 buckets, configure your environment with the following variables:

```
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
```

Then pass the bucket name as one of the options:

```bash
--s3_bucket my-bucket
```
68 changes: 60 additions & 8 deletions ingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from acquire import acquire_data, provider_names
import argparse
import boto3
from configparser import ConfigParser
from datetime import datetime, timedelta, timezone
import dateutil.parser
Expand All @@ -35,6 +36,13 @@ def setup_cli():
"""
parser = argparse.ArgumentParser()

parser.add_argument(
"--aws_region",
type=str,
help="The AWS region to use for S3. Only applies when given with the --s3_bucket argument.\
Overrides the AWS_DEFAULT_REGION environment variable.\
If AWS_DEFAULT_REGION is not set, this parameter must be given."
)
parser.add_argument(
"--bbox",
type=str,
Expand Down Expand Up @@ -114,6 +122,13 @@ def setup_cli():
type=str,
help="Local file path to a providers.csv registry file to use instead of downloading from GitHub."
)
parser.add_argument(
"--s3_bucket",
type=str,
help="AWS S3 bucket to reference. When used with --output, a common key prefix is given to data files.\
AWS credentials must be configured; use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars or standard AWS\
credential configuration in e.g. ~/.aws/credentials."
)
parser.add_argument(
"--source",
type=str,
Expand Down Expand Up @@ -198,21 +213,45 @@ def _to_datetime(data):
return end_time - timedelta(seconds=args.duration), end_time


def output_data(output, payloads, record_type, start_time, end_time):
def output_data(output, payloads, record_type, **kwargs):
"""
Write data to json files in the a directory.
Write data to one or more json files.
"""
start_time = kwargs["start_time"]
end_time = kwargs["end_time"]

def _file_name(output, provider):
"""
Generate a filename from the given parameters.
"""
fname = f"{provider}_{record_type}_{start_time.isoformat()}_{end_time.isoformat()}.json"
return os.path.join(output, fname)

if "s3_bucket" in kwargs:
bucket = kwargs["s3_bucket"]
s3 = kwargs.get("s3_service", s3_service(region_name=kwargs.get("s3_region")))
print("In S3 bucket: {}".format(bucket))
else:
print("In {}".format(output))

for provider, payload in payloads.items():
fname = _file_name(output, provider.provider_name)
with open(fname, "w") as f:
json.dump(payload, f)
if "s3_bucket" in kwargs:
body = json.dumps(payload).encode()
s3.Object(bucket_name=s3bucket, key=fname).put(Body=body)
else:
with open(fname, "w") as f:
json.dump(payload, f)


def s3_service(region_name=None):
"""
Helper to return an s3 service using :region_name: or environment variables.
"""
if region_name:
return boto3.resource("s3", region_name=region_name)

return boto3.resource("s3")


def backfill(record_type, client, start_time, end_time, duration, **kwargs):
Expand Down Expand Up @@ -274,11 +313,24 @@ def ingest(record_type, **kwargs):
del datasource[k]

# output to files if needed
bucket = kwargs.get("s3_bucket")
if bucket:
try:
s3 = s3_service(region_name=kwargs.get("aws_region"))
kwargs["s3_bucket"] = bucket
kwargs["s3_service"] = s3
except Exception as ex:
print("You must configure AWS credentials to use S3.")
print("Set the environment variables: AWS_AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.")
print("Or configure credentials in the default location (usually ~/.aws/credentials.")
print("AWS_DEFAULT_REGION environment variable or --aws_region parameter are also required.")
print(ex)
exit(1)

output = kwargs.get("output")
if output and os.path.exists(output):
print(f"Writing data files to {output}")
start_time, end_time = kwargs.get("start_time"), kwargs.get("end_time")
output_data(output, datasource, record_type, start_time, end_time)
if output or bucket:
print(f"Writing data files.")
output_data(output, datasource, record_type, **kwargs)

loading = not kwargs.get("no_load")
if loading and len(datasource) > 0:
Expand Down
1 change: 1 addition & 0 deletions ingest/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
boto3
-e git+https://github.com/CityofSantaMonica/mds-provider@master#egg=mds_provider
python-dateutil