Skip to content

Commit

Permalink
[#270] Fixed S3Hook to load/write logs in S3 properly (PR #320)
Browse files Browse the repository at this point in the history
  • Loading branch information
aliaksandr-d authored Aug 23, 2018
1 parent 5208bb6 commit d186ebb
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 67 deletions.
33 changes: 25 additions & 8 deletions deploy/ansible/roles/legion_core_chart/tasks/main.yml
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
---

# Create Jenkins IAM role for airflow s3 access
- name: Generate policy documents
- name: Generate trust policy document
template:
src: "{{ item }}.yaml.j2"
dest: "{{ tmp_dir }}/{{ item }}.{{ cluster_name }}.yaml"
with_items:
- trust_policy
- airflow_s3_access_policy
src: "trust_policy.yaml.j2"
dest: "{{ tmp_dir }}/trust_policy.{{ enclave }}.{{ cluster_name }}.yaml"
with_items: "{{ enclaves }}"
loop_control:
loop_var: enclave

- name: Generate airflow s3 access policy document
template:
src: "airflow_s3_access_policy.yaml.j2"
dest: "{{ tmp_dir }}/airflow_s3_access_policy.{{ enclave }}.{{ cluster_name }}.yaml"
with_items: "{{ enclaves }}"
loop_control:
loop_var: enclave

- name: Create Airflow S3 access role
iam:
iam_type: role
name: "{{ cluster_name }}-jenkins-role"
trust_policy_filepath: "{{ tmp_dir }}/trust_policy.{{ cluster_name }}.yaml"
trust_policy_filepath: "{{ tmp_dir }}/trust_policy.{{ enclave }}.{{ cluster_name }}.yaml"
state: present
with_items: "{{ enclaves }}"
loop_control:
loop_var: enclave

- name: Attach Airflow S3 accesse policy to the role
iam_policy:
iam_type: role
iam_name: "{{ cluster_name }}-jenkins-role"
policy_name: "{{ cluster_name }}-jenkins-airflow-s3-access-policy"
policy_document: "{{ tmp_dir }}/airflow_s3_access_policy.{{ cluster_name }}.yaml"
policy_document: "{{ tmp_dir }}/airflow_s3_access_policy.{{ enclave }}.{{ cluster_name }}.yaml"
state: present
with_items: "{{ enclaves }}"
loop_control:
loop_var: enclave

# Install Legion core chart
- name: Get legion-core chart status
Expand Down Expand Up @@ -60,6 +74,9 @@
mode: 0644
vars:
git_secret_name: legion-git-deploy
with_items: "{{ enclaves }}"
loop_control:
loop_var: enclave

- name: Pre run with dumping
shell: helm --kube-context {{ cluster_name }} install legion-core --name legion-core --debug --dry-run -f {{ tmp_dir }}/legion-core-values.{{ cluster_name }}.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
- name: Create S3 storage for Airflow
aws_s3:
bucket: "{{ airflow_s3_bucket_name }}"
object: "/{{ enclave }}"
object: "/{{ airflow_s3_logs_path }}"
mode: create

# Create IAM roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ postgres:
core:
logging_level: debug
remote_logging: true
remote_base_log_folder: {{ airflow_s3_log_url }}
remote_base_log_folder: "{{ airflow_s3_logs_url }}"
remote_log_conn_id: s3_conn

webserver:
Expand Down Expand Up @@ -64,7 +64,7 @@ storage:
dags_volume_pvc: "{{ airflow_dags_pvc }}"
airflow_dags_directory: "{{ airflow_dags_dir }}"
pvc_name: "{{ airflow_dags_pvc }}"
s3_bucket_path: "{{ enclave }}/"
s3_root_path: "s3://{{airflow_s3_bucket_name}}/"

ingress:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@
"Effect": "Allow",
"Action": ["s3:*"],
"Resource": [
"arn:aws:s3:::{{ airflow_s3_bucket_name }}/{{ enclave }}",
"arn:aws:s3:::{{ airflow_s3_bucket_name }}/{{ enclave }}/*"]
},
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": "arn:aws:s3:::{{ airflow_s3_bucket_name }}"
"arn:aws:s3:::{{ airflow_s3_bucket_name }}",
"arn:aws:s3:::{{ airflow_s3_bucket_name }}/*"]
}
]
}
2 changes: 1 addition & 1 deletion deploy/helms/airflow/templates/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ filename_template = {{ .Values.core.logging_filename_template }}
base_log_folder = /home/airflow/logs

