From 13f6b5b7052f5dfb7ecc1b23f4750cdf8c4a48bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=BE=B0?= Date: Fri, 10 Feb 2023 16:50:17 +0800 Subject: [PATCH] Fix #1055 --- .../AnchorToDataSourceMapper.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index d99a3535c..602e027d4 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -107,22 +107,23 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler) - // Only file-based source has real "path", others are just single dataset - val (adjustedObsTimeRange, dataSourcePath) = if (factDataSource.location.isFileBasedLocation()) { + val (timeInterval, updatedFactDataSource) = if (factDataSource.location.isFileBasedLocation()) { val pathChecker = PathChecker(ss, dataLoaderHandlers) val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers) val pathInfo = pathAnalyzer.analyze(factDataSource.path) - if (pathInfo.dateTimeResolution == DateTimeResolution.DAILY) { - (obsTimeRange.adjustWithDateTimeResolution(DateTimeResolution.DAILY), pathInfo.basePath) - } else (obsTimeRange, pathInfo.basePath) + val adjustedObsTimeRange = if (pathInfo.dateTimeResolution == DateTimeResolution.DAILY) { + obsTimeRange.adjustWithDateTimeResolution(DateTimeResolution.DAILY) + } else { + obsTimeRange + } + (OfflineDateTimeUtils.getFactDataTimeRange(adjustedObsTimeRange, window, timeDelays), + DataSource(pathInfo.basePath, factDataSource.sourceType, factDataSource.timeWindowParams, + factDataSource.timePartitionPattern, factDataSource.postfixPath)) } else { - (obsTimeRange, factDataSource.path) + // Path and time range adjustments cannot be applied to non-file-based sources, keep them as-is + (obsTimeRange, factDataSource) } - // Copy the pathInfo's path into the datasource path as it adds the daily/hourly keyword if it is missing from the path - val updatedFactDataSource = DataSource(dataSourcePath, factDataSource.sourceType, factDataSource.timeWindowParams, - factDataSource.timePartitionPattern, factDataSource.postfixPath) - val timeInterval = OfflineDateTimeUtils.getFactDataTimeRange(adjustedObsTimeRange, window, timeDelays) val needCreateTimestampColumn = SlidingWindowFeatureUtils.needCreateTimestampColumnFromPartition(factDataSource) val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean