You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The regular ExcelDataset is rather troublesome to enable direct translator to engineer interactions due to file downloads and versioning. Using sheets instead allows for a live editable document where a kedro pipeline can contribute as a "user" writing specific sheets or columns.
Context
We're using this in our pipeline atm and it's proven to be quite effective.
Possible Implementation
_sheets_dataset: &_sheets_datasettype: datasets.gcp.GoogleSheetsDatasetkey: <sheet_id_here>service_file: conf/local/service-account.json# catalog examplespreprocessing.int.resolved_nodes:
<<: [*_layer_int, *_sheets_dataset]save_args:
sheet_name: Nodeswrite_columns: ["curie"] # saves only the "curie" column to the sheetload_args:
sheet_name: Nodespreprocessing.int.normalized_nodes:
<<: [*_layer_int, *_sheets_dataset]save_args:
sheet_name: Nodeswrite_columns: ["normalized_curie"]load_args:
sheet_name: Nodes
importpandasaspdfromkedro.io.coreimportVersionfromkedro_datasets.sparkimportSparkDatasetfromkedro_datasets.spark.spark_datasetimport_strip_dbfs_prefix, _get_sparkfromkedro.io.coreimport (
PROTOCOL_DELIMITER,
AbstractVersionedDataset,
DatasetError,
Version,
get_filepath_str,
get_protocol_and_path,
)
importpygsheetsfrompygsheetsimportWorksheet, SpreadsheetclassGoogleSheetsDataset(AbstractVersionedDataset[pd.DataFrame, pd.DataFrame]):
"""Dataset to load data from Google sheets."""DEFAULT_LOAD_ARGS: dict[str, Any] = {}
DEFAULT_SAVE_ARGS: dict[str, Any] = {}
def__init__( # noqa: PLR0913self,
*,
key: str,
service_file: str,
load_args: dict[str, Any] |None=None,
save_args: dict[str, Any] |None=None,
version: Version|None=None,
credentials: dict[str, Any] |None=None,
metadata: dict[str, Any] |None=None,
) ->None:
"""Creates a new instance of ``GoogleSheetsDataset``. Args: key: Google sheets key service_file: path to service accunt file. load_args: Arguments to pass to the load method. save_args: Arguments to pass to the save version: Version of the dataset. credentials: Credentials to connect to the Neo4J instance. metadata: Metadata to pass to neo4j connector. kwargs: Keyword Args passed to parent. """self._key=keyself._service_file=service_fileself._sheet=Nonesuper().__init__(
filepath=None,
version=version,
exists_function=self._exists,
glob_function=None,
)
# Handle default load and save argumentsself._load_args=deepcopy(self.DEFAULT_LOAD_ARGS)
ifload_argsisnotNone:
self._load_args.update(load_args)
self._save_args=deepcopy(self.DEFAULT_SAVE_ARGS)
ifsave_argsisnotNone:
self._save_args.update(save_args)
def_init_sheet(self):
"""Function to initialize the spreadsheet. This is executed lazily to avoid loading credentials on python runtime launch which creates issues in unit tests. """ifself._sheetisNone:
gc=pygsheets.authorize(service_file=self._service_file)
self._sheet=gc.open_by_key(self._key)
def_load(self) ->pd.DataFrame:
self._init_sheet()
sheet_name=self._load_args["sheet_name"]
wks=self._get_wks_by_name(self._sheet, sheet_name)
ifwksisNone:
raiseDatasetError(f"Sheet with name {sheet_name} not found!")
df=wks.get_as_df()
if (cols:=self._load_args.get("columns", None)) isnotNone:
df=df[cols]
returndfdef_save(self, data: pd.DataFrame) ->None:
self._init_sheet()
sheet_name=self._save_args["sheet_name"]
wks=self._get_wks_by_name(self._sheet, sheet_name)
# Create the worksheet if not existsifwksisNone:
wks=self._sheet.add_worksheet(sheet_name)
# Write columnsforcolumninself._save_args["write_columns"]:
col_idx=self._get_col_index(wks, column)
ifcol_idxisNone:
raiseDatasetError(
f"Sheet with {sheet_name} does not contain column {column}!"
)
wks.set_dataframe(data[[column]], (1, col_idx+1))
@staticmethoddef_get_wks_by_name(
spreadsheet: Spreadsheet, sheet_name: str
) ->Optional[Worksheet]:
forwksinspreadsheet.worksheets():
ifwks.title==sheet_name:
returnwksreturnNone@staticmethoddef_get_col_index(sheet: Worksheet, col_name: str) ->Optional[int]:
foridx, colinenumerate(sheet.get_row(1)):
ifcol==col_name:
returnidxreturnNonedef_describe(self) ->dict[str, Any]:
return {
"key": self._key,
}
def_exists(self) ->bool:
returnFalse
Possible Alternatives
The text was updated successfully, but these errors were encountered:
I think there is a documentation issue because I can't find it in the official doc, but we relaxed the rules to enable community datasets to be included in kedro-datasets. You can follow the contributing guidelines to merge your dataset in the "experimental" folder of kedro-datasets.
That sounds really great, thank you @lvijnck! As @Galileo-Galilei mentioned, experimental datasets are a good place for this addition. Since you already have the code ready, would you like to add the dataset to the "experimental datasets" section yourself, or would you prefer to have the Kedro maintainers do it? Let us know how you'd like to proceed!
Description
The regular ExcelDataset is rather troublesome to enable direct translator to engineer interactions due to file downloads and versioning. Using sheets instead allows for a live editable document where a kedro pipeline can contribute as a "user" writing specific sheets or columns.
Context
We're using this in our pipeline atm and it's proven to be quite effective.
Possible Implementation
Possible Alternatives
The text was updated successfully, but these errors were encountered: