You can define a Data Catalog in two ways. Most use cases can be through a YAML configuration file as illustrated previously, but it is possible to access the Data Catalog programmatically through kedro.io.DataCatalog
using an API that allows you to configure data sources in code and use the IO module within notebooks.
To use the DataCatalog
API, construct a DataCatalog
object programmatically in a file like catalog.py
.
In the following code, we use several pre-built data loaders documented in the API reference documentation.
from kedro.io import DataCatalog
from kedro_datasets.pandas import (
CSVDataSet,
SQLTableDataSet,
SQLQueryDataSet,
ParquetDataSet,
)
io = DataCatalog(
{
"bikes": CSVDataSet(filepath="../data/01_raw/bikes.csv"),
"cars": CSVDataSet(filepath="../data/01_raw/cars.csv", load_args=dict(sep=",")),
"cars_table": SQLTableDataSet(
table_name="cars", credentials=dict(con="sqlite:///kedro.db")
),
"scooters_query": SQLQueryDataSet(
sql="select * from cars where gear=4",
credentials=dict(con="sqlite:///kedro.db"),
),
"ranked": ParquetDataSet(filepath="ranked.parquet"),
}
)
When using SQLTableDataSet
or SQLQueryDataSet
you must provide a con
key containing SQLAlchemy compatible database connection string. In the example above we pass it as part of credentials
argument. Alternative to credentials
is to put con
into load_args
and save_args
(SQLTableDataSet
only).
To review the DataCatalog
:
io.list()
To access each dataset by its name:
cars = io.load("cars") # data is now loaded as a DataFrame in 'cars'
gear = cars["gear"].values
The following steps happened behind the scenes when load
was called:
- The value
cars
was located in the Data Catalog - The corresponding
AbstractDataset
object was retrieved - The
load
method of this dataset was called - This
load
method delegated the loading to the underlying pandasread_csv
function
This pattern is not recommended unless you are using platform notebook environments (Sagemaker, Databricks etc) or writing unit/integration tests for your Kedro pipeline. Use the YAML approach in preference.
To save data using an API similar to that used to load data:
from kedro.io import MemoryDataset
memory = MemoryDataset(data=None)
io.add("cars_cache", memory)
io.save("cars_cache", "Memory can store anything.")
io.load("cars_cache")
To put the data in a SQLite database:
import os
# This cleans up the database in case it exists at this point
try:
os.remove("kedro.db")
except FileNotFoundError:
pass
io.save("cars_table", cars)
# rank scooters by their mpg
ranked = io.load("scooters_query")[["brand", "mpg"]]
To save the processed data in Parquet format:
io.save("ranked", ranked)
Saving `None` to a dataset is not allowed!
Before instantiating the DataCatalog
, Kedro will first attempt to read the credentials from the project configuration. The resulting dictionary is then passed into DataCatalog.from_config()
as the credentials
argument.
Let's assume that the project contains the file conf/local/credentials.yml
with the following contents:
dev_s3:
client_kwargs:
aws_access_key_id: key
aws_secret_access_key: secret
scooters_credentials:
con: sqlite:///kedro.db
my_gcp_credentials:
id_token: key
Your code will look as follows:
CSVDataSet(
filepath="s3://test_bucket/data/02_intermediate/company/motorbikes.csv",
load_args=dict(sep=",", skiprows=5, skipfooter=1, na_values=["#NA", "NA"]),
credentials=dict(key="token", secret="key"),
)
In an earlier section of the documentation we described how Kedro enables dataset and ML model versioning.
If you require programmatic control over load and save versions of a specific dataset, you can instantiate Version
and pass it as a parameter to the dataset initialisation:
from kedro.io import DataCatalog, Version
from kedro_datasets.pandas import CSVDataSet
import pandas as pd
data1 = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
data2 = pd.DataFrame({"col1": [7], "col2": [8], "col3": [9]})
version = Version(
load=None, # load the latest available version
save=None, # generate save version automatically on each save operation
)
test_data_set = CSVDataSet(
filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
io = DataCatalog({"test_data_set": test_data_set})
# save the dataset to data/01_raw/test.csv/<version>/test.csv
io.save("test_data_set", data1)
# save the dataset into a new file data/01_raw/test.csv/<version>/test.csv
io.save("test_data_set", data2)
# load the latest version from data/test.csv/*/test.csv
reloaded = io.load("test_data_set")
assert data2.equals(reloaded)
In the example above, we do not fix any versions. The behaviour of load and save operations becomes slightly different when we set a version:
version = Version(
load="my_exact_version", # load exact version
save="my_exact_version", # save to exact version
)
test_data_set = CSVDataSet(
filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
io = DataCatalog({"test_data_set": test_data_set})
# save the dataset to data/01_raw/test.csv/my_exact_version/test.csv
io.save("test_data_set", data1)
# load from data/01_raw/test.csv/my_exact_version/test.csv
reloaded = io.load("test_data_set")
assert data1.equals(reloaded)
# raises DatasetError since the path
# data/01_raw/test.csv/my_exact_version/test.csv already exists
io.save("test_data_set", data2)
We do not recommend passing exact load and/or save versions, since it might lead to inconsistencies between operations. For example, if versions for load and save operations do not match, a save operation would result in a UserWarning
.
Imagine a simple pipeline with two nodes, where B takes the output from A. If you specify the load-version of the data for B to be my_data_2023_08_16.csv
, the data that A produces (my_data_20230818.csv
) is not used.
Node_A -> my_data_20230818.csv
my_data_2023_08_16.csv -> Node B
In code:
version = Version(
load="my_data_2023_08_16.csv", # load exact version
save="my_data_20230818.csv", # save to exact version
)
test_data_set = CSVDataSet(
filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
io = DataCatalog({"test_data_set": test_data_set})
io.save("test_data_set", data1) # emits a UserWarning due to version inconsistency
# raises DatasetError since the data/01_raw/test.csv/exact_load_version/test.csv
# file does not exist
reloaded = io.load("test_data_set")