From 568a0460f5ed5a042704f3dd31496ab0b75eafa5 Mon Sep 17 00:00:00 2001 From: Johan Berggren Date: Mon, 26 Aug 2019 14:28:44 -0700 Subject: [PATCH 1/5] GCS importer initial commit --- contrib/gcs_importer.py | 143 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 contrib/gcs_importer.py diff --git a/contrib/gcs_importer.py b/contrib/gcs_importer.py new file mode 100644 index 0000000000..381eff266c --- /dev/null +++ b/contrib/gcs_importer.py @@ -0,0 +1,143 @@ +# Copyright 2019 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import unicode_literals + +import argparse +import time +import os +import sys +import uuid + +#from timesketch import create_app +#from timesketch.lib import tasks +#from timesketch.models import db_session +#from timesketch.models.user import User +#from timesketch.models.sketch import SearchIndex +#from timesketch.models.sketch import Sketch +#from timesketch.models.sketch import Timeline + +#from google.cloud import pubsub_v1 +#from google.cloud import storage + + +parser = argparse.ArgumentParser(description='GCS importer') +parser.add_argument( + 'project', dest='PROJECT_ID', action='store_const', + help='Google Cloud Project ID') +parser.add_argument( + 'bucket', dest='BUCKET_TO_WATCH', action='store_const', + help='Google Cloud Storage bucket to watch') +parser.add_argument( + 'subscription', dest='SUBSCRIPTION', action='store_const', + help='Google Cloud PubSub subscription name') +parser.add_argument( + 'output_dir', dest='OUTPUT_DIR', default='/tmp', action='store_const', + help='Directory to store downloaded files') +args = parser.parse_args() + + +# TODO: Make these flags +#PROJECT_ID = 'turbinia-dev' +#SUBSCRIPTION = 'timesketch-gcs-subscriber' +#BUCKET_TO_WATCH = 'turbinia-ad9ed98e884b56bc' +#OUTPUT_DIR = '/tmp/' + +print(PROJECT_ID) +print(SUBSCRIPTION) +print(BUCKET_TO_WATCH) +print(OUTPUT_DIR) + +sys.exit() + +# Should come from Turbinia +USERNAME = 'admin' +SKETCH_ID = 5 + +# Setup Google Cloud Pub/Sub +subscriber = pubsub_v1.SubscriberClient() +subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION) + +# Setup Google Cloud Storage +storage_client = storage.Client(PROJECT_ID) +bucket = storage_client.get_bucket(BUCKET_TO_WATCH) + +# Flask app +app = create_app() + + +def download_file_from_gcs(gcs_path, local_path): + blob = bucket.blob(gcs_path) + blob.download_to_filename(local_path) + return local_path + + +def callback(message): + message.ack() + gcs_path = message.attributes.get('objectId') + gcs_filename = os.path.basename(gcs_path) + _, extension = os.path.splitext(gcs_path) + local_path = OUTPUT_DIR + '/' + gcs_filename + + supported_extensions = (['.plaso']) + if extension not in supported_extensions: + return + + local_file = download_file_from_gcs(gcs_path, local_path) + if not local_file: + return + + with app.app_context(): + timeline_name = gcs_filename + index_name = uuid.uuid4().hex + user = User.query.filter_by(USERNAME).first() + sketch = Sketch.query.get(SKETCH_ID) + + searchindex = SearchIndex.get_or_create( + name=timeline_name, + description=timeline_name, + user=user, + index_name=index_name) + searchindex.grant_permission(permission='read', user=user) + searchindex.grant_permission(permission='write', user=user) + searchindex.grant_permission(permission='delete', user=user) + searchindex.set_status('processing') + db_session.add(searchindex) + db_session.commit() + + if sketch and sketch.has_permission(user, 'write'): + timeline = Timeline( + name=searchindex.name, + description=searchindex.description, + sketch=sketch, + user=user, + searchindex=searchindex) + timeline.set_status('processing') + sketch.timelines.append(timeline) + db_session.add(timeline) + db_session.commit() + + # Celery + pipeline = tasks.build_index_pipeline( + file_path=local_file, + timeline_name=local_file, + index_name=index_name, + file_extension=extension.lstrip('.'), + sketch_id=SKETCH_ID) + pipeline.apply_async() + + +subscriber.subscribe(subscription_path, callback=callback) + +while True: + time.sleep(10) From a1afd6a99d61f3b04b2ed162285be6246f97f547 Mon Sep 17 00:00:00 2001 From: Johan Berggren Date: Tue, 17 Sep 2019 13:28:19 +0200 Subject: [PATCH 2/5] fixes --- contrib/gcs_importer.py | 160 ++++++++++++++++++++++------------------ 1 file changed, 87 insertions(+), 73 deletions(-) diff --git a/contrib/gcs_importer.py b/contrib/gcs_importer.py index 381eff266c..cdc5dc0407 100644 --- a/contrib/gcs_importer.py +++ b/contrib/gcs_importer.py @@ -18,90 +18,62 @@ import os import sys import uuid +import json -#from timesketch import create_app -#from timesketch.lib import tasks -#from timesketch.models import db_session -#from timesketch.models.user import User -#from timesketch.models.sketch import SearchIndex -#from timesketch.models.sketch import Sketch -#from timesketch.models.sketch import Timeline +from timesketch import create_app +from timesketch.lib import tasks +from timesketch.models import db_session +from timesketch.models.sketch import SearchIndex +from timesketch.models.sketch import Sketch +from timesketch.models.sketch import Timeline +from timesketch.models.user import User -#from google.cloud import pubsub_v1 -#from google.cloud import storage +try: + from google.cloud import pubsub_v1 + from google.cloud import storage +except ImportError: + sys.exit('ERROR: You missing Google Cloud libraries.') parser = argparse.ArgumentParser(description='GCS importer') -parser.add_argument( - 'project', dest='PROJECT_ID', action='store_const', - help='Google Cloud Project ID') -parser.add_argument( - 'bucket', dest='BUCKET_TO_WATCH', action='store_const', - help='Google Cloud Storage bucket to watch') -parser.add_argument( - 'subscription', dest='SUBSCRIPTION', action='store_const', - help='Google Cloud PubSub subscription name') -parser.add_argument( - 'output_dir', dest='OUTPUT_DIR', default='/tmp', action='store_const', - help='Directory to store downloaded files') +parser.add_argument('--project', help='Google Cloud Project ID') +parser.add_argument('--bucket', help='Google Cloud Storage bucket') +parser.add_argument('--subscription', help='Google Cloud PubSub subscription ') +parser.add_argument('--output', default='/tmp', help='Directory for downloads') args = parser.parse_args() -# TODO: Make these flags -#PROJECT_ID = 'turbinia-dev' -#SUBSCRIPTION = 'timesketch-gcs-subscriber' -#BUCKET_TO_WATCH = 'turbinia-ad9ed98e884b56bc' -#OUTPUT_DIR = '/tmp/' - -print(PROJECT_ID) -print(SUBSCRIPTION) -print(BUCKET_TO_WATCH) -print(OUTPUT_DIR) - -sys.exit() - -# Should come from Turbinia -USERNAME = 'admin' -SKETCH_ID = 5 - -# Setup Google Cloud Pub/Sub -subscriber = pubsub_v1.SubscriberClient() -subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION) - -# Setup Google Cloud Storage -storage_client = storage.Client(PROJECT_ID) -bucket = storage_client.get_bucket(BUCKET_TO_WATCH) - # Flask app app = create_app() -def download_file_from_gcs(gcs_path, local_path): - blob = bucket.blob(gcs_path) - blob.download_to_filename(local_path) - return local_path - +def get_gcs_bucket(): + storage_client = storage.Client(args.project) + return storage_client.get_bucket(args.bucket) -def callback(message): - message.ack() - gcs_path = message.attributes.get('objectId') - gcs_filename = os.path.basename(gcs_path) - _, extension = os.path.splitext(gcs_path) - local_path = OUTPUT_DIR + '/' + gcs_filename - supported_extensions = (['.plaso']) - if extension not in supported_extensions: - return +def download_from_gcs(bucket, gcs_base_path, filename): + gcs_full_path = os.path.join(gcs_base_path, filename) + local_path = os.path.join(args.output, filename) + blob = bucket.blob(gcs_full_path) + blob.download_to_filename(local_path) + print('Downloaded file from GCS: ', local_path) + return local_path - local_file = download_file_from_gcs(gcs_path, local_path) - if not local_file: - return +def setup_sketch(name, description, username, sketch_id=None): with app.app_context(): - timeline_name = gcs_filename + timeline_name = gcs_base_filename index_name = uuid.uuid4().hex - user = User.query.filter_by(USERNAME).first() - sketch = Sketch.query.get(SKETCH_ID) + user = User.get_or_create(username=username) + + if sketch_id: + sketch = Sketch.query.get(sketch_id) + else: + sketch = Sketch.get_or_create( + name='Turbinia: {}'.format(gcs_base_filename), + description='Automatically created by Turbinia.', + user=user) searchindex = SearchIndex.get_or_create( name=timeline_name, @@ -127,17 +99,59 @@ def callback(message): db_session.add(timeline) db_session.commit() + +def callback(message): + message.ack() + gcs_full_path = message.attributes.get('objectId') + + if not gcs_full_path.endswith('.plaso.metadata.json'): + #print('ERROR: Skipping unknown file format: ', gcs_full_path) + return + + gcs_base_path = os.path.dirname(gcs_full_path) + gcs_metadata_filename = os.path.basename(gcs_full_path) + gcs_base_filename = gcs_metadata_filename.replace('.metadata.json', '') + gcs_plaso_filename = gcs_base_filename + + #print('PubSub message parsed: ', gcs_full_path, gcs_plaso_path) + + # Download files from GCS + local_metadata_file = download_from_gcs(gcs_base_path, gcs_metadata_filename) + local_plaso_file = download_from_gcs(gcs_base_path, gcs_plaso_filename) + + print('METADATA FILE: ', local_metadata_file) + with open(local_metadata_file, 'r') as metadata_file: + metadata = json.load(metadata_file) + username = metadata.get('requester') + sketch_id = metadata.get('sketch_iud') + print('Parse metadata file: ', username, sketch_id) + + if not username: + print('ERROR: Missing username') + return + + # Celery pipeline = tasks.build_index_pipeline( - file_path=local_file, - timeline_name=local_file, + file_path=local_plaso_file, + timeline_name=gcs_base_filename, index_name=index_name, - file_extension=extension.lstrip('.'), - sketch_id=SKETCH_ID) + file_extension='plaso', + sketch_id=sketch_id) pipeline.apply_async() -subscriber.subscribe(subscription_path, callback=callback) -while True: - time.sleep(10) + +if __name__ == '__main__': + + # Setup Google Cloud Pub/Sub + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + args.project, args.subscription) + subscriber.subscribe(subscription_path, callback=callback) + + print('Listening on PubSub queue: {}'.format(args.subscription)) + while True: + time.sleep(10) + print("foo") \ No newline at end of file From e55993785f99d50e75c98a638b18cf60da78f006 Mon Sep 17 00:00:00 2001 From: Johan Berggren Date: Mon, 10 Feb 2020 22:59:23 +1100 Subject: [PATCH 3/5] GCS importer --- contrib/gcs_importer.py | 156 ++++++++++++++++++++++++++-------------- 1 file changed, 101 insertions(+), 55 deletions(-) diff --git a/contrib/gcs_importer.py b/contrib/gcs_importer.py index cdc5dc0407..96a1640239 100644 --- a/contrib/gcs_importer.py +++ b/contrib/gcs_importer.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google Inc. All rights reserved. +# Copyright 2020 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import unicode_literals +"""Google Cloud Storage importer.""" import argparse import time @@ -19,6 +19,9 @@ import sys import uuid import json +import logging + +from werkzeug.exceptions import Forbidden from timesketch import create_app from timesketch.lib import tasks @@ -32,53 +35,81 @@ from google.cloud import pubsub_v1 from google.cloud import storage except ImportError: - sys.exit('ERROR: You missing Google Cloud libraries.') - + sys.exit('ERROR: You are missing Google Cloud libraries') -parser = argparse.ArgumentParser(description='GCS importer') -parser.add_argument('--project', help='Google Cloud Project ID') -parser.add_argument('--bucket', help='Google Cloud Storage bucket') -parser.add_argument('--subscription', help='Google Cloud PubSub subscription ') -parser.add_argument('--output', default='/tmp', help='Directory for downloads') -args = parser.parse_args() +# Create logger +logger = logging.getLogger('gcs_importer') +logger.setLevel(logging.DEBUG) +handler = logging.StreamHandler() +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) -# Flask app -app = create_app() +def download_from_gcs(gcs_base_path, filename): + """Download file from Google Cloud Storage (GCS). + Args: + gcs_base_path: (str) GCS bucket path + filename: (str) Filename of the file to download -def get_gcs_bucket(): + Returns: + (str) Path to downloaded file + """ storage_client = storage.Client(args.project) - return storage_client.get_bucket(args.bucket) - - -def download_from_gcs(bucket, gcs_base_path, filename): + bucket = storage_client.get_bucket(args.bucket) gcs_full_path = os.path.join(gcs_base_path, filename) local_path = os.path.join(args.output, filename) blob = bucket.blob(gcs_full_path) blob.download_to_filename(local_path) - print('Downloaded file from GCS: ', local_path) + logger.info('Downloaded file from GCS: {}'.format(local_path)) return local_path -def setup_sketch(name, description, username, sketch_id=None): +def setup_sketch(timeline_name, index_name, username, sketch_id=None): + """Use existing sketch or create a new sketch. + + Args: + timeline_name: (str) Name of the Timeline + index_name: (str) Name of the index + username: (str) Who should own the timeline + sketch_id: (str) Optional sketch_id to add timeline to + + Returns: + (str) Sketch ID + """ with app.app_context(): - timeline_name = gcs_base_filename - index_name = uuid.uuid4().hex user = User.get_or_create(username=username) + sketch = None if sketch_id: - sketch = Sketch.query.get(sketch_id) - else: - sketch = Sketch.get_or_create( - name='Turbinia: {}'.format(gcs_base_filename), - description='Automatically created by Turbinia.', - user=user) + try: + sketch = Sketch.query.get_with_acl(sketch_id, user=user) + logger.info('Using existing sketch: {} ({})'.format( + sketch.name, sketch.id)) + except Forbidden: + pass + + if not sketch: + # Create a new sketch. + sketch_name = 'Turbinia: {}'.format(timeline_name) + sketch = Sketch( + name=sketch_name, description=sketch_name, user=user) + # Need to commit here to be able to set permissions later. + db_session.add(sketch) + db_session.commit() + sketch.grant_permission(permission='read', user=user) + sketch.grant_permission(permission='write', user=user) + sketch.grant_permission(permission='delete', user=user) + sketch.status.append(sketch.Status(user=None, status='new')) + db_session.add(sketch) + db_session.commit() + logger.info('Created new sketch: {} ({})'.format( + sketch.name, sketch.id)) searchindex = SearchIndex.get_or_create( - name=timeline_name, - description=timeline_name, - user=user, + name=timeline_name, description='Created by Turbinia.', user=user, index_name=index_name) searchindex.grant_permission(permission='read', user=user) searchindex.grant_permission(permission='write', user=user) @@ -87,25 +118,31 @@ def setup_sketch(name, description, username, sketch_id=None): db_session.add(searchindex) db_session.commit() - if sketch and sketch.has_permission(user, 'write'): + if sketch.has_permission(user, 'write'): timeline = Timeline( - name=searchindex.name, - description=searchindex.description, - sketch=sketch, - user=user, - searchindex=searchindex) - timeline.set_status('processing') + name=searchindex.name, description=searchindex.description, + sketch=sketch, user=user, searchindex=searchindex) sketch.timelines.append(timeline) db_session.add(timeline) db_session.commit() + timeline.set_status('processing') + + return sketch.id def callback(message): + """Google PubSub callback. + + This function is called on all incoming messages on the configures topic. + + Args: + message: (dict) PubSub message + """ message.ack() gcs_full_path = message.attributes.get('objectId') + # Exit early if the file type is wrong. if not gcs_full_path.endswith('.plaso.metadata.json'): - #print('ERROR: Skipping unknown file format: ', gcs_full_path) return gcs_base_path = os.path.dirname(gcs_full_path) @@ -113,37 +150,47 @@ def callback(message): gcs_base_filename = gcs_metadata_filename.replace('.metadata.json', '') gcs_plaso_filename = gcs_base_filename - #print('PubSub message parsed: ', gcs_full_path, gcs_plaso_path) - # Download files from GCS - local_metadata_file = download_from_gcs(gcs_base_path, gcs_metadata_filename) + local_metadata_file = download_from_gcs( + gcs_base_path, gcs_metadata_filename) local_plaso_file = download_from_gcs(gcs_base_path, gcs_plaso_filename) - print('METADATA FILE: ', local_metadata_file) with open(local_metadata_file, 'r') as metadata_file: metadata = json.load(metadata_file) username = metadata.get('requester') - sketch_id = metadata.get('sketch_iud') - print('Parse metadata file: ', username, sketch_id) + sketch_id_from_metadata = metadata.get('sketch_id') if not username: - print('ERROR: Missing username') + logger.error('Missing username') return + timeline_name = os.path.splitext(gcs_plaso_filename)[0] + index_name = uuid.uuid4().hex + sketch_id = setup_sketch(timeline_name, index_name, 'admin', + sketch_id_from_metadata) - # Celery + # Start indexing + with app.app_context(): pipeline = tasks.build_index_pipeline( - file_path=local_plaso_file, - timeline_name=gcs_base_filename, - index_name=index_name, - file_extension='plaso', - sketch_id=sketch_id) + file_path=local_plaso_file, timeline_name=gcs_base_filename, + index_name=index_name, file_extension='plaso', sketch_id=sketch_id) pipeline.apply_async() - - + logger.info('File sent for indexing: {}'. format(gcs_base_filename)) if __name__ == '__main__': + parser = argparse.ArgumentParser(description='GCS importer') + parser.add_argument('--project', help='Google Cloud Project ID') + parser.add_argument('--bucket', + help='Google Cloud Storage bucket to monitor') + parser.add_argument('--subscription', + help='Google Cloud PubSub subscription') + parser.add_argument('--output', default='/tmp', + help='Directory for downloads') + args = parser.parse_args() + + # Create flask app + app = create_app() # Setup Google Cloud Pub/Sub subscriber = pubsub_v1.SubscriberClient() @@ -151,7 +198,6 @@ def callback(message): args.project, args.subscription) subscriber.subscribe(subscription_path, callback=callback) - print('Listening on PubSub queue: {}'.format(args.subscription)) + logger.info('Listening on PubSub queue: {}'.format(args.subscription)) while True: time.sleep(10) - print("foo") \ No newline at end of file From b45d928d6c790886035ef2b3e55b4c69e142140c Mon Sep 17 00:00:00 2001 From: Johan Berggren Date: Mon, 10 Feb 2020 23:52:23 +1100 Subject: [PATCH 4/5] Dont add timeline if no write acl on sketch --- contrib/gcs_importer.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/contrib/gcs_importer.py b/contrib/gcs_importer.py index 96a1640239..34768c3bf3 100644 --- a/contrib/gcs_importer.py +++ b/contrib/gcs_importer.py @@ -91,7 +91,7 @@ def setup_sketch(timeline_name, index_name, username, sketch_id=None): except Forbidden: pass - if not sketch: + if not (sketch or sketch_id): # Create a new sketch. sketch_name = 'Turbinia: {}'.format(timeline_name) sketch = Sketch( @@ -118,14 +118,20 @@ def setup_sketch(timeline_name, index_name, username, sketch_id=None): db_session.add(searchindex) db_session.commit() - if sketch.has_permission(user, 'write'): - timeline = Timeline( - name=searchindex.name, description=searchindex.description, - sketch=sketch, user=user, searchindex=searchindex) + timeline = Timeline( + name=searchindex.name, description=searchindex.description, + sketch=sketch, user=user, searchindex=searchindex) + + # If the user don't have write access to the sketch then create the + # timeline but don't attach it to the sketch. + if not sketch.has_permission(user, 'write'): + timeline.sketch = None + else: sketch.timelines.append(timeline) - db_session.add(timeline) - db_session.commit() - timeline.set_status('processing') + + db_session.add(timeline) + db_session.commit() + timeline.set_status('processing') return sketch.id @@ -133,7 +139,7 @@ def setup_sketch(timeline_name, index_name, username, sketch_id=None): def callback(message): """Google PubSub callback. - This function is called on all incoming messages on the configures topic. + This function is called on all incoming messages on the configured topic. Args: message: (dict) PubSub message From 29ae947fae957da554052b29fb1c9103fd081c84 Mon Sep 17 00:00:00 2001 From: Johan Berggren Date: Tue, 11 Feb 2020 10:46:19 +1100 Subject: [PATCH 5/5] Fix typo --- contrib/gcs_importer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/gcs_importer.py b/contrib/gcs_importer.py index 34768c3bf3..5f32111cd5 100644 --- a/contrib/gcs_importer.py +++ b/contrib/gcs_importer.py @@ -122,7 +122,7 @@ def setup_sketch(timeline_name, index_name, username, sketch_id=None): name=searchindex.name, description=searchindex.description, sketch=sketch, user=user, searchindex=searchindex) - # If the user don't have write access to the sketch then create the + # If the user doesn't have write access to the sketch then create the # timeline but don't attach it to the sketch. if not sketch.has_permission(user, 'write'): timeline.sketch = None