Conversation
Summary of ChangesHello @ding113, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! 此拉取请求旨在通过引入一个基于 Redis Pub/Sub 的缓存失效通知机制来增强系统的缓存管理能力。它提供了一种高效的方式来通知服务中的其他组件缓存已更新或失效,从而确保数据一致性。同时,为了保证系统的健壮性和可靠性,该模块还包含了对各种异常情况的详细处理和全面的单元测试,并确保在应用关闭时能够优雅地释放资源。 Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
| } | ||
|
|
||
| // 订阅成功后才更新本地状态 | ||
| const callbacks = existing ?? new Set<CacheInvalidationCallback>(); |
There was a problem hiding this comment.
这里存在一个严重的竞态条件,可能导致回调丢失。
当两个 subscribeCacheInvalidation 调用并发地为一个新 channel 执行时:
- 两个调用都会发现
existing为undefined。 - 两个调用都会执行
await sub.subscribe(channel)。 - 第一个完成的调用会使用其作用域内的
existing(此时为undefined) 创建一个新的Set,添加它的回调,然后设置到subscriptionsMap 中。 - 第二个完成的调用也会使用它自己作用域内的
existing(同样为undefined) 创建一个全新的Set,这会覆盖第一个调用设置的Set,导致第一个回调丢失。
问题在于 existing 变量在 await 之后是过时的。为了修复这个问题,你应该在 await 之后重新从 subscriptions Map 中获取回调集合,而不是使用陈旧的 existing 变量。
这将确保你总是在最新的回调集合上进行操作,从而避免数据丢失。
| const callbacks = existing ?? new Set<CacheInvalidationCallback>(); | |
| const callbacks = subscriptions.get(channel) ?? new Set<CacheInvalidationCallback>(); |
| try { | ||
| void currentSubscriber.unsubscribe(channel).catch((error) => { | ||
| logger.warn("[RedisPubSub] Failed to unsubscribe cache invalidation", { | ||
| channel, | ||
| error, | ||
| }); | ||
| }); | ||
| } catch (error) { | ||
| logger.warn("[RedisPubSub] Failed to unsubscribe cache invalidation", { | ||
| channel, | ||
| error, | ||
| }); | ||
| } |
There was a problem hiding this comment.
unsubscribe 的错误处理逻辑有些重复和复杂。try...catch 块和 .catch() 处理程序都在做同样的事情(记录警告),这使得代码难以阅读和维护。这可以被简化。
你可以使用 Promise.resolve() 来包装 unsubscribe 调用。这会捕获同步抛出的错误并将其转换为一个 rejected promise,然后你就可以用一个 .catch() 来处理所有类型的错误。
Promise.resolve(currentSubscriber.unsubscribe(channel)).catch((error) => {
logger.warn("[RedisPubSub] Failed to unsubscribe cache invalidation", {
channel,
error,
});
});| expect(() => cleanup()).not.toThrow(); | ||
|
|
||
| // 等待 microtask queue 清空,确保 Promise.resolve().then().catch() 链完成 | ||
| await new Promise((resolve) => setTimeout(resolve, 10)); |
There was a problem hiding this comment.
在测试中使用 setTimeout 加一个固定的延迟(如 10ms)来等待异步操作完成,这种做法比较脆弱,可能会导致测试不稳定(flaky)。测试的执行时间可能因环境而异。
为了使测试更健壮,建议使用 setImmediate 来等待事件循环的下一个 tick。这可以确保所有已排队的 microtask 都已执行完毕,而无需依赖任意的等待时间。
| await new Promise((resolve) => setTimeout(resolve, 10)); | |
| await new Promise((resolve) => setImmediate(resolve)); |
🧪 测试结果
总体结果: ✅ 所有测试通过 |
| try { | ||
| const sub = ensureSubscriber(baseClient); | ||
|
|
||
| const existing = subscriptions.get(channel); |
There was a problem hiding this comment.
[High] [LOGIC-BUG] Concurrent subscriptions can drop callbacks
src/lib/redis/pubsub.ts:70
Why this is a problem: existing is captured before await sub.subscribe(channel). If two subscribeCacheInvalidation() calls for the same channel run concurrently, the later call can overwrite subscriptions with a new Set created from stale existing, dropping previously registered callbacks.
Suggested fix:
const existing = subscriptions.get(channel);
const isFirstSubscriberForChannel = !existing;
if (isFirstSubscriberForChannel) {
await sub.subscribe(channel);
}
// Re-read after await to avoid clobbering concurrent subscribers
const callbacks = subscriptions.get(channel) ?? new Set<CacheInvalidationCallback>();
callbacks.add(callback);
subscriptions.set(channel, callbacks);|
|
||
| for (const cb of callbacks) { | ||
| try { | ||
| cb(); |
There was a problem hiding this comment.
[High] [ERROR-SILENT] Async callback rejections can become unhandled
src/lib/redis/pubsub.ts:27
Why this is a problem: In the "message" handler, callbacks are invoked via cb() inside try/catch, which only catches synchronous throws. In TypeScript, an async () => {} is assignable to () => void, so a rejecting async callback will surface as an unhandled promise rejection (no catch), which can destabilize the process.
Suggested fix:
type CacheInvalidationCallback = () => void | Promise<void>;
for (const cb of callbacks) {
try {
const maybePromise = cb();
void Promise.resolve(maybePromise).catch((error) => {
logger.error("[RedisPubSub] Callback error", { channel, error });
});
} catch (error) {
logger.error("[RedisPubSub] Callback error", { channel, error });
}
}There was a problem hiding this comment.
Code Review Summary
This PR introduces a new Redis Pub/Sub cache invalidation utility plus tests, and wires subscriber shutdown into instrumentation.
Two high-risk issues exist in the new subscribeCacheInvalidation() implementation: a concurrency bug that can drop callbacks, and missing handling for async callback rejections that can surface as unhandled promise rejections.
PR Size: L
- Lines changed: 507
- Files changed: 3
- Split suggestion: (1)
src/lib/redis/pubsub.ts+src/lib/redis/__tests__/pubsub.test.ts, (2)src/instrumentation.tsshutdown-hook wiring.
Issues Found
| Category | Critical | High | Medium | Low |
|---|---|---|---|---|
| Logic/Bugs | 0 | 1 | 0 | 0 |
| Security | 0 | 0 | 0 | 0 |
| Error Handling | 0 | 1 | 0 | 0 |
| Types | 0 | 0 | 0 | 0 |
| Comments/Docs | 0 | 0 | 0 | 0 |
| Tests | 0 | 0 | 0 | 0 |
| Simplification | 0 | 0 | 0 | 0 |
Critical Issues (Must Fix)
None.
High Priority Issues (Should Fix)
src/lib/redis/pubsub.ts:70[High] [LOGIC-BUG] ConcurrentsubscribeCacheInvalidation()calls can overwritesubscriptionswith stale state and drop callbacks.src/lib/redis/pubsub.ts:27[High] [ERROR-SILENT] Async callback rejections are not caught and can become unhandled promise rejections.
Review Coverage
- Logic and correctness
- Security (OWASP Top 10)
- Error handling
- Type safety
- Documentation accuracy
- Test coverage
- Code clarity
Automated review by Codex AI
There was a problem hiding this comment.
- Reviewed PR
#494and applied labelsize/L(507 lines, 3 files). - Left 2 inline comments on new code:
src/lib/redis/pubsub.ts:70— [High] [LOGIC-BUG] concurrentsubscribeCacheInvalidation()calls can drop callbacks due to staleexistingacrossawait.src/lib/redis/pubsub.ts:27— [High] [ERROR-SILENT] async callback rejections aren’t handled and can become unhandled promise rejections.
- Submitted the required summary review on the PR (includes L-size split suggestion).
There was a problem hiding this comment.
Code Review Summary
No significant issues identified in this PR.
PR Size: L
- Lines changed: 507
- Files changed: 3
This is a large PR focused on adding comprehensive test coverage. The implementation demonstrates strong engineering practices with proper error handling, graceful degradation, and thorough test coverage.
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - Clean
- Type safety - Clean
- Documentation accuracy - Clean
- Test coverage - Comprehensive (11 test cases covering all scenarios)
- Code clarity - Good
Key Strengths
- Comprehensive test coverage: 11 test cases covering happy paths, error scenarios, edge cases, and graceful degradation
- Proper error isolation: All error scenarios log warnings and continue execution (fail-open pattern)
- Resource management: Graceful shutdown integration prevents connection leaks
- Reference counting: Multi-callback subscription with proper cleanup
- Mock architecture: Well-designed EventEmitter-based mocks simulate real Redis behavior
Automated review by Claude AI
Summary
Adds comprehensive unit test coverage for the Redis Pub/Sub cache invalidation module and integrates graceful shutdown for subscriber connections.
Problem
Related Issues:
While PR #493 implemented the core Redis Pub/Sub functionality for cross-process cache invalidation, it lacked:
pubsub.tsmoduleSolution
This PR adds:
Comprehensive Unit Tests (
src/lib/redis/__tests__/pubsub.test.ts+369 lines):Graceful Shutdown Integration (
src/instrumentation.ts):closeSubscriber()call in Next.js shutdown hookChanges
Core Changes
src/lib/redis/__tests__/pubsub.test.ts(+369) - Full test suite for Redis Pub/Sub modulesrc/instrumentation.ts(+10) - Subscriber cleanup in shutdown handlerSupporting Changes
src/lib/redis/pubsub.ts(+128) - Core Pub/Sub module (included for reference)Testing
Automated Tests
publishCacheInvalidation()- Success, Redis unavailable, publish errorssubscribeCacheInvalidation()- Multi-callback, cleanup, error isolationcloseSubscriber()- Graceful shutdown, quit errorsTest Coverage Scenarios
Verification Commands
Technical Details
Test Architecture:
Shutdown Flow:
Why Graceful Shutdown Matters:
duplicate())closeSubscriber(), connection remains open during shutdownBreaking Changes
None - This is a test coverage enhancement.
Checklist
变更说明(中文)
验证(中文)
Description enhanced by Claude AI