Skip to content

Commit

Permalink
Fixing bug where stateless SessionWindow can withhold punctuations/ba…
Browse files Browse the repository at this point in the history
…tches indefinitely when there are no data events (#162)
  • Loading branch information
peterfreiling authored Feb 25, 2022
1 parent 2039f16 commit d2097ef
Show file tree
Hide file tree
Showing 3 changed files with 823 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private unsafe void ReachTime(long timestamp, bool isData)
{
if (isData && this.maximumDuration < StreamEvent.InfinitySyncTime)
{

// Evaluate new windowEndTime from maximumDuration and new data event timestamp
if (this.windowEndTime == StreamEvent.InfinitySyncTime)
{
long mod = timestamp % this.maximumDuration;
Expand All @@ -105,13 +105,21 @@ private unsafe void ReachTime(long timestamp, bool isData)
{
this.windowEndTime = timestamp - (timestamp % this.maximumDuration) + this.maximumDuration;
}
}

long threshold;
if (this.lastDataTime == long.MinValue)
{
threshold = isData
? this.windowEndTime // first ever data event, start retaining batches for this session
: 0; // still have never seen a data event, flush any punctuation-only batches
}
else
{
threshold = Math.Min(this.lastDataTime + this.sessionTimeout, this.windowEndTime);
}

var threshhold = this.lastDataTime == long.MinValue
? this.windowEndTime
: Math.Min(this.lastDataTime + this.sessionTimeout, this.windowEndTime);
if (timestamp >= threshhold)
if (timestamp >= threshold)
{
StreamMessage<TKey, TPayload> batch;
while (this.batches.Any())
Expand All @@ -126,14 +134,14 @@ private unsafe void ReachTime(long timestamp, bool isData)
{
for (; this.windowStartIdx < count; this.windowStartIdx++)
{
if (vsync[this.windowStartIdx] >= threshhold)
if (vsync[this.windowStartIdx] >= threshold)
{
this.windowEndTime = (timestamp < this.lastDataTime + this.sessionTimeout) ? StreamEvent.MaxSyncTime : StreamEvent.InfinitySyncTime;
if (vother[this.windowStartIdx] != StreamEvent.PunctuationOtherTime)
return;
}
if ((bv[this.windowStartIdx >> 6] & (1L << (this.windowStartIdx & 0x3f))) == 0)
vother[this.windowStartIdx] = threshhold;
vother[this.windowStartIdx] = threshold;
}
if (this.windowStartIdx == count)
{
Expand Down
Loading

0 comments on commit d2097ef

Please sign in to comment.