Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve persisted aggregation and logs #1740

Merged
merged 3 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,17 @@ private Map<String, Boolean> isSafeToPurgeTheDuration(long purgeTime, Table pare
purgingCheckState.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, dataInParentTable != null
&& dataInParentTable.length > 0);
} catch (Exception e) {
LOG.error("Error occurred while checking whether the data is safe to purge from aggregation " +
"tables for the aggregation " + aggregationDefinition.getId(), e);
if (e.getMessage().contains("deadlocked")) {
errorMessage = "Deadlock observed while checking whether the data is safe to purge from aggregation " +
"tables for the aggregation " + aggregationDefinition.getId() +
". If this occurred in an Active Active deployment, this error can be ignored if other node " +
"doesn't have this error";
} else {
errorMessage = "Error occurred while checking whether the data is safe to purge from aggregation " +
"tables for the aggregation " + aggregationDefinition.getId();

}
LOG.error(errorMessage, e);
purgingCheckState.put(IS_DATA_AVAILABLE_TO_PURGE, false);
purgingCheckState.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, false);
errorMessage = "Error occurred while checking whether the data is safe to purge from aggregation tables" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt
tableEventChunk.add(event);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString());
LOG.debug("Event dispatched by aggregation " + aggregatorName + " for duration " + this.duration);
}
if (isProcessingExecutor) {
executorService.execute(() -> {
Expand All @@ -223,6 +223,9 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt
"' when performing table writes of aggregation '" + this.aggregatorName +
"' for duration '" + this.duration + "'. This should be investigated as this " +
"can cause accuracy loss.", t);
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping Event chunk - \"" + eventChunk.toString() + "\"");
}
} finally {
isProcessFinished.set(true);
}
Expand All @@ -236,7 +239,7 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt
}
} catch (InterruptedException e) {
LOG.error("Error occurred while waiting until table update task finishes for duration " +
duration, e);
duration + "in aggregation " + aggregatorName, e);
}
}
if (getNextExecutor() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ public class IncrementalExecutorsInitialiser {
private String timeZone;

private boolean isInitialised;
private boolean isReadOnly;
private boolean isPersistedAggregation;

public IncrementalExecutorsInitialiser(List<TimePeriod.Duration> incrementalDurations,
Map<TimePeriod.Duration, Table> aggregationTables,
Map<TimePeriod.Duration, Executor> incrementalExecutorMap,
boolean isDistributed, String shardId, SiddhiAppContext siddhiAppContext,
MetaStreamEvent metaStreamEvent, Map<String, Table> tableMap,
Map<String, Window> windowMap,
Map<String, AggregationRuntime> aggregationMap, String timeZone) {
Map<String, AggregationRuntime> aggregationMap, String timeZone,
boolean isReadOnly, boolean isPersistedAggregation) {
this.timeZone = timeZone;
this.incrementalDurations = incrementalDurations;
this.aggregationTables = aggregationTables;
Expand All @@ -88,15 +91,18 @@ public IncrementalExecutorsInitialiser(List<TimePeriod.Duration> incrementalDura
this.aggregationMap = aggregationMap;

this.isInitialised = false;
this.isReadOnly = isReadOnly;
this.isPersistedAggregation = isPersistedAggregation;
}

public synchronized void initialiseExecutors() {
if (this.isInitialised) {
if (this.isInitialised || isReadOnly) {
// Only cleared when executors change from reading to processing state in one node deployment
return;
}
Event[] events;
Long endOFLatestEventTimestamp = null;
Long lastData = null;

// Get max(AGG_TIMESTAMP) from table corresponding to max duration
Table tableForMaxDuration = aggregationTables.get(incrementalDurations.get(incrementalDurations.size() - 1));
Expand All @@ -108,54 +114,115 @@ public synchronized void initialiseExecutors() {
// Get latest event timestamp in tableForMaxDuration and get the end time of the aggregation record
events = onDemandQueryRuntime.execute();
if (events != null) {
Long lastData = (Long) events[events.length - 1].getData(0);
lastData = (Long) events[events.length - 1].getData(0);
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(lastData, incrementalDurations.get(incrementalDurations.size() - 1), timeZone);
}
for (int i = incrementalDurations.size() - 1; i > 0; i--) {
TimePeriod.Duration recreateForDuration = incrementalDurations.get(i);
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);


// Get the table previous to the duration for which we need to recreate (e.g. if we want to recreate
// for minute duration, take the second table [provided that aggregation is done for seconds])
// This lookup is filtered by endOFLatestEventTimestamp
Table recreateFromTable = aggregationTables.get(incrementalDurations.get(i - 1));

onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap,
aggregationMap);
events = onDemandQueryRuntime.execute();

if (events != null) {
long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0);
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(referenceToNextLatestEvent, incrementalDurations.get(i - 1), timeZone);

ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);

if (isPersistedAggregation) {
for (int i = incrementalDurations.size() - 1; i > 0; i--) {
if (lastData != null && !IncrementalTimeConverterUtil.
isAggregationDataComplete(lastData, incrementalDurations.get(i), timeZone)) {
recreateState(lastData, incrementalDurations.get(i),
aggregationTables.get(incrementalDurations.get(i - 1)), i == 1);
} else if (lastData == null) {
recreateState(null, incrementalDurations.get(i),
aggregationTables.get(incrementalDurations.get(i - 1)), i == 1);
}
if (i > 1) {
onDemandQuery = getOnDemandQuery(aggregationTables.get(incrementalDurations.get(i - 1)), true,
endOFLatestEventTimestamp);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null,
siddhiAppContext, tableMap, windowMap, aggregationMap);
events = onDemandQueryRuntime.execute();
if (events != null) {
lastData = (Long) events[events.length - 1].getData(0);
}
}
incrementalExecutor.execute(complexEventChunk);
}
} else {
for (int i = incrementalDurations.size() - 1; i > 0; i--) {

TimePeriod.Duration recreateForDuration = incrementalDurations.get(i);
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);


if (i == 1) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration);
long emitTimeOfLatestEventInTable = IncrementalTimeConverterUtil.getNextEmitTime(
referenceToNextLatestEvent, rootDuration, timeZone);
// Get the table previous to the duration for which we need to recreate (e.g. if we want to recreate
// for minute duration, take the second table [provided that aggregation is done for seconds])
// This lookup is filtered by endOFLatestEventTimestamp
Table recreateFromTable = aggregationTables.get(incrementalDurations.get(i - 1));

rootIncrementalExecutor.setEmitTime(emitTimeOfLatestEventInTable);
onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap,
aggregationMap);
events = onDemandQueryRuntime.execute();

if (events != null) {
long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0);
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(referenceToNextLatestEvent, incrementalDurations.get(i - 1), timeZone);

ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
}
incrementalExecutor.execute(complexEventChunk);

if (i == 1) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration);
long emitTimeOfLatestEventInTable = IncrementalTimeConverterUtil.getNextEmitTime(
referenceToNextLatestEvent, rootDuration, timeZone);

