Skip to content

Commit

Permalink
changes: (apache#122)
Browse files Browse the repository at this point in the history
- fix bad case of session window
- fix index of CommonExpression error when using re2j engine
  • Loading branch information
speak2me authored Jan 12, 2022
1 parent 86d5c70 commit 4010e96
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import com.google.re2j.Matcher;
import com.google.re2j.Pattern;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -34,42 +36,47 @@
*/
public class Re2Engine<T> implements IStreamRegex<T> {

protected Pattern pattern;
protected com.google.re2j.Pattern pattern;

protected Map<String, T> nameMap = new HashMap<>(64);
private Map<String, List<T>> expressionMap = new HashMap<>(128);

protected Map<String, String> regexMap = new HashMap<>(64);
private Map<String, String> nameMap = new HashMap<>(128);

protected Map<String, String> unSupportMap = new HashMap<>(32);

@Override public void addRegex(String regex, T context) {
String groupName = "P" + nameMap.size();
nameMap.put(groupName, context);
regexMap.put(groupName, regex);
if (!expressionMap.containsKey(regex)) {
expressionMap.put(regex, new ArrayList<>());
}
expressionMap.get(regex).add(context);
if (!nameMap.containsKey(regex)) {
nameMap.put(regex, groupName);
}
}

@Override public void compile() {
StringBuffer buffer = new StringBuffer();
Iterator<Map.Entry<String, String>> iterator = regexMap.entrySet().iterator();
Pattern testPattern;
Iterator<Map.Entry<String, String>> iterator = nameMap.entrySet().iterator();
com.google.re2j.Pattern testPattern;
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
String groupName = entry.getKey();
String regex = entry.getValue();
String expression = entry.getKey();
String groupName = entry.getValue();
try {
testPattern = Pattern.compile(regex, Pattern.MULTILINE);
testPattern = com.google.re2j.Pattern.compile(expression, com.google.re2j.Pattern.MULTILINE);
} catch (Exception e) {
iterator.remove();
unSupportMap.put(groupName, regex);
unSupportMap.put(groupName, expression);
continue;
}
if (buffer.length() != 0) {
buffer.append("|");
}
buffer.append("(?P<").append(groupName).append(">(").append(regex).append("))");
buffer.append("(?P<").append(groupName).append(">(").append(expression).append("))");
}
if (buffer.length() != 0) {
pattern = Pattern.compile(buffer.toString(), Pattern.MULTILINE & Pattern.CASE_INSENSITIVE);
pattern = com.google.re2j.Pattern.compile(buffer.toString(), com.google.re2j.Pattern.MULTILINE & Pattern.CASE_INSENSITIVE);
}
}

Expand All @@ -92,10 +99,9 @@ private Set<T> normalMatchSet(String content) {
Iterator<Map.Entry<String, String>> iterator = unSupportMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
String groupName = entry.getKey();
String regex = entry.getValue();
if (StringUtil.matchRegexCaseInsensitive(content, regex)) {
matchedSet.add(nameMap.get(groupName));
matchedSet.addAll(expressionMap.get(regex));
}
}
return matchedSet;
Expand All @@ -119,12 +125,15 @@ private boolean normalMatchBoolean(String content) {
}
Set<T> matchedSet = new HashSet<>();
Matcher matcher = pattern.matcher(content);
int index = 0;
while (matcher.find()) {
Iterator<String> iterator = regexMap.keySet().iterator();
Iterator<Map.Entry<String, String>> iterator = nameMap.entrySet().iterator();
while (iterator.hasNext()) {
String groupName = iterator.next();
Map.Entry<String, String> entry = iterator.next();
String groupName = entry.getValue();
String expression = entry.getKey();
if (matcher.group(groupName) != null) {
matchedSet.add(nameMap.get(groupName));
matchedSet.addAll(expressionMap.get(expression));
break;
}
}
Expand All @@ -137,6 +146,6 @@ private boolean normalMatchBoolean(String content) {
}

@Override public int size() {
return regexMap.size();
return expressionMap.values().stream().mapToInt(list -> list.size()).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,6 @@ public void shuffleCalculate(List<IMessage> messages, WindowInstance instance, S
*/
private WindowValue queryOrCreateWindowValue(WindowInstance windowInstance, String queueId, String groupByValue,
IMessage message, List<WindowValue> valueList, String storeKey) {
//
if (CollectionUtil.isEmpty(valueList)) {
return createWindowValue(queueId, groupByValue, windowInstance, message, storeKey);
}
//put keys to be deleted here and delete them at last
List<String> deletePrefixKeyList = new ArrayList<>();
//
Expand All @@ -214,20 +210,35 @@ private WindowValue queryOrCreateWindowValue(WindowInstance windowInstance, Stri
Pair<Date, Date> startEndPair = getSessionTime(message);
Date messageBegin = startEndPair.getLeft();
Date messageEnd = startEndPair.getRight();
if (messageBegin.compareTo(sessionBegin) >= 0 && messageBegin.compareTo(sessionEnd) < 0) {
if (messageBegin.compareTo(sessionBegin) < 0 && messageEnd.compareTo(sessionEnd) <= 0) {
sessionBegin = messageBegin;
value.setStartTime(DateUtil.format(sessionBegin, SESSION_DATETIME_PATTERN));
return value;
} else if (messageBegin.compareTo(sessionBegin) >= 0 && messageEnd.compareTo(sessionEnd) <= 0) {
return value;
} else if (messageBegin.compareTo(sessionBegin) >= 0 && messageBegin.compareTo(sessionEnd) < 0 && messageEnd.compareTo(sessionEnd) > 0) {
sessionEnd = messageEnd;
Date sessionFire = DateUtil.addDate(TimeUnit.SECONDS, sessionEnd, waterMarkMinute * timeUnitAdjust);
value.setEndTime(DateUtil.format(sessionEnd, SESSION_DATETIME_PATTERN));
//clean order storage as sort field 'fireTime' changed
//clean older storage as sort field 'fireTime' changed
String existPartitionNumKey = createPrefixKey(value, windowInstance, queueId);
deletePrefixKeyList.add(existPartitionNumKey);
deletePrefixValue(deletePrefixKeyList);
//
Date sessionFire = DateUtil.addDate(TimeUnit.SECONDS, sessionEnd, waterMarkMinute * timeUnitAdjust);
value.setEndTime(DateUtil.format(sessionEnd, SESSION_DATETIME_PATTERN));
value.setFireTime(DateUtil.format(sessionFire, SESSION_DATETIME_PATTERN));
return value;
} else if (messageBegin.compareTo(sessionBegin) < 0 && messageEnd.compareTo(sessionBegin) > 0) {
} else if (messageBegin.compareTo(sessionBegin) < 0 && messageEnd.compareTo(sessionEnd) > 0) {
sessionBegin = messageBegin;
sessionEnd = messageEnd;
//clean older storage as sort field 'fireTime' changed
String existPartitionNumKey = createPrefixKey(value, windowInstance, queueId);
deletePrefixKeyList.add(existPartitionNumKey);
deletePrefixValue(deletePrefixKeyList);
//
value.setStartTime(DateUtil.format(sessionBegin, SESSION_DATETIME_PATTERN));
value.setEndTime(DateUtil.format(sessionEnd, SESSION_DATETIME_PATTERN));
Date sessionFire = DateUtil.addDate(TimeUnit.SECONDS, sessionEnd, waterMarkMinute * timeUnitAdjust);
value.setFireTime(DateUtil.format(sessionFire, SESSION_DATETIME_PATTERN));
return value;
}
}
Expand Down Expand Up @@ -256,7 +267,7 @@ private List<WindowValue> mergeWindowValue(List<WindowValue> allValueList, Windo
WindowValue outValue = allValueList.get(outIndex);
for (int inIndex = outIndex + 1; inIndex < allValueList.size(); inIndex++) {
WindowValue inValue = allValueList.get(inIndex);
if (inValue.getFireTime().compareTo(outValue.getEndTime()) <= 0) {
if (inValue.getStartTime().compareTo(outValue.getEndTime()) <= 0) {
deleteValueMap.put(inIndex, outIndex);
outValue.setEndTime(outValue.getEndTime().compareTo(inValue.getEndTime()) <= 0 ? inValue.getEndTime() : outValue.getEndTime());
outValue.setFireTime(outValue.getFireTime().compareTo(inValue.getFireTime()) <= 0 ? inValue.getFireTime() : outValue.getFireTime());
Expand Down Expand Up @@ -480,10 +491,10 @@ protected void clearWindowValues(List<WindowValue> deleteValueList, String queue

@Override
public long incrementAndGetSplitNumber(WindowInstance instance, String shuffleId) {
long numer = super.incrementAndGetSplitNumber(instance, shuffleId);
if (numer > 900000000) {
long number = super.incrementAndGetSplitNumber(instance, shuffleId);
if (number > 900000000) {
this.getWindowMaxValueManager().resetSplitNum(instance, shuffleId);
}
return numer;
return number;
}
}

0 comments on commit 4010e96

Please sign in to comment.