Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16879 SystemTime should use singleton mode #16266

Merged
merged 10 commits into from
Jun 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
* A time implementation that uses the system clock and sleep call. Use `Time.SYSTEM` instead of creating an instance
* of this class.
*/
public class SystemTime implements Time {
class SystemTime implements Time {
private static final SystemTime SYSTEM_TIME = new SystemTime();

public static SystemTime getSystemTime() {
return SYSTEM_TIME;
}

@Override
public long milliseconds() {
Expand Down Expand Up @@ -57,4 +62,7 @@ public void waitObject(Object obj, Supplier<Boolean> condition, long deadlineMs)
}
}

private SystemTime() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public interface Time {

Time SYSTEM = new SystemTime();
Time SYSTEM = SystemTime.getSystemTime();

/**
* Returns the current time in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
Expand Down Expand Up @@ -3587,7 +3587,7 @@ public void testPrepareJoinAndRejoinAfterFailedRebalance() {
public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterDroppingOutOfTheGroup() {
final List<TopicPartition> partitions = singletonList(t1p);
try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true)) {
final SystemTime realTime = new SystemTime();
final Time realTime = Time.SYSTEM;
coordinator.ensureActiveGroup();

prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
Expand Down Expand Up @@ -3620,7 +3620,7 @@ public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterDroppingOutOfTheGrou
public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterResettingGenerationId() {
final List<TopicPartition> partitions = singletonList(t1p);
try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true)) {
final SystemTime realTime = new SystemTime();
final Time realTime = Time.SYSTEM;
coordinator.ensureActiveGroup();

prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.common.metrics.stats.TokenBucket;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -80,45 +79,45 @@ public void testRecordLevelEnum() {

@Test
public void testShouldRecordForInfoLevelSensor() {
Sensor infoSensor = new Sensor(null, "infoSensor", null, INFO_CONFIG, new SystemTime(),
Sensor infoSensor = new Sensor(null, "infoSensor", null, INFO_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());

infoSensor = new Sensor(null, "infoSensor", null, DEBUG_CONFIG, new SystemTime(),
infoSensor = new Sensor(null, "infoSensor", null, DEBUG_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());

infoSensor = new Sensor(null, "infoSensor", null, TRACE_CONFIG, new SystemTime(),
infoSensor = new Sensor(null, "infoSensor", null, TRACE_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());
}

@Test
public void testShouldRecordForDebugLevelSensor() {
Sensor debugSensor = new Sensor(null, "debugSensor", null, INFO_CONFIG, new SystemTime(),
Sensor debugSensor = new Sensor(null, "debugSensor", null, INFO_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.DEBUG);
assertFalse(debugSensor.shouldRecord());

debugSensor = new Sensor(null, "debugSensor", null, DEBUG_CONFIG, new SystemTime(),
debugSensor = new Sensor(null, "debugSensor", null, DEBUG_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.DEBUG);
assertTrue(debugSensor.shouldRecord());

debugSensor = new Sensor(null, "debugSensor", null, TRACE_CONFIG, new SystemTime(),
debugSensor = new Sensor(null, "debugSensor", null, TRACE_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.DEBUG);
assertTrue(debugSensor.shouldRecord());
}

@Test
public void testShouldRecordForTraceLevelSensor() {
Sensor traceSensor = new Sensor(null, "traceSensor", null, INFO_CONFIG, new SystemTime(),
Sensor traceSensor = new Sensor(null, "traceSensor", null, INFO_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.TRACE);
assertFalse(traceSensor.shouldRecord());

traceSensor = new Sensor(null, "traceSensor", null, DEBUG_CONFIG, new SystemTime(),
traceSensor = new Sensor(null, "traceSensor", null, DEBUG_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.TRACE);
assertFalse(traceSensor.shouldRecord());

traceSensor = new Sensor(null, "traceSensor", null, TRACE_CONFIG, new SystemTime(),
traceSensor = new Sensor(null, "traceSensor", null, TRACE_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.TRACE);
assertTrue(traceSensor.shouldRecord());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
Expand All @@ -31,7 +30,7 @@
*/
public class ErrorHandlingMetrics implements AutoCloseable {

private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;

private final ConnectMetrics.MetricGroup metricGroup;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics;
Expand Down Expand Up @@ -97,14 +97,14 @@ public static <T> RetryWithToleranceOperator<T> noopOperator() {
return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, NONE, new ErrorHandlingMetrics(
new ConnectorTaskId("noop-connector", -1),
new ConnectMetrics("noop-worker", new TestableWorkerConfig(PROPERTIES),
new SystemTime(), "test-cluster")));
Time.SYSTEM, "test-cluster")));
}

public static <T> RetryWithToleranceOperator<T> allOperator() {
return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ALL, new ErrorHandlingMetrics(
new ConnectorTaskId("errors-all-tolerate-connector", -1),
new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(PROPERTIES),
new SystemTime(), "test-cluster")));
Time.SYSTEM, "test-cluster")));
}

