Skip to content

Commit 6d1377d

Browse files
authored
✨ Feat : Implements ConcurrentBatchCounter (#34)
## 개요 - 새롭게 정의한 `ConcurrentBatchCounter` 인터페이스에 대한 구현체 ## 변경 사항 - [x] ✨ Feat : `ConcurrentBatchingCounter` - [x] ✨ Feat : `ConcurrentParameterizedBatchingCounter` ## 추가 정보 - 스레드 별 고유 ID로 분리하여 안정성 확보 #### `ConcurrentParameterizedBatchingCounter` - 임의의 Integer를 받는 식으로 구현 - 배열의 크기가 커지면 메모리 이슈 발생 - 내부 기준에 따라 `flush`하여 해결 #### `ConcurrentBatchingCounter` - YAGNI 원칙에 의거하여 최적의 구현체 고안 - LongAdder로 연산 속도 개선 #### 테스트 - Virtual Thread와 OS Thread, 그리고 Sync/Async 각각의 방식으로 시도 - 두 경우 모두 싱글 스레드 동작이 약 4배 빠름 (평균 10초) - OS Thread는 Thread 수에 비례하여 생성 오버헤드 증가 - Virtual Thread는 수와 관계없이 일정 (평균 40초) ### 관련 이슈 Closes #33
2 parents 2d1cdd0 + 1bb0cc0 commit 6d1377d

File tree

10 files changed

+309
-87
lines changed

10 files changed

+309
-87
lines changed

.github/ISSUE_TEMPLATE/bug_report.md

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ A clear and concise description of what the bug is.
1212

1313
**To Reproduce**
1414
Steps to reproduce the behavior:
15+
1516
1. Go to '...'
1617
2. Click on '....'
1718
3. Scroll down to '....'
@@ -24,15 +25,17 @@ A clear and concise description of what you expected to happen.
2425
If applicable, add screenshots to help explain your problem.
2526

2627
**Desktop (please complete the following information):**
27-
- OS: [e.g. iOS]
28-
- Browser [e.g. chrome, safari]
29-
- Version [e.g. 22]
28+
29+
- OS: [e.g. iOS]
30+
- Browser [e.g. chrome, safari]
31+
- Version [e.g. 22]
3032

3133
**Smartphone (please complete the following information):**
32-
- Device: [e.g. iPhone6]
33-
- OS: [e.g. iOS8.1]
34-
- Browser [e.g. stock browser, safari]
35-
- Version [e.g. 22]
34+
35+
- Device: [e.g. iPhone6]
36+
- OS: [e.g. iOS8.1]
37+
- Browser [e.g. stock browser, safari]
38+
- Version [e.g. 22]
3639

3740
**Additional context**
3841
Add any other context about the problem here.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,81 @@
11
package com.thread.concurrency;
22

3+
import com.thread.concurrency.counter.batch.BatchCounter;
4+
import com.thread.concurrency.counter.batch.ConcurrentBatchingCounter;
35
import org.springframework.boot.SpringApplication;
46
import org.springframework.boot.autoconfigure.SpringBootApplication;
57

8+
import java.lang.management.ManagementFactory;
9+
import java.lang.management.MemoryMXBean;
10+
import java.lang.management.MemoryUsage;
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
import java.util.function.Consumer;
17+
618
@SpringBootApplication
719
public class SpringThreadConcurrencyApplication {
820

921
public static void main(String[] args) {
1022
SpringApplication.run(SpringThreadConcurrencyApplication.class, args);
23+
24+
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
25+
MemoryUsage initialMemoryUsage = memoryMXBean.getHeapMemoryUsage();
26+
long initialTime = System.currentTimeMillis();
27+
28+
// Run the test
29+
int totalRequest = Integer.MAX_VALUE;
30+
conditionalMultiThreading(totalRequest);
31+
32+
MemoryUsage finalMemoryUsage = memoryMXBean.getHeapMemoryUsage();
33+
long finalTime = System.currentTimeMillis();
34+
long elapsedTime = finalTime - initialTime;
35+
long usedMemory = finalMemoryUsage.getUsed() - initialMemoryUsage.getUsed();
36+
37+
// request with comma
38+
System.out.println("Total request: " + String.format("%,d", totalRequest));
39+
// seconds
40+
System.out.println("Elapsed time: " + elapsedTime / 1000 + " s");
41+
// megabytes
42+
System.out.println("Used memory: " + usedMemory / 1024 / 1024 + " MB");
43+
}
44+
45+
private static void conditionalMultiThreading(int expected) {
46+
BatchCounter counter = new ConcurrentBatchingCounter();
47+
48+
// given
49+
int numberOfThreads = 128;
50+
List<Integer> iterPerThread = range(numberOfThreads, expected);
51+
Consumer<Integer> task = (Integer number) -> {
52+
for (int i = 0; i < number; i++) {
53+
counter.add(1);
54+
}
55+
counter.flush();
56+
};
57+
// when
58+
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
59+
List<CompletableFuture<Void>> futures = iterPerThread.stream().map(number -> CompletableFuture.runAsync(() -> task.accept(number), executor)).toList();
60+
futures.forEach(CompletableFuture::join);
61+
}
62+
// then
63+
assert expected == counter.show();
64+
}
65+
66+
private static List<Integer> range(int numberOfThreads, int expected) {
67+
int baseValue = expected / numberOfThreads;
68+
int remainder = expected % numberOfThreads;
69+
70+
List<Integer> result = new ArrayList<>();
71+
for (int i = 0; i < numberOfThreads; i++) {
72+
if (i < remainder) {
73+
result.add(baseValue + 1);
74+
} else {
75+
result.add(baseValue);
76+
}
77+
}
78+
return result;
1179
}
1280

1381
}