rootIncrementalExecutor.setEmitTime(emitTimeOfLatestEventInTable);

}
}
}
}
this.isInitialised = true;
}

private void recreateState(Long lastData, TimePeriod.Duration recreateForDuration,
Table recreateFromTable, boolean isBeforeRoot) {
Long endOFLatestEventTimestamp = null;
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);
if (lastData != null) {
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(lastData, recreateForDuration, timeZone);
}
OnDemandQuery onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap,
aggregationMap);
Event[] events = onDemandQueryRuntime.execute();
if (events != null) {
long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0);
ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
}
incrementalExecutor.execute(complexEventChunk);
if (isBeforeRoot) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration);
long emitTimeOfLatestEventInTable = IncrementalTimeConverterUtil.getNextEmitTime(
referenceToNextLatestEvent, rootDuration, timeZone);

rootIncrementalExecutor.setEmitTime(emitTimeOfLatestEventInTable);

}
}
}

private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity, Long endOFLatestEventTimestamp) {
Selector selector = Selector.selector();
if (isLargestGranularity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class PersistedIncrementalExecutor implements Executor {
private Processor cudStreamProcessor;
private boolean isProcessingExecutor;
private LinkedBlockingQueue<QueuedCudStreamProcessor> cudStreamProcessorQueue;
private String aggregatorName;

public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration duration,
List<ExpressionExecutor> processExpressionExecutors,
Expand All @@ -69,6 +70,7 @@ public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration d
this.next = child;
this.cudStreamProcessor = cudStreamProcessor;

this.aggregatorName = aggregatorName;
this.timestampExpressionExecutor = processExpressionExecutors.remove(0);
this.streamEventFactory = new StreamEventFactory(metaStreamEvent);
setNextExecutor(child);
Expand All @@ -83,8 +85,8 @@ public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration d
@Override
public void execute(ComplexEventChunk streamEventChunk) {
if (log.isDebugEnabled()) {
log.debug("Event Chunk received by " + this.duration + " incremental executor: " +
streamEventChunk.toString() + " will be dropped since persisted aggregation has been scheduled ");
log.debug("Event Chunk received by the Aggregation " + aggregatorName + " for duration " + this.duration +
" will be dropped since persisted aggregation has been scheduled ");
}
streamEventChunk.reset();
while (streamEventChunk.hasNext()) {
Expand All @@ -94,6 +96,7 @@ public void execute(ComplexEventChunk streamEventChunk) {
try {
long timestamp = getTimestamp(streamEvent);
if (timestamp >= executorState.nextEmitTime) {
log.debug("Next EmitTime: " + executorState.nextEmitTime + ", Current Time: " + timestamp);
long emittedTime = executorState.nextEmitTime;
long startedTime = executorState.startTimeOfAggregates;
executorState.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(
Expand All @@ -120,8 +123,8 @@ private void dispatchEvent(long startTimeOfNewAggregates, long emittedTime, Stri
ZoneId.of(timeZone));
ZonedDateTime endTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(emittedTime),
ZoneId.of(timeZone));
log.info("Aggregation event dispatched for the duration " + duration + " to aggregate data from "
+ startTime.toString() + " to " + endTime.toString() + " ");
log.info("Aggregation event dispatched for the duration " + duration + " for aggregation " + aggregatorName +
" to aggregate data from " + startTime + " to " + endTime + " ");
ComplexEventChunk complexEventChunk = new ComplexEventChunk();
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setType(ComplexEvent.Type.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static long getNextEmitTimeForHour(long currentTime, String timeZone) {
.of(zonedDateTime.getYear(), zonedDateTime.getMonthValue(), zonedDateTime.getDayOfMonth() + 1,
0, 0, 0, 0, ZoneId.of(timeZone)).toEpochSecond() * 1000;
}
} else {
} else {
return ZonedDateTime
.of(zonedDateTime.getYear(), zonedDateTime.getMonthValue(), zonedDateTime.getDayOfMonth(),
zonedDateTime.getHour() + 1, 0, 0, 0, ZoneId.of(timeZone)).toEpochSecond() * 1000;
Expand Down Expand Up @@ -229,4 +229,35 @@ public static int getMillisecondsPerDuration(TimePeriod.Duration duration) {
+ ".Number of milliseconds are only define for SECONDS, MINUTES, HOURS and DAYS");
}
}

public static boolean isAggregationDataComplete(long timestamp, TimePeriod.Duration duration, String timeZone) {
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
ZoneId.of(timeZone));
ZonedDateTime zonedCurrentDateTime = ZonedDateTime.ofInstant(Instant.now(), ZoneId.of(timeZone));
switch (duration) {
case SECONDS:
return false;
case MINUTES:
return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() &&
zonedDateTime.getMonthValue() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getDayOfMonth() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getHour() == zonedCurrentDateTime.getHour() &&
zonedDateTime.getMinute() == (zonedCurrentDateTime.getMinute() - 1);
case HOURS:
return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() &&
zonedDateTime.getMonthValue() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getDayOfMonth() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getHour() == (zonedCurrentDateTime.getHour() - 1);
case DAYS:
return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() &&
zonedDateTime.getMonthValue() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getDayOfMonth() == (zonedCurrentDateTime.getDayOfMonth() - 1);
case MONTHS:
return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() &&
zonedDateTime.getMonthValue() == (zonedCurrentDateTime.getMonthValue() - 1);
case YEARS:
return zonedDateTime.getYear() == (zonedCurrentDateTime.getYear() - 1);
}
return false;
}
}
Loading