From 24aaee141c2effa4ff538221d80c4017d4c9e1c5 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Tue, 16 Sep 2025 14:35:36 +0900 Subject: [PATCH 01/12] =?UTF-8?q?chore=20:=20commerce-batch=20app=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80=20=EB=B0=8F=20spring=20batch=20=EC=84=A4?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/commerce-batch/build.gradle.kts | 21 +++++ .../com/loopers/CommerceBatchApplication.java | 14 ++++ .../src/main/resources/application.yml | 76 +++++++++++++++++++ .../com/loopers/CommerceBatchContextTest.java | 14 ++++ settings.gradle.kts | 1 + 5 files changed, 126 insertions(+) create mode 100644 apps/commerce-batch/build.gradle.kts create mode 100644 apps/commerce-batch/src/main/java/com/loopers/CommerceBatchApplication.java create mode 100644 apps/commerce-batch/src/main/resources/application.yml create mode 100644 apps/commerce-batch/src/test/java/com/loopers/CommerceBatchContextTest.java diff --git a/apps/commerce-batch/build.gradle.kts b/apps/commerce-batch/build.gradle.kts new file mode 100644 index 0000000..bee0476 --- /dev/null +++ b/apps/commerce-batch/build.gradle.kts @@ -0,0 +1,21 @@ +dependencies { + // add-ons + implementation(project(":modules:jpa")) + implementation(project(":supports:jackson")) + implementation(project(":supports:logging")) + implementation(project(":supports:monitoring")) + + // web + implementation("org.springframework.boot:spring-boot-starter-web") + implementation("org.springframework.boot:spring-boot-starter-batch") + implementation("org.springframework.boot:spring-boot-starter-actuator") + + // querydsl + annotationProcessor("com.querydsl:querydsl-apt::jakarta") + annotationProcessor("jakarta.persistence:jakarta.persistence-api") + annotationProcessor("jakarta.annotation:jakarta.annotation-api") + + // test-fixtures + testImplementation(testFixtures(project(":modules:jpa"))) + testImplementation(testFixtures(project(":modules:redis"))) +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/CommerceBatchApplication.java b/apps/commerce-batch/src/main/java/com/loopers/CommerceBatchApplication.java new file mode 100644 index 0000000..28309a4 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/CommerceBatchApplication.java @@ -0,0 +1,14 @@ +package com.loopers; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; + +@ConfigurationPropertiesScan +@SpringBootApplication +public class CommerceBatchApplication { + + public static void main(String[] args) { + SpringApplication.run(CommerceBatchApplication.class, args); + } +} diff --git a/apps/commerce-batch/src/main/resources/application.yml b/apps/commerce-batch/src/main/resources/application.yml new file mode 100644 index 0000000..2eda23d --- /dev/null +++ b/apps/commerce-batch/src/main/resources/application.yml @@ -0,0 +1,76 @@ +server: + port: 8084 + + shutdown: graceful + tomcat: + threads: + max: 200 # 최대 워커 스레드 수 (default : 200) + min-spare: 10 # 최소 유지 스레드 수 (default : 10) + connection-timeout: 1m # 연결 타임아웃 (ms) (default : 60000ms = 1m) + max-connections: 8192 # 최대 동시 연결 수 (default : 8192) + accept-count: 100 # 대기 큐 크기 (default : 100) + keep-alive-timeout: 60s # 60s + max-http-request-header-size: 8KB + +spring: + main: + web-application-type: servlet + application: + name: commerce-batch + profiles: + active: local + config: + import: + - jpa.yml + - logging.yml + - monitoring.yml + batch: + jdbc: + initialize-schema: always + job: + enabled: false + +management: + server: + port: 8094 + +--- +spring: + config: + activate: + on-profile: test + + main: + web-application-type: servlet + + cloud: + openfeign: + enabled: false + +--- +spring: + config: + activate: + on-profile: local + +--- +spring: + config: + activate: + on-profile: dev + +--- +spring: + config: + activate: + on-profile: qa + +--- +spring: + config: + activate: + on-profile: prd + +springdoc: + api-docs: + enabled: false \ No newline at end of file diff --git a/apps/commerce-batch/src/test/java/com/loopers/CommerceBatchContextTest.java b/apps/commerce-batch/src/test/java/com/loopers/CommerceBatchContextTest.java new file mode 100644 index 0000000..3743fb6 --- /dev/null +++ b/apps/commerce-batch/src/test/java/com/loopers/CommerceBatchContextTest.java @@ -0,0 +1,14 @@ +package com.loopers; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +public class CommerceBatchContextTest { + + @Test + void contextLoads() { + // 이 테스트는 Spring Boot 애플리케이션 컨텍스트가 로드되는지 확인합니다. + // 모든 빈이 올바르게 로드되었는지 확인하는 데 사용됩니다. + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index f4c8e4a..fdb2a71 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -3,6 +3,7 @@ rootProject.name = "loop-TDD" include( ":apps:commerce-api", ":apps:commerce-streamer", + ":apps:commerce-batch", ":apps:pg-simulator", ":modules:jpa", ":modules:redis", From 08fc423182a41080d25da1e864209883a7542189 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:30:13 +0900 Subject: [PATCH 02/12] =?UTF-8?q?feat=20:=20=EC=A3=BC=EA=B0=84MV,=20?= =?UTF-8?q?=EC=9B=94=EA=B0=84MV=20=EC=97=94=ED=8B=B0=ED=8B=B0=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/ranking/ProductMetricsMonthly.java | 48 +++++++++++++++ .../ranking/ProductMetricsMonthlyId.java | 14 +++++ .../domain/ranking/ProductMetricsWeekly.java | 59 +++++++++++++++++++ .../ranking/ProductMetricsWeeklyId.java | 14 +++++ 4 files changed, 135 insertions(+) create mode 100644 apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthly.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthlyId.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeekly.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeeklyId.java diff --git a/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthly.java b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthly.java new file mode 100644 index 0000000..5d8a669 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthly.java @@ -0,0 +1,48 @@ +package com.loopers.domain.ranking; + +import jakarta.persistence.Column; +import jakarta.persistence.EmbeddedId; +import jakarta.persistence.Entity; +import jakarta.persistence.Table; +import lombok.*; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +@Entity +@Table(name="product_metrics_monthly") +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class ProductMetricsMonthly { + + @EmbeddedId + private ProductMetricsMonthlyId id; + + @Column(nullable=false) + private double score; + + @Column(name="updated_at", nullable=false) + private LocalDateTime updatedAt; + + public static ProductMetricsMonthly of(String yearMonth, Long productId, double score){ + return ProductMetricsMonthly.builder() + .id(new ProductMetricsMonthlyId(yearMonth, productId)) + .score(score) + .updatedAt(LocalDateTime.now()) + .build(); + } + + public record YearMonth(String value){ + public static YearMonth of(LocalDate d){ + return new YearMonth(DateTimeFormatter.ofPattern("yyyyMM").format(d)); + } + public YearMonth prev(){ + LocalDate first = LocalDate.parse(this.value() + "01", DateTimeFormatter.ofPattern("yyyyMMdd")); + return of(first.minusMonths(1)); + } + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthlyId.java b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthlyId.java new file mode 100644 index 0000000..6445a88 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthlyId.java @@ -0,0 +1,14 @@ +package com.loopers.domain.ranking; + +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; + +import java.io.Serializable; + +@Embeddable +public record ProductMetricsMonthlyId( + @Column(name="year_month", length=7, nullable=false) + String yearMonth, + @Column(name="product_id", nullable=false) + Long productId +) implements Serializable {} diff --git a/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeekly.java b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeekly.java new file mode 100644 index 0000000..2f607e5 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeekly.java @@ -0,0 +1,59 @@ +package com.loopers.domain.ranking; + +import jakarta.persistence.Column; +import jakarta.persistence.EmbeddedId; +import jakarta.persistence.Entity; +import jakarta.persistence.Table; +import lombok.*; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.temporal.WeekFields; + +@Entity +@Table(name="product_metrics_weekly") +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class ProductMetricsWeekly { + + @EmbeddedId + private ProductMetricsWeeklyId id; + + @Column(nullable=false) + private double score; + + @Column(name="updated_at", nullable=false) + private LocalDateTime updatedAt; + + public static ProductMetricsWeekly of(String yearWeek, Long productId, double score){ + return ProductMetricsWeekly.builder() + .id(new ProductMetricsWeeklyId(yearWeek, productId)) + .score(score) + .updatedAt(LocalDateTime.now()) + .build(); + } + + public record YearWeek(String value) { + public static YearWeek of(LocalDate date){ + WeekFields wf = WeekFields.ISO; + int w = date.get(wf.weekOfWeekBasedYear()); + int y = date.get(wf.weekBasedYear()); + return new YearWeek("%04dW%02d".formatted(y, w)); + } + public static LocalDate start(LocalDate d){ return d.with(WeekFields.ISO.dayOfWeek(), 1); } + public static LocalDate end(LocalDate d){ return d.with(WeekFields.ISO.dayOfWeek(), 7); } + + public YearWeek prev(){ + int year = Integer.parseInt(value().substring(0, 4)); + int week = Integer.parseInt(value().substring(5)); + LocalDate firstThursday = LocalDate.of(year, 1, 4); + LocalDate mondayOfWeek = firstThursday + .with(WeekFields.ISO.weekOfWeekBasedYear(), week) + .with(WeekFields.ISO.dayOfWeek(), 1); + return of(mondayOfWeek.minusWeeks(1)); + } + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeeklyId.java b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeeklyId.java new file mode 100644 index 0000000..44ec049 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeeklyId.java @@ -0,0 +1,14 @@ +package com.loopers.domain.ranking; + +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; + +import java.io.Serializable; + +@Embeddable +public record ProductMetricsWeeklyId( + @Column(name="year_week", length=7, nullable=false) + String yearWeek, + @Column(name="product_id", nullable=false) + Long productId +) implements Serializable {} From 549d2d47026ac8de178bc274a23e3136b771c000 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:34:56 +0900 Subject: [PATCH 03/12] =?UTF-8?q?refacfor=20:=20product=5Fmetrics,=20produ?= =?UTF-8?q?ct=5Fsku=5Fmetrics=20=EB=82=A0=EC=A7=9C=ED=8F=AC=ED=95=A8=20?= =?UTF-8?q?=EB=B3=B5=ED=95=A9pk=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../catalog/CatalogApplicationService.java | 93 +++++++------ .../java/com/loopers/domain/RecordIO.java | 43 +++++- .../domain/product/MetricsService.java | 131 ++++++++++-------- .../domain/product/ProductMetrics.java | 49 ++++--- .../domain/product/ProductMetricsId.java | 20 +++ .../product/ProductMetricsRepository.java | 11 +- .../domain/product/ProductSkuMetrics.java | 34 +++-- .../domain/product/ProductSkuMetricsId.java | 21 +++ .../product/ProductSkuMetricsRepository.java | 18 ++- .../product/ProductMetricsJpaRepository.java | 46 +++++- .../product/ProductMetricsRepositoryImpl.java | 23 ++- .../ProductSkuMetricsJpaRepository.java | 60 +++++--- .../ProductSkuMetricsRepositoryImpl.java | 25 +++- 13 files changed, 404 insertions(+), 170 deletions(-) create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsId.java create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetricsId.java diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/catalog/CatalogApplicationService.java b/apps/commerce-streamer/src/main/java/com/loopers/application/catalog/CatalogApplicationService.java index 7afa02a..ac64ae3 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/application/catalog/CatalogApplicationService.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/catalog/CatalogApplicationService.java @@ -10,6 +10,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.stereotype.Service; +import java.time.LocalDate; +import java.time.ZoneId; import java.util.*; @Service @@ -24,90 +26,101 @@ public class CatalogApplicationService { private final ObjectMapper objectMapper = new ObjectMapper(); public void onViewBatch(List> records) throws Exception { - Map viewMap = new HashMap<>(); + if (records == null || records.isEmpty()) + return; + + Map> viewByDate = new LinkedHashMap<>(); List handledIds = new ArrayList<>(); - for (ConsumerRecord r : records) { - String eventId = io.header(r, "event_id"); + for (ConsumerRecord record : records) { + String eventId = io.header(record, "event_id"); + if (!handledService.shouldProcess("catalog-view", eventId)) continue; - if (!handledService.shouldProcess("activity-log", eventId)) - continue; + long productId = io.payload(record).path("productId").asLong(); + if (productId <= 0) continue; - JsonNode body = objectMapper.readTree(io.payloadString(r)).path("payload"); - long productId = body.path("productId").asLong(); + LocalDate date = io.occurredDate(record); + + viewByDate.computeIfAbsent(date, d -> new HashMap<>()) + .merge(productId, 1L, Long::sum); - viewMap.merge(productId, 1L, Long::sum); handledIds.add(eventId); } - if (viewMap.isEmpty()) + if (viewByDate.isEmpty()) return; - metricsService.incProductViews(viewMap); + for (Map.Entry> entry : viewByDate.entrySet()) { + LocalDate date = entry.getKey(); + Map viewMap = entry.getValue(); - Set productIds = new LinkedHashSet<>(viewMap.keySet()); - rankingService.computeAndPutScoresToday(productIds); + metricsService.incProductViews(viewMap, date); + rankingService.computeAndPutScores(date, viewMap.keySet()); + } handledService.saveAll("catalog-view", handledIds); } public void onLikeBatch(List> records) throws Exception { - Map latestLike = new LinkedHashMap<>(); + if (records == null || records.isEmpty()) return; + + Map> touchedByDate = new LinkedHashMap<>(); List handledIds = new ArrayList<>(); for (ConsumerRecord r : records) { String eventId = io.header(r, "event_id"); if (!handledService.shouldProcess("catalog-like", eventId)) continue; - JsonNode body = objectMapper.readTree(io.payloadString(r)).path("payload"); - long productId = body.path("productId").asLong(); - long likeCount = body.path("likeCount").asLong(); - - latestLike.put(productId, likeCount); - handledIds.add(eventId); - } + String eventType = io.header(r, "event_type"); + LocalDate date = io.occurredDate(r); - if (latestLike.isEmpty()) return; + long productId = io.payload(r).path("targetId").asLong(); + if (productId <= 0) continue; - for (Map.Entry e : latestLike.entrySet()) { - JsonNode payload = objectMapper.createObjectNode() - .put("productId", e.getKey()) - .put("likeCount", e.getValue()); + JsonNode payload = objectMapper.createObjectNode().put("productId", productId); JsonNode envelope = objectMapper.createObjectNode().set("payload", payload); - metricsService.onLikeChanged(envelope.toString()); + + metricsService.onLikeChanged(envelope.toString(), eventType, date); + + touchedByDate.computeIfAbsent(date, d -> new LinkedHashSet<>()).add(productId); + handledIds.add(eventId); } - rankingService.computeAndPutScoresToday(latestLike.keySet()); + for (Map.Entry> e : touchedByDate.entrySet()) { + rankingService.computeAndPutScores(e.getKey(), e.getValue()); + } handledService.saveAll("catalog-like", handledIds); } public void onProductBatch(List> records) throws Exception { - List toApply = new ArrayList<>(); - Set productIds = new LinkedHashSet<>(); + if (records == null || records.isEmpty()) return; + + Map> touchedByDate = new LinkedHashMap<>(); List handledIds = new ArrayList<>(); for (ConsumerRecord r : records) { String eventId = io.header(r, "event_id"); if (!handledService.shouldProcess("catalog-product", eventId)) continue; - JsonNode body = objectMapper.readTree(io.payloadString(r)).path("payload"); - + JsonNode body = io.payload(r); long productId = body.path("productId").asLong(); - productIds.add(productId); + long productSkuId = body.path("productSkuId").asLong(); + long amount = body.path("amount").asLong(); + if (productId <= 0 || productSkuId <= 0 || amount <= 0) continue; - toApply.add(body); - handledIds.add(eventId); - } + LocalDate date = io.occurredDate(r); - if (toApply.isEmpty()) return; + JsonNode envelope = objectMapper.createObjectNode().set("payload", body); + metricsService.onStockConfirmed(envelope.toString(), date); - for (JsonNode payload : toApply) { - JsonNode envelope = objectMapper.createObjectNode().set("payload", payload); - metricsService.onStockConfirmed(envelope.toString()); + touchedByDate.computeIfAbsent(date, d -> new LinkedHashSet<>()).add(productId); + handledIds.add(eventId); } - rankingService.computeAndPutScoresToday(productIds); + for (Map.Entry> e : touchedByDate.entrySet()) { + rankingService.computeAndPutScores(e.getKey(), e.getValue()); + } handledService.saveAll("catalog-product", handledIds); } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/RecordIO.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/RecordIO.java index a836685..d2ce960 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/domain/RecordIO.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/RecordIO.java @@ -5,20 +5,53 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.stereotype.Component; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.Optional; + @Component public class RecordIO { private final ObjectMapper M = new ObjectMapper(); - public String header(ConsumerRecord r, String key) { - return r.headers().lastHeader(key) == null ? "" : new String(r.headers().lastHeader(key).value()); + private static final ZoneId zoneId = ZoneId.of("Asia/Seoul"); + + public String header(ConsumerRecord r, String key) { + if (r.headers() == null || r.headers().lastHeader(key) == null) return ""; + return new String(r.headers().lastHeader(key).value(), StandardCharsets.UTF_8); + } + + public Optional headerOpt(ConsumerRecord r, String key) { + String v = header(r, key); + return (v == null || v.isBlank()) ? Optional.empty() : Optional.of(v); + } + + public LocalDate occurredDate(ConsumerRecord r) { + return headerDate(r, "occurred_at"); + } + + public LocalDate headerDate(ConsumerRecord r, String key) { + String occurredAt = header(r, key); + try { + if (occurredAt == null || occurredAt.isBlank()) return LocalDate.now(zoneId); + Instant inst = Instant.parse(occurredAt); + return inst.atZone(zoneId).toLocalDate(); + } catch (Exception ignore) { + return LocalDate.now(zoneId); + } } - public JsonNode json(ConsumerRecord r) throws Exception { + public JsonNode json(ConsumerRecord r) throws Exception { byte[] bytes = (byte[]) r.value(); return M.readTree(bytes); } - public String payloadString(ConsumerRecord r) { - return new String((byte[]) r.value()); + public JsonNode payload(ConsumerRecord r) throws Exception { + return json(r).path("payload"); + } + + public String payloadString(ConsumerRecord r) { + return new String((byte[]) r.value(), StandardCharsets.UTF_8); } } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/MetricsService.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/MetricsService.java index ad22db6..c372ca3 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/MetricsService.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/MetricsService.java @@ -2,16 +2,15 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.loopers.infrastructure.product.SalesSum; +import com.loopers.infrastructure.product.ProductSkuMetricsJpaRepository; import lombok.RequiredArgsConstructor; +import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.Instant; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.time.LocalDate; +import java.util.*; @Service @RequiredArgsConstructor @@ -22,82 +21,98 @@ public class MetricsService { private final ObjectMapper objectMapper; @Transactional - public void incProductViews(Map viewMap) { + public void incProductViews(Map viewMap, LocalDate date) { + if (viewMap == null || viewMap.isEmpty()) + return; + + Instant now = Instant.now(); for (Map.Entry e : viewMap.entrySet()) { long productId = e.getKey(); - long viewCnt = e.getValue() == null ? 0L : e.getValue(); - if (viewCnt <= 0) + long delta = e.getValue() == null ? 0L : e.getValue(); + if (delta <= 0) continue; - ProductMetrics productMetrics = productMetricsRepository.findById(productId) - .orElseGet(() -> ProductMetrics.builder() - .productId(productId) - .likeCnt(0) - .viewCnt(0) - .updatedAt(Instant.now()) - .build()); - - productMetrics.increaseViewCnt(viewCnt); - - productMetricsRepository.save(productMetrics); + Optional opt = productMetricsRepository.findByPkForUpdateWithLock(productId, date); + if (opt.isPresent()) { + ProductMetrics row = opt.get(); + row.addViewDelta(delta); + row.setUpdatedAt(now); + productMetricsRepository.save(row); + } else { + ProductMetrics fresh = ProductMetrics.newDailyRow(productId, date); + fresh.addViewDelta(delta); + fresh.setUpdatedAt(now); + productMetricsRepository.save(fresh); + } } } @Transactional - public void onLikeChanged(String envelopeJson) throws Exception { + public void onLikeChanged(String envelopeJson, String eventType, LocalDate date) throws Exception { JsonNode body = objectMapper.readTree(envelopeJson).path("payload"); long productId = body.path("productId").asLong(); - long likeCnt = body.path("likeCount").asLong(); - - ProductMetrics productMetrics = productMetricsRepository.findById(productId) - .orElseGet(() -> ProductMetrics.builder() - .productId(productId) - .likeCnt(0) - .viewCnt(0) - .updatedAt(Instant.now()) - .build()); - productMetrics.setLikeCntLatest(likeCnt); + long delta = 0L; + if ("LikeAdded".equalsIgnoreCase(eventType)) { + delta = 1L; + } else if ("LikeRemoved".equalsIgnoreCase(eventType)) { + delta = -1L; + } - productMetricsRepository.save(productMetrics); + Instant now = Instant.now(); + Optional opt = productMetricsRepository.findByPkForUpdateWithLock(productId, date); + if (opt.isPresent()) { + var row = opt.get(); + row.addLikeDelta(delta); + row.setUpdatedAt(now); + productMetricsRepository.save(row); + } else { + var fresh = ProductMetrics.newDailyRow(productId, date); + fresh.addLikeDelta(delta); + fresh.setUpdatedAt(now); + productMetricsRepository.save(fresh); + } } @Transactional - public void onStockConfirmed(String envelopeJson) throws Exception { - JsonNode payload = objectMapper.readTree(envelopeJson).path("payload"); - - Long productId = payload.path("productId").asLong(); - Long productSkuId = payload.path("productSkuId").asLong(); - Long qty = payload.path("amount").asLong(); - - productSkuMetricsRepository.upsertAddSales( - productId, - productSkuId, - qty, - Instant.now() - ); + public void onStockConfirmed(String envelopeJson,LocalDate date) throws Exception { + JsonNode p = objectMapper.readTree(envelopeJson).path("payload"); + Long productId = p.path("productId").asLong(); + Long productSkuId = p.path("productSkuId").asLong(); + long qty = p.path("amount").asLong(); + + Instant now = Instant.now(); + Optional opt = productSkuMetricsRepository.findByPkForUpdateWithLock(productSkuId, date); + if (opt.isPresent()) { + var row = opt.get(); + row.addSalesDelta(qty); + row.setUpdatedAt(now); + productSkuMetricsRepository.save(row); + } else { + var fresh = ProductSkuMetrics.newDailyRow(productId, productSkuId, date); + fresh.addSalesDelta(qty); + fresh.setUpdatedAt(now); + productSkuMetricsRepository.save(fresh); + } } @Transactional(readOnly = true) - public Map getProductMetrics(Set productIds) { - Map pm = new HashMap<>(); - if (productIds == null || productIds.isEmpty()) return pm; - - for (Long id : productIds) { - productMetricsRepository.findById(id).ifPresent(m -> pm.put(id, m)); - } - return pm; + public Map getProductMetrics(Set productIds, LocalDate date) { + Map map = new HashMap<>(); + if (productIds == null || productIds.isEmpty()) return map; + productMetricsRepository.findAllByIdProductIdInAndIdDate(productIds, date) + .forEach(pm -> map.put(pm.getId().getProductId(), pm)); + return map; } @Transactional(readOnly = true) - public Map getSalesSumByProductIds(Set productIds) { + public Map getSalesSumByProductIds(Set productIds, LocalDate date) { Map sales = new HashMap<>(); if (productIds == null || productIds.isEmpty()) return sales; - - List sums = productSkuMetricsRepository.sumSalesByProductIds(productIds); - for (SalesSum s : sums) { - sales.put(s.getProductId(), s.getTotal() == null ? 0L : s.getTotal()); - } + List sums = productSkuMetricsRepository.sumSalesByProductIdsAndDate(productIds, date); + sums.forEach( + s -> sales.put(s.getProductId(), s.getTotal() == null ? 0L : s.getTotal()) + ); return sales; } } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java index b8a0588..76897eb 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java @@ -1,37 +1,54 @@ package com.loopers.domain.product; -import jakarta.persistence.Entity; -import jakarta.persistence.Id; -import jakarta.persistence.Index; -import jakarta.persistence.Table; +import jakarta.persistence.*; import lombok.*; import java.time.Instant; - +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; @Entity @Table(name="product_metrics", indexes = @Index(name="idx_product_metrics_updated_at", columnList = "updatedAt")) -@Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder public class ProductMetrics { - @Id - private Long productId; + @EmbeddedId + private ProductMetricsId id; - private long likeCnt; + @Column(name = "view_cnt_delta", nullable = false) + private long viewCntDelta; - private long viewCnt; + @Column(name = "like_cnt_delta", nullable = false) + private long likeCntDelta; + @Column(name = "updated_at") private Instant updatedAt; - public void increaseViewCnt(long amount){ - if (amount > 0) { - this.viewCnt += amount; this.updatedAt = Instant.now(); + public void addViewDelta(long d){ + if (d > 0){ + this.viewCntDelta += d; } } - public void setLikeCntLatest(long latest){ - this.likeCnt = Math.max(0, latest); - this.updatedAt = Instant.now(); + public void addLikeDelta(long d){ + if (d == 0) + return; + long next = this.likeCntDelta + d; + this.likeCntDelta = Math.max(0, next); + } + + public static ProductMetrics newDailyRow(Long productId, LocalDate date) { + return ProductMetrics.builder() + .id(ProductMetricsId.of(productId, date)) + .viewCntDelta(0L) + .likeCntDelta(0L) + .updatedAt(Instant.now()) + .build(); } } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsId.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsId.java new file mode 100644 index 0000000..d53cf7b --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsId.java @@ -0,0 +1,20 @@ +package com.loopers.domain.product; + +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; +import lombok.*; + +import java.time.LocalDate; + +@Embeddable +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor(staticName = "of") +@EqualsAndHashCode +public class ProductMetricsId { + @Column(name = "product_id", nullable = false) + private Long productId; + + @Column(name = "date" , nullable = false) + private LocalDate date; +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsRepository.java index fa777b3..c0fb43a 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsRepository.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsRepository.java @@ -1,8 +1,15 @@ package com.loopers.domain.product; +import java.time.LocalDate; +import java.util.List; import java.util.Optional; +import java.util.Set; public interface ProductMetricsRepository { - ProductMetrics save(ProductMetrics productMetrics); - Optional findById(long id); + + Optional findByPkForUpdateWithLock(Long productId, LocalDate date); + + ProductMetrics save(ProductMetrics entity); + + List findAllByIdProductIdInAndIdDate(Set productIds, LocalDate date); } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetrics.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetrics.java index 87b2969..bc2eb4b 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetrics.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetrics.java @@ -1,17 +1,16 @@ package com.loopers.domain.product; -import jakarta.persistence.Entity; -import jakarta.persistence.Id; -import jakarta.persistence.Index; -import jakarta.persistence.Table; +import jakarta.persistence.*; import lombok.*; import java.time.Instant; +import java.time.LocalDate; @Entity @Table(name = "product_sku_metrics", indexes = { - @Index(name = "idx_product_sku_metrics_updated_at", columnList = "updatedAt") + @Index(name = "idx_product_sku_metrics_updated_at", columnList = "updated_at"), + @Index(name = "idx_product_sku_metrics_product_id_date", columnList = "product_id,date") }) @Getter @Setter @@ -20,14 +19,31 @@ @Builder public class ProductSkuMetrics { - @Id - private Long productSkuId; + @EmbeddedId + private ProductSkuMetricsId id; // (skuId, date) + @Column(name = "product_id", nullable = false) private Long productId; - private long salesCnt; + @Column(name = "sales_cnt_delta", nullable = false) + private long salesCntDelta; + @Column(name = "updated_at", nullable=false) private Instant updatedAt; + public void addSalesDelta(long delta){ + if (delta > 0) { + this.salesCntDelta += delta; + this.updatedAt = Instant.now(); + } + } + + public static ProductSkuMetrics newDailyRow(Long productId, Long productSkuId, LocalDate date) { + return ProductSkuMetrics.builder() + .id(ProductSkuMetricsId.of(productSkuId, date)) + .productId(productId) + .salesCntDelta(0L) + .updatedAt(Instant.now()) + .build(); + } } - diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetricsId.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetricsId.java new file mode 100644 index 0000000..7100ab7 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetricsId.java @@ -0,0 +1,21 @@ +package com.loopers.domain.product; + +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; +import lombok.*; + +import java.time.LocalDate; + +@Embeddable +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor(staticName = "of") +@EqualsAndHashCode +public class ProductSkuMetricsId { + @Column(name = "product_sku_id", nullable = false) + private Long productSkuId; + + @Column(name = "date", nullable = false) + private LocalDate date; +} + diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetricsRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetricsRepository.java index 2da2441..c3edb17 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetricsRepository.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductSkuMetricsRepository.java @@ -1,13 +1,19 @@ package com.loopers.domain.product; -import com.loopers.infrastructure.product.SalesSum; +import com.loopers.infrastructure.product.ProductSkuMetricsJpaRepository; -import java.time.Instant; -import java.util.Collection; +import java.time.LocalDate; import java.util.List; +import java.util.Optional; +import java.util.Set; public interface ProductSkuMetricsRepository { - ProductSkuMetrics save(ProductSkuMetrics productSkuMetrics); - void upsertAddSales(Long productId, Long skuId, Long delta, Instant now); - List sumSalesByProductIds(Collection ids); + + Optional findById(ProductSkuMetricsId id); + + Optional findByPkForUpdateWithLock(Long productSkuId, LocalDate date); + + ProductSkuMetrics save(ProductSkuMetrics entity); + + List sumSalesByProductIdsAndDate(Set productIds,LocalDate date); } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java index aaf8df5..f0b92f0 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java @@ -1,6 +1,50 @@ package com.loopers.infrastructure.product; import com.loopers.domain.product.ProductMetrics; +import com.loopers.domain.product.ProductMetricsId; +import jakarta.persistence.LockModeType; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; -public interface ProductMetricsJpaRepository extends JpaRepository {} +import java.time.LocalDate; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public interface ProductMetricsJpaRepository extends JpaRepository { + + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query(""" + select pm from ProductMetrics pm + where pm.id.productId = :productId and pm.id.date = :date + """) + Optional findByPkForUpdate(@Param("productId") Long productId, + @Param("date") LocalDate date); + + @Query(""" + select pm from ProductMetrics pm + where pm.id.productId in :productIds and pm.id.date = :date + """) + List findAllByIdProductIdInAndIdDate(@Param("productIds") Set productIds, + @Param("date") LocalDate date); + + @Query(""" + select coalesce(sum(pm.viewCntDelta), 0) + from ProductMetrics pm + where pm.id.productId = :productId and pm.id.date between :from and :to + """) + long sumViewDeltaBetween(@Param("productId") Long productId, + @Param("from") LocalDate from, + @Param("to") LocalDate to); + + @Query(""" + select coalesce(sum(pm.likeCntDelta), 0) + from ProductMetrics pm + where pm.id.productId = :productId and pm.id.date between :from and :to + """) + long sumLikeDeltaBetween(@Param("productId") Long productId, + @Param("from") LocalDate from, + @Param("to") LocalDate to); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsRepositoryImpl.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsRepositoryImpl.java index f7c43db..9786076 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsRepositoryImpl.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsRepositoryImpl.java @@ -1,27 +1,38 @@ package com.loopers.infrastructure.product; import com.loopers.domain.product.ProductMetrics; +import com.loopers.domain.product.ProductMetricsId; import com.loopers.domain.product.ProductMetricsRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Repository; +import java.time.Instant; +import java.time.LocalDate; +import java.util.List; import java.util.Optional; +import java.util.Set; @Repository @RequiredArgsConstructor @Slf4j public class ProductMetricsRepositoryImpl implements ProductMetricsRepository { + private final ProductMetricsJpaRepository productMetricsJpaRepository; + private final ProductMetricsJpaRepository jpa; + + + @Override public Optional findByPkForUpdateWithLock(Long productId, LocalDate date) { + return productMetricsJpaRepository.findByPkForUpdate(productId, date); + } - @Override - public ProductMetrics save(ProductMetrics productMetrics) { - return productMetricsJpaRepository.save(productMetrics); + @Override public ProductMetrics save(ProductMetrics entity) { + return productMetricsJpaRepository.save(entity); } - @Override - public Optional findById(long id) { - return productMetricsJpaRepository.findById(id); + @Override public List findAllByIdProductIdInAndIdDate(Set productIds, LocalDate date) { + return productMetricsJpaRepository.findAllByIdProductIdInAndIdDate(productIds, date); } } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductSkuMetricsJpaRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductSkuMetricsJpaRepository.java index d617920..eb18c4e 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductSkuMetricsJpaRepository.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductSkuMetricsJpaRepository.java @@ -1,40 +1,58 @@ package com.loopers.infrastructure.product; import com.loopers.domain.product.ProductSkuMetrics; +import com.loopers.domain.product.ProductSkuMetricsId; import com.loopers.domain.product.ProductSkuMetricsRepository; +import jakarta.persistence.LockModeType; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Lock; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import org.springframework.transaction.annotation.Transactional; import java.time.Instant; +import java.time.LocalDate; import java.util.Collection; import java.util.List; +import java.util.Optional; +import java.util.Set; -public interface ProductSkuMetricsJpaRepository extends JpaRepository { +public interface ProductSkuMetricsJpaRepository extends JpaRepository { - @Modifying(flushAutomatically = true, clearAutomatically = true) - @Transactional - @Query(value = """ - INSERT INTO product_sku_metrics (product_sku_id, product_id, sales_cnt, updated_at) - VALUES (:skuId, :productId, :delta, :now) - ON DUPLICATE KEY UPDATE - sales_cnt = COALESCE(sales_cnt, 0) + :delta, - updated_at = :now - """, nativeQuery = true) - void upsertAddSales(@Param("productId") Long productId, - @Param("skuId") Long skuId, - @Param("delta") Long delta, - @Param("now") Instant now); + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query(""" + select psm from ProductSkuMetrics psm + where psm.id.productSkuId = :skuId and psm.id.date = :date + """) + Optional findByPkForUpdate(@Param("skuId") Long productSkuId, + @Param("date") LocalDate date); + + @Query(""" + select psm.productId as productId, + coalesce(sum(psm.salesCntDelta), 0) as total + from ProductSkuMetrics psm + where psm.productId in :productIds + and psm.id.date = :date + group by psm.productId + """) + List sumSalesByProductIdsAndDate(@Param("productIds") Set productIds, + @Param("date") LocalDate date); @Query(""" - select p.productId as productId, - coalesce(sum(p.salesCnt), 0) as total - from ProductSkuMetrics p - where p.productId in :ids - group by p.productId - """) - List sumSalesByProductIds(@Param("ids") Collection ids); + select psm.productId as productId, + coalesce(sum(psm.salesCntDelta), 0) as total + from ProductSkuMetrics psm + where psm.productId in :productIds + and psm.id.date between :start and :end + group by psm.productId + """) + List sumSalesByProductIdsBetween(@Param("productIds") Set productIds, + @Param("start") LocalDate start, + @Param("end") LocalDate end); + public interface SalesSum { + Long getProductId(); + Long getTotal(); + } } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductSkuMetricsRepositoryImpl.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductSkuMetricsRepositoryImpl.java index 048dc4d..b980c47 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductSkuMetricsRepositoryImpl.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductSkuMetricsRepositoryImpl.java @@ -1,13 +1,16 @@ package com.loopers.infrastructure.product; import com.loopers.domain.product.ProductSkuMetrics; +import com.loopers.domain.product.ProductSkuMetricsId; import com.loopers.domain.product.ProductSkuMetricsRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Repository; -import java.time.Instant; +import java.time.LocalDate; +import java.util.List; import java.util.Optional; +import java.util.Set; @Repository @RequiredArgsConstructor @@ -16,13 +19,23 @@ public class ProductSkuMetricsRepositoryImpl implements ProductSkuMetricsReposit private final ProductSkuMetricsJpaRepository productSkuMetricsJpaRepository; - @Override - public ProductSkuMetrics save(ProductSkuMetrics productSkuMetrics) { - return productSkuMetricsJpaRepository.save(productSkuMetrics); + + private final ProductSkuMetricsJpaRepository jpa; + + @Override public Optional findById(ProductSkuMetricsId id) { + return productSkuMetricsJpaRepository.findById(id); + } + + @Override public Optional findByPkForUpdateWithLock(Long productSkuId, LocalDate date) { + return productSkuMetricsJpaRepository.findByPkForUpdate(productSkuId, date); + } + + @Override public ProductSkuMetrics save(ProductSkuMetrics entity) { + return productSkuMetricsJpaRepository.save(entity); } @Override - public void upsertAddSales(Long productId, Long skuId, Long delta, Instant now) { - productSkuMetricsJpaRepository.upsertAddSales(productId, skuId, delta, now); + public List sumSalesByProductIdsAndDate(Set productIds, LocalDate date) { + return productSkuMetricsJpaRepository.sumSalesByProductIdsAndDate(productIds,date); } } From 59147ad2ee341af1e0598e7d79d5d5b605710f75 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:38:23 +0900 Subject: [PATCH 04/12] =?UTF-8?q?refacfor=20:=20=EC=A2=8B=EC=95=84?= =?UTF-8?q?=EC=9A=94=20=EC=A7=91=EA=B3=84=EB=B0=A9=EC=8B=9D=EB=B3=80?= =?UTF-8?q?=EA=B2=BD(+1/-1)=EC=97=90=20=EB=94=B0=EB=A5=B8=20producer=20?= =?UTF-8?q?=EB=A1=9C=EC=A7=81=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../application/like/LikeEventListener.java | 26 +++++---- .../product/ProductEventListener.java | 6 +-- .../domain/product/CatalogMessage.java | 15 ++++-- ...Producer.java => ProductLikeProducer.java} | 4 +- .../message/ProductSkuProducer.java | 2 +- .../message/ProductViewProducer.java | 54 +++++++++++++++++++ .../product/ProductMetricsConsumer.java | 48 ----------------- .../product/ProductSkuMetricsConsumer.java | 50 ----------------- 8 files changed, 82 insertions(+), 123 deletions(-) rename apps/commerce-api/src/main/java/com/loopers/infrastructure/message/{ProductProducer.java => ProductLikeProducer.java} (96%) create mode 100644 apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductViewProducer.java delete mode 100644 apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/product/ProductMetricsConsumer.java delete mode 100644 apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/product/ProductSkuMetricsConsumer.java diff --git a/apps/commerce-api/src/main/java/com/loopers/application/like/LikeEventListener.java b/apps/commerce-api/src/main/java/com/loopers/application/like/LikeEventListener.java index 8d9c8a0..207a204 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/like/LikeEventListener.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/like/LikeEventListener.java @@ -4,7 +4,7 @@ import com.loopers.domain.like.LikeService; import com.loopers.domain.product.Product; import com.loopers.domain.product.ProductService; -import com.loopers.infrastructure.message.ProductProducer; +import com.loopers.infrastructure.message.ProductLikeProducer; import com.loopers.domain.product.CatalogMessage; import com.loopers.shared.event.Envelope; import lombok.RequiredArgsConstructor; @@ -18,7 +18,7 @@ public class LikeEventListener { private final LikeService likeService; private final ProductService productService; - private final ProductProducer productProducer; + private final ProductLikeProducer productLikeProducer; @Async("applicationEventTaskExecutor") @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) @@ -28,15 +28,14 @@ public void onAdded(Envelope e){ productService.updateLikeCnt(product, likeCnt); - Envelope record = Envelope.of( + Envelope record = Envelope.of( e.actorId(), - new CatalogMessage.LikeChanged( - product.getId(), - e.payload().targetType().name(), - likeCnt + new CatalogMessage.LikeAdded( + e.actorId(), + product.getId() ) ); - productProducer.send(String.valueOf(product.getId()),record); + productLikeProducer.send(String.valueOf(product.getId()),record); } @Async("applicationEventTaskExecutor") @@ -46,14 +45,13 @@ public void onRemoved(Envelope e) { Product product = productService.getProduct(e.payload().targetId()); productService.updateLikeCnt(product, likeCnt); - Envelope record = Envelope.of( + Envelope record = Envelope.of( e.actorId(), - new CatalogMessage.LikeChanged( - product.getId(), - e.payload().targetType().name(), - likeCnt + new CatalogMessage.LikeRemoved( + e.actorId(), + product.getId() ) ); - productProducer.send(String.valueOf(product.getId()),record); + productLikeProducer.send(String.valueOf(product.getId()),record); } } diff --git a/apps/commerce-api/src/main/java/com/loopers/application/product/ProductEventListener.java b/apps/commerce-api/src/main/java/com/loopers/application/product/ProductEventListener.java index 205415a..23283d2 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/product/ProductEventListener.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/product/ProductEventListener.java @@ -3,7 +3,7 @@ import com.loopers.domain.order.OrderEvent; import com.loopers.domain.payment.PaymentEvent; import com.loopers.domain.product.*; -import com.loopers.infrastructure.message.ProductProducer; +import com.loopers.infrastructure.message.ProductViewProducer; import com.loopers.shared.event.Envelope; import com.loopers.infrastructure.message.ProductSkuProducer; import lombok.RequiredArgsConstructor; @@ -20,7 +20,7 @@ public class ProductEventListener { private final ProductSkuService productSkuService; private final ProductService productService; - private final ProductProducer productProducer; + private final ProductViewProducer productViewProducer; private final ProductSkuProducer productSkuProducer; private final CacheManager cacheManager; @@ -68,6 +68,6 @@ public void onOrderSuccess(OrderEvent.ReCalStock e) { @Async("applicationEventTaskExecutor") @EventListener public void onDetailViewed(Envelope event) { - productProducer.send(String.valueOf(event.payload().productId()),event); + productViewProducer.send(String.valueOf(event.payload().productId()),event); } } diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/product/CatalogMessage.java b/apps/commerce-api/src/main/java/com/loopers/domain/product/CatalogMessage.java index 30750e8..582e179 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/product/CatalogMessage.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/product/CatalogMessage.java @@ -1,9 +1,14 @@ package com.loopers.domain.product; public class CatalogMessage { - public record LikeChanged( - Long productId, - String targetType, - long likeCount - ) {} + + public record LikeAdded( + String loginId, + Long targetId + ) {} + + public record LikeRemoved( + String loginId, + Long targetId + ) {} } diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductProducer.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductLikeProducer.java similarity index 96% rename from apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductProducer.java rename to apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductLikeProducer.java index 5bfce16..b36e89f 100644 --- a/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductProducer.java +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductLikeProducer.java @@ -13,12 +13,12 @@ @Component @RequiredArgsConstructor -public class ProductProducer { +public class ProductLikeProducer { private final KafkaTemplate kafkaTemplate; private final RetryTemplate retryTemplate; - @Value("${kafka.topic.catalog-events}") + @Value("${kafka.topic.catalog-like-events}") private String topic; @Value("${kafka.topic.producer-dlq}") diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductSkuProducer.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductSkuProducer.java index 7fb0586..0a0edd0 100644 --- a/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductSkuProducer.java +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductSkuProducer.java @@ -20,7 +20,7 @@ public class ProductSkuProducer { private final KafkaTemplate kafkaTemplate; private final RetryTemplate retryTemplate; - @Value("${kafka.topic.sku-events}") + @Value("${kafka.topic.catalog-product-events}") private String topic; @Value("${kafka.topic.producer-dlq}") diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductViewProducer.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductViewProducer.java new file mode 100644 index 0000000..4a3b6c5 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/message/ProductViewProducer.java @@ -0,0 +1,54 @@ +package com.loopers.infrastructure.message; + +import com.loopers.shared.event.Envelope; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +@Component +@RequiredArgsConstructor +public class ProductViewProducer { + + private final KafkaTemplate kafkaTemplate; + private final RetryTemplate retryTemplate; + + @Value("${kafka.topic.catalog-view-events}") + private String topic; + + @Value("${kafka.topic.producer-dlq}") + private String dlq; + + public void send(String key, Envelope envelope) { + String eventType = envelope.payload().getClass().getSimpleName(); + + ProducerRecord record = new ProducerRecord<>(topic, key, envelope); + addHeaders(record, envelope.id(), eventType, envelope.at().toString(), envelope.actorId()); + + try { + retryTemplate.execute(ctx -> { + kafkaTemplate.send(record).get(); + return null; + }); + } catch (Exception e) { + ProducerRecord dead = new ProducerRecord<>(dlq, key, envelope); + dead.headers().add(new RecordHeader("source_topic", topic.getBytes(StandardCharsets.UTF_8))); + dead.headers().add(new RecordHeader("event_type", eventType.getBytes(StandardCharsets.UTF_8))); + kafkaTemplate.send(dead); + } + } + + private void addHeaders(ProducerRecord record, String id, String type, String at, String actor) { + record.headers() + .add(new RecordHeader("event_id", id.getBytes(StandardCharsets.UTF_8))) + .add(new RecordHeader("event_type", type.getBytes(StandardCharsets.UTF_8))) + .add(new RecordHeader("event_version", "1".getBytes(StandardCharsets.UTF_8))) + .add(new RecordHeader("occurred_at", at.getBytes(StandardCharsets.UTF_8))) + .add(new RecordHeader("actor_id", actor.getBytes(StandardCharsets.UTF_8))); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/product/ProductMetricsConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/product/ProductMetricsConsumer.java deleted file mode 100644 index e189b37..0000000 --- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/product/ProductMetricsConsumer.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.loopers.interfaces.consumer.product; - -import com.loopers.domain.RecordIO; -import com.loopers.domain.eventhandle.EventHandledService; -import com.loopers.domain.product.MetricsService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.Acknowledgment; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -@RequiredArgsConstructor -public class ProductMetricsConsumer { - - private final EventHandledService handled; - private final MetricsService metrics; - private final RecordIO io; - - @KafkaListener( - topics = "${kafka.topic.catalog-events}", - groupId = "catalog-metrics", - containerFactory = "SINGLE_LISTENER_WITH_DLQ", - properties = { "auto.offset.reset=latest" } - ) - public void onMessage(ConsumerRecord r, Acknowledgment ack) throws Exception { - String eventId = io.header(r, "event_id"); - String type = io.header(r, "event_type"); - String json = io.payloadString(r); - - try { - if (!handled.isExistEvent("catalog-metrics", eventId)) { - ack.acknowledge(); return; - } - - if ("LikeChanged".equals(type)) { - metrics.onLikeChanged(json); - } - - ack.acknowledge(); - } catch (Exception e) { - log.error("ProductMetricsConsumer failed id={}", eventId, e); - throw e; - } - } -} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/product/ProductSkuMetricsConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/product/ProductSkuMetricsConsumer.java deleted file mode 100644 index f793e77..0000000 --- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/product/ProductSkuMetricsConsumer.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.loopers.interfaces.consumer.product; - -import com.loopers.domain.eventhandle.EventHandledService; -import com.loopers.domain.product.MetricsService; -import com.loopers.domain.RecordIO; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.Acknowledgment; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -@RequiredArgsConstructor -public class ProductSkuMetricsConsumer { - - private final EventHandledService eventHandledService; - private final MetricsService metricsService; - private final RecordIO io; - - @KafkaListener( - topics = "${kafka.topic.sku-events}", - groupId = "order-metrics", - containerFactory = "SINGLE_LISTENER_WITH_DLQ", - properties = { "auto.offset.reset=latest" } - ) - public void onMessage(ConsumerRecord record, Acknowledgment ack) throws Exception { - String eventId = io.header(record, "event_id"); - String type = io.header(record, "event_type"); - String json = io.payloadString(record); - - try { - if (!eventHandledService.isExistEvent("product-sku-metrics", eventId)) { - ack.acknowledge(); - return; - } - - if ("StockConfirmed".equals(type)) { - metricsService.onStockConfirmed(json); - } - - eventHandledService.save("product-sku-metrics", eventId); - ack.acknowledge(); - } catch (Exception e) { - log.error("OrderMetricsConsumer failed id={} type={}", eventId, type, e); - throw e; - } - } -} From a3a74a5f2119a55c4129b87bc25f528515ab00c3 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:40:31 +0900 Subject: [PATCH 05/12] =?UTF-8?q?feat=20:=20=EB=B0=B0=EC=B9=98=EB=A1=9C?= =?UTF-8?q?=EC=A7=81=20=EC=88=98=ED=96=89=EC=8B=9C=20=ED=95=84=EC=9A=94?= =?UTF-8?q?=ED=95=9C=20property=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/ranking/RankingProperties.java | 16 ++++++++++++++++ .../src/main/resources/application.yml | 10 ++++++++++ 2 files changed, 26 insertions(+) create mode 100644 apps/commerce-batch/src/main/java/com/loopers/support/config/ranking/RankingProperties.java diff --git a/apps/commerce-batch/src/main/java/com/loopers/support/config/ranking/RankingProperties.java b/apps/commerce-batch/src/main/java/com/loopers/support/config/ranking/RankingProperties.java new file mode 100644 index 0000000..dd0b8bc --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/support/config/ranking/RankingProperties.java @@ -0,0 +1,16 @@ +package com.loopers.support.config.ranking; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "ranking") +public record RankingProperties( + String weeklyPrefix, + String monthlyPrefix, + int topK, + int weeklyTtlDays, + int monthlyTtlDays, + Weights weight, + double carryOverAlpha +) { + public record Weights(double view, double like, double sales) {} +} diff --git a/apps/commerce-batch/src/main/resources/application.yml b/apps/commerce-batch/src/main/resources/application.yml index 2eda23d..4da8d6f 100644 --- a/apps/commerce-batch/src/main/resources/application.yml +++ b/apps/commerce-batch/src/main/resources/application.yml @@ -34,6 +34,16 @@ management: server: port: 8094 +ranking: + weekly-prefix: rank:weekly + monthly-prefix: rank:monthly + weekly-ttl-days: 14 + monthly-ttl-days: 60 + weight: + view: 0.1 + like: 0.2 + sales: 0.7 + carry-over-alpha: 0.1 --- spring: config: From 4e2c7d732dbd16a38245daa7e73e253751a1e1c3 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:41:46 +0900 Subject: [PATCH 06/12] =?UTF-8?q?feat=20:=20=EC=A3=BC=EA=B0=84,=EC=9B=94?= =?UTF-8?q?=EA=B0=84=20=EC=A7=91=EA=B3=84=EB=A5=BC=EC=9C=84=ED=95=9C=20Rea?= =?UTF-8?q?der=20=EB=B0=8F=20DTO=20=EC=B6=94=EA=B0=80=20(JDBC=EC=A0=81?= =?UTF-8?q?=EC=9A=A9)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/loopers/domain/ranking/ScoreRow.java | 9 +++ .../ranking/MonthlyFromWeeklyReader.java | 37 ++++++++++ .../ranking/WeeklyAggregateReader.java | 69 +++++++++++++++++++ 3 files changed, 115 insertions(+) create mode 100644 apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ScoreRow.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/MonthlyFromWeeklyReader.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/WeeklyAggregateReader.java diff --git a/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ScoreRow.java b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ScoreRow.java new file mode 100644 index 0000000..97db1dc --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ScoreRow.java @@ -0,0 +1,9 @@ +package com.loopers.domain.ranking; + +public record ScoreRow( + Long productId, + long viewCnt, + long likeCnt, + long salesCnt, + double score +) {} diff --git a/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/MonthlyFromWeeklyReader.java b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/MonthlyFromWeeklyReader.java new file mode 100644 index 0000000..c838258 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/MonthlyFromWeeklyReader.java @@ -0,0 +1,37 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.ScoreRow; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.sql.DataSource; + +@Configuration +public class MonthlyFromWeeklyReader { + + private static final String SQL_TMPL = """ + SELECT product_id, SUM(score) AS score + FROM product_metrics_weekly + WHERE year_week IN (%s) + GROUP BY product_id + """; + + @Bean + @StepScope + public JdbcCursorItemReader monthlyFromWeeklyReader( + DataSource ds, + @Value("#{jobParameters['weeksCsv']}") String weeksCsv + ){ + String inClause = weeksCsv.replaceAll("[^0-9W,]", "").replace(",", "','"); + var r = new JdbcCursorItemReader(); + r.setDataSource(ds); + r.setSql(SQL_TMPL.formatted("'" + inClause + "'")); + r.setRowMapper((rs, i) -> new ScoreRow( + rs.getLong("product_id"), 0, 0, 0, rs.getDouble("score") + )); + return r; + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/WeeklyAggregateReader.java b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/WeeklyAggregateReader.java new file mode 100644 index 0000000..8eb57ef --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/WeeklyAggregateReader.java @@ -0,0 +1,69 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.ScoreRow; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.sql.DataSource; + +@Configuration +public class WeeklyAggregateReader { + + private static final String SQL = """ + SELECT pid.product_id, + COALESCE(pm.views, 0) AS view_cnt, + COALESCE(pm.likes, 0) AS like_cnt, + COALESCE(ps.sales, 0) AS sales_cnt + FROM ( + SELECT product_id FROM product_metrics + WHERE `date` BETWEEN ? AND ? + UNION + SELECT product_id FROM product_sku_metrics + WHERE `date` BETWEEN ? AND ? + ) pid + LEFT JOIN ( + SELECT product_id, + SUM(view_cnt_delta) AS views, + SUM(like_cnt_delta) AS likes + FROM product_metrics + WHERE `date` BETWEEN ? AND ? + GROUP BY product_id + ) pm ON pm.product_id = pid.product_id + LEFT JOIN ( + SELECT product_id, + SUM(sales_cnt_delta) AS sales + FROM product_sku_metrics + WHERE `date` BETWEEN ? AND ? + GROUP BY product_id + ) ps ON ps.product_id = pid.product_id + """; + + @Bean + @StepScope + public JdbcCursorItemReader weeklyScoreReader( + DataSource ds, + @Value("#{jobParameters['start']}") String start, + @Value("#{jobParameters['end']}") String end + ){ + var r = new JdbcCursorItemReader(); + r.setDataSource(ds); + r.setSql(SQL); + r.setRowMapper((rs, i) -> new ScoreRow( + rs.getLong("product_id"), + rs.getLong("view_cnt"), + rs.getLong("like_cnt"), + rs.getLong("sales_cnt"), + 0.0 + )); + r.setPreparedStatementSetter(ps -> { + ps.setString(1, start); ps.setString(2, end); + ps.setString(3, start); ps.setString(4, end); + ps.setString(5, start); ps.setString(6, end); + ps.setString(7, start); ps.setString(8, end); + }); + return r; + } +} From 0adb8c48141ea93510faadd604c4bbe60f840cf9 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:43:32 +0900 Subject: [PATCH 07/12] =?UTF-8?q?feat=20:=20=EC=A3=BC=EA=B0=84,=EC=9B=94?= =?UTF-8?q?=EA=B0=84=20=EC=A7=91=EA=B3=84=EB=A5=BC=EC=9C=84=ED=95=9C=20Pro?= =?UTF-8?q?cosser=20=EB=B0=8F=20Aggragate=20DTO=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../loopers/domain/ranking/AggregatedOut.java | 9 +++ .../ProductMetricsMonthlyRepository.java | 7 ++ .../ProductMetricsWeeklyRepository.java | 7 ++ .../ranking/WeeklyAndMonthlyProcessors.java | 69 +++++++++++++++++++ .../ProductMetricsMonthlyJpaRepository.java | 8 +++ .../ProductMetricsMonthlyRepositoryImpl.java | 21 ++++++ .../ProductMetricsWeeklyJpaRepository.java | 8 +++ .../ProductMetricsWeeklyRepositoryImpl.java | 21 ++++++ 8 files changed, 150 insertions(+) create mode 100644 apps/commerce-batch/src/main/java/com/loopers/domain/ranking/AggregatedOut.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthlyRepository.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeeklyRepository.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/domain/ranking/WeeklyAndMonthlyProcessors.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsMonthlyJpaRepository.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsMonthlyRepositoryImpl.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsWeeklyJpaRepository.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsWeeklyRepositoryImpl.java diff --git a/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/AggregatedOut.java b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/AggregatedOut.java new file mode 100644 index 0000000..259bf4a --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/AggregatedOut.java @@ -0,0 +1,9 @@ +package com.loopers.domain.ranking; + +public record AggregatedOut( + String periodKey, + Long productId, + double score, + String redisKey, + String redisMember +) {} diff --git a/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthlyRepository.java b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthlyRepository.java new file mode 100644 index 0000000..8c1a360 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsMonthlyRepository.java @@ -0,0 +1,7 @@ +package com.loopers.domain.ranking; + +import java.util.Optional; + +public interface ProductMetricsMonthlyRepository { + Optional findById(ProductMetricsMonthlyId id); +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeeklyRepository.java b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeeklyRepository.java new file mode 100644 index 0000000..fd46d34 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsWeeklyRepository.java @@ -0,0 +1,7 @@ +package com.loopers.domain.ranking; + +import java.util.Optional; + +public interface ProductMetricsWeeklyRepository { + Optional findById(ProductMetricsWeeklyId id); +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/WeeklyAndMonthlyProcessors.java b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/WeeklyAndMonthlyProcessors.java new file mode 100644 index 0000000..34278ac --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/domain/ranking/WeeklyAndMonthlyProcessors.java @@ -0,0 +1,69 @@ +package com.loopers.domain.ranking; + +import com.loopers.support.config.ranking.RankingProperties; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Optional; + +@Configuration +@RequiredArgsConstructor +public class WeeklyAndMonthlyProcessors { + + private final RankingProperties props; + private final ProductMetricsMonthlyRepository productMetricsMonthlyRepository; + private final ProductMetricsWeeklyRepository productMetricsWeeklyRepository; + + @Bean + @StepScope + public ItemProcessor weeklyProcessor( + @Value("#{jobParameters['yearWeek']}") String yearWeek + ){ + return row -> { + double base = props.weight().view() * row.viewCnt() + + props.weight().like() * row.likeCnt() + + props.weight().sales() * row.salesCnt(); + + double carry = 0.0; + if (props.carryOverAlpha() > 0.0) { + var thisWeek = new ProductMetricsWeekly.YearWeek(yearWeek); + var prevWeek = thisWeek.prev(); + Optional prev = productMetricsWeeklyRepository.findById( + new ProductMetricsWeeklyId(prevWeek.value(), row.productId())); + if (prev.isPresent()) carry = prev.get().getScore() * props.carryOverAlpha(); + } + + double finalScore = Math.max(0.0, base + carry); + String redisKey = props.weeklyPrefix() + ":" + yearWeek; + + return new AggregatedOut(yearWeek, row.productId(), finalScore, redisKey, String.valueOf(row.productId())); + }; + } + + @Bean @StepScope + public ItemProcessor monthlyProcessor( + @Value("#{jobParameters['yearMonth']}") String yearMonth + ){ + return row -> { + double base = row.score(); + double carry = 0.0; + + if (props.carryOverAlpha() > 0.0) { + var thisMonth = new ProductMetricsMonthly.YearMonth(yearMonth); + var prevMonth = thisMonth.prev(); + Optional prev = productMetricsMonthlyRepository.findById( + new ProductMetricsMonthlyId(prevMonth.value(), row.productId())); + if (prev.isPresent()) carry = prev.get().getScore() * props.carryOverAlpha(); + } + + double finalScore = Math.max(0.0, base + carry); + String redisKey = props.monthlyPrefix() + ":" + yearMonth; + + return new AggregatedOut(yearMonth, row.productId(), finalScore, redisKey, String.valueOf(row.productId())); + }; + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsMonthlyJpaRepository.java b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsMonthlyJpaRepository.java new file mode 100644 index 0000000..fa1a08c --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsMonthlyJpaRepository.java @@ -0,0 +1,8 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.ProductMetricsMonthly; +import com.loopers.domain.ranking.ProductMetricsMonthlyId; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface ProductMetricsMonthlyJpaRepository + extends JpaRepository {} diff --git a/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsMonthlyRepositoryImpl.java b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsMonthlyRepositoryImpl.java new file mode 100644 index 0000000..ec2dfc6 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsMonthlyRepositoryImpl.java @@ -0,0 +1,21 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.ProductMetricsMonthly; +import com.loopers.domain.ranking.ProductMetricsMonthlyId; +import com.loopers.domain.ranking.ProductMetricsMonthlyRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +import java.util.Optional; + +@Repository +@RequiredArgsConstructor +public class ProductMetricsMonthlyRepositoryImpl implements ProductMetricsMonthlyRepository { + + private final ProductMetricsMonthlyJpaRepository productMetricsMonthlyJpaRepository; + + @Override + public Optional findById(ProductMetricsMonthlyId id) { + return productMetricsMonthlyJpaRepository.findById(id); + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsWeeklyJpaRepository.java b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsWeeklyJpaRepository.java new file mode 100644 index 0000000..767c6ec --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsWeeklyJpaRepository.java @@ -0,0 +1,8 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.ProductMetricsWeekly; +import com.loopers.domain.ranking.ProductMetricsWeeklyId; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface ProductMetricsWeeklyJpaRepository + extends JpaRepository {} diff --git a/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsWeeklyRepositoryImpl.java b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsWeeklyRepositoryImpl.java new file mode 100644 index 0000000..5825865 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductMetricsWeeklyRepositoryImpl.java @@ -0,0 +1,21 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.ProductMetricsWeekly; +import com.loopers.domain.ranking.ProductMetricsWeeklyId; +import com.loopers.domain.ranking.ProductMetricsWeeklyRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +import java.util.Optional; + +@Repository +@RequiredArgsConstructor +public class ProductMetricsWeeklyRepositoryImpl implements ProductMetricsWeeklyRepository { + + private final ProductMetricsWeeklyJpaRepository productMetricsWeeklyJpaRepository; + + @Override + public Optional findById(ProductMetricsWeeklyId id) { + return productMetricsWeeklyJpaRepository.findById(id); + } +} From f281264b1da448cad256f17f510e431cbd843cdb Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:44:21 +0900 Subject: [PATCH 08/12] =?UTF-8?q?feat=20:=20=EC=A3=BC=EA=B0=84,=EC=9B=94?= =?UTF-8?q?=EA=B0=84=EC=A7=91=EA=B3=84=20batch=20writer=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80=20(JDBC=EC=A0=81=EC=9A=A9)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ranking/MonthlyDbWriter.java | 30 +++++++++++ .../ranking/RedisRankingWriter.java | 52 +++++++++++++++++++ .../ranking/WeeklyDbWriter.java | 30 +++++++++++ 3 files changed, 112 insertions(+) create mode 100644 apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/MonthlyDbWriter.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/RedisRankingWriter.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/WeeklyDbWriter.java diff --git a/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/MonthlyDbWriter.java b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/MonthlyDbWriter.java new file mode 100644 index 0000000..585a802 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/MonthlyDbWriter.java @@ -0,0 +1,30 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.AggregatedOut; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; +import org.springframework.batch.item.database.JdbcBatchItemWriter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.sql.DataSource; + +@Configuration +public class MonthlyDbWriter { + + private static final String UPSERT = """ + INSERT INTO product_metrics_monthly (year_month, product_id, score, updated_at) + VALUES (:periodKey, :productId, :score, NOW()) + ON DUPLICATE KEY UPDATE score = VALUES(score), updated_at = NOW() + """; + + @Bean + @StepScope + public JdbcBatchItemWriter monthlyUpsertWriter(DataSource ds){ + var w = new JdbcBatchItemWriter(); + w.setDataSource(ds); + w.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); + w.setSql(UPSERT); + return w; + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/RedisRankingWriter.java b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/RedisRankingWriter.java new file mode 100644 index 0000000..aa96fd9 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/RedisRankingWriter.java @@ -0,0 +1,52 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.AggregatedOut; +import com.loopers.support.config.ranking.RankingProperties; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.data.redis.core.BoundZSetOperations; +import org.springframework.data.redis.core.DefaultTypedTuple; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.ZSetOperations; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.LinkedHashSet; +import java.util.Set; + +@Component +@RequiredArgsConstructor +public class RedisRankingWriter implements ItemWriter { + + private final StringRedisTemplate redis; + private final RankingProperties props; + + + @Override + public void write(Chunk chunk) throws Exception { + if (chunk == null || chunk.isEmpty()) return; + + String key = chunk.getItems().get(0).redisKey(); + BoundZSetOperations zops = redis.boundZSetOps(key); + + Set> tuples = new LinkedHashSet<>(chunk.size()); + for (AggregatedOut it : chunk) { + tuples.add(new DefaultTypedTuple<>(it.redisMember(), it.score())); + } + + if (tuples.isEmpty()) return; + + zops.add(tuples); + + zops.removeRange(0, -(props.topK() + 1)); + + Long ttl = redis.getExpire(key); + if (ttl == null || ttl < 0) { + int days = key.startsWith(props.weeklyPrefix() + ":") + ? props.weeklyTtlDays() + : props.monthlyTtlDays(); + zops.expire(Duration.ofDays(days)); + } + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/WeeklyDbWriter.java b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/WeeklyDbWriter.java new file mode 100644 index 0000000..5362c95 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/WeeklyDbWriter.java @@ -0,0 +1,30 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.AggregatedOut; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; +import org.springframework.batch.item.database.JdbcBatchItemWriter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.sql.DataSource; + +@Configuration +public class WeeklyDbWriter { + + private static final String UPSERT = """ + INSERT INTO product_metrics_weekly (year_week, product_id, score, updated_at) + VALUES (:periodKey, :productId, :score, NOW()) + ON DUPLICATE KEY UPDATE score = VALUES(score), updated_at = NOW() + """; + + @Bean + @StepScope + public JdbcBatchItemWriter weeklyUpsertWriter(DataSource ds){ + var w = new JdbcBatchItemWriter(); + w.setDataSource(ds); + w.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); + w.setSql(UPSERT); + return w; + } +} From 5d189aa112902e3cc8a772f810c23b6639e0c7c3 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:45:06 +0900 Subject: [PATCH 09/12] =?UTF-8?q?feat=20:=20Job,=20Step=20Config=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/job/MonthlyRankingJobConfig.java | 50 +++++++++++++++++++ .../batch/job/WeeklyRankingJobConfig.java | 50 +++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 apps/commerce-batch/src/main/java/com/loopers/batch/job/MonthlyRankingJobConfig.java create mode 100644 apps/commerce-batch/src/main/java/com/loopers/batch/job/WeeklyRankingJobConfig.java diff --git a/apps/commerce-batch/src/main/java/com/loopers/batch/job/MonthlyRankingJobConfig.java b/apps/commerce-batch/src/main/java/com/loopers/batch/job/MonthlyRankingJobConfig.java new file mode 100644 index 0000000..44181f5 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/batch/job/MonthlyRankingJobConfig.java @@ -0,0 +1,50 @@ +package com.loopers.batch.job; + +import com.loopers.domain.ranking.AggregatedOut; +import com.loopers.infrastructure.ranking.RedisRankingWriter; +import com.loopers.domain.ranking.ScoreRow; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.database.JdbcBatchItemWriter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +@Configuration +@RequiredArgsConstructor +public class MonthlyRankingJobConfig { + + private final JobRepository repo; + private final PlatformTransactionManager tx; + + @Bean + public Job monthlyRankingJob(Step monthlyStep) { + return new JobBuilder("monthlyRankingJob", repo) + .start(monthlyStep) + .build(); + } + + @Bean + public Step monthlyStep( + ItemReader monthlyFromWeeklyReader, + ItemProcessor monthlyProcessor, + JdbcBatchItemWriter monthlyUpsertWriter, + RedisRankingWriter redisWriter + ){ + return new StepBuilder("monthlyStep", repo) + .chunk(1000, tx) + .reader(monthlyFromWeeklyReader) + .processor(monthlyProcessor) + .writer(items -> { + monthlyUpsertWriter.write(items); + redisWriter.write(items); + }) + .build(); + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/batch/job/WeeklyRankingJobConfig.java b/apps/commerce-batch/src/main/java/com/loopers/batch/job/WeeklyRankingJobConfig.java new file mode 100644 index 0000000..f4a6396 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/batch/job/WeeklyRankingJobConfig.java @@ -0,0 +1,50 @@ +package com.loopers.batch.job; + +import com.loopers.domain.ranking.AggregatedOut; +import com.loopers.infrastructure.ranking.RedisRankingWriter; +import com.loopers.domain.ranking.ScoreRow; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.database.JdbcBatchItemWriter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +@Configuration +@RequiredArgsConstructor +public class WeeklyRankingJobConfig { + + private final JobRepository repo; + private final PlatformTransactionManager tx; + + @Bean + public Job weeklyRankingJob(Step weeklyStep) { + return new JobBuilder("weeklyRankingJob", repo) + .start(weeklyStep) + .build(); + } + + @Bean + public Step weeklyStep( + ItemReader weeklyScoreReader, + ItemProcessor weeklyProcessor, + JdbcBatchItemWriter weeklyUpsertWriter, + RedisRankingWriter redisWriter + ){ + return new StepBuilder("weeklyStep", repo) + .chunk(1000, tx) + .reader(weeklyScoreReader) + .processor(weeklyProcessor) + .writer(items -> { + weeklyUpsertWriter.write(items); + redisWriter.write(items); + }) + .build(); + } +} From 9c213ca7c52cbd95de569ee2dd21219c3156c632 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:45:40 +0900 Subject: [PATCH 10/12] =?UTF-8?q?feat=20:=20=EB=B0=B0=EC=B9=98=EC=8B=A4?= =?UTF-8?q?=ED=96=89=EC=9D=84=20=EC=9C=84=ED=95=9C=20=EC=8A=A4=EC=BC=80?= =?UTF-8?q?=EC=A5=B4=EB=9F=AC=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../interfaces/schedular/BatchSchedulers.java | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 apps/commerce-batch/src/main/java/com/loopers/interfaces/schedular/BatchSchedulers.java diff --git a/apps/commerce-batch/src/main/java/com/loopers/interfaces/schedular/BatchSchedulers.java b/apps/commerce-batch/src/main/java/com/loopers/interfaces/schedular/BatchSchedulers.java new file mode 100644 index 0000000..cb7d01f --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/interfaces/schedular/BatchSchedulers.java @@ -0,0 +1,67 @@ +package com.loopers.interfaces.schedular; + +import com.loopers.domain.ranking.ProductMetricsMonthly; +import com.loopers.domain.ranking.ProductMetricsWeekly; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; + +import java.time.LocalDate; +import java.util.StringJoiner; + +@Slf4j +@Configuration +@EnableScheduling +@RequiredArgsConstructor +public class BatchSchedulers { + + private final JobLauncher jobLauncher; + private final Job weeklyRankingJob; + private final Job monthlyRankingJob; + + @Scheduled(cron = "0 5 1 * * *") // 매일 01:05 + public void runWeekly() throws Exception { + LocalDate today = LocalDate.now(); + var yw = ProductMetricsWeekly.YearWeek.of(today); + var start = ProductMetricsWeekly.YearWeek.start(today); + var end = ProductMetricsWeekly.YearWeek.end(today); + + JobParameters params = new JobParametersBuilder() + .addString("yearWeek", yw.value()) + .addString("start", start.toString()) + .addString("end", end.toString()) + .addLong("ts", System.currentTimeMillis()) + .toJobParameters(); + + jobLauncher.run(weeklyRankingJob, params); + } + + @Scheduled(cron = "0 20 1 * * *") // 매일 01:20 + public void runMonthly() throws Exception { + LocalDate today = LocalDate.now(); + var ym = ProductMetricsMonthly.YearMonth.of(today); + + LocalDate first = today.withDayOfMonth(1); + LocalDate last = today.withDayOfMonth(today.lengthOfMonth()); + StringJoiner weeks = new StringJoiner(","); + LocalDate cur = first; + while(!cur.isAfter(last)){ + weeks.add(ProductMetricsWeekly.YearWeek.of(cur).value()); + cur = cur.plusDays(1); + } + + JobParameters params = new JobParametersBuilder() + .addString("yearMonth", ym.value()) + .addString("weeksCsv", weeks.toString()) + .addLong("ts", System.currentTimeMillis()) + .toJobParameters(); + + jobLauncher.run(monthlyRankingJob, params); + } +} From 7c7710638dbc1d2faf4c6168e64f2d7a2c8b1cd4 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:46:16 +0900 Subject: [PATCH 11/12] =?UTF-8?q?feat=20:=20=EC=A3=BC=EA=B0=84,=EC=9B=94?= =?UTF-8?q?=EA=B0=84=EB=9E=AD=ED=82=B9=20=EC=A1=B0=ED=9A=8C=EA=B8=B0?= =?UTF-8?q?=EB=8A=A5=20=EC=B6=94=EA=B0=80=20(period=20=ED=8C=8C=EB=9D=BC?= =?UTF-8?q?=EB=AF=B8=ED=84=B0=EB=A5=BC=20=ED=86=B5=ED=95=9C=20=EB=B6=84?= =?UTF-8?q?=EA=B8=B0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rank/RankApplicationService.java | 6 ++-- .../java/com/loopers/domain/rank/Rank.java | 26 ++++++++++++++- .../com/loopers/domain/rank/RankCommand.java | 7 ++-- .../domain/rank/RankingProperties.java | 4 ++- .../loopers/domain/rank/RankingService.java | 33 ++++++++++--------- .../api/controller/rank/RankController.java | 8 ++--- .../api/controller/rank/RankV1ApiSpec.java | 7 ++-- .../src/main/resources/application.yml | 11 ++++--- .../domain/ranking/RankingService.java | 29 ++++++++-------- 9 files changed, 80 insertions(+), 51 deletions(-) diff --git a/apps/commerce-api/src/main/java/com/loopers/application/rank/RankApplicationService.java b/apps/commerce-api/src/main/java/com/loopers/application/rank/RankApplicationService.java index a0ee791..b14d168 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/rank/RankApplicationService.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/rank/RankApplicationService.java @@ -11,6 +11,7 @@ import org.springframework.transaction.annotation.Transactional; import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -26,10 +27,11 @@ public class RankApplicationService implements RankUsecase { @Override @Transactional(readOnly = true) public List getProductRanking(RankCommand.ProductRanking command) { - LocalDate date = LocalDate.parse(command.date()); + LocalDate date = LocalDate.parse(command.date(), DateTimeFormatter.BASIC_ISO_DATE); int size = command.size(); + String period = command.period() == null ? "daily" : command.period().toLowerCase(); - List rankList = rankingService.getProductRank(date, size); + List rankList = rankingService.getProductRank(date, size, period); if (rankList.isEmpty()) return List.of(); diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/rank/Rank.java b/apps/commerce-api/src/main/java/com/loopers/domain/rank/Rank.java index f8d64e3..6895dde 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/rank/Rank.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/rank/Rank.java @@ -2,9 +2,9 @@ import lombok.*; -import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.time.temporal.WeekFields; @Getter @ToString @@ -16,7 +16,31 @@ public class Rank { private final Long position; private final double score; + private static final DateTimeFormatter D_YYYYMMDD = DateTimeFormatter.BASIC_ISO_DATE; + public static Rank create(Long productId, Long position, double score) { return new Rank(productId, position, score); } + + public static String buildKey(String periodRaw, LocalDate date, + String dailyPrefix, String weeklyPrefix, String monthlyPrefix) { + final String period = (periodRaw == null ? "daily" : periodRaw.toLowerCase()); + + return switch (period) { + case "weekly" -> weeklyPrefix + ":" + toYearWeek(date); + case "monthly" -> monthlyPrefix + ":" + toYearMonth(date); + default -> dailyPrefix + ":" + date.format(D_YYYYMMDD); + }; + } + + private static String toYearWeek(LocalDate date) { + var wf = WeekFields.ISO; + int w = date.get(wf.weekOfWeekBasedYear()); + int y = date.get(wf.weekBasedYear()); + return "%04dW%02d".formatted(y, w); + } + + private static String toYearMonth(LocalDate date) { + return DateTimeFormatter.ofPattern("yyyyMM").format(date); + } } diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankCommand.java b/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankCommand.java index 2c73794..7456502 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankCommand.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankCommand.java @@ -5,10 +5,11 @@ public class RankCommand { public record ProductRanking( String date, - int size + int size, + String period ){ - public static ProductRanking create(LocalDate date, int size) { - return new ProductRanking(date.toString(), size); + public static ProductRanking create(String date, int size,String period) { + return new ProductRanking(date.toString(), size,period); } } } diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankingProperties.java b/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankingProperties.java index 6674ed6..3703706 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankingProperties.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankingProperties.java @@ -4,5 +4,7 @@ @ConfigurationProperties(value = "ranking") public record RankingProperties( - String keyPrefix + String keyPrefix, + String weeklyPrefix, + String monthlyPrefix ){} diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankingService.java b/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankingService.java index 7a0c9df..ceadec6 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankingService.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/rank/RankingService.java @@ -24,48 +24,49 @@ public class RankingService { private final RankingProperties props; - public List getProductRank(LocalDate date, int size) { + public List getProductRank(LocalDate date, int size, String period) { if (size <= 0) return List.of(); - final String key = props.keyPrefix() + ":" + date.format(dateTimeFormatter); + String key = Rank.buildKey(period, date, + props.keyPrefix(), + props.weeklyPrefix(), + props.monthlyPrefix() + ); Set> tuples = stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, 0, Math.max(0, size - 1)); - if (tuples == null || tuples.isEmpty()) - return List.of(); + if (tuples == null || tuples.isEmpty()) return List.of(); List result = new ArrayList<>(tuples.size()); int position = 1; for (ZSetOperations.TypedTuple t : tuples) { - String productIdStr = t.getValue(); Double score = t.getScore(); - if (productIdStr == null) - continue; + if (productIdStr == null) continue; long productId = Long.parseLong(productIdStr); - - result.add(new Rank(productId, (long) position++, score == null ? 0d : score)); + result.add(Rank.create(productId, (long) position++, score == null ? 0d : score)); } return result; } - public Rank getRankInfo(LocalDate date, Long productId) { + public Rank getRankInfo(LocalDate date, Long productId, String period) { if (productId == null) return null; - final String key = props.keyPrefix() + ":" + date.format(dateTimeFormatter); + String key = Rank.buildKey(period, date, + props.keyPrefix(), + props.weeklyPrefix(), + props.monthlyPrefix() + ); final String member = productId.toString(); Long zeroBased = stringRedisTemplate.opsForZSet().reverseRank(key, member); - - if (zeroBased == null) - return null; + if (zeroBased == null) return null; Double score = stringRedisTemplate.opsForZSet().score(key, member); - int position = Math.toIntExact(zeroBased) + 1; - return new Rank(productId, (long) position, score == null ? 0d : score); + return Rank.create(productId, (long) position, score == null ? 0d : score); } } diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/controller/rank/RankController.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/controller/rank/RankController.java index 1117b6f..a76cb72 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/controller/rank/RankController.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/controller/rank/RankController.java @@ -5,7 +5,6 @@ import com.loopers.domain.rank.RankInfo; import com.loopers.interfaces.api.ApiResponse; import lombok.RequiredArgsConstructor; -import org.springframework.format.annotation.DateTimeFormat; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -24,10 +23,11 @@ public class RankController implements RankV1ApiSpec{ @Override @GetMapping public ApiResponse rank( - @RequestParam @DateTimeFormat(pattern = "yyyyMMdd") LocalDate date, - @RequestParam(defaultValue = "20") int size + @RequestParam String date, + @RequestParam(defaultValue = "20") int size, + @RequestParam(defaultValue = "daily") String period ) { - List infos = rankUsecase.getProductRanking(RankCommand.ProductRanking.create(date,size)); + List infos = rankUsecase.getProductRanking(RankCommand.ProductRanking.create(date,size,period)); return ApiResponse.success(RankV1Response.ProductRankList.from(infos)); } } diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/controller/rank/RankV1ApiSpec.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/controller/rank/RankV1ApiSpec.java index 823fb25..f155538 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/controller/rank/RankV1ApiSpec.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/controller/rank/RankV1ApiSpec.java @@ -3,10 +3,8 @@ import com.loopers.interfaces.api.ApiResponse; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; -import org.springframework.format.annotation.DateTimeFormat; import org.springframework.web.bind.annotation.RequestParam; -import java.time.LocalDate; @Tag(name = "랭킹", description = "랭킹 관련 API") public interface RankV1ApiSpec { @@ -15,7 +13,8 @@ public interface RankV1ApiSpec { description = "사용자가 요청한 날짜, 사이즈에 대해 상품랭킹 조회" ) ApiResponse rank( - @RequestParam @DateTimeFormat LocalDate date, - @RequestParam int size + @RequestParam String date, + @RequestParam(defaultValue = "20") int size, + @RequestParam(defaultValue = "daily") String period ); } diff --git a/apps/commerce-api/src/main/resources/application.yml b/apps/commerce-api/src/main/resources/application.yml index 9fdcf3d..a9592ec 100644 --- a/apps/commerce-api/src/main/resources/application.yml +++ b/apps/commerce-api/src/main/resources/application.yml @@ -46,7 +46,8 @@ kafka: ranking: key-prefix: rank:daily - top-k: 10 + weekly-prefix: rank:weekly + monthly-prefix: rank:monthly daily-ttl-days: 1 carry-over-alpha: 0.10 @@ -90,9 +91,11 @@ pg: kafka: topic: activity-events: activity-events - catalog-events: catalog-events - sku-events: sku-events - producer-dlq: producer-dlq + catalog-like-events: catalog-like-events + catalog-view-events: catalog-view-events + catalog-product-events: catalog-product-events + consumer-dlq: consumer-dlq + producer-dlq: producer-dlq --- spring: diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingService.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingService.java index 43ecfe4..1636e0d 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingService.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingService.java @@ -19,7 +19,7 @@ @RequiredArgsConstructor public class RankingService { - private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.BASIC_ISO_DATE; + private static final DateTimeFormatter YMD = DateTimeFormatter.BASIC_ISO_DATE; private final @Qualifier(RedisConfig.STRING_TEMPLATE_MASTER) StringRedisTemplate stringRedisTemplate; @@ -27,38 +27,35 @@ public class RankingService { private final RankingProperties props; private final MetricsService metricsService; - public void computeAndPutScoresToday(Set productIds) { - computeAndPutScores(LocalDate.now(ZoneId.of("Asia/Seoul")), productIds); - } - public void computeAndPutScores(LocalDate date, Set productIds) { if (productIds == null || productIds.isEmpty()) return; - String key = props.keyPrefix() + ":" + date.format(dateTimeFormatter); + String key = props.keyPrefix() + ":" + date.format(YMD); BoundZSetOperations zops = stringRedisTemplate.boundZSetOps(key); - Map productMetrics = metricsService.getProductMetrics(productIds); - Map salesByProductIds = metricsService.getSalesSumByProductIds(productIds); + Map pmById = metricsService.getProductMetrics(productIds, date); + + Map salesByProductIds = metricsService.getSalesSumByProductIds(productIds, date); double vw = props.viewWeight(); double lw = props.likeWeight(); double sw = props.salesWeight(); - Set> tuples = new LinkedHashSet<>(productIds.size()); + Set> tuples = new LinkedHashSet<>(productIds.size()); for (Long id : productIds) { - long viewCnt = productMetrics.get(id).getViewCnt(); - long likeCnt = productMetrics.get(id).getLikeCnt(); - long salesSum = salesByProductIds.getOrDefault(id, 0L); - - double score = viewCnt * vw + likeCnt * lw + salesSum * sw; + ProductMetrics pm = pmById.get(id); + long viewDelta = (pm == null ? 0L : pm.getViewCntDelta()); + long likeDelta = (pm == null ? 0L : pm.getLikeCntDelta()); + long salesDelta = salesByProductIds.getOrDefault(id, 0L); + double score = viewDelta * vw + likeDelta * lw + salesDelta * sw; tuples.add(new DefaultTypedTuple<>(String.valueOf(id), score)); } - if (tuples.isEmpty()) - return; + if (tuples.isEmpty()) return; zops.add(tuples); zops.expire(Duration.ofDays(props.dailyTtlDays())); } } + From 6d2cb7ba464bbf2d9ae096ec70d9f2a9785a9d68 Mon Sep 17 00:00:00 2001 From: Lexyyaa Date: Fri, 19 Sep 2025 08:50:42 +0900 Subject: [PATCH 12/12] =?UTF-8?q?feat=20:=20=EC=83=81=ED=92=88=20=EC=83=81?= =?UTF-8?q?=EC=83=88=EC=A1=B0=ED=9A=8C=EC=8B=9C=EC=97=90=EB=8A=94=20?= =?UTF-8?q?=EC=9D=BC=EA=B0=84=EB=9E=AD=ED=82=B9=EC=A0=95=EB=B3=B4=EB=A7=8C?= =?UTF-8?q?=20=EC=B6=9C=EB=A0=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../application/product/ProductApplicationService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/commerce-api/src/main/java/com/loopers/application/product/ProductApplicationService.java b/apps/commerce-api/src/main/java/com/loopers/application/product/ProductApplicationService.java index 248026d..e06c8cb 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/product/ProductApplicationService.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/product/ProductApplicationService.java @@ -52,7 +52,7 @@ public ProductInfo.Detail getProductDetail(String loginId, Long productId) { String brandName = brandService.get(product.getBrandId()).getName(); // 오늘 기준 랭킹 조회 (없으면 null) - Rank rank = rankingService.getRankInfo(LocalDate.now(ZoneId.of("Asia/Seoul")), productId); + Rank rank = rankingService.getRankInfo(LocalDate.now(ZoneId.of("Asia/Seoul")), productId,"daily"); // 사용자 활동로그(상품 조회) productActivityPublisher.productDetail(new ProductActivityPayload.ProductDetailViewed(loginId, productId)); @@ -78,7 +78,7 @@ public ProductInfo.Detail getProductDetailWithCacheable(String loginId, Long pro String brandName = brandService.get(product.getBrandId()).getName(); // 오늘 기준 랭킹 조회 (없으면 null) - Rank rank = rankingService.getRankInfo(LocalDate.now(ZoneId.of("Asia/Seoul")), productId); + Rank rank = rankingService.getRankInfo(LocalDate.now(ZoneId.of("Asia/Seoul")), productId,"daily"); // 사용자 활동로그(상품 조회) productActivityPublisher.productDetail(new ProductActivityPayload.ProductDetailViewed(loginId, productId));