# S3 bucket path for data storage
s3_bucket_path = {{ .Values.storage.s3_bucket_path }}
s3_root_path = {{ .Values.storage.s3_root_path }}

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
Expand Down
6 changes: 3 additions & 3 deletions deploy/profiles/legion-ci.epm.kharlamov.biz.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ enclaves:
storageclass: efs
airflow_dags_dir: '/airflow-dags'
airflow_dags_pvc: legion-airflow-dags
airflow_s3_bucket_name: 'epm-legion-data-ci'
airflow_s3_log_url: "s3://{{ airflow_s3_bucket_name }}/{{ enclave }}/airflow-logs/"
airflow_s3_url: "s3://{{ airflow_s3_bucket_name }}/{{ enclave }}/"
airflow_s3_bucket_name: 'epm-legion-data-{{ env_type }}-{{ enclave }}'
airflow_s3_logs_path: 'airflow-logs/'
airflow_s3_logs_url: "s3://{{ airflow_s3_bucket_name }}/{{ airflow_s3_logs_path }}"
airflow_expected_output: 'expected-data/'
airflow_pvc: 200m

Expand Down
6 changes: 3 additions & 3 deletions deploy/profiles/legion-demo.epm.kharlamov.biz.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ enclaves:
storageclass: efs
airflow_dags_dir: '/airflow-dags'
airflow_dags_pvc: legion-airflow-dags
airflow_s3_bucket_name: 'epm-legion-data-demo'
airflow_s3_log_url: "s3://{{ airflow_s3_bucket_name }}/{{ enclave }}/airflow-logs/"
airflow_s3_url: "s3://{{ airflow_s3_bucket_name }}/{{ enclave }}/"
airflow_s3_bucket_name: 'epm-legion-data-{{ env_type }}-{{ enclave }}'
airflow_s3_logs_path: 'airflow-logs/'
airflow_s3_logs_url: "s3://{{ airflow_s3_bucket_name }}/{{ airflow_s3_logs_path }}"
airflow_expected_output: 'expected-data/'
airflow_pvc: 200m

Expand Down
6 changes: 3 additions & 3 deletions deploy/profiles/legion-dev.epm.kharlamov.biz.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ enclaves:
storageclass: efs
airflow_dags_dir: '/airflow-dags'
airflow_dags_pvc: legion-airflow-dags
airflow_s3_bucket_name: 'epm-legion-data-dev'
airflow_s3_log_url: "s3://{{ airflow_s3_bucket_name }}/{{ enclave }}/airflow-logs/"
airflow_s3_url: "s3://{{ airflow_s3_bucket_name }}/{{ enclave }}/"
airflow_s3_bucket_name: 'epm-legion-data-{{ env_type }}-{{ enclave }}'
airflow_s3_logs_path: 'airflow-logs/'
airflow_s3_logs_url: "s3://{{ airflow_s3_bucket_name }}/{{ airflow_s3_logs_path }}"
airflow_expected_output: 'expected-data/'
airflow_pvc: 200m

Expand Down
5 changes: 3 additions & 2 deletions deploy/profiles/profiles_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,16 @@ enclaves: ~ # list of enclaves which will be automatically deployed after Legion
# Airflow specific configuration
airflow_dags_dir: ~ # Name of Aitflow DAGs directory
airflow_dags_pvc: ~ # Name of Airflow DAGs PVC which will be created in Cluster
airflow_s3_log_url: ~ # S3 url for storing Airflow logs
airflow_s3_bucket_name: ~ # S3 bucket for airflow data
airflow_s3_logs_path: ~ # Path at S3 for airflow log
airflow_s3_logs_url: ~ # S3 url for storing Airflow logs, it should contain bucket name and key. E.g.: airflow_s3_logs_url: "s3://{{ airflow_s3_bucket_name }}/{{ airflow_s3_logs_path }}"
airflow_pvc: ~ # Airflow PVC size (for storing DAGs code)

