Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: fixing JavaDocs and other cleanup #17207

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ public interface ClientInstanceIds {
*
* @throws IllegalStateException If telemetry is disabled on the admin client.
*/
@SuppressWarnings("unused")
Uuid adminInstanceId();

/**
* Returns the {@code client instance id} of the consumers.
*
* @return a map from thread key to {@code client instance id}
*/
@SuppressWarnings("unused")
Map<String, Uuid> consumerInstanceIds();

/**
* Returns the {@code client instance id} of the producers.
*
* @return a map from thread key to {@code client instance id}
*/
@SuppressWarnings("unused")
Map<String, Uuid> producerInstanceIds();
}
78 changes: 44 additions & 34 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,11 @@ private boolean setState(final State newState) {

if (state == State.PENDING_SHUTDOWN && newState != State.NOT_RUNNING) {
// when the state is already in PENDING_SHUTDOWN, all other transitions than NOT_RUNNING (due to thread dying) will be
// refused but we do not throw exception here, to allow appropriate error handling
// refused, but we do not throw exception here, to allow appropriate error handling
return false;
} else if (state == State.NOT_RUNNING && (newState == State.PENDING_SHUTDOWN || newState == State.NOT_RUNNING)) {
// when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls)
// will be refused but we do not throw exception here, to allow idempotent close calls
// will be refused, but we do not throw exception here, to allow idempotent close calls
return false;
} else if (state == State.REBALANCING && newState == State.REBALANCING) {
// when the state is already in REBALANCING, it should not transit to REBALANCING again
Expand All @@ -337,7 +337,7 @@ private boolean setState(final State newState) {
return false;
} else if (state == State.PENDING_ERROR && newState != State.ERROR) {
// when the state is already in PENDING_ERROR, all other transitions than ERROR (due to thread dying) will be
// refused but we do not throw exception here, to allow appropriate error handling
// refused, but we do not throw exception here, to allow appropriate error handling
return false;
} else if (!state.isValidTransition(newState)) {
throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState);
Expand Down Expand Up @@ -453,7 +453,7 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler un
* The handler will execute on the thread that produced the exception.
* In order to get the thread that threw the exception, use {@code Thread.currentThread()}.
* <p>
* Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any
* Note, this handler must be thread safe, since it will be shared among all threads, and invoked from any
* thread that encounters such an exception.
*
* @param userStreamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads
Expand Down Expand Up @@ -539,9 +539,12 @@ private void handleStreamsUncaughtException(final Throwable throwable,
}
break;
case SHUTDOWN_CLIENT:
log.error("Encountered the following exception during processing " +
"and the registered exception handler opted to " + action + "." +
" The streams client is going to shut down now. ", throwable);
log.error(
"Encountered the following exception during processing and the registered exception handler" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, why not using {} placeholder directly?

"opted to {}. The streams client is going to shut down now.",
action,
throwable
);
closeToError();
break;
case SHUTDOWN_APPLICATION:
Expand Down Expand Up @@ -979,7 +982,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
// The application ID is a required config and hence should always have value
final String userClientId = applicationConfigs.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG);
if (userClientId.length() <= 0) {
if (userClientId.isEmpty()) {
clientId = applicationId + "-" + processId;
} else {
clientId = userClientId;
Expand Down Expand Up @@ -1194,7 +1197,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
for (final StreamThread streamThread : new ArrayList<>(threads)) {
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
log.info("Removing StreamThread " + streamThread.getName());
log.info("Removing StreamThread {}", streamThread.getName());
final Optional<String> groupInstanceID = streamThread.groupInstanceID();
streamThread.requestLeaveGroupDuringShutdown();
streamThread.shutdown();
Expand Down Expand Up @@ -1268,17 +1271,18 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
}
log.warn("There are no threads eligible for removal");
} else {
log.warn("Cannot remove a stream thread when Kafka Streams client is in state " + state());
log.warn("Cannot remove a stream thread when Kafka Streams client is in state {}", state());
}
return Optional.empty();
}

/*
* Takes a snapshot and counts the number of stream threads which are not in PENDING_SHUTDOWN or DEAD
*
* note: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
* require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding
* Note: iteration over SynchronizedList is not thread safe, so it must be manually synchronized. However, we may
* require other locks when looping threads, and it could cause deadlock. Hence, we create a copy to avoid holding
* threads lock when looping threads.
*
* @return number of alive stream threads
*/
private int numLiveStreamThreads() {
Expand All @@ -1305,7 +1309,7 @@ private int nextThreadIndex() {
final AtomicInteger maxThreadId = new AtomicInteger(1);
synchronized (threads) {
processStreamThread(thread -> {
// trim any DEAD threads from the list so we can reuse the thread.id
// trim any DEAD threads from the list, so we can reuse the thread.id
// this is only safe to do once the thread has fully completed shutdown
if (thread.state() == StreamThread.State.DEAD) {
threads.remove(thread);
Expand Down Expand Up @@ -1452,7 +1456,7 @@ public CloseOptions leaveGroup(final boolean leaveGroup) {
* This will block until all threads have stopped.
*/
public void close() {
close(Long.MAX_VALUE, false);
close(Optional.empty(), false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this to Optional to unify the log message (cf other changes).

}

private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
Expand Down Expand Up @@ -1531,7 +1535,15 @@ private Thread shutdownHelper(final boolean error, final long timeoutMs, final b
}, clientId + "-CloseThread");
}

private boolean close(final long timeoutMs, final boolean leaveGroup) {
private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
final long timeoutMs;
if (timeout.isPresent()) {
timeoutMs = timeout.get();
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there is "ms" already, so timeoutMillis could be simplified to timeout

} else {
timeoutMs = Long.MAX_VALUE;
}

if (state.hasCompletedShutdown()) {
log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state);
return true;
Expand Down Expand Up @@ -1574,7 +1586,7 @@ private boolean close(final long timeoutMs, final boolean leaveGroup) {

private void closeToError() {
if (!setState(State.PENDING_ERROR)) {
log.info("Skipping shutdown since we are already in " + state());
log.info("Skipping shutdown since we are already in {}", state());
} else {
final Thread shutdownThread = shutdownHelper(true, -1, false);

Expand All @@ -1586,10 +1598,10 @@ private void closeToError() {
/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* A {@code timeout} of Duration.ZERO (or any other zero duration) makes the close operation asynchronous.
* A {@code timeout} of {@link Duration#ZERO} (or any other zero duration) makes the close operation asynchronous.
* Negative-duration timeouts are rejected.
*
* @param timeout how long to wait for the threads to shutdown
* @param timeout how long to wait for the threads to shut down
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
Expand All @@ -1602,15 +1614,13 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument
throw new IllegalArgumentException("Timeout can't be negative.");
}

log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);

return close(timeoutMs, false);
return close(Optional.of(timeoutMs), false);
}

/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* @param options contains timeout to specify how long to wait for the threads to shutdown, and a flag leaveGroup to
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag leaveGroup to
* trigger consumer leave call
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
Expand All @@ -1624,8 +1634,8 @@ public synchronized boolean close(final CloseOptions options) throws IllegalArgu
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
return close(timeoutMs, options.leaveGroup);

return close(Optional.of(timeoutMs), options.leaveGroup);
}

private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) {
Expand Down Expand Up @@ -1656,7 +1666,7 @@ private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remaini
}

/**
* Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all
* Do a cleanup of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all
* data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}.
* <p>
* May only be called either before this {@code KafkaStreams} instance is {@link #start() started} or after the
Expand All @@ -1679,7 +1689,7 @@ public void cleanUp() {
* {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
* the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
* Note: this is a point in time view, and it may change due to partition reassignment.
*
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
*/
Expand All @@ -1697,10 +1707,10 @@ public Collection<StreamsMetadata> metadataForAllStreamsClients() {
* </ul>
* and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
* Note: this is a point in time view, and it may change due to partition reassignment.
*
* @param storeName the {@code storeName} to find metadata for
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provided {@code storeName} of
* this application
*/
public Collection<StreamsMetadata> streamsMetadataForStore(final String storeName) {
Expand Down Expand Up @@ -1730,9 +1740,9 @@ public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
*
* @param storeName the {@code storeName} to find metadata for
* @param key the key to find metadata for
* @param partitioner the partitioner to be use to locate the host for the key
* @param partitioner the partitioner to be used to locate the host for the key
* @param <K> key type
* Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
* Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using
* the supplied partitioner, or {@code null} if no matching metadata could be found.
*/
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
Expand Down Expand Up @@ -1772,7 +1782,7 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
/**
* This method pauses processing for the KafkaStreams instance.
*
* <p>Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks.
* <p>Paused topologies will only skip over (a) processing, (b) punctuation, and (c) standby tasks.
* Notably, paused topologies will still poll Kafka consumers, and commit offsets.
* This method sets transient state that is not maintained or managed among instances.
* Note that pause() can be called before start() in order to start a KafkaStreams instance
Expand Down Expand Up @@ -1817,8 +1827,8 @@ public void resume() {

/**
* handle each stream thread in a snapshot of threads.
* noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
* require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding
* noted: iteration over SynchronizedList is not thread safe, so it must be manually synchronized. However, we may
* require other locks when looping threads, and it could cause deadlock. Hence, we create a copy to avoid holding
* threads lock when looping threads.
* @param consumer handler
*/
Expand Down Expand Up @@ -2060,7 +2070,7 @@ protected Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags(final Li
/**
* Run an interactive query against a state store.
* <p>
* This method allows callers outside of the Streams runtime to access the internal state of
* This method allows callers outside the Streams runtime to access the internal state of
* stateful processors. See <a href="https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html">IQ docs</a>
* for more information.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public HostInfo activeHost() {
/**
* Get the Kafka Streams instances that host the key as standbys.
*
* @return set of standby {@link HostInfo} or a empty set, if no standbys are configured
* @return set of standby {@link HostInfo} or an empty set, if no standbys are configured
*/
public Set<HostInfo> standbyHosts() {
return standbyHosts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class LagInfo {
* Get the current maximum offset on the store partition's changelog topic, that has been successfully written into
* the store partition's state store.
*
* @return current consume offset for standby/restoring store partitions &amp; simply endoffset for active store partition replicas
* @return current consume offset for standby/restoring store partitions &amp; simply end offset for active store partition replicas
*/
public long currentOffsetPosition() {
return this.currentOffsetPosition;
Expand Down
Loading