Skip to content

KAFKA-14607: Move Scheduler/KafkaScheduler to server-common#13092

Merged
ijuma merged 8 commits intoapache:trunkfrom
ijuma:kafka-14607-move-scheduler-server-common
Jan 11, 2023
Merged

KAFKA-14607: Move Scheduler/KafkaScheduler to server-common#13092
ijuma merged 8 commits intoapache:trunkfrom
ijuma:kafka-14607-move-scheduler-server-common

Conversation

@ijuma
Copy link
Member

@ijuma ijuma commented Jan 9, 2023

There were some concurrency inconsistencies in KafkaScheduler flagged by spotBugs
that had to be fixed, summary of changes below:

  • Executor is volatile
  • We always synchronize and check isStarted as the first thing within the critical
    section when a mutating operation is performed.
  • We don't synchronize (but ensure the executor is not null in a safe way) in read-only
    operations that operate on the executor.

With regards to MockScheduler/MockTask:

  • Set the type of nextExecution to AtomicLong and replaced inconsistent synchronization
  • Extracted logic into MockTask.rescheduleIfPeriodic

Tweaked the Scheduler interface a bit:

  • Removed unit parameter since we always used ms except one invocation
  • Introduced a couple of scheduleOnce overloads to replace the usage of default
    arguments in Scala
  • Pulled up resizeThreadPool to the interface and removed isStarted from the
    interface.

Other cleanups:

  • Removed spotBugs exclusion affecting kafka.log.LogConfig, which no longer exists.

For broader context, see:

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

