Skip to content

Commit

Permalink
feed filter mutations common prefix cpu optimiation
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jslocum committed Jan 9, 2023
1 parent 8188964 commit aa0c52d
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions fdbserver/storageserver.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2650,7 +2650,8 @@ MutationsAndVersionRef filterMutationsInverted(Arena& arena, MutationsAndVersion
MutationsAndVersionRef filterMutations(Arena& arena,
MutationsAndVersionRef const& m,
KeyRange const& range,
bool inverted) {
bool inverted,
int commonPrefixLength) {
if (m.mutations.size() == 1 && m.mutations.back().param1 == lastEpochEndPrivateKey) {
return m;
}
Expand All @@ -2662,22 +2663,28 @@ MutationsAndVersionRef filterMutations(Arena& arena,
Optional<VectorRef<MutationRef>> modifiedMutations;
for (int i = 0; i < m.mutations.size(); i++) {
if (m.mutations[i].type == MutationRef::SetValue) {
if (modifiedMutations.present() && range.contains(m.mutations[i].param1)) {
bool inRange = range.begin.compareSuffix(m.mutations[i].param1, commonPrefixLength) <= 0 &&
m.mutations[i].param1.compareSuffix(range.end, commonPrefixLength) < 0;
if (modifiedMutations.present() && inRange) {
modifiedMutations.get().push_back(arena, m.mutations[i]);
}
if (!modifiedMutations.present() && !range.contains(m.mutations[i].param1)) {
if (!modifiedMutations.present() && !inRange) {
modifiedMutations = m.mutations.slice(0, i);
arena.dependsOn(range.arena());
}
} else {
ASSERT(m.mutations[i].type == MutationRef::ClearRange);
// param1 < range.begin || param2 > range.end
if (!modifiedMutations.present() &&
(m.mutations[i].param1 < range.begin || m.mutations[i].param2 > range.end)) {
(m.mutations[i].param1.compareSuffix(range.begin, commonPrefixLength) < 0 ||
m.mutations[i].param2.compareSuffix(range.end, commonPrefixLength) > 0)) {
modifiedMutations = m.mutations.slice(0, i);
arena.dependsOn(range.arena());
}
if (modifiedMutations.present()) {
if (m.mutations[i].param1 < range.end && range.begin < m.mutations[i].param2) {
// param1 < range.end && range.begin < param2
if (m.mutations[i].param1.compareSuffix(range.end, commonPrefixLength) < 0 &&
range.begin.compareSuffix(m.mutations[i].param2, commonPrefixLength) < 0) {
modifiedMutations.get().push_back(arena,
MutationRef(MutationRef::ClearRange,
std::max(range.begin, m.mutations[i].param1),
Expand Down Expand Up @@ -2786,6 +2793,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
bool inverted,
bool atLatest,
bool doFilterMutations,
int commonFeedPrefixLength,
FeedDiskReadState* feedDiskReadState) {
state ChangeFeedStreamReply reply;
state ChangeFeedStreamReply memoryReply;
Expand Down Expand Up @@ -2866,7 +2874,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor

remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize();
if (doFilterMutations) {
m = filterMutations(memoryReply.arena, *it, req.range, inverted);
m = filterMutations(memoryReply.arena, *it, req.range, inverted, commonFeedPrefixLength);
}
if (m.mutations.size()) {
memoryReply.arena.dependsOn(it->arena());
Expand Down Expand Up @@ -2966,7 +2974,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor

MutationsAndVersionRef m = MutationsAndVersionRef(mutations, version, knownCommittedVersion);
if (doFilterMutations) {
m = filterMutations(reply.arena, m, req.range, inverted);
m = filterMutations(reply.arena, m, req.range, inverted, commonFeedPrefixLength);
}
if (m.mutations.size()) {
reply.arena.dependsOn(mutations.arena());
Expand Down Expand Up @@ -3075,10 +3083,6 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
// the end
if ((reply.mutations.empty() || reply.mutations.back().version < lastMemoryVersion) &&
remainingLimitBytes <= 0) {
TraceEvent(SevDebug, "TraceChangeFeedMemoryAddEmpty", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("Version", lastMemoryVersion);
CODE_PROBE(true, "Memory feed adding empty version after memory filtered");
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(lastMemoryVersion, lastMemoryKnownCommitted));
}
Expand Down Expand Up @@ -3349,7 +3353,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques

// keep this as not state variable so it is freed after sending to reduce memory
Future<std::pair<ChangeFeedStreamReply, bool>> feedReplyFuture =
getChangeFeedMutations(data, feedInfo, req, false, atLatest, doFilterMutations, &feedDiskReadState);
getChangeFeedMutations(data, feedInfo, req, false, atLatest, doFilterMutations, commonFeedPrefixLength, &feedDiskReadState);
if (atLatest && !removeUID && !feedReplyFuture.isReady()) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] =
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;
Expand Down

0 comments on commit aa0c52d

Please sign in to comment.