Skip to content

Commit

Permalink
[Fix](cache) fix query cache returns wrong result after deleting part…
Browse files Browse the repository at this point in the history
…itions.
  • Loading branch information
王翔宇 committed Aug 28, 2023
1 parent 1c70d15 commit d750939
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,15 @@ public class CacheTable implements Comparable<CacheTable> {
public long latestVersion;
public long latestTime;
public long partitionNum;
public long sumOfPartitionNum;

public CacheTable() {
olapTable = null;
latestPartitionId = 0;
latestVersion = 0;
latestTime = 0;
partitionNum = 0;
sumOfPartitionNum = 0;
}

@Override
Expand All @@ -157,8 +159,8 @@ public int compareTo(CacheTable table) {
}

public void debug() {
LOG.debug("table {}, partition id {}, ver {}, time {}, partition num {}", olapTable.getName(),
latestPartitionId, latestVersion, latestTime, partitionNum);
LOG.debug("table {}, partition id {}, ver {}, time {}, partition num {}, sumOfPartitionNum: {}",
olapTable.getName(), latestPartitionId, latestVersion, latestTime, partitionNum, sumOfPartitionNum);
}
}

Expand Down Expand Up @@ -228,6 +230,7 @@ private CacheMode innerCheckCacheMode(long now) {
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
Collections.sort(tblTimeList);
latestTable = tblTimeList.get(0);
latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> item.partitionNum).sum();
latestTable.debug();

addAllViewStmt(selectStmt);
Expand Down Expand Up @@ -330,6 +333,7 @@ private CacheMode innerCheckCacheModeSetOperation(long now) {
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
Collections.sort(tblTimeList);
latestTable = tblTimeList.get(0);
latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> item.partitionNum).sum();
latestTable.debug();

addAllViewStmt((SetOperationStmt) parsedStmt);
Expand Down Expand Up @@ -384,6 +388,7 @@ private CacheMode innerCheckCacheModeForNereids(long now) {
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
Collections.sort(tblTimeList);
latestTable = tblTimeList.get(0);
latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> item.partitionNum).sum();
latestTable.debug();

if (((LogicalPlanAdapter) parsedStmt).getStatementContext().getParsedStatement().isExplain()) {
Expand Down Expand Up @@ -584,9 +589,9 @@ private CacheTable getSelectedPartitionLastUpdateTime(OlapScanNode node) {
cacheTable.latestPartitionId = partition.getId();
cacheTable.latestTime = partition.getVisibleVersionTime();
cacheTable.latestVersion = partition.getVisibleVersion();
cacheTable.partitionNum = node.getSelectedPartitionNum();
}
}
cacheTable.partitionNum = node.getSelectedPartitionNum();
return cacheTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void clear() {
}

public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(
String sql, long partitionKey, long lastVersion, long lastestTime) {
String sql, long partitionKey, long lastVersion, long lastestTime, long partitionNum) {
if (updateRequest == null) {
updateRequest = InternalService.PUpdateCacheRequest.newBuilder()
.setSqlKey(CacheProxy.getMd5(sql))
Expand All @@ -124,6 +124,7 @@ public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(
.setPartitionKey(partitionKey)
.setLastVersion(lastVersion)
.setLastVersionTime(lastestTime)
.setPartitionNum(partitionNum)
.build()).setDataSize(dataSize).addAllRows(
rowList.stream().map(row -> ByteString.copyFrom(row))
.collect(Collectors.toList()))).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public InternalService.PFetchCacheResult getCacheData(Status status) {
.setPartitionKey(latestTable.latestPartitionId)
.setLastVersion(latestTable.latestVersion)
.setLastVersionTime(latestTable.latestTime)
.setPartitionNum(latestTable.partitionNum))
.setPartitionNum(latestTable.sumOfPartitionNum))
.build();

InternalService.PFetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
Expand Down Expand Up @@ -95,7 +95,7 @@ public void updateCache() {

InternalService.PUpdateCacheRequest updateRequest =
rowBatchBuilder.buildSqlUpdateRequest(getSqlWithViewStmt(), latestTable.latestPartitionId,
latestTable.latestVersion, latestTable.latestTime);
latestTable.latestVersion, latestTable.latestTime, latestTable.sumOfPartitionNum);
if (updateRequest.getValuesCount() > 0) {
CacheBeProxy proxy = new CacheBeProxy();
Status status = new Status();
Expand Down

0 comments on commit d750939

Please sign in to comment.