delay = delay, unit = unit)
kafkaScheduler.scheduleOnce("auto-leader-rebalance-task",
() => eventManager.put(AutoPreferredReplicaLeaderElection),
unit.toMillis(delay))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only case where we were calling schedule with a unit that is not `ms.

// stop token expiry check scheduler
if (tokenCleanScheduler.isStarted)
tokenCleanScheduler.shutdown()
tokenCleanScheduler.shutdown()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safe to call shutdown even if isStarted was not called.

shuttingDown.set(true)
if (scheduler.isStarted)
scheduler.shutdown()
scheduler.shutdown()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's redundant to check if the scheduler has started before calling shutdown.

nextDelayMs)
} catch {
case e: Throwable =>
if (scheduler.isStarted) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to double check if this change makes sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao What's your thought regarding this? This check seems a bit odd. If the scheduler is not started, we return NoOpScheduledFutureTask.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. #11351 changed the scheduler to return NoOpScheduledFutureTask if the scheduler is not started. But we forgot to change the code here accordingly.

initialOfflineDirs: Seq[String],
configRepository: ConfigRepository,
kafkaScheduler: KafkaScheduler,
kafkaScheduler: Scheduler,
Copy link
Member Author

@ijuma ijuma Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the interface instead of the concrete class.

def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
def dataPlaneRequestProcessor: KafkaApis
def kafkaScheduler: KafkaScheduler
def kafkaScheduler: Scheduler
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the interface instead of the concrete class.


<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="LogConfig.scala"/>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed since LogConfig.scala no longer exists.

return scheduleOnce(name, task, 0L);
}

default ScheduledFuture<?> scheduleOnce(String name, Runnable task, long delayMs) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java we have to use multiple methods (potentially with overloads) instead of default arguments. For clarity, we use scheduleOnce for the case where the task is executed only once (periodMs < 0).

@ijuma ijuma force-pushed the kafka-14607-move-scheduler-server-common branch from 7cbf398 to f0a18cd Compare January 10, 2023 00:17
@ijuma ijuma marked this pull request as ready for review January 10, 2023 14:54
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streams integration harness has a transitive runtime dependency to Scheduler/KafkaScheduler and dependencies to test jars are not transitive.

expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
logging: Logging
): () => Unit = () => {
): Unit = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This made the code less clear, it's better to have the clarity that a lambda is being passed.

final long period;
final Time time;

private final AtomicLong nextExecution;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this an atomic instead of the inconsistent synchronization that existed before.

/**
* If this task is periodic, reschedule it and return true. Otherwise, do nothing and return false.
*/
public boolean rescheduleIfPeriodic() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted this logic into its own method.

@ijuma
Copy link
Member Author

ijuma commented Jan 10, 2023

All builds passed. @junrao this is ready for review when you have cycles.

@ijuma ijuma requested a review from junrao January 10, 2023 15:24
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma : Thanks for the PR. LGTM Just a minor comment. Also, could you rebase?

nextDelayMs)
} catch {
case e: Throwable =>
if (scheduler.isStarted) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. #11351 changed the scheduler to return NoOpScheduledFutureTask if the scheduler is not started. But we forgot to change the code here accordingly.

@ijuma
Copy link
Member Author

ijuma commented Jan 11, 2023

JDK 17 build passed and the other two had unrelated failures:

Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[1] true
Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[2] false
Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.internals.DefaultBackgroundThreadTest.testStartupAndTearDown()
Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldRemovePausedAndUpdatingTasksOnShutdown()

@ijuma ijuma merged commit 8ac644d into apache:trunk Jan 11, 2023
ijuma added a commit to fvaleri/kafka that referenced this pull request Jan 13, 2023
* apache-github/trunk:
  KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089
  KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886)
  KAFKA-14530: Check state updater more often (apache#13017)
  KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103)
  KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301)
  KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092)
  KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870)
  KAFKA-14557; Lock metadata log dir (apache#13058)
  MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101)
  KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818)
  KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032)
  KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087)
  KAFKA-14279: Add 3.3.x streams system tests (apache#13077)
  MINOR: bump streams quickstart pom versions and add to list in gradle.properties (apache#13064)
  MINOR: Update KRaft cluster upgrade documentation for 3.4 (apache#13063)
  KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller. (apache#12998)
  KAFKA-14570: Fix parenthesis in verifyFullFetchResponsePartitions output (apache#13072)
  MINOR: Remove public mutable fields from ProducerAppendInfo (apache#13091)
ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 17, 2023
…master

* apache-github/trunk: (23 commits)
  MINOR: Include the inner exception stack trace when re-throwing an exception (apache#12229)
  MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (apache#13109)
  Update ProducerConfig.java (apache#13115)
  KAFKA-14618; Fix off by one error in snapshot id (apache#13108)
  KAFKA-13709 (follow-up): Avoid mention of 'exactly-once delivery' or 'delivery guarantees' in Connect (apache#13106)
  KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface (apache#12901)
  KAFKA-14568: Move FetchDataInfo and related to storage module (apache#13085)
  KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (apache#13104)
  KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089
  KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886)
  KAFKA-14530: Check state updater more often (apache#13017)
  KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103)
  KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301)
  KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092)
  KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870)
  KAFKA-14557; Lock metadata log dir (apache#13058)
  MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101)
  KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818)
  KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032)
  KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087)
  ...
guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
…3092)

There were some concurrency inconsistencies in `KafkaScheduler` flagged by spotBugs
that had to be fixed, summary of changes below:
* Executor is `volatile`
* We always synchronize and check `isStarted` as the first thing within the critical
   section when a mutating operation is performed.
* We don't synchronize (but ensure the executor is not null in a safe way) in read-only
   operations that operate on the executor.

With regards to `MockScheduler/MockTask`:
* Set the type of `nextExecution` to `AtomicLong` and replaced inconsistent synchronization
* Extracted logic into `MockTask.rescheduleIfPeriodic`

Tweaked the `Scheduler` interface a bit:
* Removed `unit` parameter since we always used `ms` except one invocation
* Introduced a couple of `scheduleOnce` overloads to replace the usage of default
   arguments in Scala
* Pulled up `resizeThreadPool` to the interface and removed `isStarted` from the
  interface.

Other cleanups:
* Removed spotBugs exclusion affecting `kafka.log.LogConfig`, which no longer exists.

For broader context, see:
* KAFKA-14470: Move log layer to storage module

Reviewers: Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments