Skip to content

Commit

Permalink
Pipe: Allow pipe times configured by raw timestamp (apache#12004)
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi committed Feb 5, 2024
1 parent 831ea92 commit eab1769
Showing 1 changed file with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -111,13 +110,13 @@ public void validate(PipeParameterValidator validator) {
try {
historicalDataExtractionStartTime =
parameters.hasAnyAttributes(SOURCE_START_TIME_KEY)
? DateTimeUtils.convertDatetimeStrToLong(
parameters.getStringByKeys(SOURCE_START_TIME_KEY), ZoneId.systemDefault())
? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(SOURCE_START_TIME_KEY))
: Long.MIN_VALUE;
historicalDataExtractionEndTime =
parameters.hasAnyAttributes(SOURCE_END_TIME_KEY)
? DateTimeUtils.convertDatetimeStrToLong(
parameters.getStringByKeys(SOURCE_END_TIME_KEY), ZoneId.systemDefault())
? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(SOURCE_END_TIME_KEY))
: Long.MAX_VALUE;
if (historicalDataExtractionStartTime > historicalDataExtractionEndTime) {
throw new PipeParameterNotValidException(
Expand Down Expand Up @@ -149,19 +148,17 @@ public void validate(PipeParameterValidator validator) {
isHistoricalExtractorEnabled
&& parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY)
? DateTimeUtils.convertDatetimeStrToLong(
? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(
EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY),
ZoneId.systemDefault())
EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY))
: Long.MIN_VALUE;
historicalDataExtractionEndTime =
isHistoricalExtractorEnabled
&& parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY)
? DateTimeUtils.convertDatetimeStrToLong(
? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(
EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY),
ZoneId.systemDefault())
EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY))
: Long.MAX_VALUE;
if (historicalDataExtractionStartTime > historicalDataExtractionEndTime) {
throw new PipeParameterNotValidException(
Expand Down

0 comments on commit eab1769

Please sign in to comment.