# Airflow RDS configuration
airflow_rds_shape: ~ # shape for Airflow RDS
airflow_rds_size: ~ # size of Airflow RDS in GB

# Airflow DAGs configuration [?]
airflow_s3_url: ~ # Airflow storage location at S3
airflow_expected_output: ~ # Configuration for Airflow DAGs

# Addons configuration
Expand Down
3 changes: 1 addition & 2 deletions legion/docs/source/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,14 @@ enclaves: # list of enclaves which will be automatically deployed after Legion
# Airflow specific configuration
airflow_dags_dir: '/airflow-dags' # Name of Aitflow DAGs directory
airflow_dags_pvc: legion-airflow-dags # Name of Airflow DAGs PVC which will be created in Cluster
airflow_s3_log_url: 's3://epm-legion-data-dev/logs/' # S3 url for storing Airflow logs
airflow_pvc: 200m # Airflow PVC size (for storing DAGs code)

# Airflow RDS configuration
airflow_rds_shape: "db.t2.medium" # shape for Airflow RDS
airflow_rds_size: "50" # size of Airflow RDS in GB

# Airflow DAGs configuration [?]
airflow_s3_url: 's3://epm-legion-data-dev/' # Airflow storage location at S3
airflow_s3_bucket_name: "epm-legion-data-{{ env_type }}-{{ enclave }}" # Airflow storage location at S3
airflow_expected_output: 'expected-data/' # Configuration for Airflow DAGs

# Addons configuration
Expand Down
78 changes: 45 additions & 33 deletions legion_airflow/legion_airflow/hooks/s3_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import smart_open
import json
import boto3
from urllib.parse import urlparse

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowConfigException

from airflow import configuration as conf
from airflow.hooks.base_hook import BaseHook
from urllib.parse import urlparse


class S3Hook(BaseHook):
Expand Down Expand Up @@ -39,23 +39,36 @@ def __init__(self, conn_id: str, *args, **kwargs):
self.extras = self.connection.extra_dejson
self.aws_access_key_id = self.extras.get('aws_access_key_id', None)
self.aws_secret_access_key = self.extras.get('aws_secret_access_key', None)
self.key_prefix = self.extras.get('key_prefix', conf.get('core', 's3_bucket_path'))
self.bucket_prefix = self.extras.get('bucket_prefix', '')
try:
self.s3_root_path = conf.get('core', 's3_root_path')
except AirflowConfigException:
self.s3_root_path = ''
if self.s3_root_path.startswith('s3://'):
self.s3_root_path = self.s3_root_path[5:]

@staticmethod
def _parse_s3_url(s3url):
def _get_uri(self, bucket, key):
"""
Create an URI based on passed bucket, key and Airflow configuration.
:param bucket: S3 folder
:param key: Path inside S3 bucket (
if key is a full path it is simply returned)
:return: URI, that contains protocol, bucket, path, e.g. s3://bucket/k1/k2/k3_file
"""
if key.startswith('s3://'):
return key
path = [self.s3_root_path or bucket, key]
return 's3://' + '/'.join(name.strip('/') for name in path)

def _parse_s3_url(self, s3url):
"""
Parse S3 URL into bucket and key
:param s3url: S3 URL
:return: (bucket, key)
Parse any passed s3url into bucket and key pair
:param s3url: s3 storage absolute path
:return: tuple (bucket, key)
"""
parsed_url = urlparse(s3url)
if not parsed_url.netloc:
raise AirflowException('Please provide a bucket_name')
else:
bucket_name = parsed_url.netloc
key = parsed_url.path.strip('/')
return (bucket_name, key)
bucket_name = parsed_url.netloc
key = parsed_url.path.strip('/')
return bucket_name, key

