Skip to content

Commit

Permalink
Merge pull request #68 from IBM-Cloud/jobhistory-fixes
Browse files Browse the repository at this point in the history
improve jobhistory export
  • Loading branch information
Torsten Steinbach authored May 21, 2020
2 parents 825258b + d814118 commit 70b2779
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
32 changes: 18 additions & 14 deletions Python/ibmcloudsql/SQLQuery.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,13 @@ def get_jobs(self):
)
if response.status_code == 200 or response.status_code == 201:
job_details = response.json()
error = None
error_message = None
# None gets converted to integer type in pandas.to_parquet
error = ""
error_message = ""
rows_read = None
rows_returned = None
bytes_read = None
end_time = None
end_time = ""
if 'error' in job_details:
error = job_details['error']
if 'end_time' in job_details:
Expand All @@ -449,7 +450,6 @@ def get_jobs(self):
resultset_loc = np.NaN
if 'resultset_location' in job_details:
resultset_loc = job_details['resultset_location']
print(resultset_loc)
job_list_df = job_list_df.append([{'job_id': job['job_id'],
'status': job_details['status'],
'user_id': job_details['user_id'],
Expand Down Expand Up @@ -572,7 +572,7 @@ def list_cos_objects(self, url):
result = result.drop(columns=['ETag', 'Owner']).rename(columns={"Key": "Object"})
return result

def export_job_history(self, cos_url=None):
def export_job_history(self, cos_url=None, export_file_prefix = "job_export_", export_file_suffix = ".parquet"):
if cos_url:
# Default export location is target COS URL set at __init__
# But we'll overwrite that with the provided export URL
Expand All @@ -582,7 +582,6 @@ def export_job_history(self, cos_url=None):
if not self.export_cos_url.endswith('/'):
self.export_cos_url += "/"
url_parsed = self.ParsedUrl(self.export_cos_url)
export_file_prefix = "job_export_"

job_history_df = self.get_jobs() # Retrieve current job history (most recent 30 jobs)
terminated_job_history_df = job_history_df[job_history_df['status'].isin(['completed', 'failed'])] # Only export terminated jobs
Expand All @@ -593,24 +592,29 @@ def export_job_history(self, cos_url=None):
paginator = cos_client.get_paginator("list_objects")
page_iterator = paginator.paginate(Bucket=url_parsed.bucket, Prefix=url_parsed.prefix)
newest_exported_job_end_time = ""
expected_object_prefix = url_parsed.prefix + export_file_prefix
for page in page_iterator:
if "Contents" in page:
for key in page['Contents']:
object_name = key["Key"]
suffix_index = object_name.find(".parquet")
prefix_end_index = len(url_parsed.prefix + export_file_prefix)
if prefix_end_index < suffix_index:
job_end_time = object_name[prefix_end_index:suffix_index]
if job_end_time > newest_exported_job_end_time:
newest_exported_job_end_time = job_end_time
if not(object_name.startswith(expected_object_prefix)):
continue
prefix_end_index = len(expected_object_prefix)
suffix_index = object_name.find(export_file_suffix)
if not(prefix_end_index < suffix_index):
continue
job_end_time = object_name[prefix_end_index:suffix_index]
if job_end_time > newest_exported_job_end_time:
newest_exported_job_end_time = job_end_time

# Export all new jobs if there are some:
if newest_exported_job_end_time < newest_job_end_time:
tmpfile = tempfile.NamedTemporaryFile()
tempfilename = tmpfile.name
new_jobs_df = terminated_job_history_df[terminated_job_history_df['end_time'] > newest_exported_job_end_time]
new_jobs_df.to_parquet(engine="pyarrow", fname=tempfilename, compression="snappy")
cos_client.upload_file(Bucket=url_parsed.bucket, Filename=tempfilename, Key=url_parsed.prefix + export_file_prefix + newest_job_end_time + ".parquet")
new_jobs_df.to_parquet(engine="pyarrow", path=tempfilename, compression="snappy", index=False)
export_object = url_parsed.prefix + export_file_prefix + newest_job_end_time + export_file_suffix
cos_client.upload_file(Bucket=url_parsed.bucket, Filename=tempfilename, Key=export_object)
print("Exported {} new jobs".format(new_jobs_df['job_id'].count()))
tmpfile.close()
else:
Expand Down
11 changes: 10 additions & 1 deletion Python/ibmcloudsql/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,16 @@
print(result_df.head(200))

print("Test job history export to COS")
sqlClient.export_job_history(test_credentials.result_location + "/my_job_history/")
jobhist_location = test_credentials.result_location + "/my_job_history/"
sqlClient.export_job_history(jobhist_location,
export_file_prefix = "job_export=", export_file_suffix = "/data.parquet")

print("Running query on exported history")
jobhist_df = sqlClient.run_sql("SELECT * FROM {} STORED AS PARQUET LIMIT 10 INTO {} STORED AS CSV".format(
jobhist_location,
test_credentials.result_location)
)
print(jobhist_df[['job_id','status']])

sqlClient = ibmcloudsql.SQLQuery(test_credentials.apikey, test_credentials.instance_crn, client_info='ibmcloudsql test')
sqlClient.logon()
Expand Down
2 changes: 1 addition & 1 deletion Python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def readme():
return f.read()

setup(name='ibmcloudsql',
version='0.3.16',
version='0.3.17',
python_requires='>=2.7, <4',
install_requires=['pandas','requests','ibm-cos-sdk-core','ibm-cos-sdk','numpy',
'pyarrow==0.15.1', 'backoff==1.10.0'],
Expand Down

0 comments on commit 70b2779

Please sign in to comment.