src/main/java/com/thread/concurrency/counter/BatchingCounter.java

Lines changed: 0 additions & 38 deletions
This file was deleted.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.thread.concurrency.counter.batch;
2+
3+
import com.thread.concurrency.counter.Counter;
4+
5+
public interface BatchCounter extends Counter {
6+
void flush();
7+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.thread.concurrency.counter.batch;
2+
3+
import org.springframework.stereotype.Component;
4+
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ConcurrentMap;
7+
import java.util.concurrent.atomic.AtomicLong;
8+
import java.util.concurrent.atomic.LongAdder;
9+
10+
@Component
11+
public class ConcurrentBatchingCounter implements BatchCounter {
12+
13+
private final AtomicLong counter = new AtomicLong();
14+
private final ConcurrentMap<Long, LongAdder> batch = new ConcurrentHashMap<>();
15+
16+
@Override
17+
public void add(int value) {
18+
var threadId = Thread.currentThread().threadId();
19+
batch.computeIfAbsent(threadId, k -> new LongAdder()).add(value);
20+
}
21+
22+
@Override
23+
public int show() {
24+
return counter.intValue();
25+
}
26+
27+
private void flush(long threadId) {
28+
var value = batch.remove(threadId);
29+
if (value != null) {
30+
counter.addAndGet(value.longValue());
31+
}
32+
}
33+
34+
@Override
35+
public void flush() {
36+
var threadId = Thread.currentThread().threadId();
37+
flush(threadId);
38+
}
39+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.thread.concurrency.counter.batch;
2+
3+
import org.springframework.context.annotation.Profile;
4+
import org.springframework.stereotype.Component;
5+
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.concurrent.ConcurrentMap;
10+
import java.util.concurrent.atomic.AtomicLong;
11+
12+
@Component
13+
@Profile("dev")
14+
public class ConcurrentParameterizedBatchingCounter implements BatchCounter {
15+
16+
private static final int BATCH_SIZE = 100;
17+
18+
private final AtomicLong counter = new AtomicLong();
19+
private final ConcurrentMap<Long, List<Integer>> batch = new ConcurrentHashMap<>();
20+
21+
@Override
22+
public void add(int value) {
23+
var threadId = Thread.currentThread().threadId();
24+
batch.computeIfAbsent(threadId, k -> new ArrayList<>()).add(value);
25+
if (batch.get(threadId).size() >= BATCH_SIZE) {
26+
flush(threadId);
27+
}
28+
}
29+
30+
@Override
31+
public int show() {
32+
return counter.intValue();
33+
}
34+
35+
private void flush(long threadId) {
36+
var list = batch.getOrDefault(threadId, null);
37+
if (list != null && !list.isEmpty()) {
38+
counter.addAndGet(list.stream().mapToLong(Integer::longValue).sum());
39+
batch.remove(threadId);
40+
}
41+
}
42+
43+
@Override
44+
public void flush() {
45+
var threadId = Thread.currentThread().threadId();
46+
flush(threadId);
47+
}
48+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
spring.application.name=spring-thread-concurrency
2+
spring.profiles.active=default

src/test/java/com/thread/concurrency/counter/CounterTest.java

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,60 +5,44 @@
55
import org.junit.jupiter.params.provider.MethodSource;
66
import org.springframework.boot.test.context.SpringBootTest;
77

8-
import java.util.concurrent.CountDownLatch;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
910
import java.util.stream.Stream;
1011

11-
import static java.lang.Thread.sleep;
12-
1312
@SpringBootTest
1413
public class CounterTest {
1514

1615
public static Stream<Counter> counterProvider() {
17-
return Stream.of(new BatchingCounter(), new LockCounter(), new PollingCounter(), new BasicCounter());
16+
return Stream.of(new LockCounter(), new PollingCounter());
1817
}
1918

20-
private static void assertThen(Counter counter, int expectedValue, int actualValue) {
21-
System.out.println("Expected value: " + expectedValue);
22-
System.out.println("Actual value: " + actualValue);
23-
if (counter instanceof BasicCounter) {
24-
System.out.println("BasicCounter is not thread-safe");
25-
Assertions.assertNotEquals(expectedValue, actualValue);
26-
} else {
27-
System.out.println("Counter is thread-safe");
28-
Assertions.assertEquals(expectedValue, actualValue);
19+
private static void whenAdd(Counter counter, int nThreads, int addPerThread) {
20+
try (ExecutorService executor = Executors.newFixedThreadPool(nThreads)) {
21+
for (int i = 0; i < nThreads; i++) {
22+
executor.submit(() -> {
23+
for (int j = 0; j < addPerThread; j++) {
24+
counter.add(1);
25+
}
26+
});
27+
}
2928
}
3029
}
3130

3231
@ParameterizedTest
3332
@MethodSource("counterProvider")
34-
public void stressTest(Counter counter) throws InterruptedException {
35-
int initialValue = counter.show();
33+
public void stressTest(Counter counter) {
34+
// given
3635
int nThreads = 100;
37-
int nAddsPerThread = 1000;
38-
int valueToAdd = 1;
39-
int expectedValue = initialValue + nThreads * nAddsPerThread * valueToAdd;
40-
41-
42-
// define runnable job
43-
CountDownLatch latch = new CountDownLatch(nThreads);
44-
Runnable job = () -> {
45-
try {
46-
latch.countDown(); // decrease the count
47-
latch.await(); // wait until the count reaches 0
48-
for (int i = 0; i < nAddsPerThread; i++) {
49-
counter.add(valueToAdd);
50-
}
51-
} catch (InterruptedException ignored) {
52-
}
53-
};
54-
55-
// start nThreads threads
56-
for (int i = 0; i < nThreads; i++) {
57-
Thread.ofVirtual().start(job);
58-
}
36+
int addPerThread = 1000;
37+
int expectedValue = counter.show() + nThreads * addPerThread;
5938

60-
sleep(300); // wait for all threads to finish
39+
// when
40+
long start = System.currentTimeMillis();
41+
whenAdd(counter, nThreads, addPerThread);
42+
long end = System.currentTimeMillis();
6143

62-
assertThen(counter, expectedValue, counter.show());
44+
// then
45+
Assertions.assertEquals(expectedValue, counter.show());
46+
System.out.println("Time elapsed: " + (end - start) + "ms");
6347
}
6448
}
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
package com.thread.concurrency;
1+
package com.thread.concurrency.counter;
22

3+
import com.thread.concurrency.SpringThreadConcurrencyApplication;
34
import org.junit.jupiter.api.Test;
45
import org.springframework.boot.test.context.SpringBootTest;
6+
57
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
68

79
@SpringBootTest
810
class SpringThreadConcurrencyApplicationTests {
9-
1011
@Test
1112
void contextLoads() {
1213
assertDoesNotThrow(() -> SpringThreadConcurrencyApplication.main(new String[]{}));
1314
}
14-
1515
}

0 commit comments

Comments
 (0)