def open_file(self, bucket: str, key: str, mode: str = 'rb', encoding: str = 'utf-8'):
"""
Expand All @@ -72,7 +85,7 @@ def open_file(self, bucket: str, key: str, mode: str = 'rb', encoding: str = 'ut
:return: s3 file
"""
self.check_if_maintenance(bucket, key)
uri = 's3://{}{}/{}{}'.format(self.bucket_prefix, bucket, self.key_prefix, key)
uri = self._get_uri(bucket, key)
return smart_open.smart_open(uri=uri, mode=mode,
encoding=encoding,
aws_access_key_id=self.aws_access_key_id,
Expand Down Expand Up @@ -154,7 +167,7 @@ def exists(self, bucket: str, key: str):
:return: bool -- True if file exist, False otherwise
"""
try:
smart_open.smart_open('s3://{}{}/{}{}'.format(self.bucket_prefix, bucket, self.key_prefix, key),
smart_open.smart_open(self._get_uri(bucket, key),
mode='rb',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key).close()
Expand Down Expand Up @@ -194,22 +207,30 @@ def copy_folder(self, src_bucket: str, src_key: str, dest_bucket: str, dest_key:
:type dest_key: str
:return: None
"""
src_bucket, src_key = self._parse_s3_url(
self._get_uri(src_bucket, src_key))
dest_bucket, dest_key = self._parse_s3_url(
self._get_uri(dest_bucket, dest_key))

session = boto3.Session(profile_name=None,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key)
s3 = session.resource(service_name='s3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key)
bucket_from = s3.Bucket(self.bucket_prefix + src_bucket)
bucket_to = s3.Bucket(self.bucket_prefix + dest_bucket)
bucket_from = s3.Bucket(src_bucket)
bucket_to = s3.Bucket(dest_bucket)
for obj in bucket_from.objects.filter():
key_from = obj.key
if key_from.startswith(self.key_prefix + src_key):
source = {'Bucket': self.bucket_prefix + src_bucket, 'Key': key_from}
key_to = key_from.replace(self.key_prefix + src_key, self.key_prefix + dest_key)
if key_from.startswith(src_key):
source = {'Bucket': src_bucket, 'Key': key_from}
key_to = key_from.replace(src_key, dest_key)
dist_obj = bucket_to.Object(key_to)
self.logger.info('Copying from {}:{} to {}:{}'
.format(self.bucket_prefix + src_bucket, key_from, dest_bucket, key_to))
self.logger.info(
'Copying from {}:{} to {}:{}'.format(
src_bucket, key_from, dest_bucket, key_to
)
)
dist_obj.copy(source)

def load_file(self, filename, key, bucket_name=None, replace=False, encrypt=False):
Expand All @@ -230,9 +251,6 @@ def load_file(self, filename, key, bucket_name=None, replace=False, encrypt=Fals
by S3 and will be stored in an encrypted form while at rest in S3.
:type encrypt: bool
"""
if not bucket_name and not self.bucket_prefix:
(bucket_name, key) = self._parse_s3_url(key)

with self.open_file(bucket_name, key, 'w') as dist:
with open(filename, 'r') as source:
for line in source:
Expand Down Expand Up @@ -261,9 +279,6 @@ def load_string(self, string_data, key, bucket_name=None, replace=False,
:param encoding: String encoding
:type encoding: str
"""
if not bucket_name and not self.bucket_prefix:
(bucket_name, key) = self._parse_s3_url(key)

with self.open_file(bucket_name, key, 'w', encoding) as out:
out.write(string_data)

Expand All @@ -287,9 +302,6 @@ def read_key(self, key, bucket_name=None):
:param bucket_name: Name of the bucket in which the file is stored
:type bucket_name: str
"""
if not bucket_name and not self.bucket_prefix:
(bucket_name, key) = self._parse_s3_url(key)

if self.exists(bucket_name, key):
with self.open_file(bucket_name, key, 'r', 'utf-8') as out:
return out.read()
Expand Down
4 changes: 2 additions & 2 deletions legion_airflow/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

def extract_requirements(filename):
"""
Extracts requirements from a pip formatted requirements file.
Extract requirements from a pip formatted requirements file.
:param filename: str path to file
:return: list of package names as strings
"""
Expand All @@ -49,7 +49,7 @@ def extract_version(filename):

setup(name='legion_airflow',
version=extract_version(
os.path.join(PACKAGE_ROOT_PATH, 'legion_airflow','version.py')),
os.path.join(PACKAGE_ROOT_PATH, 'legion_airflow', 'version.py')),
description='External library for airflow',
url='https://github.com/legion-platform/legion',
author='Legion team',
Expand Down

0 comments on commit d186ebb

Please sign in to comment.