🐼 🐼 🐼 🐼 🐼 🐼 🐼
A simple interface written in python for reproducible i/o workflows around tabular data via pandas DataFrame
specified via yaml
"playbooks".
Apart from any pandas
function possible that can alter data, also datapatch is included for an additional and easier way to patch data.
NOTICE
As of july 2023, this package only handles pandas transform logic, no data warehousing anymore. See archived version
Specify your operations via yaml
syntax:
read:
uri: ./data.csv
options:
skiprows: 3
operations:
- handler: DataFrame.rename
options:
columns:
value: amount
- handler: Series.map
column: slug
options:
func: "lambda x: normality.slugify(x) if isinstance(x, str) else 'NO DATA'"
store this as a file pandas.yml
, and apply a data source:
cat data.csv | runpandarun pandas.yml > data_transformed.csv
Or, use within your python scripts:
from runpandarun import Playbook
play = Playbook.from_yaml("./pandas.yml")
df = play.run() # get the transformed dataframe
# change playbook parameters on run time:
play.read.uri = "s3://my-bucket/data.csv"
df = play.run()
df.to_excel("./output.xlsx")
# the play can be applied directly to a data frame,
# this allows more granular control
df = get_my_data_from_somewhere_else()
df = play.run(df)
Requires at least python3.10 Virtualenv use recommended.
Additional dependencies (pandas
et. al.) will be installed automatically:
pip install runpandarun
After this, you should be able to execute in your terminal:
runpandarun --help
The playbook can be programmatically obtained in different ways:
from runpandarun import Playbook
# via yaml file
play = Playbook.from_yaml('./path/to/config.yml')
# via yaml string
play = Playbook.from_string("""
operations:
- handler: DataFrame.sort_values
options:
by: my_sort_column
""")
# directly via the Playbook object (which is a pydantic object)
play = Playbook(operations=[{
"handler": "DataFrane.sort_values",
"options": {"by": "my_sort_column"}
}])
All options within the Playbook are optional, if you apply an empty play to a DataFrame, it will just remain untouched (but runpandarun
won't break)
The playbook has three sections:
- read: instructions for reading in a source dataframe
- operations: a list of functions with their options (kwargs) executed in the given order
- write: instructions for saving a transformed dataframe to a target
pandas
can read and write from many local and remote sources and targets.
More information about handlers and their options: Pandas IO tools
For example, you could transform a source from s3
to a sftp
endpoint:
runpandarun pandas.yml -i s3://my_bucket/data.csv -o sftp://user@host/data.csv
you can overwrite the uri
arguments in the command line with -i / --in-uri
and -o / --out-uri
read:
uri: s3://my-bucket/data.xls # input uri, anything that pandas can read
handler: read_excel # default: guess by file extension, fallback: read_csv
options: # options for the handler
skiprows: 2
write:
uri: ./data.xlsx # output uri, anything that pandas can write to
handler: write_excel # default: guess by file extension, fallback: write_csv
options: # options for the handler
index: false
The operations
key of the yaml spec holds the transformations that should be applied to the data in order.
An operation can be any function from pd.DataFrame or pd.Series. Refer to these documentations to see their possible options (as in **kwargs
).
For the handler, specify the module path without a pd
or pandas
prefix, just DataFrame.<func>
or Series.<func>
. When using a function that applies to a Series
, tell 🐼 which one to use via the column
prop.
operations:
- handler: DataFrame.rename
options:
columns:
value: amount
This exactly represents this python call to the processed dataframe:
df.rename(columns={"value": "amount"})
For api keys or other secrets, you can put environment variables anywhere into the config. They will simply resolved via os.path.expandvars
read:
options:
storage_options:
header:
"api-key": ${MY_API_KEY}
A full playbook example that covers a few of the possible cases.
See the yaml files in ./tests/fixtures/ for more.
read:
uri: https://api.example.org/data?format=csv
options:
storage_options:
header:
"api-key": ${API_KEY}
skipfooter: 1
operations:
- handler: DataFrame.rename
options:
columns:
value: amount
- handler: Series.str.lower
column: state
- handler: DataFrame.assign
options:
city_id: "lambda x: x['state'] + '-' + x['city'].map(normality.slugify)"
- handler: DataFrame.set_index
options:
keys:
- city_id
- handler: DataFrame.sort_values
options:
by:
- state
- city
patch:
city:
options:
- match: Zarizri
value: Zar1zr1
write:
uri: ftp://user:${FTP_PASSWORD}@host/data.csv
options:
index: false
operations:
- handler: DataFrame.rename
options:
columns:
value: amount
"First name": first_name
operations:
- handler: Series.map
column: my_column
options:
func: "lambda x: x.lower()"
operations:
- handler: DataFrame.set_index
options:
keys:
- city_id
operations:
- sort_values:
by:
- column1
- column2
ascending: false
when using a subset of columns, use in conjunction with sort_values
to make sure to keep the right records
operations:
- drop_duplicates:
subset:
- column1
- column2
keep: last
operations:
- handler: DataFrame.assign
options:
city_id: "lambda x: x['state'] + '-' + x['city'].map(normality.slugify)"
read:
uri: postgresql://user:password@host/database
options:
sql: "SELECT * FROM my_table WHERE category = 'A'"
Apart from any pandas
function possible that can alter data, also datapatch is included for an additional and easier way to patch data.
Simply add a patch
config to the yaml. Refer to the datapatch readme for details.
The patching is applied after all the operations
are applied.
patch:
countries:
normalize: true
lowercase: true
options:
- match: Frankreich
value: France
- match:
- Northkorea
- Nordkorea
- Northern Korea
- NKorea
- DPRK
value: North Korea
- contains: Britain
value: Great Britain
Ok wait, you are executing arbitrary python code in the yaml specs?
Not really, there is a strict allow list of possible modules that can be used. See runpandarun.util.safe_eval
This includes:
- any pandas or numpy modules
- normality
- fingerprints
So, this would, of course, NOT WORK (as tested here)
operations:
- handler: DataFrame.apply
func: "__import__('os').system('rm -rf /')"
Package is managed via Poetry
git clone https://github.com/investigativedata/runpandarun
Install requirements:
poetry install --with dev
Test:
make test
Since July 2023, this project is part of investigraph and development of this project is funded by
Media Tech Lab Bayern batch #3