private static <T> RetryWithToleranceOperator<T> genericOperator(int retryTimeout, ToleranceType toleranceType, ErrorHandlingMetrics metrics) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{AlterPartitionResponse, FetchRequest, ListOffsetsRequest, RequestHeader}
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.metadata.LeaderRecoveryState
Expand Down Expand Up @@ -2914,7 +2914,7 @@ class PartitionTest extends AbstractPartitionTest {
val topicPartition = new TopicPartition("test", 1)
val partition = new Partition(
topicPartition, 1000, MetadataVersion.latestTesting, 0, () => defaultBrokerEpoch(0),
new SystemTime(), mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]),
Time.SYSTEM, mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]),
mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager]))

val replicas = Seq(0, 1, 2, 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest}
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
Expand Down Expand Up @@ -168,7 +168,7 @@ class ReplicaFetcherThreadTest {
t1p1 -> newOffsetForLeaderPartitionResult(t1p1, leaderEpoch, 1)).asJava

//Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM)

val thread = createReplicaFetcherThread(
"bob",
Expand Down Expand Up @@ -317,7 +317,7 @@ class ReplicaFetcherThreadTest {
t1p1 -> newOffsetForLeaderPartitionResult(t1p1, leaderEpoch, 1)).asJava

//Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
Expand Down Expand Up @@ -383,7 +383,7 @@ class ReplicaFetcherThreadTest {
t2p1 -> newOffsetForLeaderPartitionResult(t2p1, leaderEpoch, 172)).asJava

//Create the thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
Expand Down Expand Up @@ -443,7 +443,7 @@ class ReplicaFetcherThreadTest {
t2p1 -> newOffsetForLeaderPartitionResult(t2p1, leaderEpochAtLeader, 202)).asJava

//Create the thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
Expand Down Expand Up @@ -506,7 +506,7 @@ class ReplicaFetcherThreadTest {
t1p1 -> newOffsetForLeaderPartitionResult(t1p1, 4, 143)).asJava

// Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
Expand Down Expand Up @@ -584,7 +584,7 @@ class ReplicaFetcherThreadTest {
stub(partition, replicaManager, log)

// Create the fetcher thread
val mockNetwork = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint, Time.SYSTEM)
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, brokerEndPoint.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, fetchSessionHandler, config,
Expand Down Expand Up @@ -694,7 +694,7 @@ class ReplicaFetcherThreadTest {
val mockNetwork = new MockBlockingSender(
Collections.emptyMap(),
brokerEndPoint,
new SystemTime()
Time.SYSTEM
)

val leader = new RemoteLeaderEndPoint(
Expand Down Expand Up @@ -787,7 +787,7 @@ class ReplicaFetcherThreadTest {
val mockNetwork = new MockBlockingSender(
Collections.emptyMap(),
brokerEndPoint,
new SystemTime()
Time.SYSTEM
)

val leader = new RemoteLeaderEndPoint(
Expand Down Expand Up @@ -883,7 +883,7 @@ class ReplicaFetcherThreadTest {
t1p1 -> newOffsetForLeaderPartitionResult(t1p1, UNDEFINED_EPOCH, 143)).asJava

// Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
Expand Down Expand Up @@ -947,7 +947,7 @@ class ReplicaFetcherThreadTest {
t1p0 -> newOffsetForLeaderPartitionResult(t1p0, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)).asJava

//Create the thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
Expand Down Expand Up @@ -1008,7 +1008,7 @@ class ReplicaFetcherThreadTest {
).asJava

//Create the thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
Expand Down Expand Up @@ -1070,7 +1070,7 @@ class ReplicaFetcherThreadTest {
).asJava

//Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
Expand Down Expand Up @@ -1130,7 +1130,7 @@ class ReplicaFetcherThreadTest {
).asJava

//Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_
import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -246,7 +246,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
val node = from.metadataCache.getAliveBrokerNode(to.config.brokerId,
from.config.interBrokerListenerName).get
val endPoint = new BrokerEndPoint(node.id(), node.host(), node.port())
new BrokerBlockingSender(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher", new LogContext())
new BrokerBlockingSender(endPoint, from.config, new Metrics(), Time.SYSTEM, 42, "TestFetcher", new LogContext())
}

private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.AbstractRequest.Builder
import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetsResponse, OffsetsForLeaderEpochResponse, FetchMetadata => JFetchMetadata}
import org.apache.kafka.common.utils.{SystemTime, Time}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}

import java.net.SocketTimeoutException
Expand All @@ -46,7 +46,7 @@ class MockBlockingSender(offsets: java.util.Map[TopicPartition, EpochEndOffset],
time: Time)
extends BlockingSend {

private val client = new MockClient(new SystemTime)
private val client = new MockClient(Time.SYSTEM)
var fetchCount = 0
var epochFetchCount = 0
var listOffsetsCount = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
Expand Down Expand Up @@ -92,7 +92,7 @@ public void beforeEach() {
.collect(Collectors.toMap(Function.identity(), e -> 0L));
consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST));
consumer.updateBeginningOffsets(offsets);
consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, new SystemTime());
consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, Time.SYSTEM);
thread = new Thread(consumerTask);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
Expand All @@ -44,7 +43,7 @@ public class RemoteLogMetadataSerdeTest {

public static final String TOPIC = "foo";
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0));
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;

@Test
public void testRemoteLogSegmentMetadataSerde() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform;
Expand All @@ -41,7 +40,7 @@

public class RemoteLogMetadataTransformTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;

@Test
public void testRemoteLogSegmentMetadataTransform() {
Expand Down
Loading