diff --git a/tools/asset-inventory/asset_inventory/api_schema.py b/tools/asset-inventory/asset_inventory/api_schema.py index 7342892480..a3e52e0883 100644 --- a/tools/asset-inventory/asset_inventory/api_schema.py +++ b/tools/asset-inventory/asset_inventory/api_schema.py @@ -30,12 +30,12 @@ class APISchema(object): Will union all API versions into a single schema. """ - _discovery_document_cache = dict() + _discovery_document_cache = {} _schema_cache = {} @classmethod def _get_discovery_document(cls, dd_url): - """Retreive and cache a discovery document.""" + """Retrieve and cache a discovery document.""" if dd_url in cls._discovery_document_cache: return cls._discovery_document_cache[dd_url] discovery_document = None @@ -57,9 +57,9 @@ def _get_api_name_for_discovery_document_url(cls, dd_url): Args: dd_url: Discovery document url. Returns: - API name if can be found, None otherwise. + API name if one can be found, None otherwise. """ - apiary_match = re.match(r'https://(?:[^/]+)/discovery/v1/apis/([^/]+)', + apiary_match = re.match(r'https://[^/]+/discovery/v1/apis/([^/]+)', dd_url) if apiary_match: return apiary_match.group(1) @@ -70,28 +70,28 @@ def _get_api_name_for_discovery_document_url(cls, dd_url): return None @classmethod - def _get_discovery_document_versions(cls, dd_url): - """Return all verisons of the APIs discovery documents. + def _get_discovery_document_versions(cls, doc_url): + """Return all versions of the APIs discovery documents. Args: - dd_url: the url of the asset's discovery document. + doc_url: the url of the asset's discovery document. Returns: list of discovery document json objects. """ # calculate all the discovery documents from the url. discovery_documents = [] - api_name = cls._get_api_name_for_discovery_document_url(dd_url) - dd = cls._get_discovery_document(dd_url) + api_name = cls._get_api_name_for_discovery_document_url(doc_url) + doc = cls._get_discovery_document(doc_url) # add the discovery document to return value - discovery_documents += [dd] if dd else [] + discovery_documents += [doc] if doc else [] # and discovery documents from other versions of the same API. all_discovery_docs = cls._get_discovery_document( 'https://content.googleapis.com/discovery/v1/apis') for discovery_doc in all_discovery_docs['items']: dru = discovery_doc['discoveryRestUrl'] - if (api_name == discovery_doc['name'] and dru != dd_url): - dd = cls._get_discovery_document(dru) - discovery_documents += [dd] if dd else [] + if api_name == discovery_doc['name'] and dru != doc_url: + doc = cls._get_discovery_document(dru) + discovery_documents += [doc] if doc else [] return discovery_documents @classmethod @@ -111,7 +111,9 @@ def _get_bigquery_type_for_property(cls, property_value, resources): bigquery_type = 'STRING' property_type = property_value.get('type', None) # nested record. - if property_type == 'object' or 'properties' in property_value: + if (property_type == 'any' or + property_type == 'object' or + 'properties' in property_value): bigquery_type = 'RECORD' # repeated, recurse into element type. elif property_type == 'array': @@ -155,43 +157,108 @@ def _get_properties_map_field_list(cls, property_name, property_value, Returns: BigQuery fields dict list or None if the field should be skipped. """ - # found a record type, this is a recursive exit condition. + + return_value = [] + + # explicit properties are added to the list of fields to return. if 'properties' in property_value: - return cls._properties_map_to_field_list( + return_value += cls._properties_map_to_field_list( property_value['properties'], resources, seen_resources) + + # handle $ref property_resource_name = cls._ref_resource_name(property_value) # get fields of the reference type. if property_resource_name: - # not handling recursive fields. + # if a field is recursive, ignore the field. if property_resource_name in seen_resources: - return None - # rack prior types to not recurse forever. + return [] + # track prior types to not recurse forever. seen_resources[property_resource_name] = True - return_value = cls._get_properties_map_field_list( + return_value += cls._get_properties_map_field_list( property_resource_name, resources[property_resource_name], resources, seen_resources) del seen_resources[property_resource_name] - return return_value - # get fields of item type. + + # handle 'items' if 'items' in property_value: - return cls._get_properties_map_field_list( + return_value += cls._get_properties_map_field_list( property_name, property_value['items'], resources, seen_resources) - # convert additionalProperties fields to a dict - # of name value pairs for a more regular schema. - if 'additionalProperties' in property_value: - fields = [{'name': 'name', - 'field_type': 'STRING', - 'description': 'additionalProperties name', - 'mode': 'NULLABLE'}] - fields.append( - cls._property_to_field( - 'value', - property_value['additionalProperties'], - resources, seen_resources)) - return fields - # unknown property type. - return None + # additionalProperties is a repeated field, + # to support a nested repeated field we need another nested field. + if cls.is_additional_property_fields(return_value): + return_value = [{'name': 'additionalProperties', + 'field_type': 'RECORD', + 'description': 'additionalProperties', + 'fields': return_value, + 'mode': 'REPEATED'}] + + # does the property allow arbitrary properties with no schema? + if cls.allows_additional_properties(property_value): + # assign the additional_properties to the return value. + additional_properties_fields = return_value + + # if there are 'properties' in addition to 'additionalProperties', + # we'll create a new 'additionalProperties' nested field to hold the + # repeated name value property list. + if additional_properties_fields: + ap_field = {'name': 'additionalProperties', + 'field_type': 'RECORD', + 'description': 'additionalProperties', + 'fields': [], + 'mode': 'REPEATED'} + return_value.append(ap_field) + additional_properties_fields = ap_field['fields'] + + # add name field. + additional_properties_fields.append( + {'name': 'name', + 'field_type': 'STRING', + 'description': 'additionalProperties name', + 'mode': 'NULLABLE'}) + + # is there an explicit type for the additional property value? + ap_prop = property_value.get('additionalProperties', None) + if isinstance(ap_prop, dict) and ap_prop: + ap_field = cls._property_to_field( + 'value', ap_prop, + resources, seen_resources) + # only add the value property if it's valid, + # also, don't double next additional properties. + # which can happen if the property type of the additional + # properties value is arbitrary. + if (ap_field is not None + and not cls.is_additional_property_fields( + ap_field.get('fields', None))): + additional_properties_fields.append(ap_field) + + # if we didn't find a value property, + # add a generic 'STRING' value property + if len(additional_properties_fields) < 2: + additional_properties_fields.append( + {'name': 'value', + 'field_type': 'STRING', + 'description': 'additionalProperties value', + 'mode': 'NULLABLE'}) + + return return_value + + @classmethod + def allows_additional_properties(cls, schema_object): + """True if the schema allows arbitrary properties.""" + return (('items' not in schema_object and + '$ref' not in schema_object and + 'properties' not in schema_object) or + ('additionalProperties' in schema_object) and + schema_object['additionalProperties'] is not False) + + @classmethod + def is_additional_property_fields(cls, fields): + """True if 'fields' is an additionalProperties schema field list.""" + return fields and len(fields) == 2 and all( + (f.get('name', None) == 'name' + and f.get('description', None) == 'additionalProperties name') + or (f.get('name', None) == 'value') for f in fields) @classmethod def _property_to_field(cls, property_name, property_value, @@ -215,20 +282,23 @@ def _property_to_field(cls, property_name, property_value, if 'description' in property_value: field['description'] = property_value['description'][:1024] + fields_list = [] + if bigquery_type == 'RECORD': + fields_list = cls._get_properties_map_field_list( + property_name, property_value, resources, seen_resources) + # did we find any fields? + if not fields_list: + return None + field['fields'] = fields_list + # array fields are BigQuery repeated fields, and convert # additionalProperties to repeated lists of key value pairs. if (property_type == 'array' or - 'additionalProperties' in property_value): + cls.is_additional_property_fields(fields_list)): field['mode'] = 'REPEATED' else: field['mode'] = 'NULLABLE' - if bigquery_type == 'RECORD': - fields_list = cls._get_properties_map_field_list( - property_name, property_value, resources, seen_resources) - if not fields_list: - return None - field['fields'] = fields_list return field @classmethod @@ -238,7 +308,7 @@ def _properties_map_to_field_list(cls, properties_map, resources, Args: properties_map: dict of properties from the API schema document we - are convering into a BigQuery field list. + are converting into a BigQuery field list. resources: dict of all other resources that might be referenced by the API schema through reference types ($ref values). seen_resources: dict of types we have processed to prevent endless @@ -257,12 +327,10 @@ def _properties_map_to_field_list(cls, properties_map, resources, @classmethod def _get_cache_key(cls, resource_name, document): if 'id' in document: - return '{}.{}'.format(document['id'], resource_name) + return f'{document["id"]}.{resource_name}' if 'info' in document: info = document['info'] - return '{}.{}.{}'.format(info['title'], - info['version'], - resource_name) + return f'{info["title"]}.{info["version"]}.{resource_name}' return resource_name @classmethod @@ -273,7 +341,7 @@ def _get_document_resources(cls, document): @classmethod def _translate_resource_to_schema(cls, resource_name, document): - """Expands the $ref properties of a reosurce definition.""" + """Expands the $ref properties of a resource definition.""" cache_key = cls._get_cache_key(resource_name, document) if cache_key in cls._schema_cache: return cls._schema_cache[cache_key] @@ -318,13 +386,26 @@ def _add_asset_export_fields(cls, 'field_type': 'TIMESTAMP', 'description': 'Load time.', 'mode': 'NULLABLE' - }] + }, { + 'name': 'ancestors', + 'field_type': 'STRING', + 'mode': 'REPEATED', + 'description': 'The ancestry path of an asset in Google Cloud.' + }, + { + 'name': 'update_time', + 'field_type': 'STRING', + 'mode': 'NULLABLE', + 'description': 'The last update timestamp of an asset.' + }] if include_resource: resource_schema = list(schema) _, last_modified = bigquery_schema.get_field_by_name( resource_schema, 'lastModifiedTime') if not last_modified: + # if we lack a lastModified time in the schema, add it, some + # resources include it without being in the schema. resource_schema.append({ 'name': 'lastModifiedTime', 'field_type': 'STRING', @@ -362,7 +443,20 @@ def _add_asset_export_fields(cls, 'description': 'Resource properties.', 'mode': 'NULLABLE', 'fields': resource_schema - }] + }, + { + 'name': 'location', + 'field_type': 'STRING', + 'description': 'The location of the resource in Google Cloud, such as its zone and region. ' + 'For more information, see https://cloud.google.com/about/locations/.', + 'mode': 'NULLABLE' + }, + { + 'name': 'json_data', + 'field_type': 'JSON', + 'description': 'Original JSON of the resource.', + 'mode': 'NULLABLE' + }] }) if include_iam_policy: asset_schema.append({ @@ -396,9 +490,9 @@ def _add_asset_export_fields(cls, 'field_type': 'NUMERIC', 'mode': 'NULLABLE', 'description': - ('1: Admin reads. Example: CloudIAM getIamPolicy' - '2: Data writes. Example: CloudSQL Users create' - '3: Data reads. Example: CloudSQL Users list') + ('1: Admin reads. Example: CloudIAM getIamPolicy' + '2: Data writes. Example: CloudSQL Users create' + '3: Data reads. Example: CloudSQL Users list') }] }] }, { @@ -439,8 +533,12 @@ def bigquery_schema_for_resource(cls, asset_type, Returns: BigQuery schema. """ - cache_key = '{}.{}.{}'.format(asset_type, include_resource, - include_iam_policy) + + # some resources use asset_type as their discovery_name incorrectly. + # this tries to correct that. + if resource_name is not None and '/' in resource_name: + resource_name = resource_name[resource_name.find('/') + 1:] + cache_key = f'{asset_type}.{include_resource}.{include_iam_policy}' if cache_key in cls._schema_cache: return cls._schema_cache[cache_key] # get the resource schema if we are including the resource diff --git a/tools/asset-inventory/asset_inventory/bigquery_schema.py b/tools/asset-inventory/asset_inventory/bigquery_schema.py index 6cdd029379..17a0d983e8 100644 --- a/tools/asset-inventory/asset_inventory/bigquery_schema.py +++ b/tools/asset-inventory/asset_inventory/bigquery_schema.py @@ -25,7 +25,7 @@ `sanitize_property_value`- Modifies the supplied json object to conform to BigQuery standards such as nesting depth, column name format. - `merge_schemas` - Combines multiple BigQuery schmas and returns a new schema + `merge_schemas` - Combines multiple BigQuery schemas and returns a new schema that is a union of both. `get_field_by_name` - Returns a field with the supplied name from a list of @@ -35,34 +35,31 @@ """ -import copy from collections import defaultdict +import copy from numbers import Number import re from six import string_types -CLEAN_UP_REGEX = re.compile(r'[\W]+') +CLEAN_UP_REGEX = re.compile(r'\W+') TIMESTAMP_REGEX = re.compile( r'^\d\d\d\d-\d\d-\d\d[T ]\d\d:\d\d:\d\d' - r'(?:\.\d{1,6})?(?: ?Z| ?[\+-]\d\d:\d\d| [A-Z]{3})?$') + r'(?:\.\d{1,6})?(?: ?Z| ?[+-]\d\d:\d\d| [A-Z]{3})?$') DATE_REGEX = re.compile(r'^\d\d\d\d-\d\d-\d\d$') -BQ_MAX_NUMERIC = 99999999999999999999999999999.999999999 -BQ_MIN_NUMERIC = -99999999999999999999999999999.999999999 +BQ_MAX_NUMERIC = 9999999999999999999999999999.999999999 +BQ_MIN_NUMERIC = -9999999999999999999999999999.999999999 BQ_MAX_COL_NAME_LENGTH = 128 BQ_NUMERIC_SCALE_DIGITS = 9 BQ_MAX_DEPTH = 15 BQ_MAX_COLUMNS = 10000 - - -def is_number(s): - return isinstance(s, Number) +BQ_FORBIDDEN_PREFIXES = ['_partition', '_table_', '_file_', '_row_timestamp', '__root__', '_colidentifier'] def _get_bigquery_type_for_property_value(property_value): """Convert json value into a BigQuery data type. - Recgonizes BOOL, RECORD and returns NUMERIC for all numbers. + Recognizes BOOL, RECORD and returns NUMERIC for all numbers. Doesn't try to determine if a string is formatted as a timestamp. Args: property_value: Value of the json property. @@ -71,11 +68,11 @@ def _get_bigquery_type_for_property_value(property_value): """ if isinstance(property_value, bool): return 'BOOL' - elif isinstance(property_value, Number): + if isinstance(property_value, Number): return 'NUMERIC' - elif isinstance(property_value, dict): + if isinstance(property_value, dict): return 'RECORD' - elif isinstance(property_value, list): + if isinstance(property_value, list): for element in property_value: return _get_bigquery_type_for_property_value(element) # no elements, but string is a good guess? @@ -116,27 +113,70 @@ def translate_json_to_schema(document): return schema +def contains_resource_data_schema(schema): + """True if the resource data schema contains fields. + Args: + schema: list of fields (a schema). + Returns: + True if the schema contains resource. data with non-empty fields. + """ + _, resource_field = get_field_by_name(schema, 'resource') + if resource_field is not None: + _, data_field = get_field_by_name(resource_field['fields'], 'data') + # if we have any fields from the api schema call. + # ignore the artificial 'lastModifiedTime' field. + return ( + (data_field is not None) and (data_field['fields'] is not None) and + (len(data_field['fields']) > 0) and + (not (len(data_field['fields']) == 1 and + data_field['fields'][0]['name'] == 'lastModifiedTime'))) + return False + + def get_field_by_name(fields, field_name): + """Find index and field with input name. + + Args: + fields: list of fields (a schema). + field_name: name of field to search for. + Returns: + tuple of index and field[ + """ for i, field in enumerate(fields): - # BigQuery column names are case insensitive. + # BigQuery column names are case-insensitive. if field['name'].lower() == field_name.lower(): return i, field return None, None -def is_additonal_properties(fields): +def is_additional_property_fields(fields): + """True if the input fields are part of an 'additionalProperties' field.""" return fields and len(fields) == 2 and all( (f.get('name', None) == 'name' and f.get('description', None) == 'additionalProperties name') or (f.get('name', None) == 'value') for f in fields) -def _merge_fields(destination_field, source_field): +def merge_additional_properties_fields(apf, fields): + """Adds "fields" into the "value" property of apf. + This is to match the specific type of additional property in the new schema. + Args: + apf: the additional property field + fields: list of schema json documents to merge into the value field. + """ + i, value_field = get_field_by_name(apf, 'value') + for f in fields: + if f.get('name', None) not in ('name', 'value'): + value_field = _merge_fields(value_field, f) + apf[i] = value_field + + +def _merge_fields(destination_field, source_field, num_properties=0): """Combines two SchemaField like dicts. The same field can exist in both the destination and source schemas when trying to combine schemas. To handle this we try to choose a more specific - type if there is a conflict and merge any encosed fields. + type if there is a conflict and merge any enclosed fields. Args: destination_field: `google.cloud.bigquery.SchemaField` dict. @@ -144,60 +184,64 @@ def _merge_fields(destination_field, source_field): Returns: A `google.cloud.bigquery.SchemaField` dict. """ - - dd = destination_field.get('description', None) - sd = source_field.get('description', None) - dft = destination_field.get('field_type', None) - sft = source_field.get('field_type', None) + dst_desc = destination_field.get('description', None) + src_desc = source_field.get('description', None) + dst_field_type = destination_field.get('field_type', None) + src_field_type = source_field.get('field_type', None) # use the field with more information. - if ((not dd and sd) or (sd and dd and len(dd) < len(sd))): - destination_field['description'] = sd - destination_field['field_type'] = sft + if ((not dst_desc and src_desc) or + (src_desc and dst_desc and len(dst_desc) < len(src_desc))): + destination_field['description'] = src_desc + destination_field['field_type'] = src_field_type # use the less specific type. and join fields # but don't overwrite the timestamp field as per # https://github.com/GoogleCloudPlatform/professional-services/issues/900 elif (source_field.get('name', None) != 'timestamp' and - (dft != 'RECORD' and dft != 'STRING') and sft == 'STRING'): - destination_field['field_type'] = sft + (dst_field_type not in ('RECORD', 'STRING')) and + src_field_type == 'STRING'): + destination_field['field_type'] = src_field_type # https://github.com/GoogleCloudPlatform/professional-services/issues/614 - # Use the schema with the additonalProperties overrides. See + # Use the schema with the additionalProperties overrides. See # api_schema._get_properties_map_field_list which creates the # additionalProperties RECORD type and enforce_schema_data_types for where - # documents of type RECORD are converted to the REPEATED additonalProperties - # name value pairs. - def merge_additional_properties_fields(apf, fields): - i, value_field = get_field_by_name(apf, 'value') - for f in fields: - if f.get('name', None) not in ('name', 'value'): - value_field = _merge_fields(value_field, f) - apf[i] = value_field - - sf = source_field.get('fields', []) - df = destination_field.get('fields', []) - if is_additonal_properties(sf) and not is_additonal_properties(df): + # documents of type RECORD are converted to the REPEATED + # additionalProperties name value pairs. + + src_fields = source_field.get('fields', []) + dst_fields = destination_field.get('fields', []) + + if (is_additional_property_fields(src_fields) and not + is_additional_property_fields(dst_fields)): destination_field['mode'] = 'REPEATED' - sf = copy.deepcopy(sf) - merge_additional_properties_fields(sf, df) - destination_field['fields'] = sf - elif is_additonal_properties(df) and not is_additonal_properties(sf): + merge_additional_properties_fields(src_fields, dst_fields) + destination_field['fields'] = copy.deepcopy(src_fields) + + elif (is_additional_property_fields(dst_fields) and not + is_additional_property_fields(src_fields)): destination_field['mode'] = 'REPEATED' - merge_additional_properties_fields(df, sf) - destination_field['fields'] = df - elif is_additonal_properties(df) and is_additonal_properties(sf): + merge_additional_properties_fields(dst_fields, src_fields) + destination_field['fields'] = dst_fields + elif (is_additional_property_fields(dst_fields) and + is_additional_property_fields(src_fields)): destination_field['mode'] = 'REPEATED' - merge_additional_properties_fields(df, sf) - destination_field['fields'] = df + merge_additional_properties_fields(dst_fields, src_fields) + destination_field['fields'] = dst_fields else: - mf = _merge_schema(df, sf) - if mf: - destination_field['fields'] = mf + merged_fields = _merge_schema(dst_fields, src_fields, num_properties) + if merged_fields: + destination_field['fields'] = merged_fields return destination_field -def _merge_schema(destination_schema, source_schema): - """Add source_schema fields to the the destination_schema. +def contains_additional_properties(schema): + """True if the schema contains an 'additionalProperties' field.""" + return get_field_by_name(schema, 'additionalProperties')[0] is not None + + +def _merge_schema(destination_schema, source_schema, num_properties=0): + """Add source_schema fields to the destination_schema. Modifies the destination_schema list argument with fields from source_schema. Calls _merge_fields when a field exists in both with the same @@ -209,27 +253,44 @@ def _merge_schema(destination_schema, source_schema): The modified destination_schema list. """ + # short circuit. + if num_properties > BQ_MAX_COLUMNS: + return destination_schema + # short circuit if schemas are the same. if destination_schema == source_schema: return destination_schema - destination_schema_list = list(destination_schema) + # if we have a schema with 'additionalProperties', don't merge field by + # field, just take the schema that contains it as it's the API schema and + # authoritative. + if contains_additional_properties(destination_schema): + return destination_schema + if contains_additional_properties(source_schema): + return copy.deepcopy(source_schema) + + # modify the destination_schema_list for efficiency for source_field in source_schema: - i, destination_field = get_field_by_name(destination_schema_list, + # short circuit. + if num_properties > BQ_MAX_COLUMNS: + return destination_schema + i, destination_field = get_field_by_name(destination_schema, source_field['name']) # field with same name exists, merge them. if destination_field: - destination_schema_list[i] = _merge_fields(destination_field, - source_field) - # otherwise append at the end. + destination_schema[i] = _merge_fields(destination_field, + source_field, + num_properties) + num_properties = num_properties + len(destination_schema[i].get('fields', [])) + 1 else: - destination_schema_list.append(source_field) - return destination_schema_list + destination_schema.append(copy.deepcopy(source_field)) + num_properties = num_properties + len(source_field.get('fields', [])) + 1 + return destination_schema def merge_schemas(schemas): """Combines BigQuery schemas. - Unions all input scheams into one. This is not be a safe operation if two + Unions all input schemas into one. This is not be a safe operation if two schemas defines a different type for the same field. Args: @@ -278,16 +339,16 @@ def _sanitize_property(property_name, parent, depth, num_properties): maximum column name length is 128 characters. 3. Removes empty dictionary or list of empty dictionaries as any RECORD type - field must have defined fields and we can't determine those fields from an + field must have defined fields, and we can't determine those fields from an empty dictionary. - 4. Remove duplicate properties. BigQuery is case insensitive in property, - names yet we want to keep the input case of the column for human readers. To + 4. Remove duplicate properties. BigQuery is case-insensitive in property, + names, yet we want to keep the input case of the column for human readers. To columns with the same name but different case can result in a failure to load. Args: - property_name: Name of the property in the json oject. + property_name: Name of the property in the json object. parent: The json object containing the property. depth: How nested within the original document we are. num_properties: How many properties into the document we are. @@ -307,6 +368,10 @@ def _sanitize_property(property_name, parent, depth, num_properties): new_property_name = '_' + new_property_name new_property_name = new_property_name[:BQ_MAX_COL_NAME_LENGTH] + # Some prefixes aren't allowed as field names; prefix with '_' again + if any([new_property_name.lower().startswith(x) for x in BQ_FORBIDDEN_PREFIXES]): + new_property_name = '_' + new_property_name + # check if property was changed. if property_name != new_property_name: property_value = parent.pop(property_name) @@ -314,12 +379,12 @@ def _sanitize_property(property_name, parent, depth, num_properties): # handle labels (condition #1). if (new_property_name == 'labels' and - isinstance(parent[new_property_name], dict)): + isinstance(parent[new_property_name], dict)): _convert_labels_dict_to_list(parent) property_value = parent[new_property_name] - # recursivly descend. + # recursively descend. sanitized = sanitize_property_value(property_value, depth=depth + 1, num_properties=num_properties) @@ -327,9 +392,7 @@ def _sanitize_property(property_name, parent, depth, num_properties): parent[new_property_name] = sanitized # remove empty dicts or list of empty dicts (condition #3) - if ((isinstance(sanitized, list) or - isinstance(sanitized, dict)) and - not any(sanitized)): + if isinstance(sanitized, (list, dict)) and not any(sanitized): # BigQuery doesn't deal well with empty records. # prune the value. parent.pop(new_property_name) @@ -341,7 +404,7 @@ def remove_duplicates(properties): Args: properties: dictionary to modify. - BigQuery is case insensitive, remove any lexically greater property + BigQuery is case-insensitive, remove any lexically greater property in the dictionary that differ only by case. """ duplicates = defaultdict(list) @@ -354,9 +417,9 @@ def remove_duplicates(properties): # remove any properties that are duplicate if len(duplicate_properties) > 1: selected_property = min(duplicate_properties) - for p in duplicate_properties: - if p != selected_property: - properties.pop(p) + for prop in duplicate_properties: + if prop != selected_property: + properties.pop(prop) def sanitize_property_value(property_value, depth=0, num_properties=0): @@ -365,7 +428,7 @@ def sanitize_property_value(property_value, depth=0, num_properties=0): Traverses the json object and modifies it to conform to BigQuery requirements. - 1. Will prune any value with more then 15 layers of nesting as that's the + 1. Will prune any value with more than 15 layers of nesting as that's the most BigQuery will handle. See `_sanitize_property` for description of other rules enforced. @@ -400,18 +463,26 @@ def sanitize_property_value(property_value, depth=0, num_properties=0): property_value = max(property_value, BQ_MIN_NUMERIC) property_value = min(property_value, BQ_MAX_NUMERIC) + # Some records report numbers with values of "Infinity" and "-Infinity" + # convert them to respective numeric values. + if property_value == 'Infinity': + property_value = BQ_MAX_NUMERIC + if property_value == '-Infinity': + property_value = BQ_MIN_NUMERIC + if property_value == 'NaN': + property_value = BQ_MIN_NUMERIC + # sanitize each nested list element. if isinstance(property_value, list): - for i in range(len(property_value)): - if isinstance(property_value[i], (dict, list)): - sanitize_property_value(property_value[i], - depth, num_properties) + for i, item in enumerate(property_value): + num_properties += 1 + if isinstance(item, (dict, list)): + sanitize_property_value(item, depth, num_properties) else: # if the list element has a primitive type, we need to # re-affect the sanitized value property_value[i] = sanitize_property_value( - property_value[i], - depth, num_properties) + item, depth, num_properties) # and each nested json object. if isinstance(property_value, dict): @@ -425,6 +496,34 @@ def sanitize_property_value(property_value, depth=0, num_properties=0): return property_value +def sanitize_bigquery_schema(schema, depth=0, num_properties=0): + """Enforces BigQuery property and table restrictions on the json schema documents + Args: + schema: BigQuery schema field dict. + depth: The nested depth of the schema. + num_properties: Number of properties in the table. + Returns: + BigQuery schema field dict with BigQuery validation rules applied. + """ + result_fields = [] + for field in schema: + num_properties += 1 + # if we are over the max number of columns, truncate the schema. + if num_properties > BQ_MAX_COLUMNS or depth > BQ_MAX_DEPTH: + return result_fields + + property_name = field['name'] + parent = {property_name: "value"} + _sanitize_property(property_name, parent, depth, num_properties) + # if the property was invalid, just continue. + if len(parent) == 0: + continue + # it's possible the field name changed, store it. + field['name'] = list(parent.keys())[0] + # sanitize any nested fields? + sanitize_bigquery_schema(field.get('fields', []), depth + 1, num_properties) + + def enforce_schema_data_type_on_property(field, property_value): """Ensure property values are the correct type. @@ -434,23 +533,19 @@ def enforce_schema_data_type_on_property(field, property_value): field: BigQuery schema field dict. property_value: object to try to coerce. Returns: - The properly typed property_value, or None if it can't be convered. + The properly typed property_value, or None if it can't be converted. """ field_type = field['field_type'] if field_type == 'RECORD': if isinstance(property_value, dict): return enforce_schema_data_types(property_value, field['fields']) - else: - return None + return None if field_type == 'STRING': if not isinstance(property_value, string_types): return str(property_value) if field_type == 'BOOL': if not isinstance(property_value, bool): - if property_value: - return True - else: - return False + return bool(property_value) if field_type == 'TIMESTAMP': if not re.match(TIMESTAMP_REGEX, property_value): return None @@ -469,6 +564,29 @@ def enforce_schema_data_type_on_property(field, property_value): return property_value +def push_down_additional_properties(resource, schema): + """Move additional properties to name value array. + + If the schema contains an 'additionalProperties' field, then move any + properties not in the defined schema's fields into the child + additionalProperties name value pair. + Args: + resource: json dict to be modified. + schema: BigQuery schema. + + """ + _, ap_field = get_field_by_name(schema, 'additionalProperties') + if ap_field: + known_fields = [f['name'] for f in schema] + for property_name in list(resource): + if property_name not in known_fields: + ap_property = resource.get('additionalProperties', []) + resource['additionalProperties'] = ap_property + ap_property.append({'name': property_name, + 'value': resource[property_name]}) + del resource[property_name] + + def enforce_schema_data_types(resource, schema): """Enforce schema's data types. @@ -485,36 +603,49 @@ def enforce_schema_data_types(resource, schema): Modified resource. """ + # if the schema contains an 'additionalProperties' field, then move + # any properties not in the defined schema's fields into the child + # additionalProperties name value pair. + push_down_additional_properties(resource, schema) + # apply datatype of each field. for field in schema: field_name = field['name'] - if field_name in resource: - resource_value = resource[field_name] - if field.get('mode', 'NULLABLE') == 'REPEATED': - # satisfy array condition by converting dict into - # repeated name value records. - # this handles any 'additonalProperties' types. - if (field['field_type'] == 'RECORD' and - isinstance(resource_value, dict)): - resource_value = [{'name': key, 'value': val} - for (key, val) in resource_value.items()] - elif not isinstance(resource_value, list): - resource_value = [resource_value] - new_array = [] - for value in resource_value: - value = enforce_schema_data_type_on_property( - field, value) - if value is not None: - new_array.append(value) - if any(new_array): - resource[field_name] = new_array - else: - del resource[field_name] - else: + # nothing to sanitize if there is no property value. + if field_name not in resource: + continue + + resource_value = resource[field_name] + + # if this is an additional_property, convert to name value pair. + if (is_additional_property_fields(field.get('fields', None)) and + isinstance(resource_value, dict)): + resource_value = [{'name': key, 'value': val} + for (key, val) in resource_value.items()] + + # if it's a list, convert to list sanitize each element of the array. + if field.get('mode', 'NULLABLE') == 'REPEATED': + if not isinstance(resource_value, list): + resource_value = [resource_value] + new_array = [] + for value in resource_value: value = enforce_schema_data_type_on_property( - field, resource_value) + field, value) if value is not None: - resource[field_name] = value - else: - del resource[field_name] + new_array.append(value) + # if all values are invalid, delete the array. + if any(new_array): + resource[field_name] = new_array + else: + del resource[field_name] + else: + # We are RECORD or other scalar value, sanitize it + # if it's not valid, delete the property. + value = enforce_schema_data_type_on_property( + field, resource_value) + if value is not None: + resource[field_name] = value + else: + del resource[field_name] + return resource diff --git a/tools/asset-inventory/asset_inventory/export.py b/tools/asset-inventory/asset_inventory/export.py index 0b014fc589..2cca929546 100755 --- a/tools/asset-inventory/asset_inventory/export.py +++ b/tools/asset-inventory/asset_inventory/export.py @@ -51,7 +51,7 @@ def export_to_gcs(parent, gcs_destination, content_type, asset_types): Invoke either the cloudasset.organizations.exportAssets or cloudasset.projects.exportAssets method depending on if parent is a project - or orgniaztion. + or organization. Args: parent: Either `project/` or `organization/`. gcs_destination: GCS uri to export to. @@ -65,10 +65,10 @@ def export_to_gcs(parent, gcs_destination, content_type, asset_types): output_config = asset_v1.types.OutputConfig() output_config.gcs_destination.uri = gcs_destination operation = Clients.cloudasset().export_assets( - parent, - output_config, - content_type=content_type, - asset_types=asset_types) + {'parent': parent, + 'output_config': output_config, + 'content_type': content_type, + 'asset_types': asset_types}) return operation.result() @@ -128,17 +128,19 @@ def add_argparse_args(ap, required=False): 'This MUST be run with a service account owned by a project with the ' 'Cloud Asset API enabled. The gcloud generated user credentials' ' do not work. This requires:\n\n' - ' 1. Enable the Cloud Asset Inventory API on a project (https://console.cloud.google.com/apis/api/cloudasset.googleapis.com/overview)\n' - ' 2. Create a service acocunt owned by this project\n' + '1. Enable the Cloud Asset Inventory API on a project (' + 'https://console.cloud.google.com/apis/api/cloudasset.googleapis.com/overview)\n' + ' 2. Create a service account owned by this project\n' ' 3. Give the service account roles/cloudasset.viewer at the organization layer\n' ' 4. Run on a GCE instance started with this service account,\n' - ' or downloadthe private key and set GOOGLE_APPLICATION_CREDENTIALS to the file name\n' + ' or download the private key and set GOOGLE_APPLICATION_CREDENTIALS to the file name\n' ' 5. Run this command.\n\n' 'If the GCS bucket being written to is owned by a different project then' ' the project that you enabled the API on, then you must also grant the' ' "service-@gcp-sa-cloudasset.iam.gserviceaccount.com" account' - ' objectAdmin privleges to the bucket:\n' - ' gsutil iam ch serviceAccount:service-@gcp-sa-cloudasset.iam.gserviceaccount.com:objectAdmin gs://\n' + ' objectAdmin privileges to the bucket:\n' + 'gsutil iam ch serviceAccount:service-@gcp-sa-cloudasset.iam.gserviceaccount.com:objectAdmin ' + 'gs://\n' '\n\n') ap.add_argument( '--parent', @@ -172,7 +174,7 @@ def content_types_argument(string): ap.add_argument( '--asset-types', - help=('Comma seprated list of asset types to export such as ' + help=('Comma separated list of asset types to export such as ' '"google.compute.Firewall,google.compute.HealthCheck"' ' default is `*` for everything'), type=lambda x: [y.strip() for y in x.split(',')], diff --git a/tools/asset-inventory/asset_inventory/import_pipeline.py b/tools/asset-inventory/asset_inventory/import_pipeline.py index 6e8a8f99b8..8fb9c109fc 100644 --- a/tools/asset-inventory/asset_inventory/import_pipeline.py +++ b/tools/asset-inventory/asset_inventory/import_pipeline.py @@ -17,13 +17,13 @@ """Import Cloud Asset Inventory exports into BigQuery. Apache Beam pipeline to load Cloud Asset Inventory exports in GCS json objects -into a BigQuery daset. There are options for appending to tables or truncating +into a BigQuery dataset. There are options for appending to tables or truncating them. The dataset must exist prior to import. Most all export are small and can likely be processed very quickly by a single machine with the direct runner. In some situations there might be a very large number of assets like GCS buckets or BigQuery tables which will benefit from the -scalability of the Dataflow runner or perhaps you wish to process the file from +scalability of the Dataflow runner, or perhaps you wish to process the file from environments that are not easily suited to large memory single machines like Cloud Functions or App Engine. """ @@ -45,6 +45,7 @@ from asset_inventory import bigquery_schema from asset_inventory.api_schema import APISchema from six import string_types +from typing import Any from google.api_core.exceptions import BadRequest from google.api_core.exceptions import NotFound @@ -54,11 +55,14 @@ class JsonCoder(Coder): """A coder interpreting each line as a JSON string.""" - def encode(self, x): - return json.dumps(x) + def encode(self, value): + return json.dumps(value) - def decode(self, x): - return json.loads(x) + def decode(self, encoded): + return json.loads(encoded) + + def to_type_hint(self): + return Any class AssignGroupByKey(beam.DoFn): @@ -66,7 +70,7 @@ class AssignGroupByKey(beam.DoFn): The group_by value can be either: - - ASSET_TYPE so we have a table for that asset type like + - ASSET_TYPE, so we have a table for that asset type like `google.compute.Instance`. - ASSET_TYPE_VERSION to have a table for each asset type and version like @@ -78,7 +82,7 @@ class AssignGroupByKey(beam.DoFn): intermediary step prior to load. """ - def __init__(self, group_by, num_shards): + def __init__(self, group_by, num_shards, *unused_args, **unused_kwargs): if isinstance(group_by, string_types): group_by = StaticValueProvider(str, group_by) @@ -89,7 +93,8 @@ def __init__(self, group_by, num_shards): self.shard_map = None def apply_shard(self, key): - # initialize shard_map from num_shard + """Append shard suffix.""" + # Initialize shard_map from num_shard if first time. if self.shard_map is None: self.shard_map = { k: int(v) for (k, v) in @@ -104,9 +109,10 @@ def apply_shard(self, key): @classmethod def remove_shard(cls, key): + """Strip shard suffix.""" return key[:key.rfind('.')] - def process(self, element): + def process(self, element, **kwargs): key = 'ASSET_TYPE' group_by = self.group_by.get() if group_by == 'NAME': @@ -118,13 +124,12 @@ def process(self, element): key = self.apply_shard(element.pop('_group_by', element['asset_type'])) elif group_by == 'ASSET_TYPE_VERSION': - version = '' if 'resource' in element: version = element['resource']['version'] key = element['asset_type'] + '.' + version key = element.pop('_group_by', key) key = self.apply_shard(key) - yield (key, element) + yield key, element class BigQuerySchemaCombineFn(core.CombineFn): @@ -133,13 +138,15 @@ class BigQuerySchemaCombineFn(core.CombineFn): def create_accumulator(self): return [] - def merge_accumulators(self, accumulators): + def merge_accumulators(self, accumulators, **kwargs): return bigquery_schema.merge_schemas(accumulators) - def extract_output(self, schema): - return schema + def extract_output(self, accumulator, **kwargs): + return accumulator - def element_to_schema(self, element): + @staticmethod + def get_api_schema_for_resource(element): + """This will contain the discovery document drive resource schema is present.""" element_resource = element.get('resource', {}) return APISchema.bigquery_schema_for_resource( element['asset_type'], @@ -148,30 +155,36 @@ def element_to_schema(self, element): 'data' in element_resource, 'iam_policy' in element) - def add_input(self, schema, element): - resource_schema = self.element_to_schema(element) - json_schema = bigquery_schema.translate_json_to_schema(element) - return bigquery_schema.merge_schemas([schema, resource_schema, - json_schema]) + def add_input(self, mutable_accumulator, element, **kwargs): + resource_schema = self.get_api_schema_for_resource(element) + # use the API resource's schema if we have one. + if bigquery_schema.contains_resource_data_schema(resource_schema): + return resource_schema + # we don't have a valid API schema, use the element schema. + return bigquery_schema.merge_schemas([ + mutable_accumulator, + resource_schema, + bigquery_schema.translate_json_to_schema(element)]) class BigQuerySanitize(beam.DoFn): """Make the json acceptable to BigQuery.""" - def process(self, element): + def process(self, element, **kwargs): yield bigquery_schema.sanitize_property_value(element) class ProduceResourceJson(beam.DoFn): """Create a json only element for every element.""" - def __init__(self, group_by): + def __init__(self, group_by, *unused_args, **unused_kwargs): if isinstance(group_by, string_types): group_by = StaticValueProvider(str, group_by) self.group_by = group_by - def create_resource_copy(self, element): - # Write out a json consistenly structured row called 'resource' to the + @staticmethod + def create_resource_copy(element): + # Write out a json consistently structured row called 'resource' to the # 'resource' table. returns a copy of element without data field which # is grouped by the 'resource' key. resource_data = element.get('resource', {}).pop('data', None) @@ -179,17 +192,17 @@ def create_resource_copy(self, element): # add it back after the copy operation. if resource_data is not None: element['resource']['data'] = resource_data - # override the group by field so it's written to the `resource` table + # override the group by field, so it's written to the `resource` table resource_copy['_group_by'] = 'resource' return resource_copy - def process(self, element): + def process(self, element, **kwargs): # add the resource.json_data property if we have resource.data if ('resource' in element and - 'data' in element['resource']): + 'data' in element['resource']): element['resource']['json_data'] = json.dumps( element['resource']['data']) - # yeild the resource copy. + # yield the resource copy. yield self.create_resource_copy(element) # and the element if we are grouping by asset_type or # asset_type_version. @@ -200,23 +213,34 @@ def process(self, element): class AddLoadTime(beam.DoFn): """Add timestamp field to track load time.""" - def __init__(self, load_time): + def __init__(self, load_time, *unused_args, **unused_kwargs): if isinstance(load_time, string_types): load_time = StaticValueProvider(str, load_time) self.load_time = load_time - def process(self, element): + def process(self, element, **kwargs): element[1]['timestamp'] = self.load_time.get() yield element +class SanitizeBigQuerySchema(beam.DoFn): + """Ensure final schema is valid.""" + + def process(self, element, **kwargs): + bigquery_schema.sanitize_bigquery_schema(element[1]) + yield element + + class EnforceSchemaDataTypes(beam.DoFn): """Convert values to match schema types. Change json values to match the expected types of the input schema. """ - def process(self, element, schemas): - """Enforce the datatypes of the input schema on the element data.""" + def process(self, element, schemas, **kwargs): + """Enforce the datatypes of the input schema on the element data. + :param element: the element to enforce schema on. + :param schemas: schemas to enforce + """ key_name = element[0] elements = element[1] schema = schemas[key_name] @@ -225,7 +249,7 @@ def process(self, element, schemas): yield (element[0], bigquery_schema.enforce_schema_data_types( elem, schema)) else: - yield (element[0], elem) + yield element[0], elem class CombinePolicyResource(beam.DoFn): @@ -236,7 +260,7 @@ class CombinePolicyResource(beam.DoFn): """ - def process(self, element): + def process(self, element, **kwargs): combined = {} for content in element[1]: # don't merge a `resource` element. @@ -255,12 +279,12 @@ class WriteToGCS(beam.DoFn): is an object for each group-key, either an object per asset type, or for each asset type version. - There is nothing cleaning up these objects so it might be prudent to have a + There is nothing cleaning up these objects, so it might be prudent to have a lifecycle policy on the GCS destination bucket to purge old files. """ - def __init__(self, stage_dir, load_time): + def __init__(self, stage_dir, load_time, *unused_args, **unused_kwargs): if isinstance(stage_dir, string_types): stage_dir = StaticValueProvider(str, stage_dir) if isinstance(load_time, string_types): @@ -287,7 +311,7 @@ def _get_file_for_element(self, element): self.open_files[key_name] = file_handle return file_handle, file_path - def process(self, element): + def process(self, element, **kwargs): file_handle, created_file_path = self._get_file_for_element(element) for asset_line in element[1]: file_handle.write(json.dumps(asset_line).encode()) @@ -297,7 +321,7 @@ def process(self, element): # independently # value is sharded key and gcs filepath. yield (AssignGroupByKey.remove_shard(element[0]), - (element[0], created_file_path)) + (element[0], created_file_path)) def finish_bundle(self): for _, file_handle in self.open_files.items(): @@ -307,18 +331,19 @@ def finish_bundle(self): class AssignShardedKeyForLoad(beam.DoFn): """Element is a tuple keyed by table, value is iterable of sharded key and - gcs file path. The transform unbundle the iterable and return a tuples of + gcs file path. The transform unbundled the iterable and return a tuples of sharded key and gcs file path to be loaded in parallel to bigquery. """ - def process(self, element): + + def process(self, element, **kwargs): for (sharded_key, created_file_path) in element[1]: - yield (sharded_key, created_file_path) + yield sharded_key, created_file_path class BigQueryDoFn(beam.DoFn): """Superclass for a DoFn that requires BigQuery dataset information.""" - def __init__(self, dataset, add_load_date_suffix, load_time): + def __init__(self, dataset, add_load_date_suffix, load_time, *unused_args, **unused_kwargs): if isinstance(dataset, string_types): dataset = StaticValueProvider(str, dataset) self.dataset = dataset @@ -350,7 +375,7 @@ def asset_type_to_table_name(self, asset_type): suffix = '' add_load_date_suffix = self.add_load_date_suffix.get() if (add_load_date_suffix and - add_load_date_suffix.lower() in ('yes', 'true', 't', '1')): + add_load_date_suffix.lower() in ('yes', 'true', 't', '1')): suffix = '_' + self.load_time.get()[0:10].replace('-', '') return asset_type.replace('.', '_').replace('/', '_') + suffix @@ -378,7 +403,7 @@ def __init__(self, dataset, add_load_date_suffix, load_time, write_disposition = StaticValueProvider(str, write_disposition) self.write_disposition = write_disposition - def process(self, element): + def process(self, element, **kwargs): # If we are appending to the table, no need to Delete first. if self.write_disposition.get() == 'WRITE_APPEND': yield element @@ -415,8 +440,11 @@ def to_bigquery_schema(self, fields): field['fields'] = self.to_bigquery_schema(field['fields']) return [bigquery.SchemaField(**field) for field in fields] - def process(self, element, schemas): - """Element is a tuple of key_ name and iterable of filesystem paths.""" + def process(self, element, schemas, **kwargs): + """Element is a tuple of key_ name and iterable of filesystem paths. + :param schemas: schema of the table. + :param element: name of object to load. + """ dataset_ref = self.get_dataset_ref() sharded_key_name = element[0] @@ -424,6 +452,7 @@ def process(self, element, schemas): object_paths = [object_path for object_path in element[1]] job_config = bigquery.LoadJobConfig() job_config.write_disposition = 'WRITE_APPEND' + job_config.ignore_unknown_values = True job_config.schema_update_options = [ bigquery.job.SchemaUpdateOption.ALLOW_FIELD_ADDITION] @@ -487,7 +516,7 @@ def _add_argparse_args(cls, parser): '--num_shards', help=( 'Number of shards to use per key.' 'List of asset types and the number' - 'of shardes to use for that type with "*" used as a default.' + 'of shards to use for that type with "*" used as a default.' ' For example "google.compute.VpnTunnel=1,*=10"'), default='*=1') @@ -518,47 +547,49 @@ def run(argv=None): # Cleanup json documents. sanitized = ( - p | 'read' >> ReadFromText(options.input, coder=JsonCoder()) - | 'produce_resource_json' >> beam.ParDo(ProduceResourceJson( - options.group_by)) - | 'bigquery_sanitize' >> beam.ParDo(BigQuerySanitize())) + p | 'read' >> ReadFromText(options.input, coder=JsonCoder()) + | 'produce_resource_json' >> beam.ParDo(ProduceResourceJson( + options.group_by)) + | 'bigquery_sanitize' >> beam.ParDo(BigQuerySanitize())) # Joining all iam_policy objects with resources of the same name. merged_iam = ( - sanitized | 'assign_name_key' >> beam.ParDo( - AssignGroupByKey('NAME', '')) - | 'group_by_name' >> beam.GroupByKey() - | 'combine_policy' >> beam.ParDo(CombinePolicyResource())) + sanitized | 'assign_name_key' >> beam.ParDo( + AssignGroupByKey('NAME', '')) + | 'group_by_name' >> beam.GroupByKey() + | 'combine_policy' >> beam.ParDo(CombinePolicyResource())) # split into BigQuery tables. keyed_assets = merged_iam | 'assign_group_by_key' >> beam.ParDo( AssignGroupByKey(options.group_by, options.num_shards)) # Generate BigQuery schema for each table. - schemas = keyed_assets | 'to_schema' >> core.CombinePerKey( - BigQuerySchemaCombineFn()) + schemas = (keyed_assets + | 'to_schema' >> core.CombinePerKey( + BigQuerySchemaCombineFn()) + | 'sanitize_schema' >> beam.ParDo(SanitizeBigQuerySchema())) pvalue_schemas = beam.pvalue.AsDict(schemas) # Write to GCS and load to BigQuery. # pylint: disable=expression-not-assigned (keyed_assets | 'add_load_time' >> beam.ParDo(AddLoadTime(options.load_time)) - | 'group_by_sharded_key_for_enfoce' >> beam.GroupByKey() + | 'group_by_sharded_key_for_enforce_schema' >> beam.GroupByKey() | 'enforce_schema' >> beam.ParDo(EnforceSchemaDataTypes(), pvalue_schemas) | 'group_by_sharded_key_for_write' >> beam.GroupByKey() | 'write_to_gcs' >> beam.ParDo( - WriteToGCS(options.stage, options.load_time)) + WriteToGCS(options.stage, options.load_time)) | 'group_written_objects_by_key' >> beam.GroupByKey() | 'delete_tables' >> beam.ParDo( - DeleteDataSetTables(options.dataset, options.add_load_date_suffix, - options.load_time, - options.write_disposition)) + DeleteDataSetTables(options.dataset, options.add_load_date_suffix, + options.load_time, + options.write_disposition)) | 'assign_sharded_key_for_load' >> beam.ParDo(AssignShardedKeyForLoad()) | 'group_by_sharded_key_for_load' >> beam.GroupByKey() | 'load_to_bigquery' >> beam.ParDo( - LoadToBigQuery(options.dataset, options.add_load_date_suffix, - options.load_time), - beam.pvalue.AsDict(schemas))) + LoadToBigQuery(options.dataset, options.add_load_date_suffix, + options.load_time), + beam.pvalue.AsDict(schemas))) return p.run() diff --git a/tools/asset-inventory/asset_inventory/main.py b/tools/asset-inventory/asset_inventory/main.py index 504dec41df..571a31dd0b 100644 --- a/tools/asset-inventory/asset_inventory/main.py +++ b/tools/asset-inventory/asset_inventory/main.py @@ -30,11 +30,11 @@ def parse_args(): - """Parse command line argments. + """Parse command line arguments. Present a slightly simpler interface by constructing the pipeline input and stage parameters from the export gcs-destination if not supplied. Also - accepts arbitraty beam pipeline arguments if using a beam runner. but # + accepts arbitrary beam pipeline arguments if using a beam runner. but # getting them into the help text is near impossible. @@ -54,16 +54,16 @@ def parse_args(): choices=['ASSET_TYPE', 'ASSET_TYPE_VERSION', 'NONE'], # pylint: disable=line-too-long help=( - 'How to group exported resources into Bigquery tables.\n', - ' ASSET_TYPE: A table for each asset type (like google.compute.Instance\n', - ' ASSET_TYPE_VERSION: A table for each asset type and api version (like google.compute.Instance.v1\n', + 'How to group exported resources into Bigquery tables.\n' + ' ASSET_TYPE: A table for each asset type (like google.compute.Instance\n' + ' ASSET_TYPE_VERSION: A table for each asset type and api version (like google.compute.Instance.v1\n' ' NONE: One one table holding assets in a single json column\n')) parser.add_argument( '--write_disposition', default='WRITE_APPEND', choices=['WRITE_APPEND', 'WRITE_EMPTY'], - help='Location to write data and load from from.') + help='When WRITE_EMPTY, will delete the tables first prior to loading.') parser.add_argument( '--stage', @@ -81,7 +81,7 @@ def parse_args(): default='*=1', help=('Number of shards to use per asset type.' 'List of asset types and the number ' - 'of shardes to use for that type with "*" used as a default.' + 'of shards to use for that type with "*" used as a default.' ' For example "google.compute.VpnTunnel=1,*=10"')) parser.add_argument( @@ -97,6 +97,12 @@ def parse_args(): action='store_true', default=False) + parser.add_argument( + '--add-load-date-suffix', + help='If load date is appended to table name.', + action='store_true', + default=False) + parser.add_argument( '--template-job-launch-location', help=( @@ -133,7 +139,7 @@ def json_value(string_value): # If input isn't supplied, we can infer it from the export destination. if 'input' not in args or not args.input: args.input = '{}/*.json'.format(args.gcs_destination) - # If stage isn't supplied, we can infer it from export desitnation. + # If stage isn't supplied, we can infer it from export destination. if 'stage' not in args or not args.stage: args.stage = args.gcs_destination + '/stage' @@ -159,12 +165,13 @@ def main(): args.template_job_project, args.template_job_region, launch_location, args.input, args.group_by, args.write_disposition, args.dataset, args.stage, args.load_time, args.num_shards, + args.add_load_date_suffix, args.template_job_runtime_environment_json) else: final_state = pipeline_runner.run_pipeline_beam_runner( None, None, args.input, args.group_by, args.write_disposition, args.dataset, args.stage, args.load_time, args.num_shards, - beam_args) + args.add_load_date_suffix, beam_args) if not pipeline_runner.is_successful_state(final_state): sys.exit(1) diff --git a/tools/asset-inventory/asset_inventory/pipeline_runner.py b/tools/asset-inventory/asset_inventory/pipeline_runner.py index 2de8498f84..93f60b8a7d 100644 --- a/tools/asset-inventory/asset_inventory/pipeline_runner.py +++ b/tools/asset-inventory/asset_inventory/pipeline_runner.py @@ -22,7 +22,7 @@ def get_job_name(load_time): - """User friendly job name from load_time.""" + """User-friendly job name from load_time.""" return ('cloud-asset-import-' + load_time.lower().replace( ':', '-').replace(' ', '').replace('.', '-')) @@ -60,7 +60,7 @@ def wait_on_pipeline_job(df_service, pipeline_job): ]: logging.info('final pipeline state : %s', current_state) return current_state, pipeline_job - logging.info('sleeping 60 seconds before repolling.') + logging.info('sleeping 60 seconds before polling.') time.sleep(60) return wait_on_pipeline_job(df_service, pipeline_job) @@ -69,7 +69,7 @@ def run_pipeline_template(dataflow_project, template_region, template_location, input_location, group_by, write_disposition, dataset, stage, load_time, num_shards, add_load_date_suffix, runtime_environment): - """Invoke the suplied pipeline template. + """Invoke the supplied pipeline template. Args: dataflow_project: Project to run the dataflow job in. @@ -77,13 +77,13 @@ def run_pipeline_template(dataflow_project, template_region, template_location, template_location: GCS path to the template file. input_location: GCS path load json documents from, group_by: How to split assets into tables. - write_disposition: To append to or ovewrite BigQuery tables. + write_disposition: To append to or overwrite BigQuery tables. dataset: BigQuery dataset to write to. stage: GCS path to write BigQuery load files. load_time: Timestamp or date to load data with. - num_shards: Shards for for each asset type. + num_shards: Shards for each asset type. add_load_date_suffix: If the load date is added as a table suffix. - runtime_environment: Dict suppling other runtime overrides. + runtime_environment: Dict supplying other runtime overrides. Returns: End state of the pipline and job object. """ @@ -132,11 +132,11 @@ def run_pipeline_beam_runner(pipeline_runner, dataflow_project, input_location, dataflow_project: Project to run the dataflow job in. input_location: GCS path load json documents from, group_by: How to split assets into tables. - write_disposition: To append to or ovewrite BigQuery tables. + write_disposition: To append to or overwrite BigQuery tables. dataset: BigQuery dataset to write to. stage: GCS path to write BigQuery load files. - load_time: Timestamp to add to data during during BigQuery load. - num_shards: Shards for for each asset type. + load_time: Timestamp to add to data during BigQuery load. + num_shards: Shards for each asset type. add_load_date_suffix: If the load date is added as a table suffix. pipeline_arguments: List of additional runner arguments. Returns: diff --git a/tools/asset-inventory/requirements.txt b/tools/asset-inventory/requirements.txt index c4ef302601..855f06edad 100644 --- a/tools/asset-inventory/requirements.txt +++ b/tools/asset-inventory/requirements.txt @@ -1,3 +1,14 @@ -apache-beam[gcp]==2.25.0 +apache-beam[gcp]==2.60.0 +google-api-python-client==2.151.0 +google-cloud-asset==3.27.1 +google-cloud-bigquery==3.26.0 mock==2.0.0 pytest +requests-futures==1.0.0 + +setuptools~=63.2.0 +tomli~=2.0.2 +packaging~=24.1 +six~=1.16.0 +requests~=2.32.3 +oauth2client~=4.1.3 diff --git a/tools/asset-inventory/setup.py b/tools/asset-inventory/setup.py index e4646c7292..1d24af6ddc 100644 --- a/tools/asset-inventory/setup.py +++ b/tools/asset-inventory/setup.py @@ -39,7 +39,7 @@ packages=['asset_inventory'], setup_requires=['pytest-runner', 'setuptools_scm'], extras_require = { - 'testing': ['mock==4.0.3', 'pytest==7.1.3', 'apache-beam[gcp]==2.41.0'], + 'testing': ['mock==4.0.3', 'pytest==7.1.3', 'apache-beam[gcp]==2.60.0'], }, include_package_data=True, # https://pypi.org/project/google-cloud-asset/#history @@ -47,8 +47,9 @@ # https://pypi.org/project/google-cloud-bigquery/#history # https://pypi.org/project/requests-futures/#history install_requires=[ - 'google-cloud-asset==3.13.0', - 'google-cloud-bigquery==2.34.4', + 'google-api-python-client==2.151.0', + 'google-cloud-asset==3.27.1', + 'google-cloud-bigquery==3.26.0', 'requests-futures==1.0.0' ], use_scm_version = { diff --git a/tools/asset-inventory/tests/data/resource_with_error_details.json b/tools/asset-inventory/tests/data/resource_with_error_details.json new file mode 100644 index 0000000000..d9f01e4025 --- /dev/null +++ b/tools/asset-inventory/tests/data/resource_with_error_details.json @@ -0,0 +1 @@ +{"name":"//datamigration.googleapis.com/projects/project-id/locations/europe-west1/migrationJobs/job-id","asset_type":"datamigration.googleapis.com.MigrationJob","resource":{"version":"v1","discovery_document_uri":"https://datamigration.googleapis.com/$discovery/rest","discovery_name":"MigrationJob","parent":"//cloudresourcemanager.googleapis.com/projects/123","data":{"createTime":"2022-03-15T13:05:58.615319161Z","destination":"projects/project-id/locations/europe-west1/connectionProfiles/connection-id","destinationDatabase":{"engine":"POSTGRESQL","provider":"CLOUDSQL"},"displayName":"display-name","duration":"1740104.555020s","error":{"code":13,"details":[{"@type":"type.googleapis.com/google.rpc.DebugInfo","detail":"generic::DEADLINE_EXCEEDED: deadline=9.2","stackEntries":["com.google.spanner.SpannerException: generic::DEADLINE_EXCEEDED: deadline=9.2"]},{"@type":"type.googleapis.com/google.rpc.Help","links":[{"description":"Learn more:","url":"https://cloud.google.com/database-migration/docs/diagnose-issues"}]}],"message":"An internal error occurred. Contact support. Dapper trace id: -40, operation id: projects/123/locations/europe-west1/operations/operation-123."},"name":"projects/123/locations/europe-west1/migrationJobs/jobs","phase":"CDC","reverseSshConnectivity":{"vm":"my-vn","vmIp":"1.2.3.4","vmPort":20,"vpc":"https://www.googleapis.com/compute/v1/projects/123/global/networks/my-vpc"},"source":"projects/project-id/locations/europe-west1/connectionProfiles/profile","sourceDatabase":{"engine":"POSTGRESQL"},"state":"FAILED","type":"CONTINUOUS","updateTime":"2022-10-17T18:57:33.913824Z"},"location":"europe-west1"},"ancestors":["projects/456","folders/789","folders/733","folders/322","folders/2233","folders/22","organizations/2222"],"update_time":"2022-10-17T18:57:34.027405Z"} diff --git a/tools/asset-inventory/tests/test_api_schema.py b/tools/asset-inventory/tests/test_api_schema.py index df2e2c75c5..6d17215fa0 100644 --- a/tools/asset-inventory/tests/test_api_schema.py +++ b/tools/asset-inventory/tests/test_api_schema.py @@ -16,19 +16,21 @@ """Test construction of a BigQuery schema from an API discovery document..""" import unittest -from asset_inventory.api_schema import APISchema +from asset_inventory.api_schema import APISchema # pylint:disable=protected-access # pylint: disable=line-too-long class TestApiSchema(unittest.TestCase): + maxDiff = None def get_schema_data_field(self, fields): return self.get_field_by_name( self.get_field_by_name(fields, 'resource')['fields'], 'data')['fields'] - def get_field_by_name(self, fields, field_name): + @staticmethod + def get_field_by_name(fields, field_name): for field in fields: if field['name'].lower() == field_name.lower(): return field @@ -91,7 +93,7 @@ def test_record_properties(self): 'description': 'description-2.', 'mode': 'NULLABLE' }] - }]) + }]) def test_repeated_properties(self): api_properties = { @@ -165,7 +167,7 @@ def test_for_swagger_type(self): 'mode': 'NULLABLE'}], data_fields) - def test_for_for_asset_type(self): + def test_for_asset_type(self): APISchema._discovery_document_cache = { 'https://www.googleapis.com/discovery/v1/apis/compute/v1/rest': { 'id': 'compute.v1', @@ -198,8 +200,8 @@ def test_for_for_asset_type(self): 'name': 'lastModifiedTime', 'mode': 'NULLABLE'}], data_fields) - # name, asset_type, timestamp, resource, iam_policy - self.assertEqual(len(schema), 5) + # name, asset_type, timestamp, resource, iam_policy, update_time, ancestors + self.assertEqual(len(schema), 7) def test_resource_last_modified(self): # Test that resource lastModifiedTime takes precedence. @@ -301,6 +303,48 @@ def test_string_additional_properties(self): 'description': 'description-1.', 'mode': 'NULLABLE'}]}]) + def test_additional_properties_with_properties(self): + api_properties = { + 'property-1': { + 'type': 'object', + 'properties': { + 'property-nested': { + 'type': 'string' + } + }, + 'additionalProperties': { + 'type': 'string', + 'description': 'description-1.' + }, + 'description': 'description-1' + }, + } + resources = {} + schema = APISchema._properties_map_to_field_list(api_properties, + resources, {}) + schema.sort() + self.assertEqual( + schema, + [{'name': 'property-1', + 'field_type': 'RECORD', + 'fields': [{'name': 'property-nested', + 'field_type': 'STRING', + 'mode': 'NULLABLE'}, + {'name': 'additionalProperties', + 'description': 'additionalProperties', + 'field_type': 'RECORD', + 'fields': [{'description': 'additionalProperties name', + 'field_type': 'STRING', + 'mode': 'NULLABLE', + 'name': 'name'}, + {'description': 'description-1.', + 'field_type': 'STRING', + 'mode': 'NULLABLE', + 'name': 'value'}], + 'mode': 'REPEATED'}], + 'description': 'description-1', + 'mode': 'NULLABLE'}]) + def test_nested_additional_properties(self): api_properties = { 'property-1': { @@ -343,3 +387,76 @@ def test_nested_additional_properties(self): 'field_type': 'STRING', 'description': 'description-2.', 'mode': 'NULLABLE'}]}]}]) + + def test_array_additional_properties(self): + resources = { + 'Status': { + 'type': 'object', + 'properties': { + 'code': { + 'format': 'int32', + 'description': 'The status code, which should be an enum value of google.rpc.Code.', + 'type': 'integer' + }, + 'details': { + 'type': 'array', + 'description': 'A list of messages that carry the error details. There is a common set of ' + 'message types for APIs to use.', + 'items': { + 'additionalProperties': { + 'description': 'Properties of the object. Contains field @type with type URL.', + 'type': 'any' + }, + 'type': 'object' + } + } + } + } + } + + api_properties = { + 'error': { + '$ref': 'Status', + 'readOnly': True, + 'description': 'Output only. The error details in case of state FAILED.' + }, + } + + schema = APISchema._properties_map_to_field_list(api_properties, + resources, {}) + schema.sort() + self.assertEqual( + schema, + [{'description': 'Output only. The error details in case of state FAILED.', + 'field_type': 'RECORD', + 'fields': [{'description': 'The status code, which should be an enum value ' + 'of google.rpc.Code.', + 'field_type': 'NUMERIC', + 'mode': 'NULLABLE', + 'name': 'code'}, + {'description': 'A list of messages that carry the error details. ' + 'There is a common set of message types for APIs ' + 'to use.', + 'field_type': 'RECORD', + 'fields': [{'description': 'additionalProperties', + 'field_type': 'RECORD', + 'fields': [{'description': 'additionalProperties ' + 'name', + 'field_type': 'STRING', + 'mode': 'NULLABLE', + 'name': 'name'}, + {'description': 'additionalProperties ' + 'value', + 'field_type': 'STRING', + 'mode': 'NULLABLE', + 'name': 'value'}], + 'mode': 'REPEATED', + 'name': 'additionalProperties'}], + 'mode': 'REPEATED', + 'name': 'details'}], + 'mode': 'NULLABLE', + 'name': 'error'}]) + +# "error":{"code":13,"details":[{"@type":"type.googleapis.com/google.rpc.DebugInfo", +# "detail":"generic::DEADLINE_EXCEEDED: deadline=9.223372036854776E15s","stackEntries":[ +# "com.google.spanner.SpannerException: generic::DEADLINE_EXCEEDED: diff --git a/tools/asset-inventory/tests/test_bigquery_schema.py b/tools/asset-inventory/tests/test_bigquery_schema.py index 789cbc2a64..e8b8e3ef57 100644 --- a/tools/asset-inventory/tests/test_bigquery_schema.py +++ b/tools/asset-inventory/tests/test_bigquery_schema.py @@ -22,7 +22,6 @@ class TestBigQuerySchema(unittest.TestCase): - def test_record(self): document = {'record_field': {'string_field': 'string_value'}} schema = bigquery_schema.translate_json_to_schema( @@ -34,7 +33,7 @@ def test_record(self): {'name': 'string_field', 'field_type': 'STRING', 'mode': 'NULLABLE' - }]}]) + }]}]) def test_array(self): document = {'array_field': [{'string_field': 'string_value'}]} @@ -47,7 +46,7 @@ def test_array(self): {'name': 'string_field', 'field_type': 'STRING', 'mode': 'NULLABLE' - }]}]) + }]}]) def test_numeric(self): document = {'integer_field': 111, 'float_field': 22.0} @@ -60,7 +59,7 @@ def test_numeric(self): {'name': 'integer_field', 'field_type': 'NUMERIC', 'mode': 'NULLABLE'} - ]) + ]) def test_bool(self): document = {'bool_array_field': [True, False], 'bool_field': False} @@ -73,7 +72,7 @@ def test_bool(self): {'name': 'bool_field', 'field_type': 'BOOL', 'mode': 'NULLABLE'} - ]) + ]) def test_timestamp(self): document = {'timestamp': '2019-01-01T00:01:00'} @@ -82,13 +81,13 @@ def test_timestamp(self): self.assertEqual(schema, [{'name': 'timestamp', 'field_type': 'STRING', 'mode': 'NULLABLE'}, - ]) + ]) def test_merge_schemas_basic(self): schemas = [ bigquery_schema.translate_json_to_schema({ 'field1': - 'string' + 'string' }), bigquery_schema.translate_json_to_schema({ 'field2': 3 @@ -103,7 +102,7 @@ def test_merge_schemas_basic(self): {'name': 'field2', 'field_type': 'NUMERIC', 'mode': 'NULLABLE'}, - ]) + ]) def test_merge_array_schemas_records(self): schemas = [ @@ -154,10 +153,10 @@ def test_merge_schemas_records(self): def test_sanitize_property_value(self): doc = { - 'empyty_dict': {}, - 'empyty_dict_list': [{}, {}], + 'empty_dict': {}, + 'empty_dict_list': [{}, {}], 'a' * 200: 'value0', - '@!@': 'deleteme', + '@!@': 'delete_me', '@2_3': 'value1', 'invalid_numeric': 9.300000191734863, 'labels': { @@ -169,7 +168,7 @@ def test_sanitize_property_value(self): self.assertEqual(len(sanitized), 4) self.assertNotIn('empty_dict', sanitized) self.assertNotIn('empty_dict_list', sanitized) - self.assertEqual(sanitized['a'* 128], 'value0') + self.assertEqual(sanitized['a' * 128], 'value0') self.assertEqual(sanitized['invalid_numeric'], 9.300000192) self.assertEqual(sanitized['_2_3'], 'value1') labels = sanitized['labels'] @@ -257,7 +256,7 @@ def test_enforce_schema_data_types(self): self.assertEqual(bigquery_schema.enforce_schema_data_types( {'property_7': [{'property_1': 'invalid'}, 33]}, schema), {}) - def test_addtional_properties_repeated_string(self): + def test_additional_properties_repeated_string(self): schema = [ {'name': 'property_1', 'field_type': 'RECORD', @@ -277,7 +276,7 @@ def test_addtional_properties_repeated_string(self): {'property_1': [{'name': 'key1', 'value': 'a'}, {'name': 'key2', 'value': 'b'}]}) - def test_addtional_properties_repeated_record(self): + def test_additional_properties_repeated_record(self): schema = [ {'name': 'property_1', 'field_type': 'RECORD', @@ -302,6 +301,84 @@ def test_addtional_properties_repeated_record(self): {'property_1': [{'name': 'key1', 'value': {'property_2': 'a'}}, {'name': 'key2', 'value': {'property_2': 'b'}}]}) + def test_additional_properties_push_down(self): + schema = [ + {'name': 'property_1', + 'field_type': 'STRING', + 'description': 'a property', + 'mode': 'NULLABLE'}, + {'name': 'additionalProperties', + 'field_type': 'RECORD', + 'description': 'description-1', + 'mode': 'REPEATED', + 'fields': [{'name': 'name', + 'field_type': 'STRING', + 'description': 'additionalProperties name', + 'mode': 'NULLABLE'}, + {'name': 'value', + 'field_type': 'STRING', + 'description': 'description-1.', + 'mode': 'NULLABLE'}]}] + self.assertEqual( + bigquery_schema.enforce_schema_data_types( + {'property_1': 'value_1', + 'property_2': {'key1': {'property_2': 'a'}, + 'key2': {'property_2': 'b'}}}, schema), + {'property_1': 'value_1', + 'additionalProperties': [ + {'name': 'property_2', + 'value': "{'key1': {'property_2': 'a'}, 'key2': {'property_2': 'b'}}"}]}) + + def test_enforce_additional_properties_list(self): + schema = [ + {'description': 'Output only. The error details in case of state FAILED.', + 'field_type': 'RECORD', + 'fields': [{'description': 'The status code, which should be an enum value ' + 'of google.rpc.Code.', + 'field_type': 'NUMERIC', + 'mode': 'NULLABLE', + 'name': 'code'}, + {'description': 'A list of messages that carry the error details. ' + 'There is a common set of message types for APIs ' + 'to use.', + 'field_type': 'RECORD', + 'fields': [{'description': 'additionalProperties', + 'field_type': 'RECORD', + 'fields': [{'description': 'additionalProperties ' + 'name', + 'field_type': 'STRING', + 'mode': 'NULLABLE', + 'name': 'name'}, + {'description': 'additionalProperties ' + 'value', + 'field_type': 'STRING', + 'mode': 'NULLABLE', + 'name': 'value'}], + 'mode': 'REPEATED', + 'name': 'additionalProperties'}], + 'mode': 'REPEATED', + 'name': 'details'}], + 'mode': 'NULLABLE', + 'name': 'error'}] + self.assertEqual( + bigquery_schema.enforce_schema_data_types( + {'error': {'code': 13, 'details': [{'@type': 'type.googleapis.com/google.rpc.DebugInfo', + 'detail': 'generic::DEADLINE_EXCEEDED: ' + 'deadline=9.223372036854776E15s', + 'stackEntries': [ + 'com.google.spanner.SpannerException: ' + 'generic::DEADLINE_EXCEEDED:']}]}}, + schema), + {'error': {'code': 13, + 'details': [{'additionalProperties': [{'name': '@type', + 'value': 'type.googleapis.com/google.rpc.DebugInfo'}, + {'name': 'detail', + 'value': 'generic::DEADLINE_EXCEEDED: ' + 'deadline=9.223372036854776E15s'}, + {'name': 'stackEntries', + 'value': "['com.google.spanner.SpannerException: " + "generic::DEADLINE_EXCEEDED:']"}]}]}}) + def test_remove_duplicate_property(self): doc = { 'ipAddress': 'value', @@ -332,13 +409,13 @@ def test_prune_max_properties(self): self.assertEqual(len(sanitized), 10000) self.assertNotIn('z', sanitized) - def test_addtional_properties_merge_schema_simple(self): + def test_additional_properties_merge_schema_simple(self): rest_schema = [ {'name': 'property_1', 'field_type': 'STRING', 'description': 'description-1', 'mode': 'NULLABLE' - }, + }, {'name': 'property_2', 'field_type': 'RECORD', 'description': 'description-2', @@ -357,7 +434,7 @@ def test_addtional_properties_merge_schema_simple(self): 'property_2': { 'add_prop_1': 'add_value_1', 'add_prop_2': 'add_value_2' - }, + }, 'property_3': 'value_3' } @@ -371,16 +448,16 @@ def test_addtional_properties_merge_schema_simple(self): rest_schema + [{'name': 'property_3', 'field_type': 'STRING', 'mode': 'NULLABLE' - }]) + }]) - def test_addtional_properties_merge_schema_object(self): + def test_additional_properties_merge_schema_object(self): self.maxDiff = None rest_schema = [ {'name': 'property_1', 'field_type': 'STRING', 'description': 'description-1', 'mode': 'NULLABLE' - }, + }, {'name': 'property_2', 'field_type': 'RECORD', 'description': 'description-2', @@ -398,7 +475,7 @@ def test_addtional_properties_merge_schema_object(self): 'property_2': { 'add_prop_1': {'key_1': 1}, 'add_prop_2': {'key_1': 2} - }, + }, 'property_3': 'value_3' } @@ -413,7 +490,7 @@ def test_addtional_properties_merge_schema_object(self): 'field_type': 'STRING', 'description': 'description-1', 'mode': 'NULLABLE' - }, + }, {'name': 'property_2', 'field_type': 'RECORD', 'description': 'description-2', @@ -428,7 +505,7 @@ def test_addtional_properties_merge_schema_object(self): 'fields': [{'name': 'key_1', 'field_type': 'NUMERIC', 'mode': 'NULLABLE'}]}] - }, + }, {'name': 'property_3', 'field_type': 'STRING', 'mode': 'NULLABLE'}]) diff --git a/tools/asset-inventory/tests/test_export.py b/tools/asset-inventory/tests/test_export.py index d10984e263..a6eb6af0c1 100644 --- a/tools/asset-inventory/tests/test_export.py +++ b/tools/asset-inventory/tests/test_export.py @@ -89,5 +89,6 @@ def test_main(self, mock_export_to_gcs, mock_export_to_gcs_content_types, self.assertEqual(mock_export_to_gcs_content_types.call_count, 1) mock_export_to_gcs.assert_not_called() + if __name__ == '__main__': unittest.main() diff --git a/tools/asset-inventory/tests/test_import_pipeline.py b/tools/asset-inventory/tests/test_import_pipeline.py index 72be7e3b78..5fcc3a28e2 100644 --- a/tools/asset-inventory/tests/test_import_pipeline.py +++ b/tools/asset-inventory/tests/test_import_pipeline.py @@ -23,7 +23,6 @@ from asset_inventory import import_pipeline import mock -from six import string_types STAGE_PATH = 'tests/data/stage' @@ -31,11 +30,11 @@ class TestImportPipeline(unittest.TestCase): def setUp(self): - if not os.path.exists(STAGE_PATH): os.mkdir(STAGE_PATH) for old_test_file in os.listdir(STAGE_PATH): - os.remove(os.path.join(STAGE_PATH, old_test_file)) + if os.path.isfile(os.path.join(STAGE_PATH, old_test_file)): + os.remove(os.path.join(STAGE_PATH, old_test_file)) @mock.patch('google.cloud.bigquery.Client') def test_assets(self, _): @@ -93,13 +92,14 @@ def test_resources(self, _): self.assertEqual(len(instance_labels), 1) @mock.patch('google.cloud.bigquery.Client') - def test_load_group_by_none(self, _): + def test_resource_with_error_details(self, _): with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'The compiler package is deprecated') import_pipeline.run([ '--load_time=', - '--input=tests/data/resource.json', '--group_by=NONE', + '--input=tests/data/resource_with_error_details.json', + '--group_by=ASSET_TYPE', '--stage={}'.format(STAGE_PATH), '--dataset=test_resource' ]) rows = [] @@ -109,18 +109,38 @@ def test_load_group_by_none(self, _): with open(fn) as f: for line in f: rows.append(json.loads(line)) - self.assertEqual(export_files, 1) + # both resource and asset_type file should exist. + self.assertEqual(export_files, 2) found_assets = {} found_names = {} for row in rows: found_assets[row['asset_type']] = row found_names[row['name']] = row - self.assertEqual(len(found_names), 2) - self.assertEqual(len(found_assets), 2) - instance_row = found_assets['google.compute.Instance'] - resource_properties = instance_row['resource']['json_data'] - self.assertIsInstance(resource_properties, string_types) - self.assertNotIn('data', instance_row['resource']) + self.assertEqual(len(found_names), 1) + self.assertEqual(len(found_assets), 1) + instance_row = found_assets[ + 'datamigration.googleapis.com.MigrationJob'] + error_property = instance_row['resource']['data']['error'] + self.assertEqual( + error_property, + {"code": 13, "details": [ + {"additionalProperties": [ + {"name": "detail", + "value": "generic::DEADLINE_EXCEEDED: deadline=9.2"}, + {"name": "stackEntries", + "value": "['com.google.spanner.SpannerException: generic::DEADLINE_EXCEEDED: deadline=9.2']"}, + {"name": "type", + "value": "type.googleapis.com/google.rpc.DebugInfo"}]}, + {"additionalProperties": [ + {"name": "links", + "value": "[{'description': 'Learn more:', 'url': " + "'https://cloud.google.com/database-migration/docs/diagnose-issues'}]"}, + {"name": "type", + "value": "type.googleapis.com/google.rpc.Help"}]}], + "message": + "An internal error occurred. Contact support. Dapper trace id: -40, operation id: " + "projects/123/locations/europe-west1/operations/operation-123." + }) if __name__ == '__main__':