From 06fec555c998eefdc417fd1dd2643ab2bd11cd1a Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 26 Oct 2023 10:28:50 +0800 Subject: [PATCH] fix(s3stream): prevent sort on unmodifiable list; delete unused code Signed-off-by: Shichao Nie --- .../s3/compact/AsyncTokenBucketThrottle.java | 51 ------------ .../stream/s3/compact/CompactionManager.java | 2 +- .../s3/compact/TokenBucketThrottle.java | 70 ---------------- .../compact/AsyncTokenBucketThrottleTest.java | 79 ------------------- 4 files changed, 1 insertion(+), 201 deletions(-) delete mode 100644 s3stream/src/main/java/com/automq/stream/s3/compact/AsyncTokenBucketThrottle.java delete mode 100644 s3stream/src/main/java/com/automq/stream/s3/compact/TokenBucketThrottle.java delete mode 100644 s3stream/src/test/java/com/automq/stream/s3/compact/AsyncTokenBucketThrottleTest.java diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/AsyncTokenBucketThrottle.java b/s3stream/src/main/java/com/automq/stream/s3/compact/AsyncTokenBucketThrottle.java deleted file mode 100644 index 738cf5d49..000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/AsyncTokenBucketThrottle.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.compact; - -import io.github.bucket4j.Bucket; -import io.netty.util.concurrent.DefaultThreadFactory; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -public class AsyncTokenBucketThrottle { - private final long tokenSize; - private final ScheduledExecutorService executorService; - private final Bucket bucket; - - public AsyncTokenBucketThrottle(long tokenSize, int poolSize, String threadSuffix) { - this.tokenSize = tokenSize; - this.executorService = Executors.newScheduledThreadPool(poolSize, new DefaultThreadFactory("token-bucket-throttle-" + threadSuffix)); - this.bucket = Bucket.builder() - .addLimit(limit -> limit.capacity(tokenSize).refillGreedy(tokenSize, Duration.ofSeconds(1))) - .build(); - } - - public void stop() { - this.executorService.shutdown(); - } - - public long getTokenSize() { - return tokenSize; - } - - public CompletableFuture throttle(long size) { - return bucket.asScheduler().consume(size, executorService); - } -} diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index aca2fa1a1..5fb3fc484 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -132,7 +132,7 @@ private CompletableFuture compact() { List streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream()) .map(StreamOffsetRange::getStreamId).distinct().toList(); return this.streamManager.getStreams(streamIds).thenApplyAsync(streamMetadataList -> { - List s3ObjectMetadataList = objectMetadataList; + List s3ObjectMetadataList = new ArrayList<>(objectMetadataList); if (s3ObjectMetadataList.isEmpty()) { logger.info("No WAL objects to compact"); return CompactResult.SKIPPED; diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/TokenBucketThrottle.java b/s3stream/src/main/java/com/automq/stream/s3/compact/TokenBucketThrottle.java deleted file mode 100644 index 44d24348a..000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/TokenBucketThrottle.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.compact; - -import io.netty.util.concurrent.DefaultThreadFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -public class TokenBucketThrottle { - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private final long tokenSize; - private long availableTokens; - - private final ScheduledExecutorService executorService; - - public TokenBucketThrottle(long tokenSize) { - this.tokenSize = tokenSize; - this.executorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("token-bucket-throttle")); - this.executorService.scheduleAtFixedRate(() -> { - lock.lock(); - try { - availableTokens = tokenSize; - condition.signalAll(); - } finally { - lock.unlock(); - } - }, 0, 1, java.util.concurrent.TimeUnit.SECONDS); - } - - public void stop() { - this.executorService.shutdown(); - } - - public long getTokenSize() { - return tokenSize; - } - - public void throttle(long size) { - lock.lock(); - try { - while (availableTokens < size) { - condition.await(); - } - availableTokens -= size; - } catch (InterruptedException ignored) { - } finally { - lock.unlock(); - } - } -} diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/AsyncTokenBucketThrottleTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/AsyncTokenBucketThrottleTest.java deleted file mode 100644 index 70a51e841..000000000 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/AsyncTokenBucketThrottleTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.compact; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - - -class AsyncTokenBucketThrottleTest { - private AsyncTokenBucketThrottle asyncTokenBucketThrottle; - - @AfterEach - void tearDown() { - if (asyncTokenBucketThrottle != null) { - asyncTokenBucketThrottle.stop(); - } - } - - @Test - void testThrottleQuickly() throws ExecutionException, InterruptedException { - asyncTokenBucketThrottle = new AsyncTokenBucketThrottle(100, 1, "testThrottleQuickly"); - final long startTimeStamp = System.currentTimeMillis(); - - asyncTokenBucketThrottle.throttle(100) - .thenRun(() -> { - // should be satisfied immediately - long timeCost = System.currentTimeMillis() - startTimeStamp; - Assertions.assertTrue(timeCost < 10, "should be satisfied immediately but took" + timeCost + "ms"); - }).get(); - } - - @Test - void testThrottleSeq() throws ExecutionException, InterruptedException { - asyncTokenBucketThrottle = new AsyncTokenBucketThrottle(100, 1, "testThrottleSeq"); - final long startTimeStamp = System.currentTimeMillis(); - - List> futures = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - futures.add(asyncTokenBucketThrottle.throttle(100)); - } - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenRun(() -> { - // should be satisfied within 3 seconds - Assertions.assertTrue(System.currentTimeMillis() - startTimeStamp < 3000); - }).get(); - } - - @Test - void testThrottleBigSize() throws ExecutionException, InterruptedException { - asyncTokenBucketThrottle = new AsyncTokenBucketThrottle(100, 1, "testThrottleBigSize"); - final long startTimeStamp = System.currentTimeMillis(); - - asyncTokenBucketThrottle.throttle(200) - .thenRun(() -> { - // should be satisfied within 2 seconds - Assertions.assertTrue(System.currentTimeMillis() - startTimeStamp < 2000); - }).get(); - } -} \ No newline at end of file