Skip to content

Commit

Permalink
Mock more of the Glue Data Catalog APIs
Browse files Browse the repository at this point in the history
This adds some of the missing Get/Update/Create APIs relating to the
Glue data catalog -- but not yet all of them, and none of the Batch* API
calls.
  • Loading branch information
ashb committed Oct 3, 2018
1 parent dfa7935 commit 5783d66
Show file tree
Hide file tree
Showing 6 changed files with 673 additions and 61 deletions.
59 changes: 48 additions & 11 deletions moto/glue/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,56 @@ class GlueClientError(JsonRESTError):
code = 400


class DatabaseAlreadyExistsException(GlueClientError):
def __init__(self):
self.code = 400
super(DatabaseAlreadyExistsException, self).__init__(
'DatabaseAlreadyExistsException',
'Database already exists.'
class AlreadyExistsException(GlueClientError):
def __init__(self, typ):
super(GlueClientError, self).__init__(
'AlreadyExistsException',
'%s already exists.' % (typ),
)


class TableAlreadyExistsException(GlueClientError):
class DatabaseAlreadyExistsException(AlreadyExistsException):
def __init__(self):
super(DatabaseAlreadyExistsException, self).__init__('Database')


class TableAlreadyExistsException(AlreadyExistsException):
def __init__(self):
self.code = 400
super(TableAlreadyExistsException, self).__init__(
'TableAlreadyExistsException',
'Table already exists.'
super(TableAlreadyExistsException, self).__init__('Table')


class PartitionAlreadyExistsException(AlreadyExistsException):
def __init__(self):
super(PartitionAlreadyExistsException, self).__init__('Partition')


class EntityNotFoundException(GlueClientError):
def __init__(self, msg):
super(GlueClientError, self).__init__(
'EntityNotFoundException',
msg,
)


class DatabaseNotFoundException(EntityNotFoundException):
def __init__(self, db):
super(DatabaseNotFoundException, self).__init__(
'Database %s not found.' % db,
)


class TableNotFoundException(EntityNotFoundException):
def __init__(self, tbl):
super(TableNotFoundException, self).__init__(
'Table %s not found.' % tbl,
)


class PartitionNotFoundException(EntityNotFoundException):
def __init__(self):
super(PartitionNotFoundException, self).__init__("Cannot find partition.")


class VersionNotFoundException(EntityNotFoundException):
def __init__(self):
super(VersionNotFoundException, self).__init__("Version not found.")
104 changes: 98 additions & 6 deletions moto/glue/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
from __future__ import unicode_literals

import time

from moto.core import BaseBackend, BaseModel
from moto.compat import OrderedDict
from.exceptions import DatabaseAlreadyExistsException, TableAlreadyExistsException
from.exceptions import (
JsonRESTError,
DatabaseAlreadyExistsException,
DatabaseNotFoundException,
TableAlreadyExistsException,
TableNotFoundException,
PartitionAlreadyExistsException,
PartitionNotFoundException,
VersionNotFoundException,
)


class GlueBackend(BaseBackend):
Expand All @@ -19,7 +30,10 @@ def create_database(self, database_name):
return database

def get_database(self, database_name):
return self.databases[database_name]
try:
return self.databases[database_name]
except KeyError:
raise DatabaseNotFoundException(database_name)

def create_table(self, database_name, table_name, table_input):
database = self.get_database(database_name)
Expand All @@ -33,7 +47,10 @@ def create_table(self, database_name, table_name, table_input):

def get_table(self, database_name, table_name):
database = self.get_database(database_name)
return database.tables[table_name]
try:
return database.tables[table_name]
except KeyError:
raise TableNotFoundException(table_name)

def get_tables(self, database_name):
database = self.get_database(database_name)
Expand All @@ -52,9 +69,84 @@ class FakeTable(BaseModel):
def __init__(self, database_name, table_name, table_input):
self.database_name = database_name
self.name = table_name
self.table_input = table_input
self.storage_descriptor = self.table_input.get('StorageDescriptor', {})
self.partition_keys = self.table_input.get('PartitionKeys', [])
self.partitions = OrderedDict()
self.versions = []
self.update(table_input)

def update(self, table_input):
self.versions.append(table_input)

def get_version(self, ver):
try:
if not isinstance(ver, int):
# "1" goes to [0]
ver = int(ver) - 1
except ValueError as e:
raise JsonRESTError("InvalidInputException", str(e))

try:
return self.versions[ver]
except IndexError:
raise VersionNotFoundException()

def as_dict(self, version=-1):
obj = {
'DatabaseName': self.database_name,
'Name': self.name,
}
obj.update(self.get_version(version))
return obj

def create_partition(self, partiton_input):
partition = FakePartition(self.database_name, self.name, partiton_input)
key = str(partition.values)
if key in self.partitions:
raise PartitionAlreadyExistsException()
self.partitions[str(partition.values)] = partition

def get_partitions(self):
return [p for str_part_values, p in self.partitions.items()]

def get_partition(self, values):
try:
return self.partitions[str(values)]
except KeyError:
raise PartitionNotFoundException()

def update_partition(self, old_values, partiton_input):
partition = FakePartition(self.database_name, self.name, partiton_input)
key = str(partition.values)
if old_values == partiton_input['Values']:
# Altering a partition in place. Don't remove it so the order of
# returned partitions doesn't change
if key not in self.partitions:
raise PartitionNotFoundException()
else:
removed = self.partitions.pop(str(old_values), None)
if removed is None:
raise PartitionNotFoundException()
if key in self.partitions:
# Trying to update to overwrite a partition that exists
raise PartitionAlreadyExistsException()
self.partitions[key] = partition


class FakePartition(BaseModel):
def __init__(self, database_name, table_name, partiton_input):
self.creation_time = time.time()
self.database_name = database_name
self.table_name = table_name
self.partition_input = partiton_input
self.values = self.partition_input.get('Values', [])

def as_dict(self):
obj = {
'DatabaseName': self.database_name,
'TableName': self.table_name,
'CreationTime': self.creation_time,
}
obj.update(self.partition_input)
return obj


glue_backend = GlueBackend()
103 changes: 85 additions & 18 deletions moto/glue/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,94 @@ def get_table(self):
database_name = self.parameters.get('DatabaseName')
table_name = self.parameters.get('Name')
table = self.glue_backend.get_table(database_name, table_name)

return json.dumps({'Table': table.as_dict()})

def update_table(self):
database_name = self.parameters.get('DatabaseName')
table_input = self.parameters.get('TableInput')
table_name = table_input.get('Name')
table = self.glue_backend.get_table(database_name, table_name)
table.update(table_input)
return ""

def get_table_versions(self):
database_name = self.parameters.get('DatabaseName')
table_name = self.parameters.get('TableName')
table = self.glue_backend.get_table(database_name, table_name)

return json.dumps({
'Table': {
'DatabaseName': table.database_name,
'Name': table.name,
'PartitionKeys': table.partition_keys,
'StorageDescriptor': table.storage_descriptor
}
"TableVersions": [
{
"Table": table.as_dict(version=n),
"VersionId": str(n + 1),
} for n in range(len(table.versions))
],
})

def get_table_version(self):
database_name = self.parameters.get('DatabaseName')
table_name = self.parameters.get('TableName')
table = self.glue_backend.get_table(database_name, table_name)
ver_id = self.parameters.get('VersionId')

return json.dumps({
"TableVersion": {
"Table": table.as_dict(version=ver_id),
"VersionId": ver_id,
},
})

def get_tables(self):
database_name = self.parameters.get('DatabaseName')
tables = self.glue_backend.get_tables(database_name)
return json.dumps(
{
'TableList': [
{
'DatabaseName': table.database_name,
'Name': table.name,
'PartitionKeys': table.partition_keys,
'StorageDescriptor': table.storage_descriptor
} for table in tables
]
}
)
return json.dumps({
'TableList': [
table.as_dict() for table in tables
]
})

def get_partitions(self):
database_name = self.parameters.get('DatabaseName')
table_name = self.parameters.get('TableName')
if 'Expression' in self.parameters:
raise NotImplementedError("Expression filtering in get_partitions is not implemented in moto")
table = self.glue_backend.get_table(database_name, table_name)

return json.dumps({
'Partitions': [
p.as_dict() for p in table.get_partitions()
]
})

def get_partition(self):
database_name = self.parameters.get('DatabaseName')
table_name = self.parameters.get('TableName')
values = self.parameters.get('PartitionValues')

table = self.glue_backend.get_table(database_name, table_name)

p = table.get_partition(values)

return json.dumps({'Partition': p.as_dict()})

def create_partition(self):
database_name = self.parameters.get('DatabaseName')
table_name = self.parameters.get('TableName')
part_input = self.parameters.get('PartitionInput')

table = self.glue_backend.get_table(database_name, table_name)
table.create_partition(part_input)

return ""

def update_partition(self):
database_name = self.parameters.get('DatabaseName')
table_name = self.parameters.get('TableName')
part_input = self.parameters.get('PartitionInput')
part_to_update = self.parameters.get('PartitionValueList')

table = self.glue_backend.get_table(database_name, table_name)
table.update_partition(part_to_update, part_input)

return ""
25 changes: 25 additions & 0 deletions tests/test_glue/fixtures/datacatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,28 @@
},
'TableType': 'EXTERNAL_TABLE',
}


PARTITION_INPUT = {
# 'DatabaseName': 'dbname',
'StorageDescriptor': {
'BucketColumns': [],
'Columns': [],
'Compressed': False,
'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
'Location': 's3://.../partition=value',
'NumberOfBuckets': -1,
'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
'Parameters': {},
'SerdeInfo': {
'Parameters': {'path': 's3://...', 'serialization.format': '1'},
'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'},
'SkewedInfo': {'SkewedColumnNames': [],
'SkewedColumnValueLocationMaps': {},
'SkewedColumnValues': []},
'SortColumns': [],
'StoredAsSubDirectories': False,
},
# 'TableName': 'source_table',
# 'Values': ['2018-06-26'],
}
Loading

0 comments on commit 5783d66

Please sign in to comment.