-
Notifications
You must be signed in to change notification settings - Fork 2
/
aws_schedule_tasks.py
205 lines (175 loc) · 6.24 KB
/
aws_schedule_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""\
Manage scheduled tasks to populate NYC-DB tables.
Usage:
aws_schedule_tasks.py create <cluster-name>
[--task-prefix=<prefix>]
[--task-definition=<name>]
[--ecs-events-role=<role>]
[--use-test-data]
aws_schedule_tasks.py delete [--task-prefix=<prefix>]
Options:
-h --help Show this screen.
--task-prefix=<prefix> The prefix to prepend all scheduled task
names/rules with. This will be followed
by the dataset name [default: nycdb-load-].
--task-definition=<name> The name of the pre-existing ECS task
definition to load NYC-DB datasets. It
should have a single container that has
all environment variables pre-filled
except for DATASET and USE_TEST_DATA
[default: nycdb-k8s-loader].
--ecs-events-role=<role> The pre-existing IAM role that allows
CloudWatch events to start ECS tasks
[default: ecsEventsRole].
--use-test-data Make the dataset loading tasks
load small test datasets instead of the
real thing.
<cluster-name> The name of the pre-existing ECS
cluster that the scheduled tasks will
be in.
Environment variables:
AWS_ACCESS_KEY_ID Your AWS access key ID.
AWS_SECRET_ACCESS_KEY Your AWS secret access key.
AWS_DEFAULT_REGION The AWS region to use.
"""
import json
import boto3
import docopt
import dotenv
from scheduling import DATASET_NAMES, get_schedule_for_dataset
dotenv.load_dotenv()
def create_input_str(container_name: str, dataset: str, use_test_data: bool):
'''
Create the JSON-encoded input string that specifies
the environment variables to pass to the given dataset-loading
container.
'''
return json.dumps({
'containerOverrides': [{
'name': container_name,
'environment': [{
'name': 'DATASET',
'value': dataset
}, {
'name': 'USE_TEST_DATA',
'value': 'yup' if use_test_data else ''
}]
}]
})
def delete_tasks(prefix: str):
'''
Delete the scheduled tasks with the given prefix, if they exist.
'''
client = boto3.client('events')
response = client.list_rules(NamePrefix=prefix)
for rule in response['Rules']:
name = rule['Name']
targets = client.list_targets_by_rule(Rule=name)['Targets']
print(f"Deleting rule '{name}'.")
client.remove_targets(Rule=name, Ids=[
target['Id'] for target in targets
])
client.delete_rule(Name=name)
def create_task(
prefix: str,
dataset: str,
use_test_data: bool,
role_arn: str,
cluster_arn: str,
task_arn: str,
container_name: str,
subnet: str
):
'''
Create a scheduled task for the given dataset.
'''
name = f"{prefix}{dataset}"
schedule_expression = get_schedule_for_dataset(dataset).aws
print(f"Creating rule '{name}' with schedule {schedule_expression}.")
client = boto3.client('events')
client.put_rule(
Name=name,
ScheduleExpression=schedule_expression,
State="ENABLED",
Description=f"Load the dataset '{dataset}' into NYC-DB.",
RoleArn=role_arn
)
target_id = dataset
input_str = create_input_str(
container_name=container_name,
dataset=dataset,
use_test_data=use_test_data
)
print(f"Creating target '{target_id}'.")
client.put_targets(
Rule=name,
Targets=[{
"Id": target_id,
"Arn": cluster_arn,
"RoleArn": role_arn,
"Input": input_str,
"EcsParameters": {
"TaskDefinitionArn": task_arn,
"TaskCount": 1,
"LaunchType": "FARGATE",
"NetworkConfiguration": {
"awsvpcConfiguration": {
"Subnets": [subnet],
"SecurityGroups": [],
"AssignPublicIp": "ENABLED"
}
},
"PlatformVersion": "LATEST"
}
}]
)
def create_tasks(prefix: str, args):
'''
Create or update the scheduled tasks.
'''
ecs_events_role: str = args['--ecs-events-role']
print(f"Obtaining ARN for role '{ecs_events_role}'")
iam = boto3.client('iam')
role_arn = iam.get_role(RoleName=ecs_events_role)['Role']['Arn']
cluster_name: str = args['<cluster-name>']
print(f"Obtaining cluster information for {cluster_name}.")
ecs = boto3.client('ecs')
clusters = ecs.describe_clusters(clusters=[cluster_name])
cluster_arn: str = clusters['clusters'][0]['clusterArn']
print(f"Found cluster {cluster_arn}.")
print(f"Obtaining VPC and subnet info for cluster {cluster_name}.")
ec2 = boto3.resource('ec2')
vpc = list(ec2.vpcs.filter(Filters=[{
'Name': 'tag:Name',
'Values': [f'ECS {cluster_name} - VPC']
}]).all())[0]
subnet: str = list(vpc.subnets.all())[0].id
print(f"Found VPC {vpc.id} and subnet {subnet}.")
task_definition: str = args['--task-definition']
print(f"Obtaining task definition for {task_definition}.")
task = ecs.describe_task_definition(
taskDefinition=task_definition)['taskDefinition']
task_arn = task['taskDefinitionArn']
container_name = task['containerDefinitions'][0]['name']
print(f"Found {task_arn} with container {container_name}.")
use_test_data: bool = args['--use-test-data']
for dataset in DATASET_NAMES:
create_task(
prefix=prefix,
dataset=dataset,
use_test_data=use_test_data,
role_arn=role_arn,
cluster_arn=cluster_arn,
task_arn=task_arn,
container_name=container_name,
subnet=subnet
)
def main():
args = docopt.docopt(__doc__)
prefix: str = args['--task-prefix']
if args['create']:
create_tasks(prefix, args)
elif args['delete']:
delete_tasks(prefix)
if __name__ == '__main__':
main()