Skip to content

Commit 411f697

Browse files
lhotarimerlimat
andauthored
[improve][misc] Replace dependencies on PositionImpl with Position interface (#22891)
Co-authored-by: Matteo Merli <mmerli@apache.org>
1 parent a91a172 commit 411f697

File tree

151 files changed

+2214
-1910
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

151 files changed

+2214
-1910
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
3535
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
3636
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
37-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
3837

3938
/**
4039
* A ManagedCursor is a persisted cursor inside a ManagedLedger.
@@ -152,7 +151,7 @@ enum IndividualDeletedEntries {
152151
* max position can read
153152
*/
154153
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
155-
PositionImpl maxPosition);
154+
Position maxPosition);
156155

157156

158157
/**
@@ -165,7 +164,7 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, O
165164
* @param maxPosition max position can read
166165
*/
167166
void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
168-
Object ctx, PositionImpl maxPosition);
167+
Object ctx, Position maxPosition);
169168

170169
/**
171170
* Asynchronously read entries from the ManagedLedger.
@@ -178,7 +177,7 @@ void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesC
178177
* @param skipCondition predicate of read filter out
179178
*/
180179
default void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
181-
Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
180+
Object ctx, Position maxPosition, Predicate<Position> skipCondition) {
182181
asyncReadEntries(numberOfEntriesToRead, maxSizeBytes, callback, ctx, maxPosition);
183182
}
184183

@@ -256,7 +255,7 @@ List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
256255
* max position can read
257256
*/
258257
void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
259-
PositionImpl maxPosition);
258+
Position maxPosition);
260259

261260
/**
262261
* Asynchronously read entries from the ManagedLedger, up to the specified number and size.
@@ -277,7 +276,7 @@ void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callb
277276
* max position can read
278277
*/
279278
void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
280-
PositionImpl maxPosition);
279+
Position maxPosition);
281280

282281
/**
283282
* Asynchronously read entries from the ManagedLedger, up to the specified number and size.
@@ -298,7 +297,7 @@ void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallba
298297
* predicate of read filter out
299298
*/
300299
default void asyncReadEntriesWithSkipOrWait(int maxEntries, ReadEntriesCallback callback, Object ctx,
301-
PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
300+
Position maxPosition, Predicate<Position> skipCondition) {
302301
asyncReadEntriesOrWait(maxEntries, callback, ctx, maxPosition);
303302
}
304303

@@ -323,15 +322,15 @@ default void asyncReadEntriesWithSkipOrWait(int maxEntries, ReadEntriesCallback
323322
* predicate of read filter out
324323
*/
325324
default void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
326-
Object ctx, PositionImpl maxPosition,
327-
Predicate<PositionImpl> skipCondition) {
325+
Object ctx, Position maxPosition,
326+
Predicate<Position> skipCondition) {
328327
asyncReadEntriesOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition);
329328
}
330329

331330
/**
332331
* Cancel a previously scheduled asyncReadEntriesOrWait operation.
333332
*
334-
* @see #asyncReadEntriesOrWait(int, ReadEntriesCallback, Object, PositionImpl)
333+
* @see #asyncReadEntriesOrWait(int, ReadEntriesCallback, Object, Position)
335334
* @return true if the read operation was canceled or false if there was no pending operation
336335
*/
337336
boolean cancelPendingReadRequest();
@@ -837,7 +836,7 @@ default void skipNonRecoverableLedger(long ledgerId){}
837836
* Get last individual deleted range.
838837
* @return range
839838
*/
840-
Range<PositionImpl> getLastIndividualDeletedRange();
839+
Range<Position> getLastIndividualDeletedRange();
841840

842841
/**
843842
* Trim delete entries for the given entries.
@@ -847,7 +846,7 @@ default void skipNonRecoverableLedger(long ledgerId){}
847846
/**
848847
* Get deleted batch indexes list for a batch message.
849848
*/
850-
long[] getDeletedBatchIndexesAsLongArray(PositionImpl position);
849+
long[] getDeletedBatchIndexesAsLongArray(Position position);
851850

