diff --git a/fluss-common/src/main/java/com/alibaba/fluss/exception/UnknownLeaderEpochException.java b/fluss-common/src/main/java/com/alibaba/fluss/exception/UnknownLeaderEpochException.java new file mode 100644 index 0000000000..92433f4da2 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/exception/UnknownLeaderEpochException.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.exception; + +/** + * The request contained a leaderEpoch which is larger than that on the tabletServer that received + * the request. This can happen if the client/follower observes a metadata update before it has been + * propagated to all tabletServers. client/follower need not refresh metadata before retrying. + */ +public class UnknownLeaderEpochException extends RetriableException { + private static final long serialVersionUID = 1L; + + public UnknownLeaderEpochException(String message) { + super(message); + } +} diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/entity/EpochAndLogEndOffsetForBucket.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/entity/EpochAndLogEndOffsetForBucket.java new file mode 100644 index 0000000000..c6f3999f1c --- /dev/null +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/entity/EpochAndLogEndOffsetForBucket.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.rpc.entity; + +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest; +import com.alibaba.fluss.rpc.protocol.ApiError; + +/** Result of {@link OffsetForLeaderEpochRequest} for each table bucket. */ +public class EpochAndLogEndOffsetForBucket extends ResultForBucket { + + /** The leaderEpoch of this tableBucket. */ + private final int leaderEpoch; + + /** The logEndOffset of this leaderEpoch. */ + private final long logEndOffset; + + public EpochAndLogEndOffsetForBucket( + TableBucket tableBucket, int leaderEpoch, long logEndOffset) { + this(tableBucket, leaderEpoch, logEndOffset, ApiError.NONE); + } + + public EpochAndLogEndOffsetForBucket(TableBucket tableBucket, ApiError error) { + this(tableBucket, -1, -1L, error); + } + + public EpochAndLogEndOffsetForBucket( + TableBucket tableBucket, int leaderEpoch, long logEndOffset, ApiError error) { + super(tableBucket, error); + this.leaderEpoch = leaderEpoch; + this.logEndOffset = logEndOffset; + } + + public int getLeaderEpoch() { + return leaderEpoch; + } + + public long getLogEndOffset() { + return logEndOffset; + } +} diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java index ba99eb8936..e13f8be301 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java @@ -35,6 +35,8 @@ import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochResponse; import com.alibaba.fluss.rpc.messages.PrefixLookupRequest; import com.alibaba.fluss.rpc.messages.PrefixLookupResponse; import com.alibaba.fluss.rpc.messages.ProduceLogRequest; @@ -160,4 +162,13 @@ CompletableFuture notifyKvSnapshotOffset( @RPC(api = ApiKeys.NOTIFY_LAKE_TABLE_OFFSET) CompletableFuture notifyLakeTableOffset( NotifyLakeTableOffsetRequest request); + + /** + * Get logEndOffset for leaderEpoch. + * + * @return logEndOffset for leaderEpoch response + */ + @RPC(api = ApiKeys.OFFSET_FOR_LEADER_EPOCH) + CompletableFuture offsetForLeaderEpoch( + OffsetForLeaderEpochRequest request); } diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java index 25c6218653..98a4e761e4 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java @@ -68,7 +68,8 @@ public enum ApiKeys { AUTHENTICATE(1038, 0, 0, PUBLIC), CREATE_ACLS(1039, 0, 0, PUBLIC), LIST_ACLS(1040, 0, 0, PUBLIC), - DROP_ACLS(1041, 0, 0, PUBLIC); + DROP_ACLS(1041, 0, 0, PUBLIC), + OFFSET_FOR_LEADER_EPOCH(1042, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java index 36caf1e98d..a14cad265f 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java @@ -63,6 +63,7 @@ import com.alibaba.fluss.exception.TimeoutException; import com.alibaba.fluss.exception.TooManyBucketsException; import com.alibaba.fluss.exception.TooManyPartitionsException; +import com.alibaba.fluss.exception.UnknownLeaderEpochException; import com.alibaba.fluss.exception.UnknownServerException; import com.alibaba.fluss.exception.UnknownTableOrBucketException; import com.alibaba.fluss.exception.UnknownWriterIdException; @@ -199,7 +200,11 @@ public enum Errors { SECURITY_DISABLED_EXCEPTION(47, "Security is disabled.", SecurityDisabledException::new), AUTHORIZATION_EXCEPTION(48, "Authorization failed", AuthorizationException::new), BUCKET_MAX_NUM_EXCEPTION( - 49, "Exceed the maximum number of buckets", TooManyBucketsException::new); + 49, "Exceed the maximum number of buckets", TooManyBucketsException::new), + UNKNOWN_LEADER_EPOCH_EXCEPTION( + 50, + "The leaderEpoch in the request is newer than the leaderEpoch on the tabletServer.", + UnknownLeaderEpochException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 2c8372ad12..3c9b9b6a3c 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -506,6 +506,16 @@ message DropAclsResponse{ repeated PbDropAclsFilterResult filter_results = 1; } +// OffsetForLeaderEpochRequest and response +message OffsetForLeaderEpochRequest { + required int32 follower_server_id = 1; // value -1 indicate the request from client. + repeated PbOffsetForLeaderEpochReqForTable tables_req = 2; +} + +message OffsetForLeaderEpochResponse { + repeated PbOffsetForLeaderEpochRespForTable tables_resp = 1; +} + // --------------- Inner classes ---------------- message PbApiVersion { required int32 api_key = 1; @@ -815,4 +825,31 @@ message PbDropAclsMatchingAcl { required PbAclInfo acl = 1; optional int32 error_code = 2; optional string error_message = 3; +} + +message PbOffsetForLeaderEpochReqForTable { + required int64 table_id = 1; + repeated PbOffsetForLeaderEpochReqForBucket buckets_req = 2; +} + + +message PbOffsetForLeaderEpochReqForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + required int32 current_leader_epoch = 3; + required int32 leader_epoch = 4; +} + +message PbOffsetForLeaderEpochRespForTable { + required int64 table_id = 1; + repeated PbOffsetForLeaderEpochRespForBucket buckets_resp = 2; +} + +message PbOffsetForLeaderEpochRespForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + optional int32 error_code = 3; + optional string error_message = 4; + optional int32 leader_epoch = 5; + optional int64 log_end_offset = 6; } \ No newline at end of file diff --git a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingTabletGatewayService.java index 6044530548..7595487a06 100644 --- a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingTabletGatewayService.java @@ -62,6 +62,8 @@ import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochResponse; import com.alibaba.fluss.rpc.messages.PrefixLookupRequest; import com.alibaba.fluss.rpc.messages.PrefixLookupResponse; import com.alibaba.fluss.rpc.messages.ProduceLogRequest; @@ -160,6 +162,12 @@ public CompletableFuture notifyLakeTableOffset( return null; } + @Override + public CompletableFuture offsetForLeaderEpoch( + OffsetForLeaderEpochRequest request) { + return null; + } + @Override public CompletableFuture listDatabases(ListDatabasesRequest request) { return null; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/entity/OffsetForLeaderEpochData.java b/fluss-server/src/main/java/com/alibaba/fluss/server/entity/OffsetForLeaderEpochData.java new file mode 100644 index 0000000000..77701099ec --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/entity/OffsetForLeaderEpochData.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.entity; + +import com.alibaba.fluss.exception.FencedLeaderEpochException; +import com.alibaba.fluss.exception.UnknownLeaderEpochException; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest; + +import java.util.Objects; + +/** The data for request {@link OffsetForLeaderEpochRequest}. */ +public class OffsetForLeaderEpochData { + + private final TableBucket tableBucket; + + /** + * An epoch used to fence clients/followers with old metadata. If the leaderEpoch provided by + * the client or follower is larger than the current leaderEpoch known to the tabletServer, then + * the {@link UnknownLeaderEpochException} will be thrown. If the leaderEpoch provided by the + * client or follower is smaller than the current epoch known to the requested tabletServer, + * then the {@link FencedLeaderEpochException} will be thrown. + */ + private final int currentLeaderEpoch; + + /** The epoch to look up an offset for. */ + private final int leaderEpoch; + + public OffsetForLeaderEpochData( + TableBucket tableBucket, int currentLeaderEpoch, int leaderEpoch) { + this.tableBucket = tableBucket; + this.currentLeaderEpoch = currentLeaderEpoch; + this.leaderEpoch = leaderEpoch; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public int getCurrentLeaderEpoch() { + return currentLeaderEpoch; + } + + public int getLeaderEpoch() { + return leaderEpoch; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OffsetForLeaderEpochData that = (OffsetForLeaderEpochData) o; + return currentLeaderEpoch == that.currentLeaderEpoch + && leaderEpoch == that.leaderEpoch + && tableBucket.equals(that.tableBucket); + } + + @Override + public int hashCode() { + return Objects.hash(tableBucket, currentLeaderEpoch, leaderEpoch); + } + + @Override + public String toString() { + return "OffsetForLeaderEpochData{" + + "tableBucket=" + + tableBucket + + ", currentLeaderEpoch=" + + currentLeaderEpoch + + ", leaderEpoch=" + + leaderEpoch + + '}'; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 801dec62df..bea4ea0cae 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -44,6 +44,7 @@ import com.alibaba.fluss.record.KvRecordBatch; import com.alibaba.fluss.record.LogRecords; import com.alibaba.fluss.record.MemoryLogRecords; +import com.alibaba.fluss.rpc.entity.EpochAndLogEndOffsetForBucket; import com.alibaba.fluss.rpc.protocol.Errors; import com.alibaba.fluss.server.SequenceIDCounter; import com.alibaba.fluss.server.coordinator.CoordinatorContext; @@ -91,6 +92,7 @@ import com.alibaba.fluss.utils.IOUtils; import com.alibaba.fluss.utils.MapUtils; import com.alibaba.fluss.utils.clock.Clock; +import com.alibaba.fluss.utils.types.Either; import com.alibaba.fluss.utils.types.Tuple2; import org.slf4j.Logger; @@ -1310,6 +1312,39 @@ public void truncateFullyAndStartAt(long newOffset) { () -> logManager.truncateFullyAndStartAt(tableBucket, newOffset)); } + /** + * Find the (exclusive) last log offset of the largest leaderEpoch less than or equal to the + * requested epoch. + * + * @param currentLeaderEpoch The expected leaderEpoch of the current leader (if known) + * @param leaderEpoch Requested leaderEpoch + * @param requireLeader Whether to require servicing only from the leader + * @return The requested leaderEpoch and the endLogOffset of this leaderEpoch, or if the + * requested leaderEpoch is unknown, the leaderEpoch less than the requested leaderEpoch and + * the logEndOffset of this leaderEpoch. The logEndOffset of a leaderEpoch is defined as the + * startLogOffset of the first leaderEpoch larger than the leaderEpoch, or else the + * logEndOffset if the leaderEpoch is the latest leaderEpoch. + */ + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public EpochAndLogEndOffsetForBucket lastLogOffsetForLeaderEpoch( + Optional currentLeaderEpoch, int leaderEpoch, boolean requireLeader) { + return inReadLock( + leaderIsrUpdateLock, + () -> { + Either localLogOrError = + localLogOrThrow(currentLeaderEpoch, requireLeader); + if (localLogOrError.isLeft()) { + // TODO add localLog.endOffsetForEpoch() in next pr. + LogTablet localLog = localLogOrError.left(); + return new EpochAndLogEndOffsetForBucket( + tableBucket, leaderEpoch, localLog.localLogEndOffset()); + } else { + return new EpochAndLogEndOffsetForBucket( + tableBucket, localLogOrError.right().toApiError()); + } + }); + } + private LogReadInfo readRecords(FetchParams fetchParams, LogTablet logTablet) throws IOException { // Note we use the log end offset prior to the read. This ensures that any appends following @@ -1742,15 +1777,51 @@ private boolean isUnderMinIsr() { } private LogTablet localLogOrThrow(boolean requireLeader) { - // TODO check leader epoch. - if (requireLeader && !isLeader()) { - throw new NotLeaderOrFollowerException( - String.format( - "Leader not local for bucket %s on tabletServer %d", - tableBucket, localTabletServerId)); + Either either = localLogOrThrow(Optional.empty(), requireLeader); + if (either.isLeft()) { + return either.left(); + } else { + throw either.right() + .exception( + String.format( + "Failed to find %s logTablet for tableBucket %s which leaderEpoch %s. The " + + "current leader is %s and the current leaderEpoch is %s", + requireLeader ? "leader" : "", + tableBucket, + -1, + leaderReplicaIdOpt, + leaderEpoch)); } + } - return logTablet; + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private Either localLogOrThrow( + Optional currentLeaderEpoch, boolean requireLeader) { + if (checkCurrentLeaderEpoch(currentLeaderEpoch) == Errors.NONE) { + if (requireLeader && !isLeader()) { + return Either.right(Errors.NOT_LEADER_OR_FOLLOWER); + } else { + return Either.left(logTablet); + } + } + return Either.right(Errors.FENCED_LEADER_EPOCH_EXCEPTION); + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private Errors checkCurrentLeaderEpoch(Optional currentLeaderEpochOpt) { + if (!currentLeaderEpochOpt.isPresent()) { + return Errors.NONE; + } else { + int currentLeaderEpoch = currentLeaderEpochOpt.get(); + int localLeaderEpoch = leaderEpoch; + if (currentLeaderEpoch < localLeaderEpoch) { + return Errors.FENCED_LEADER_EPOCH_EXCEPTION; + } else if (currentLeaderEpoch > localLeaderEpoch) { + return Errors.UNKNOWN_LEADER_EPOCH_EXCEPTION; + } else { + return Errors.NONE; + } + } } @VisibleForTesting diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java index c437ac77a7..4d134cb5a6 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java @@ -38,6 +38,7 @@ import com.alibaba.fluss.remote.RemoteLogFetchInfo; import com.alibaba.fluss.remote.RemoteLogSegment; import com.alibaba.fluss.rpc.RpcClient; +import com.alibaba.fluss.rpc.entity.EpochAndLogEndOffsetForBucket; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; import com.alibaba.fluss.rpc.entity.LimitScanResultForBucket; import com.alibaba.fluss.rpc.entity.ListOffsetsResultForBucket; @@ -60,6 +61,7 @@ import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrData; import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import com.alibaba.fluss.server.entity.NotifyRemoteLogOffsetsData; +import com.alibaba.fluss.server.entity.OffsetForLeaderEpochData; import com.alibaba.fluss.server.entity.StopReplicaData; import com.alibaba.fluss.server.entity.StopReplicaResultForBucket; import com.alibaba.fluss.server.kv.KvManager; @@ -701,6 +703,35 @@ public void notifyLakeTableOffset( }); } + public void lastLogOffsetForLeaderEpoch( + List offsetForLeaderEpochDataList, + Consumer> responseCallback) { + List result = new ArrayList<>(); + for (OffsetForLeaderEpochData epochData : offsetForLeaderEpochDataList) { + TableBucket tb = epochData.getTableBucket(); + try { + Replica replica = getReplicaOrException(tb); + Optional currentLeaderEpochOpt; + // TODO -1 change to NO_LEADER_EPOCH + if (epochData.getCurrentLeaderEpoch() == -1) { + currentLeaderEpochOpt = Optional.empty(); + } else { + currentLeaderEpochOpt = Optional.of(epochData.getCurrentLeaderEpoch()); + } + result.add( + replica.lastLogOffsetForLeaderEpoch( + currentLeaderEpochOpt, epochData.getLeaderEpoch(), true)); + } catch (Exception e) { + LOG.error( + "Error processing fetch last offset for leader epoch operation on replica {}", + tb, + e); + result.add(new EpochAndLogEndOffsetForBucket(tb, ApiError.fromThrowable(e))); + } + } + responseCallback.accept(result); + } + /** * Make the current server to become leader for a given set of replicas by: * diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/LeaderEndpoint.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/LeaderEndpoint.java index 05da6830b1..e50ab238b4 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/LeaderEndpoint.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/LeaderEndpoint.java @@ -17,7 +17,9 @@ package com.alibaba.fluss.server.replica.fetcher; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.rpc.entity.EpochAndLogEndOffsetForBucket; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; +import com.alibaba.fluss.server.entity.OffsetForLeaderEpochData; import java.util.Map; import java.util.Optional; @@ -37,6 +39,10 @@ interface LeaderEndpoint { CompletableFuture fetchLeaderEndOffsetSnapshot(TableBucket tableBucket); + /** Fetches the logEndOffset for leader epoch of the given table bucket. */ + CompletableFuture> fetchOffsetForLeaderEpoch( + Map leaderEpochDataMap); + /** * Given a fetchLogRequest, carries out the expected request and returns the results from * fetching from the leader. diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java index 3f407f88d9..e3499c16e8 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.rpc.entity.EpochAndLogEndOffsetForBucket; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; import com.alibaba.fluss.rpc.gateway.TabletServerGateway; import com.alibaba.fluss.rpc.messages.FetchLogRequest; @@ -28,7 +29,12 @@ import com.alibaba.fluss.rpc.messages.PbFetchLogRespForTable; import com.alibaba.fluss.rpc.messages.PbListOffsetsRespForBucket; import com.alibaba.fluss.rpc.protocol.Errors; +import com.alibaba.fluss.server.entity.OffsetForLeaderEpochData; import com.alibaba.fluss.server.log.ListOffsetsParam; +import com.alibaba.fluss.server.utils.ServerRpcMessageUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -39,9 +45,12 @@ import static com.alibaba.fluss.rpc.CommonRpcMessageUtils.getFetchLogResultForBucket; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeListOffsetsRequest; +import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeOffsetForLeaderEpochRequest; /** Facilitates fetches from a remote replica leader in one tablet server. */ final class RemoteLeaderEndpoint implements LeaderEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RemoteLeaderEndpoint.class); + private final int followerServerId; private final int remoteServerId; private final TabletServerGateway tabletServerGateway; @@ -89,6 +98,22 @@ public CompletableFuture fetchLeaderEndOffsetSnapshot(TableBucket tableBuc return fetchLogOffset(tableBucket, ListOffsetsParam.LEADER_END_OFFSET_SNAPSHOT_TYPE); } + @Override + public CompletableFuture> + fetchOffsetForLeaderEpoch( + Map leaderEpochDataMap) { + if (leaderEpochDataMap.isEmpty()) { + LOG.info( + "Skipping send OffsetForLeaderEpochRequest since all tableBuckets don't have an epoch"); + return CompletableFuture.completedFuture(new HashMap<>()); + } + + return tabletServerGateway + .offsetForLeaderEpoch( + makeOffsetForLeaderEpochRequest(followerServerId, leaderEpochDataMap)) + .thenApply(ServerRpcMessageUtils::getOffsetForLeaderEpochData); + } + @Override public CompletableFuture> fetchLog( FetchLogContext fetchLogContext) { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java index 686675d4fd..6343f60cfc 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java @@ -47,6 +47,8 @@ import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochResponse; import com.alibaba.fluss.rpc.messages.PrefixLookupRequest; import com.alibaba.fluss.rpc.messages.PrefixLookupResponse; import com.alibaba.fluss.rpc.messages.ProduceLogRequest; @@ -90,6 +92,7 @@ import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getNotifyLeaderAndIsrRequestData; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getNotifyRemoteLogOffsetsData; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getNotifySnapshotOffsetData; +import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getOffsetForLeaderEpochData; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getPutKvData; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getStopReplicaData; @@ -100,6 +103,7 @@ import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeListOffsetsResponse; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeLookupResponse; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeNotifyLeaderAndIsrResponse; +import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeOffsetForLeaderEpochResponse; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makePrefixLookupResponse; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeProduceLogResponse; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makePutKvResponse; @@ -344,6 +348,17 @@ public CompletableFuture notifyLakeTableOffset( return response; } + @Override + public CompletableFuture offsetForLeaderEpoch( + OffsetForLeaderEpochRequest request) { + CompletableFuture response = new CompletableFuture<>(); + replicaManager.lastLogOffsetForLeaderEpoch( + getOffsetForLeaderEpochData(request), + (responseList) -> + response.complete(makeOffsetForLeaderEpochResponse(responseList))); + return response; + } + private void authorizeTable(OperationType operationType, long tableId) { PhysicalTablePath tablePath = metadataCache.getTablePath(tableId); if (tablePath == null) { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java index d2ac7bd6fd..4412e9374e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java @@ -36,6 +36,7 @@ import com.alibaba.fluss.record.MemoryLogRecords; import com.alibaba.fluss.remote.RemoteLogFetchInfo; import com.alibaba.fluss.remote.RemoteLogSegment; +import com.alibaba.fluss.rpc.entity.EpochAndLogEndOffsetForBucket; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; import com.alibaba.fluss.rpc.entity.LimitScanResultForBucket; import com.alibaba.fluss.rpc.entity.ListOffsetsResultForBucket; @@ -69,6 +70,8 @@ import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrRequest; import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochResponse; import com.alibaba.fluss.rpc.messages.PbAclInfo; import com.alibaba.fluss.rpc.messages.PbAdjustIsrReqForBucket; import com.alibaba.fluss.rpc.messages.PbAdjustIsrReqForTable; @@ -92,6 +95,10 @@ import com.alibaba.fluss.rpc.messages.PbNotifyLakeTableOffsetReqForBucket; import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrReqForBucket; import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrRespForBucket; +import com.alibaba.fluss.rpc.messages.PbOffsetForLeaderEpochReqForBucket; +import com.alibaba.fluss.rpc.messages.PbOffsetForLeaderEpochReqForTable; +import com.alibaba.fluss.rpc.messages.PbOffsetForLeaderEpochRespForBucket; +import com.alibaba.fluss.rpc.messages.PbOffsetForLeaderEpochRespForTable; import com.alibaba.fluss.rpc.messages.PbPartitionSpec; import com.alibaba.fluss.rpc.messages.PbPhysicalTablePath; import com.alibaba.fluss.rpc.messages.PbPrefixLookupReqForBucket; @@ -133,6 +140,7 @@ import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrData; import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import com.alibaba.fluss.server.entity.NotifyRemoteLogOffsetsData; +import com.alibaba.fluss.server.entity.OffsetForLeaderEpochData; import com.alibaba.fluss.server.entity.StopReplicaData; import com.alibaba.fluss.server.entity.StopReplicaResultForBucket; import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot; @@ -1364,6 +1372,135 @@ public static DropAclsResponse makeDropAclsResponse(List aclDel return new DropAclsResponse().addAllFilterResults(dropAclsFilterResults); } + public static OffsetForLeaderEpochRequest makeOffsetForLeaderEpochRequest( + int followerServerId, Map leaderEpochDataMap) { + OffsetForLeaderEpochRequest request = + new OffsetForLeaderEpochRequest().setFollowerServerId(followerServerId); + Map> reqForBuckets = new HashMap<>(); + for (Map.Entry entry : + leaderEpochDataMap.entrySet()) { + TableBucket tb = entry.getKey(); + OffsetForLeaderEpochData epochData = entry.getValue(); + PbOffsetForLeaderEpochReqForBucket reqForBucket = + new PbOffsetForLeaderEpochReqForBucket() + .setBucketId(tb.getBucket()) + .setCurrentLeaderEpoch(epochData.getCurrentLeaderEpoch()) + .setLeaderEpoch(epochData.getLeaderEpoch()); + if (tb.getPartitionId() != null) { + reqForBucket.setPartitionId(tb.getPartitionId()); + } + reqForBuckets + .computeIfAbsent(tb.getTableId(), key -> new ArrayList<>()) + .add(reqForBucket); + } + + reqForBuckets.forEach( + (tableId, buckets) -> + request.addTablesReq().setTableId(tableId).addAllBucketsReqs(buckets)); + return request; + } + + public static Map getOffsetForLeaderEpochData( + OffsetForLeaderEpochResponse response) { + Map epochDataMap = new HashMap<>(); + for (PbOffsetForLeaderEpochRespForTable pbEpochDataForTable : + response.getTablesRespsList()) { + long tableId = pbEpochDataForTable.getTableId(); + for (PbOffsetForLeaderEpochRespForBucket pbEpochDataForBucket : + pbEpochDataForTable.getBucketsRespsList()) { + TableBucket tb = + new TableBucket( + tableId, + pbEpochDataForBucket.hasPartitionId() + ? pbEpochDataForBucket.getPartitionId() + : null, + pbEpochDataForBucket.getBucketId()); + if (pbEpochDataForBucket.hasErrorCode()) { + epochDataMap.put( + tb, + new EpochAndLogEndOffsetForBucket( + tb, ApiError.fromErrorMessage(pbEpochDataForBucket))); + } else { + epochDataMap.put( + tb, + new EpochAndLogEndOffsetForBucket( + tb, + pbEpochDataForBucket.getLeaderEpoch(), + pbEpochDataForBucket.getLogEndOffset())); + } + } + } + return epochDataMap; + } + + public static List getOffsetForLeaderEpochData( + OffsetForLeaderEpochRequest request) { + List epochDataList = new ArrayList<>(); + for (PbOffsetForLeaderEpochReqForTable pbEpochDataForTable : request.getTablesReqsList()) { + long tableId = pbEpochDataForTable.getTableId(); + for (PbOffsetForLeaderEpochReqForBucket pbEpochDataForBucket : + pbEpochDataForTable.getBucketsReqsList()) { + int bucketId = pbEpochDataForBucket.getBucketId(); + TableBucket tb = + new TableBucket( + tableId, + pbEpochDataForBucket.hasPartitionId() + ? pbEpochDataForBucket.getPartitionId() + : null, + bucketId); + epochDataList.add( + new OffsetForLeaderEpochData( + tb, + pbEpochDataForBucket.getCurrentLeaderEpoch(), + pbEpochDataForBucket.getLeaderEpoch())); + } + } + return epochDataList; + } + + public static OffsetForLeaderEpochResponse makeOffsetForLeaderEpochResponse( + List epochAndLogEndOffsetForBuckets) { + OffsetForLeaderEpochResponse response = new OffsetForLeaderEpochResponse(); + Map> respForTableMap = new HashMap<>(); + for (EpochAndLogEndOffsetForBucket epochDataForBucket : epochAndLogEndOffsetForBuckets) { + TableBucket tb = epochDataForBucket.getTableBucket(); + PbOffsetForLeaderEpochRespForBucket respForBucket = + new PbOffsetForLeaderEpochRespForBucket().setBucketId(tb.getBucket()); + if (tb.getPartitionId() != null) { + respForBucket.setPartitionId(tb.getPartitionId()); + } + + if (epochDataForBucket.failed()) { + respForBucket.setError( + epochDataForBucket.getErrorCode(), epochDataForBucket.getErrorMessage()); + } else { + respForBucket + .setLeaderEpoch(epochDataForBucket.getLeaderEpoch()) + .setLogEndOffset(epochDataForBucket.getLogEndOffset()); + } + + if (respForTableMap.containsKey(tb.getTableId())) { + respForTableMap.get(tb.getTableId()).add(respForBucket); + } else { + List respForBuckets = new ArrayList<>(); + respForBuckets.add(respForBucket); + respForTableMap.put(tb.getTableId(), respForBuckets); + } + } + + List respForTables = new ArrayList<>(); + for (Map.Entry> entry : + respForTableMap.entrySet()) { + PbOffsetForLeaderEpochRespForTable respForTable = + new PbOffsetForLeaderEpochRespForTable().setTableId(entry.getKey()); + respForTable.addAllBucketsResps(entry.getValue()); + respForTables.add(respForTable); + } + + response.addAllTablesResps(respForTables); + return response; + } + private static Map mergeResponse( Map response, Map errors) { if (errors.isEmpty()) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java new file mode 100644 index 0000000000..21f9c5d53f --- /dev/null +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.replica.fetcher; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.rpc.TestingTabletGatewayService; +import com.alibaba.fluss.rpc.entity.EpochAndLogEndOffsetForBucket; +import com.alibaba.fluss.rpc.entity.ListOffsetsResultForBucket; +import com.alibaba.fluss.rpc.messages.ListOffsetsRequest; +import com.alibaba.fluss.rpc.messages.ListOffsetsResponse; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochResponse; +import com.alibaba.fluss.rpc.protocol.ApiError; +import com.alibaba.fluss.server.entity.OffsetForLeaderEpochData; +import com.alibaba.fluss.server.log.ListOffsetsParam; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getOffsetForLeaderEpochData; +import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeListOffsetsResponse; +import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeOffsetForLeaderEpochResponse; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link RemoteLeaderEndpoint}. */ +public class RemoteLeaderEndpointTest { + + private TestingTabletServerGateway remoteServerGateway; + private RemoteLeaderEndpoint remoteLeaderEndpoint; + + @BeforeEach + public void setUp() { + Configuration conf = new Configuration(); + int remoteServerId = 2; + int followerServerId = 1; + remoteServerGateway = new TestingTabletServerGateway(); + remoteLeaderEndpoint = + new RemoteLeaderEndpoint( + conf, followerServerId, remoteServerId, remoteServerGateway); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testFetchLogStartOffset(boolean isPartitionedTable) throws Exception { + TableBucket tb = new TableBucket(1, isPartitionedTable ? 1000L : null, 0); + // first add bucket leader status to mock gateway. + remoteServerGateway.setBucketLeaderStatus( + tb, new BucketLeaderStatus(tb, 0L, 100L, 0, Collections.singletonMap(0, 100L))); + assertThat(remoteLeaderEndpoint.fetchLocalLogStartOffset(tb).get()).isEqualTo(0L); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testFetchLogEndOffset(boolean isPartitionedTable) throws Exception { + TableBucket tb = new TableBucket(1, isPartitionedTable ? 1000L : null, 0); + + // first add bucket leader status to mock gateway. + remoteServerGateway.setBucketLeaderStatus( + tb, new BucketLeaderStatus(tb, 0L, 100L, 0, Collections.singletonMap(0, 100L))); + + assertThat(remoteLeaderEndpoint.fetchLocalLogEndOffset(tb).get()).isEqualTo(100L); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void fetchOffsetForLeaderEpoch(boolean isPartitionedTable) throws Exception { + TableBucket tb0 = new TableBucket(1, isPartitionedTable ? 1000L : null, 0); + TableBucket tb1 = new TableBucket(2, isPartitionedTable ? 1100L : null, 1); + + Map tb0LeaderEpochToEndOffset = new HashMap<>(); + tb0LeaderEpochToEndOffset.put(0, 20L); + tb0LeaderEpochToEndOffset.put(1, 50L); + tb0LeaderEpochToEndOffset.put(2, 100L); + BucketLeaderStatus tb0Status = + new BucketLeaderStatus(tb0, 0L, 100L, 2, tb0LeaderEpochToEndOffset); + + Map tb1LeaderEpochToEndOffset = new HashMap<>(); + tb1LeaderEpochToEndOffset.put(1, 10L); + tb1LeaderEpochToEndOffset.put(2, 50L); + tb1LeaderEpochToEndOffset.put(3, 100L); + BucketLeaderStatus tb1Status = + new BucketLeaderStatus(tb1, 10L, 100L, 3, tb1LeaderEpochToEndOffset); + + remoteServerGateway.setBucketLeaderStatus(tb0, tb0Status); + remoteServerGateway.setBucketLeaderStatus(tb1, tb1Status); + + Map leaderEpochDataMap = new HashMap<>(); + leaderEpochDataMap.put(tb0, new OffsetForLeaderEpochData(tb0, 1, 2)); + leaderEpochDataMap.put(tb1, new OffsetForLeaderEpochData(tb0, 1, 3)); + Map result = + remoteLeaderEndpoint.fetchOffsetForLeaderEpoch(leaderEpochDataMap).get(); + assertThat(result.get(tb0).getLogEndOffset()).isEqualTo(100L); + assertThat(result.get(tb1).getLogEndOffset()).isEqualTo(100L); + } + + private static class TestingTabletServerGateway extends TestingTabletGatewayService { + Map bucketLeaderStatusMap; + + TestingTabletServerGateway() { + this.bucketLeaderStatusMap = new HashMap<>(); + } + + @Override + public CompletableFuture listOffsets(ListOffsetsRequest request) { + CompletableFuture response = new CompletableFuture<>(); + TableBucket tb = + new TableBucket( + request.getTableId(), + request.hasPartitionId() ? request.getPartitionId() : null, + request.getBucketIdAt(0)); + BucketLeaderStatus bucketLeaderStatus = bucketLeaderStatusMap.get(tb); + ListOffsetsResultForBucket result; + switch (request.getOffsetType()) { + case ListOffsetsParam.LATEST_OFFSET_TYPE: + result = new ListOffsetsResultForBucket(tb, bucketLeaderStatus.logEndOffset); + break; + case ListOffsetsParam.EARLIEST_OFFSET_TYPE: + result = new ListOffsetsResultForBucket(tb, bucketLeaderStatus.logStartOffset); + break; + default: + result = + new ListOffsetsResultForBucket( + tb, + ApiError.fromThrowable(new UnsupportedOperationException())); + } + + response.complete(makeListOffsetsResponse(Collections.singletonList(result))); + return response; + } + + @Override + public CompletableFuture offsetForLeaderEpoch( + OffsetForLeaderEpochRequest request) { + CompletableFuture response = new CompletableFuture<>(); + + List requestData = getOffsetForLeaderEpochData(request); + List resultList = new ArrayList<>(); + requestData.forEach( + data -> { + TableBucket tb = data.getTableBucket(); + BucketLeaderStatus bucketLeaderStatus = bucketLeaderStatusMap.get(tb); + Map cacheEpoch = + bucketLeaderStatus.leaderEpochToLogEndOffset; + int requestEpoch = data.getLeaderEpoch(); + if (cacheEpoch.containsKey(requestEpoch)) { + resultList.add( + new EpochAndLogEndOffsetForBucket( + tb, requestEpoch, cacheEpoch.get(requestEpoch))); + } else { + resultList.add( + new EpochAndLogEndOffsetForBucket( + tb, + ApiError.fromThrowable( + new UnsupportedOperationException()))); + } + }); + + response.complete(makeOffsetForLeaderEpochResponse(resultList)); + return response; + } + + void setBucketLeaderStatus(TableBucket tableBucket, BucketLeaderStatus bucketLeaderStatus) { + bucketLeaderStatusMap.put(tableBucket, bucketLeaderStatus); + } + } + + private static class BucketLeaderStatus { + final TableBucket tableBucket; + final long logStartOffset; + final long logEndOffset; + final int leaderEpoch; + final Map leaderEpochToLogEndOffset; + + BucketLeaderStatus( + TableBucket tableBucket, + long logStartOffset, + long logEndOffset, + int leaderEpoch, + Map leaderEpochToLogEndOffset) { + this.tableBucket = tableBucket; + this.logStartOffset = logStartOffset; + this.logEndOffset = logEndOffset; + this.leaderEpoch = leaderEpoch; + this.leaderEpochToLogEndOffset = leaderEpochToLogEndOffset; + } + } +} diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java index 540839e8f1..b2eb79ce7f 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java @@ -23,9 +23,11 @@ import com.alibaba.fluss.record.FileLogRecords; import com.alibaba.fluss.record.LogRecords; import com.alibaba.fluss.record.MemoryLogRecords; +import com.alibaba.fluss.rpc.entity.EpochAndLogEndOffsetForBucket; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.server.entity.FetchData; +import com.alibaba.fluss.server.entity.OffsetForLeaderEpochData; import com.alibaba.fluss.server.log.FetchParams; import com.alibaba.fluss.server.replica.Replica; import com.alibaba.fluss.server.replica.ReplicaManager; @@ -81,6 +83,24 @@ public CompletableFuture fetchLeaderEndOffsetSnapshot(TableBucket tableBuc return CompletableFuture.completedFuture(replica.getLeaderEndOffsetSnapshot()); } + @Override + public CompletableFuture> + fetchOffsetForLeaderEpoch( + Map leaderEpochDataMap) { + Map result = new HashMap<>(); + leaderEpochDataMap.forEach( + (tb, epochData) -> { + Replica replica = replicaManager.getReplicaOrException(tb); + result.put( + tb, + replica.lastLogOffsetForLeaderEpoch( + Optional.of(epochData.getCurrentLeaderEpoch()), + epochData.getLeaderEpoch(), + true)); + }); + return CompletableFuture.completedFuture(result); + } + @Override public CompletableFuture> fetchLog( FetchLogContext fetchLogContext) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java index 9f10b2bcb9..ff16b1288d 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java @@ -67,6 +67,8 @@ import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest; +import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochResponse; import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrReqForBucket; import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrRespForBucket; import com.alibaba.fluss.rpc.messages.PbStopReplicaReqForBucket; @@ -317,6 +319,12 @@ public CompletableFuture listAcls(ListAclsRequest request) { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture offsetForLeaderEpoch( + OffsetForLeaderEpochRequest request) { + throw new UnsupportedOperationException(); + } + public int pendingRequestSize() { return requests.size(); }