-
Notifications
You must be signed in to change notification settings - Fork 27
[FSSDK-11923] Add redis-streams **DO NOT MERGE - IT'S PRE-RELASE FROM FEATURE BRANCH** #444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
- Add RedisStreams implementation with retry logic and error handling - Support configurable batch processing and connection resilience - Add comprehensive unit tests for functionality and error scenarios - Update configuration to support both "redis" and "redis-streams" options - Maintain backwards compatibility with existing Redis pub/sub - Add configuration parameters for tuning Redis Streams behavior
4aa9496
to
61e5e7e
Compare
Standardize password field handling for all Redis instances (pub/sub, UPS, ODP) to support multiple field names and environment variable fallback: - auth_token (recommended - avoids security scanner alerts) - redis_secret (alternative) - password (legacy support) - Environment variable fallback (REDIS_PASSWORD, REDIS_UPS_PASSWORD, REDIS_ODP_PASSWORD) Changes: - Add pkg/utils/redisauth package with GetPassword() utility - Update Redis Streams tests to use Redis 6 in CI (required for Streams support) - Fix DNS lookup error assertion in invalid host test - Update config.yaml with security-friendly password field examples - Document Redis 5.0+ requirement in redis-streams.md
- Fix indentation in test assertions - Align map literal values in tests - Sort imports alphabetically per Go convention
This commit addresses two critical issues found during integration testing: 1. Race condition in Subscribe() method: - Subscribe() was returning immediately while consumer group creation happened asynchronously in a goroutine - This caused NOGROUP errors when messages were published before the consumer group was fully initialized - Fixed by adding a synchronization channel (ready) that waits for consumer group creation to complete before returning 2. Consumer group creation using wrong client: - createConsumerGroupWithRetry() was ignoring the passed client parameter - It created temporary clients via executeWithRetry() which were immediately closed - Fixed to use the persistent client from Subscribe() goroutine 3. Test timeout adjustments: - Both Subscribe tests now wait 6 seconds (longer than 5s flush interval) - Previously they waited exactly at or below flush interval causing flakiness - Tests now pass reliably 10/10 runs All Redis Streams tests now pass consistently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request adds Redis Streams functionality to the agent per CapitalOne requirements, addressing limitations with the current Redis pub/sub fire-and-forget approach where notifications could be lost during agent crashes. The implementation provides persistent message delivery with guaranteed delivery, message acknowledgment, and automatic recovery.
- Introduces Redis Streams as an alternative to Redis pub/sub for reliable notification delivery
- Implements batching, retry logic, and connection resilience features
- Updates authentication configuration to support multiple field names and environment variable fallback
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
pkg/syncer/pubsub/redis_streams.go | Core Redis Streams implementation with batching, retry logic, and error handling |
pkg/syncer/pubsub/redis_streams_test.go | Unit tests for Redis Streams functionality |
pkg/syncer/pubsub/redis_streams_error_test.go | Error handling and retry logic tests |
pkg/syncer/pubsub.go | Integration with existing pubsub system and configuration parsing |
pkg/syncer/pubsub_test.go | Updated tests for new pubsub configurations |
pkg/utils/redisauth/password.go | Utility for flexible Redis password configuration |
pkg/utils/redisauth/password_test.go | Tests for Redis authentication utility |
plugins/userprofileservice/services/redis_ups.go | Updated to use new authentication utility |
plugins/odpcache/services/redis_cache.go | Updated to use new authentication utility |
docs/redis-streams.md | Comprehensive documentation for Redis Streams feature |
config.yaml | Updated configuration with Redis Streams settings |
.github/workflows/agent.yml | CI updates to use Redis 6 for testing |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
1. Fix consumer name collision risk (Line R268): - Changed from time.Now().UnixNano() to hostname+pid+timestamp - Reduces collision risk in concurrent scenarios - Format: consumer-{hostname}-{pid}-{timestamp} - Each Agent process gets a unique, deterministic consumer name 2. Fix resource leak in executeWithRetry (Line R339): - Wrap operation with defer client.Close() in anonymous function - Ensures Redis client is always closed, even on panic - Prevents connection pool exhaustion in failure scenarios - Critical for production stability 3. Race condition (Line R103): - Already addressed in commit aa90732 with ready channel synchronization
1. Fix YAML/JSON database type assertion (Line R139): - YAML/JSON unmarshals numeric values as float64, not int - Added type switch to handle both int and float64 - Prevents failure on valid config like "database: 0" - Applied to both getPubSubRedis and getPubSubRedisStreams 2. Fix typo: SycnFeatureFlag → SyncFeatureFlag - Corrected spelling throughout codebase - Fixed in pubsub.go, pubsub_test.go, and syncer.go Note on Line R113 comment: - Reviewer suggested PubSubRedisStreams instead of PubSubRedis - Current code is correct - both redis and redis-streams implementations intentionally share the same config section "pubsub.redis" - This allows common connection settings while supporting implementation-specific parameters (batch_size, flush_interval, etc.)
Coverage improvements for recent PR review fixes: 1. pkg/syncer/pubsub_test.go: - Test database type conversion (int vs float64 from YAML/JSON) - Covers the type switch added in getPubSubRedis/getPubSubRedisStreams - Tests valid types (int, float64) and invalid types (string, nil) 2. plugins/odpcache/services/redis_cache_test.go: - Test RedisCache.UnmarshalJSON method - Verifies password field priority: auth_token > redis_secret > password - Tests empty password handling and invalid JSON 3. plugins/userprofileservice/services/redis_ups_test.go: - Test RedisUserProfileService.UnmarshalJSON method - Verifies password field priority: auth_token > redis_secret > password - Tests empty password handling and invalid JSON These tests cover the previously uncovered code paths from the flexible Redis password configuration implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated no new comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Coverage improvements for redis-streams error paths: - Test config not found - Test config not valid (not a map) - Test host not found - Test host not valid (not a string) - Test database not found - Test database as float64 (valid YAML/JSON case) - Test database invalid type - Test datafile with unsupported pubsub type This addresses uncovered lines 66, 121-122, 126-127, 131-132, 135-136, 143-144, 150-151, 152-153 in pkg/syncer/pubsub.go
Address reviewer feedback on race condition that was only partially fixed: Problem: - If context is cancelled while goroutine is initializing, the main function returns via ctx.Done() case but goroutine continues running - Goroutine could block trying to send to ready channel if no receiver - This creates a goroutine leak Solution: - Wrap both ready channel sends in select statements - Check ctx.Done() before sending to ready channel - If main function already returned, goroutine exits immediately - Prevents goroutine from blocking on channel send This ensures proper cleanup when Subscribe() caller cancels the context during the initialization phase.
Address comprehensive goroutine lifecycle management issue: Problem: The previous fix with buffered channel only prevented blocking, but didn't prevent goroutine leaks. When the main function returns with an error or due to context cancellation BEFORE the goroutine enters its main loop, the goroutine would continue running indefinitely until the original context expires (could be much later). Complete Solution: 1. Added stop channel (chan struct{}) to signal goroutine termination 2. Goroutine checks stop channel during initialization (both error and success paths) 3. Goroutine checks stop channel as first case in main select loop 4. Main function closes stop channel when returning with error or ctx.Done() 5. Main function does NOT close stop on success - goroutine continues normally This ensures: - No goroutine leaks when Subscribe returns early with error - No goroutine leaks when context is cancelled during initialization - Clean resource cleanup via defer statements - Goroutine runs normally when initialization succeeds Verified with race detector running 3 consecutive times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good now. Thanks for the conversation and updates.
Add beta release notes for Redis Streams feature including: - Redis Streams persistent notification delivery - Flexible Redis password configuration PR: #444
- Upgrade go-sdk from v2.0.0 to v2.1.1 (latest master) - Replace cmab.Config with client.CmabConfig - Remove RetryConfig parsing (now handled internally by go-sdk) - Simplify CMAB configuration to use stable public API
Summary
Issues