852851
/**
853852
* @return the managed cursor stats MBean

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java

+97-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.bookkeeper.mledger;
2020

21+
import java.util.Optional;
2122
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
2223
import org.apache.bookkeeper.common.annotation.InterfaceStability;
2324

@@ -26,16 +27,108 @@
2627
*/
2728
@InterfaceAudience.LimitedPrivate
2829
@InterfaceStability.Stable
29-
public interface Position {
30+
public interface Position extends Comparable<Position> {
31+
/**
32+
* Get the ledger id of the entry pointed by this position.
33+
*
34+
* @return the ledger id
35+
*/
36+
long getLedgerId();
37+
38+
/**
39+
* Get the entry id of the entry pointed by this position.
40+
*
41+
* @return the entry id
42+
*/
43+
long getEntryId();
44+
45+
/**
46+
* Compare this position with another position.
47+
* The comparison is first based on the ledger id, and then on the entry id.
48+
* This is implements the Comparable interface.
49+
* @param that the other position to be compared.
50+
* @return -1 if this position is less than the other, 0 if they are equal, 1 if this position is greater than
51+
* the other.
52+
*/
53+
default int compareTo(Position that) {
54+
if (getLedgerId() != that.getLedgerId()) {
55+
return Long.compare(getLedgerId(), that.getLedgerId());
56+
}
57+
58+
return Long.compare(getEntryId(), that.getEntryId());
59+
}
60+
61+
/**
62+
* Compare this position with another position based on the ledger id and entry id.
63+
* @param ledgerId the ledger id to compare
64+
* @param entryId the entry id to compare
65+
* @return -1 if this position is less than the other, 0 if they are equal, 1 if this position is greater than
66+
* the other.
67+
*/
68+
default int compareTo(long ledgerId, long entryId) {
69+
if (getLedgerId() != ledgerId) {
70+
return Long.compare(getLedgerId(), ledgerId);
71+
}
72+
73+
return Long.compare(getEntryId(), entryId);
74+
}
75+
76+
/**
77+
* Calculate the hash code for the position based on ledgerId and entryId.
78+
* This is used in Position implementations to implement the hashCode method.
79+
* @return hash code
80+
*/
81+
default int hashCodeForPosition() {
82+
int result = Long.hashCode(getLedgerId());
83+
result = 31 * result + Long.hashCode(getEntryId());
84+
return result;
85+
}
86+
3087
/**
3188
* Get the position of the entry next to this one. The returned position might point to a non-existing, or not-yet
3289
* existing entry
3390
*
3491
* @return the position of the next logical entry
3592
*/
36-
Position getNext();
93+
default Position getNext() {
94+
if (getEntryId() < 0) {
95+
return PositionFactory.create(getLedgerId(), 0);
96+
} else {
97+
return PositionFactory.create(getLedgerId(), getEntryId() + 1);
98+
}
99+
}
37100

38-
long getLedgerId();
101+
/**
102+
* Position after moving entryNum messages,
103+
* if entryNum < 1, then return the current position.
104+
* */
105+
default Position getPositionAfterEntries(int entryNum) {
106+
if (entryNum < 1) {
107+
return this;
108+
}
109+
if (getEntryId() < 0) {
110+
return PositionFactory.create(getLedgerId(), entryNum - 1);
111+
} else {
112+
return PositionFactory.create(getLedgerId(), getEntryId() + entryNum);
113+
}
114+
}
39115

40-
long getEntryId();
116+
/**
117+
* Check if the position implementation has an extension of the given class or interface.
118+
*
119+
* @param extensionClass the class of the extension
120+
* @return true if the position has an extension of the given class, false otherwise
121+
*/
122+
default boolean hasExtension(Class<?> extensionClass) {
123+
return getExtension(extensionClass).isPresent();
124+
}
125+
126+
/**
127+
* Get the extension instance of the given class or interface that is attached to this position.
128+
* If the position does not have an extension of the given class, an empty optional is returned.
129+
* @param extensionClass the class of the extension
130+
*/
131+
default <T> Optional<T> getExtension(Class<T> extensionClass) {
132+
return Optional.empty();
133+
}
41134
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger;
20+
21+
import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;
22+
23+
/**
24+
* Factory for creating {@link Position} instances.
25+
*/
26+
public final class PositionFactory {
27+
/**
28+
* Earliest position.
29+
*/
30+
public static final Position EARLIEST = create(-1, -1);
31+
/**
32+
* Latest position.
33+
*/
34+
public static final Position LATEST = create(Long.MAX_VALUE, Long.MAX_VALUE);
35+
36+
private PositionFactory() {
37+
}
38+
39+
/**
40+
* Create a new position.
41+
*
42+
* @param ledgerId ledger id
43+
* @param entryId entry id
44+
* @return new position
45+
*/
46+
public static Position create(long ledgerId, long entryId) {
47+
return new ImmutablePositionImpl(ledgerId, entryId);
48+
}
49+
50+
/**
51+
* Create a new position.
52+
*
53+
* @param other other position
54+
* @return new position
55+
*/
56+
public static Position create(Position other) {
57+
return new ImmutablePositionImpl(other);
58+
}
59+
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
2525
import org.apache.bookkeeper.common.annotation.InterfaceStability;
2626
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
27-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
2827

2928
@InterfaceAudience.LimitedPrivate
3029
@InterfaceStability.Stable
@@ -48,7 +47,7 @@ public interface ReadOnlyCursor {
4847
* @see #readEntries(int)
4948
*/
5049
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback,
51-
Object ctx, PositionImpl maxPosition);
50+
Object ctx, Position maxPosition);
5251

5352
/**
5453
* Asynchronously read entries from the ManagedLedger.
@@ -60,7 +59,7 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback,
6059
* @param maxPosition max position can read
6160
*/
6261
void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
63-
Object ctx, PositionImpl maxPosition);
62+
Object ctx, Position maxPosition);
6463

6564
/**
6665
* Get the read position. This points to the next message to be read from the cursor.
@@ -116,7 +115,7 @@ Position findNewestMatching(ManagedCursor.FindPositionConstraint constraint, Pre
116115
* @param range the range between two positions
117116
* @return the number of entries in range
118117
*/
119-
long getNumberOfEntries(Range<PositionImpl> range);
118+
long getNumberOfEntries(Range<Position> range);
120119

121120
/**
122121
* Close the cursor and releases the associated resources.

0 commit comments

Comments
 (0)