Skip to content

Commit

Permalink
debug simpledb
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 26, 2024
1 parent 0f6f819 commit b849403
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions datapackage_pipelines_budgetkey/pipelines/simpledb/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ def debug_source(source, debug):
''',
possible_values=['approval', 'payment'],
type='string',
default=lambda row: 'payment' if row.get('year_paid') else 'approval',
default=lambda row: ('payment' if row.get('year_paid') else 'approval'),
),
dict(
name='year',
Expand All @@ -451,7 +451,7 @@ def debug_source(source, debug):
''',
sample_values=[2017, 2023, 2024],
type='integer',
default=lambda row: row.get('year_requested') if row['value_kind'] == 'approval' else row.get('year_paid'),
default=lambda row: (row.get('year_requested') if row['value_kind'] == 'approval' else row.get('year_paid')),
),
dict(
name='amount',
Expand All @@ -460,7 +460,7 @@ def debug_source(source, debug):
''',
sample_values=[1000000, 5000000, 10000000],
type='number',
default=lambda row: row.get('amount_approved') if row['value_kind'] == 'approval' else row.get('amount_total'),
default=lambda row: (row.get('amount_approved') if row['value_kind'] == 'approval' else row.get('amount_total')),
filter=lambda x: x is not None and x > 0
),
dict(
Expand Down Expand Up @@ -498,7 +498,7 @@ def debug_source(source, debug):
'private_person',
],
type='string',
default=lambda row: row.get('entity_kind') or 'private_person'
default=lambda row: (row.get('entity_kind') or 'private_person')
),
],
search=dict(
Expand All @@ -520,6 +520,11 @@ def debug_source(source, debug):
),
)

def print_descriptor(package: DF.PackageWrapper):
print(json.dumps(package.pkg.descriptor, indent=2))
yield package.pkg
yield from package

def get_flow(table, params, debug=False):
steps = []
source = debug_source(params['source'], debug)
Expand Down Expand Up @@ -548,6 +553,7 @@ def get_flow(table, params, debug=False):

steps.append(DF.select_fields(field_names))
if not debug:
steps.append(print_descriptor)
steps.append(DF.dump_to_path(f'/var/datapackages/simpledb/{table}'))
steps.append(DF.dump_to_sql({table: {'resource-name': table}}))
else:
Expand Down

0 comments on commit b849403

Please sign in to comment.