Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.exception;

/**
* The request contained a leaderEpoch which is larger than that on the tabletServer that received
* the request. This can happen if the client/follower observes a metadata update before it has been
* propagated to all tabletServers. client/follower need not refresh metadata before retrying.
*/
public class UnknownLeaderEpochException extends RetriableException {
private static final long serialVersionUID = 1L;

public UnknownLeaderEpochException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.rpc.entity;

import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest;
import com.alibaba.fluss.rpc.protocol.ApiError;

/** Result of {@link OffsetForLeaderEpochRequest} for each table bucket. */
public class EpochAndLogEndOffsetForBucket extends ResultForBucket {

/** The leaderEpoch of this tableBucket. */
private final int leaderEpoch;

/** The logEndOffset of this leaderEpoch. */
private final long logEndOffset;

public EpochAndLogEndOffsetForBucket(
TableBucket tableBucket, int leaderEpoch, long logEndOffset) {
this(tableBucket, leaderEpoch, logEndOffset, ApiError.NONE);
}

public EpochAndLogEndOffsetForBucket(TableBucket tableBucket, ApiError error) {
this(tableBucket, -1, -1L, error);
}

public EpochAndLogEndOffsetForBucket(
TableBucket tableBucket, int leaderEpoch, long logEndOffset, ApiError error) {
super(tableBucket, error);
this.leaderEpoch = leaderEpoch;
this.logEndOffset = logEndOffset;
}

public int getLeaderEpoch() {
return leaderEpoch;
}

public long getLogEndOffset() {
return logEndOffset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrResponse;
import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse;
import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest;
import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochResponse;
import com.alibaba.fluss.rpc.messages.PrefixLookupRequest;
import com.alibaba.fluss.rpc.messages.PrefixLookupResponse;
import com.alibaba.fluss.rpc.messages.ProduceLogRequest;
Expand Down Expand Up @@ -160,4 +162,13 @@ CompletableFuture<NotifyKvSnapshotOffsetResponse> notifyKvSnapshotOffset(
@RPC(api = ApiKeys.NOTIFY_LAKE_TABLE_OFFSET)
CompletableFuture<NotifyLakeTableOffsetResponse> notifyLakeTableOffset(
NotifyLakeTableOffsetRequest request);

/**
* Get logEndOffset for leaderEpoch.
*
* @return logEndOffset for leaderEpoch response
*/
@RPC(api = ApiKeys.OFFSET_FOR_LEADER_EPOCH)
CompletableFuture<OffsetForLeaderEpochResponse> offsetForLeaderEpoch(
OffsetForLeaderEpochRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public enum ApiKeys {
AUTHENTICATE(1038, 0, 0, PUBLIC),
CREATE_ACLS(1039, 0, 0, PUBLIC),
LIST_ACLS(1040, 0, 0, PUBLIC),
DROP_ACLS(1041, 0, 0, PUBLIC);
DROP_ACLS(1041, 0, 0, PUBLIC),
OFFSET_FOR_LEADER_EPOCH(1042, 0, 0, PUBLIC);

private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import com.alibaba.fluss.exception.TimeoutException;
import com.alibaba.fluss.exception.TooManyBucketsException;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.exception.UnknownLeaderEpochException;
import com.alibaba.fluss.exception.UnknownServerException;
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
import com.alibaba.fluss.exception.UnknownWriterIdException;
Expand Down Expand Up @@ -199,7 +200,11 @@ public enum Errors {
SECURITY_DISABLED_EXCEPTION(47, "Security is disabled.", SecurityDisabledException::new),
AUTHORIZATION_EXCEPTION(48, "Authorization failed", AuthorizationException::new),
BUCKET_MAX_NUM_EXCEPTION(
49, "Exceed the maximum number of buckets", TooManyBucketsException::new);
49, "Exceed the maximum number of buckets", TooManyBucketsException::new),
UNKNOWN_LEADER_EPOCH_EXCEPTION(
50,
"The leaderEpoch in the request is newer than the leaderEpoch on the tabletServer.",
UnknownLeaderEpochException::new);

private static final Logger LOG = LoggerFactory.getLogger(Errors.class);

Expand Down
37 changes: 37 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,16 @@ message DropAclsResponse{
repeated PbDropAclsFilterResult filter_results = 1;
}

// OffsetForLeaderEpochRequest and response
message OffsetForLeaderEpochRequest {
required int32 follower_server_id = 1; // value -1 indicate the request from client.
repeated PbOffsetForLeaderEpochReqForTable tables_req = 2;
}

message OffsetForLeaderEpochResponse {
repeated PbOffsetForLeaderEpochRespForTable tables_resp = 1;
}

// --------------- Inner classes ----------------
message PbApiVersion {
required int32 api_key = 1;
Expand Down Expand Up @@ -815,4 +825,31 @@ message PbDropAclsMatchingAcl {
required PbAclInfo acl = 1;
optional int32 error_code = 2;
optional string error_message = 3;
}

message PbOffsetForLeaderEpochReqForTable {
required int64 table_id = 1;
repeated PbOffsetForLeaderEpochReqForBucket buckets_req = 2;
}


message PbOffsetForLeaderEpochReqForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
required int32 current_leader_epoch = 3;
required int32 leader_epoch = 4;
}

message PbOffsetForLeaderEpochRespForTable {
required int64 table_id = 1;
repeated PbOffsetForLeaderEpochRespForBucket buckets_resp = 2;
}

message PbOffsetForLeaderEpochRespForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
optional int32 error_code = 3;
optional string error_message = 4;
optional int32 leader_epoch = 5;
optional int64 log_end_offset = 6;
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrResponse;
import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse;
import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest;
import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochResponse;
import com.alibaba.fluss.rpc.messages.PrefixLookupRequest;
import com.alibaba.fluss.rpc.messages.PrefixLookupResponse;
import com.alibaba.fluss.rpc.messages.ProduceLogRequest;
Expand Down Expand Up @@ -160,6 +162,12 @@ public CompletableFuture<NotifyLakeTableOffsetResponse> notifyLakeTableOffset(
return null;
}

@Override
public CompletableFuture<OffsetForLeaderEpochResponse> offsetForLeaderEpoch(
OffsetForLeaderEpochRequest request) {
return null;
}

@Override
public CompletableFuture<ListDatabasesResponse> listDatabases(ListDatabasesRequest request) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.server.entity;

import com.alibaba.fluss.exception.FencedLeaderEpochException;
import com.alibaba.fluss.exception.UnknownLeaderEpochException;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.rpc.messages.OffsetForLeaderEpochRequest;

import java.util.Objects;

/** The data for request {@link OffsetForLeaderEpochRequest}. */
public class OffsetForLeaderEpochData {

private final TableBucket tableBucket;

/**
* An epoch used to fence clients/followers with old metadata. If the leaderEpoch provided by
* the client or follower is larger than the current leaderEpoch known to the tabletServer, then
* the {@link UnknownLeaderEpochException} will be thrown. If the leaderEpoch provided by the
* client or follower is smaller than the current epoch known to the requested tabletServer,
* then the {@link FencedLeaderEpochException} will be thrown.
*/
private final int currentLeaderEpoch;

/** The epoch to look up an offset for. */
private final int leaderEpoch;

public OffsetForLeaderEpochData(
TableBucket tableBucket, int currentLeaderEpoch, int leaderEpoch) {
this.tableBucket = tableBucket;
this.currentLeaderEpoch = currentLeaderEpoch;
this.leaderEpoch = leaderEpoch;
}

public TableBucket getTableBucket() {
return tableBucket;
}

public int getCurrentLeaderEpoch() {
return currentLeaderEpoch;
}

public int getLeaderEpoch() {
return leaderEpoch;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OffsetForLeaderEpochData that = (OffsetForLeaderEpochData) o;
return currentLeaderEpoch == that.currentLeaderEpoch
&& leaderEpoch == that.leaderEpoch
&& tableBucket.equals(that.tableBucket);
}

@Override
public int hashCode() {
return Objects.hash(tableBucket, currentLeaderEpoch, leaderEpoch);
}

@Override
public String toString() {
return "OffsetForLeaderEpochData{"
+ "tableBucket="
+ tableBucket
+ ", currentLeaderEpoch="
+ currentLeaderEpoch
+ ", leaderEpoch="
+ leaderEpoch
+ '}';
}
}
Loading