Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add jet stream feature. #110

Merged
merged 2 commits into from
Nov 30, 2024
Merged

add jet stream feature. #110

merged 2 commits into from
Nov 30, 2024

Conversation

uakihir0
Copy link
Owner

@uakihir0 uakihir0 commented Nov 26, 2024

fix #105

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced interfaces and classes for managing JetStream subscriptions and events, enhancing streaming capabilities.
    • Added JetStreamClient for WebSocket connection management and event handling.
    • New Account, Commit, Event, Identity, and JetStreamSubscribeRequest classes for structured data representation.
    • Added BlueskyStream interface and BlueskyStreamFactory for flexible stream initialization.
  • Improvements

    • Refactored callback interfaces for better event handling.
    • Simplified type references in existing classes for clarity.
    • Updated package structures for better organization.
  • Bug Fixes

    • Updated return types for subscription methods to ensure consistency and reliability.

Copy link

coderabbitai bot commented Nov 26, 2024

Walkthrough

The pull request introduces several modifications and additions across multiple files in the Kotlin project. Key changes include the conversion of the BlueskyConfig class to an open class, the addition of new interfaces and classes such as BlueskyStream, JetStreamResource, and JetStreamClient, and the introduction of various callback interfaces for managing events. Additionally, several existing classes were refactored or had their package structures modified. Test files were also updated to align with these changes, ensuring that the new functionality is adequately covered.

Changes

File Path Change Summary
core/src/commonMain/kotlin/work/socialhub/kbsky/BlueskyConfig.kt Class signature updated to open class BlueskyConfig : ATProtocolConfig().
core/src/commonMain/kotlin/work/socialhub/kbsky/model/share/RecordUnion.kt Updated type references in getter properties to use simple class names instead of fully qualified names.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStream.kt New interface BlueskyStream added with method jetStream().
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamConfig.kt New class BlueskyStreamConfig added, extending BlueskyConfig, with mutable property jetStreamHost.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamFactory.kt New object BlueskyStreamFactory added with overloaded instance methods for creating BlueskyStream instances.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/app/bsky/JetStreamResource.kt New interface JetStreamResource added with method subscribe().
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/com/atproto/SyncResource.kt Method subscribeRepos return type updated from StreamClient to SyncStreamClient.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/entity/app/bsky/JetStreamSubscribeRequest.kt New class JetStreamSubscribeRequest added with various properties for subscription parameters.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/JetStreamClient.kt New class JetStreamClient added for managing WebSocket connections with event handling.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/callback/JetStreamEventCallback.kt New interface JetStreamEventCallback added for event handling.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Account.kt New class Account added with serializable properties.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Commit.kt New class Commit added with serializable properties.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Event.kt New class Event added with serializable properties.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Identity.kt New class Identity added with serializable properties.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/callback/ClosedCallback.kt New interface ClosedCallback added for handling closure events.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/callback/OpenedCallback.kt New interface OpenedCallback added for handling opened events.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/SyncStreamClient.kt Class renamed from StreamClient to SyncStreamClient, updated method signatures for event callbacks.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/model/StreamOp.kt Package declaration updated, no changes to class structure.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/model/StreamRoot.kt Package declaration updated, no changes to class structure.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_BlueskyStream.kt New class _BlueskyStream added, implementing BlueskyStream interface.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_JetStreamResource.kt New class _JetStreamResource added, implementing JetStreamResource interface.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/com/atproto/_SyncResource.kt Method subscribeRepos return type updated from StreamClient to SyncStreamClient.
stream/src/jvmTest/kotlin/work/socialhub/kbsky/stream/app/bsky/JetStreamTest.kt New test class JetStreamTest added with asynchronous test method testSubscribeRepo.
stream/src/jvmTest/kotlin/work/socialhub/kbsky/stream/atproto/sync/SubscribeRepoTest.kt Updated import statements and callback interfaces to reflect new structure.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/util/callback/ClosedCallback.kt Interface removed, eliminating closure event handling.
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/util/callback/OpenedCallback.kt Interface removed, eliminating opened event handling.

Assessment against linked issues

Objective Addressed Explanation
Support for JetStream (#105)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 18

🧹 Outside diff range and nitpick comments (38)
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStream.kt (1)

5-8: Add KDoc documentation for the interface and method.

The interface structure looks good, but it would benefit from documentation explaining its purpose and usage. Consider adding KDoc comments describing:

  • The purpose of the BlueskyStream interface
  • The behavior and return value of the jetStream() method

Example documentation:

/**
 * Interface for accessing Bluesky's streaming capabilities.
 */
interface BlueskyStream {

    /**
     * Returns a resource for interacting with the Bluesky jet stream.
     * @return JetStreamResource instance for managing stream subscriptions
     */
    fun jetStream(): JetStreamResource
}
core/src/commonMain/kotlin/work/socialhub/kbsky/BlueskyConfig.kt (1)

3-3: Consider adding documentation about inheritance purpose.

Since this class is now designed for inheritance, it would be helpful to add KDoc comments explaining its extensibility purpose and relationship with the jet stream feature.

+/**
+ * Base configuration for Bluesky API client.
+ * This class is designed to be extended by specialized configurations
+ * such as stream-related settings.
+ */
 open class BlueskyConfig : ATProtocolConfig() {
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/callback/JetStreamEventCallback.kt (2)

5-8: Add KDoc documentation for the interface and method.

The interface lacks documentation explaining its purpose, usage, and contract. Consider adding KDoc comments to improve maintainability and developer experience.

+/**
+ * Callback interface for handling JetStream events.
+ * Implementations of this interface will receive events from the Bluesky stream.
+ */
 interface JetStreamEventCallback {
 
+    /**
+     * Called when a new event is received from the stream.
+     * @param event The event received from the Bluesky stream
+     * @throws IllegalStateException if the event processing fails
+     */
     fun onEvent(event: Event)
 }

7-7: Consider adding error handling guidance.

The onEvent method should provide guidance on error handling behavior. Consider:

  1. Documenting expected exceptions
  2. Adding a result type to handle failures gracefully
-    fun onEvent(event: Event)
+    /**
+     * @return Result indicating success or failure of event processing
+     */
+    fun onEvent(event: Event): Result<Unit>
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Account.kt (3)

6-6: Consider declaring this as a data class

Since this class appears to be a model/entity class primarily holding data, consider making it a data class to automatically get equals(), hashCode(), toString(), and copy() implementations.

-class Account {
+data class Account(
+    var active: Boolean = true,
+    var did: String,
+    var sec: Long = 0,
+    var time: String,
+)

7-11: Add KDoc comments for properties

Please add documentation for each property to explain their purpose and any constraints:

  • What does active represent in the context of an account?
  • Is did a decentralized identifier? What's its format?
  • What does sec represent? Is it a Unix timestamp?
  • What format is expected for the time string?
+    /** Whether the account is active */
     var active: Boolean = true
 
+    /** Decentralized identifier for the account */
     lateinit var did: String
 
+    /** Unix timestamp in seconds */
     var sec: Long = 0
+    /** ISO 8601 formatted timestamp */
     lateinit var time: String

10-10: Consider using kotlinx.datetime.Instant for timestamp

Instead of using a raw Long for the timestamp, consider using kotlinx.datetime.Instant for better type safety and built-in formatting utilities.

+import kotlinx.datetime.Instant
+
 @Serializable
 class Account {
     var active: Boolean = true
     lateinit var did: String
 
-    var sec: Long = 0
+    var sec: Instant = Instant.DISTANT_PAST
     lateinit var time: String
 }
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Identity.kt (2)

8-12: Improve type safety and add property documentation

Several improvements could enhance the robustness of this class:

  1. Use proper type for time:
-    lateinit var time: String
+    var time: kotlinx.datetime.Instant
  1. Add property documentation:
    /** Decentralized identifier for the identity */
    var did: String

    /** User handle for display purposes */
    var handle: String

    /** Security-related timestamp or sequence number */
    var sec: Long = 0

    /** Event timestamp */
    var time: kotlinx.datetime.Instant

5-13: Add class-level documentation and serialization format specification

The class lacks documentation explaining its role in the jet stream feature. Additionally, consider specifying the serialization format to ensure consistency:

+/**
+ * Represents an identity in the BlueSky network.
+ * Used in the jet stream feature for identity tracking and event association.
+ */
+@Serializable(with = Json)
 @Serializable
 class Identity {
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamConfig.kt (1)

5-5: Add KDoc documentation and consider making the class open.

The class would benefit from:

  1. KDoc documentation explaining its purpose and usage
  2. open modifier to allow further customization through inheritance
+/**
+ * Configuration class for Bluesky JetStream functionality.
+ * Provides stream-specific settings including JetStream host endpoints.
+ */
-class BlueskyStreamConfig: BlueskyConfig() {
+open class BlueskyStreamConfig: BlueskyConfig() {
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/app/bsky/JetStreamResource.kt (1)

6-14: Consider enhancing the documentation.

While the link to the official documentation is helpful, consider adding:

  • A brief description of the interface's purpose
  • Documentation for the subscribe method explaining:
    • Expected behavior
    • Possible error conditions
    • Lifecycle management responsibilities
    • Connection handling expectations
 /**
  * https://docs.bsky.app/blog/jetstream
+ *
+ * JetStreamResource provides an interface for subscribing to real-time events
+ * from the Bluesky network using the JetStream protocol.
+ */
 interface JetStreamResource {
 
+    /**
+     * Subscribes to JetStream events based on the provided configuration.
+     *
+     * @param request Configuration for the subscription including host,
+     *                wanted collections, and message size limits
+     * @return A JetStreamClient instance that manages the WebSocket connection
+     * @throws IllegalArgumentException if the request parameters are invalid
+     * @throws ConnectionException if the connection cannot be established
+     */
     fun subscribe(
         request: JetStreamSubscribeRequest
     ): JetStreamClient
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Commit.kt (3)

6-7: Add KDoc documentation to describe the class purpose and usage.

The Commit class appears to represent commit operations in the streaming context, but its purpose and usage are not documented. Consider adding comprehensive KDoc documentation.

 @Serializable
+/**
+ * Represents a commit operation in the ATProto/Bluesky streaming context.
+ * This class captures changes made to collections, including the type of operation,
+ * collection details, and associated record data.
+ */
 class Commit {

9-12: Document and validate required properties.

The required properties using lateinit should be documented and potentially validated to ensure they contain meaningful values when initialized.

-    lateinit var rev: String
-    lateinit var operation: String
-    lateinit var collection: String
-    lateinit var rkey: String
+    /** Revision identifier for this commit. */
+    lateinit var rev: String
+
+    /** Operation type (e.g., create, update, delete). */
+    lateinit var operation: String
+
+    /** Target collection identifier. */
+    lateinit var collection: String
+
+    /** Record key within the collection. */
+    lateinit var rkey: String
+
+    private fun validateRequiredProperties() {
+        check(::rev.isInitialized) { "rev must be initialized" }
+        check(::operation.isInitialized) { "operation must be initialized" }
+        check(::collection.isInitialized) { "collection must be initialized" }
+        check(::rkey.isInitialized) { "rkey must be initialized" }
+    }

14-15: Document optional properties and consider using a primary constructor.

The nullable properties should be documented, and consider using a primary constructor for better initialization control.

-class Commit {
-    lateinit var rev: String
-    lateinit var operation: String
-    lateinit var collection: String
-    lateinit var rkey: String
-
-    var record: RecordUnion? = null
-    var cid: String? = null
+class Commit(
+    var rev: String,
+    var operation: String,
+    var collection: String,
+    var rkey: String,
+    /** Associated record data, if any. */
+    var record: RecordUnion? = null,
+    /** Content identifier, if applicable. */
+    var cid: String? = null,
+) {
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Event.kt (1)

6-7: Consider enhancing the class definition

  1. Consider making this a data class since it's a model class primarily holding data
  2. Add KDoc documentation to explain the class's purpose and property meanings
+/** 
+ * Represents an event in the JetStream system.
+ * This class is used for serializing and deserializing event data
+ * received through the stream.
+ */
 @Serializable
-class Event {
+data class Event(
+    var did: String = "",
+    @SerialName("time_us")
+    var timeUs: Long = 0,
+    var kind: String = "",
+    var commit: Commit? = null,
+    var identity: Identity? = null,
+    var account: Account? = null,
+)
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/com/atproto/SyncResource.kt (1)

Line range hint 13-14: Consider removing or updating [WIP] tag

The method is marked as [WIP] (Work in Progress). As part of the jet stream feature implementation, consider whether this tag is still relevant or if the implementation is now complete.

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamFactory.kt (1)

6-22: Add KDoc documentation for the public factory API.

As this is a public API for creating stream instances, consider adding comprehensive KDoc documentation including:

  1. Usage examples
  2. Parameter descriptions
  3. Exception scenarios
  4. Configuration options

Example documentation:

/**
 * Factory for creating BlueskyStream instances.
 */
object BlueskyStreamFactory {
    /**
     * Creates a BlueskyStream instance from a jet stream URI.
     *
     * @param jetStreamUri The URI(s) of the jet stream service. Multiple URIs can be comma-separated.
     * @throws IllegalArgumentException if the URI is invalid or empty
     * @return A configured BlueskyStream instance
     */
    fun instance(jetStreamUri: String): BlueskyStream

    /**
     * Creates a BlueskyStream instance with custom configuration.
     *
     * @param config The configuration for the stream. Defaults to a new instance.
     * @throws IllegalArgumentException if the configuration is invalid
     * @return A configured BlueskyStream instance
     */
    fun instance(config: BlueskyStreamConfig = BlueskyStreamConfig()): BlueskyStream
}
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_BlueskyStream.kt (2)

13-15: Clarify inheritance intention.

The properties are marked as protected, but the class is not marked as open. Either:

  1. Mark the class as open if inheritance is intended, or
  2. Make the properties private if inheritance is not intended.
-class _BlueskyStream(
+open class _BlueskyStream(
     config: BlueskyStreamConfig
 ) : BlueskyStream {

-    protected val bluesky: Bluesky = _Bluesky(config)
+    protected val bluesky: Bluesky = _Bluesky(config)

-    protected val jetStream: JetStreamResource = _JetStreamResource(this.bluesky, config)
+    protected val jetStream: JetStreamResource = _JetStreamResource(this.bluesky, config)

Or if inheritance is not intended:

     config: BlueskyStreamConfig
 ) : BlueskyStream {

-    protected val bluesky: Bluesky = _Bluesky(config)
+    private val bluesky: Bluesky = _Bluesky(config)

-    protected val jetStream: JetStreamResource = _JetStreamResource(this.bluesky, config)
+    private val jetStream: JetStreamResource = _JetStreamResource(this.bluesky, config)

17-20: Consider using single-expression function syntax.

The method can be more concise using Kotlin's single-expression function syntax.

-    /**
-     * {@inheritDoc}
-     */
-    override fun jetStream() = jetStream
+    /** {@inheritDoc} */
+    override fun jetStream(): JetStreamResource = jetStream
stream/src/jvmTest/kotlin/work/socialhub/kbsky/stream/atproto/sync/SubscribeRepoTest.kt (2)

Line range hint 31-37: Verify test coverage for edge cases in SyncEventCallback.

While the basic event handling is implemented, consider adding test cases for:

  • Null CID/URI handling
  • Different types of RecordUnion objects
  • Error conditions

Also, using print in tests makes it harder to verify results. Consider using assertions instead.

 object : SyncEventCallback {
     override fun onEvent(
         cid: String?,
         uri: String?,
         record: RecordUnion
     ) {
-        print(record)
+        assertNotNull(record, "Record should not be null")
+        // Add specific assertions based on expected record content
     }
 }

Line range hint 31-51: Consider reducing the delay duration in the test.

The test currently waits for 10 seconds, which might be unnecessarily long for a unit test. Consider:

  1. Reducing the delay if possible
  2. Making the delay duration configurable
  3. Using a more deterministic approach to test completion
 launch { stream.open() }.let {
-    delay(10000)
+    delay(2000) // or make it configurable via constant
     it.cancel()
     stream.close()
 }
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/entity/app/bsky/JetStreamSubscribeRequest.kt (7)

3-4: Consider making this a data class with immutable properties

For request objects, it's recommended to:

  1. Use a data class to get equals(), hashCode(), toString(), and copy() implementations.
  2. Use immutable properties (val instead of var) to prevent state mutations after creation.
-class JetStreamSubscribeRequest {
+data class JetStreamSubscribeRequest(
+    val host: String? = null,
+    val wantedCollections: List<String> = listOf(),
+    val wantedDids: List<String> = listOf(),
+    val maxMessageSizeBytes: Long? = null,
+    val cursor: Long? = null,
+    val requireHello: Boolean? = null,
+)

5-5: Add documentation and validation for the host property

The host property lacks documentation explaining:

  • What kind of host it represents (e.g., WebSocket endpoint, API server)
  • When it can be null
  • Expected format (e.g., domain name, URL)

7-14: Consider adding validation and constants for NSID paths

To improve robustness and maintainability:

  1. Add validation for NSID path format
  2. Define constants for commonly used paths like app.bsky.feed.like
companion object {
    const val FEED_LIKE = "app.bsky.feed.like"
    const val GRAPH_ALL = "app.bsky.graph.*"
    
    private val NSID_PATH_REGEX = Regex("""^[a-zA-Z]+\.[a-zA-Z]+(\.[a-zA-Z]+)?(\.\*)?$""")
    
    fun validateNsidPath(path: String): Boolean = 
        NSID_PATH_REGEX.matches(path)
}

22-27: Add validation for maxMessageSizeBytes

The documentation states that negative values are treated as zero, but this behavior isn't enforced in the code. Consider adding validation or normalization.

private fun normalizeMaxMessageSize(size: Long?): Long? =
    when {
        size == null -> null
        size <= 0 -> 0
        else -> size
    }

37-42: Remove commented code and track compression support as an issue

Instead of keeping the FIXME comments and commented code in the source:

  1. Remove the commented property
  2. Track this limitation in the issue tracker

Would you like me to create a GitHub issue to track the zstd compression support requirement?


44-48: Fix typo in documentation

There's a typo in the documentation: "recevies" should be "receives".

-     * Set to true to pause replay/live-tail until the server recevies
+     * Set to true to pause replay/live-tail until the server receives

1-49: Consider architectural integration points

As this class is part of the streaming API:

  1. Consider implementing Serializable if this request needs to be persisted
  2. Add factory methods for common subscription scenarios
  3. Consider adding a builder pattern for complex configurations
stream/src/jvmTest/kotlin/work/socialhub/kbsky/stream/app/bsky/JetStreamTest.kt (2)

16-27: Add documentation and improve test robustness

Consider the following improvements:

  1. Add KDoc to document the test's purpose and expected behavior
  2. Move the collection name to a companion object constant
  3. Add error handling for stream initialization

Here's a suggested implementation:

+    companion object {
+        private const val FEED_POST_COLLECTION = "app.bsky.feed.post"
+    }
+
+    /**
+     * Tests the subscription to a repository stream.
+     * Verifies that:
+     * - Stream can be initialized and opened
+     * - Feed post events are received and processed
+     * - Stream can be properly closed
+     */
     @Test
     fun testSubscribeRepo() {
         runBlocking {
-            val stream = BlueskyStreamFactory
-                .instance()
-                .jetStream()
-                .subscribe(
-                    JetStreamSubscribeRequest().also {
-                        it.wantedCollections = listOf("app.bsky.feed.post")
-                    }
-                )
+            try {
+                val stream = BlueskyStreamFactory
+                    .instance()
+                    .jetStream()
+                    .subscribe(
+                        JetStreamSubscribeRequest().also {
+                            it.wantedCollections = listOf(FEED_POST_COLLECTION)
+                        }
+                    )

14-54: Consider adding more test cases

The current test file only covers the happy path. Consider adding test cases for:

  1. Invalid collection names
  2. Network disconnection scenarios
  3. Malformed event handling
  4. Rate limiting scenarios
  5. Multiple simultaneous subscriptions

This will ensure the JetStream feature is thoroughly tested.

Would you like me to help create these additional test cases?

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_JetStreamResource.kt (2)

50-50: Consider adding URL length validation.

When dealing with many DIDs or collections, the URL might exceed server limits. Consider adding a check for the final URL length.

-return JetStreamClient(builder.buildString())
+val url = builder.buildString()
+require(url.length <= MAX_URL_LENGTH) { "URL exceeds maximum length" }
+return JetStreamClient(url)

Consider adding this constant at the class level:

companion object {
    private const val MAX_URL_LENGTH = 8192 // Adjust based on server limits
}

19-51: Consider adding connection retry and error handling strategies.

As this is a WebSocket connection setup, consider implementing:

  1. Connection retry logic with exponential backoff
  2. Error handling for network issues
  3. Rate limiting protection
  4. Connection pooling if multiple subscriptions are expected

This could be added either in this class or in the JetStreamClient implementation.

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/JetStreamClient.kt (3)

22-25: Add documentation for callback methods

The builder pattern implementation looks good, but these public methods should be documented to improve API clarity.

Add KDoc for each callback method:

+    /** Sets the callback for handling stream events */
     fun eventCallback(callback: JetStreamEventCallback) = also { this.eventCallback = callback }
+    /** Sets the callback for connection opened events */
     fun openedCallback(callback: OpenedCallback) = also { this.openedCallback = callback }
+    /** Sets the callback for connection closed events */
     fun closedCallback(callback: ClosedCallback) = also { this.closedCallback = callback }
+    /** Sets the callback for error events */
     fun errorCallback(callback: ErrorCallback) = also { this.errorCallback = callback }

64-66: Implement binary message handling for zstd compression

The TODO comment indicates that this method should handle zstd compressed data.

Would you like me to help implement the zstd decompression logic for binary messages? This would involve:

  1. Adding zstd dependency
  2. Implementing decompression
  3. Converting the decompressed data to text
  4. Handling the resulting text through the existing text message handler

11-67: Consider implementing lifecycle management

As this class manages network resources, consider implementing a proper lifecycle management pattern.

Suggestions:

  1. Implement Closeable interface for resource cleanup
  2. Add connection timeout configuration
  3. Consider adding reconnection strategy for handling disconnections
  4. Add connection state monitoring and health checks

Example implementation:

class JetStreamClient(
    private val uri: String,
    private val config: JetStreamConfig = JetStreamConfig(),
) : Closeable {
    data class JetStreamConfig(
        val connectionTimeout: Duration = Duration.seconds(30),
        val reconnectAttempts: Int = 3,
        val reconnectDelay: Duration = Duration.seconds(5),
    )
    
    // ... existing code ...

    override fun close() {
        client.close()
        // Cleanup resources
    }
}
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/SyncStreamClient.kt (3)

Line range hint 15-27: Consider improving property encapsulation

The class properties could benefit from better encapsulation:

  • isOpen should be private as it represents internal state
  • client should be private as it's an implementation detail
-    var client = WebsocketRequest()
-    var isOpen: Boolean = false
+    private var client = WebsocketRequest()
+    private var isOpen: Boolean = false

Line range hint 34-47: Add error handling for WebSocket errors

The error callback is defined but not wired up to the WebSocket client's error events.

     init {
         this.client.url(this.uri)
         this.client.bytesListener = {
             this.onMessage(it)
         }
         this.client.onOpenListener = {
             this.isOpen = true
             this.openedCallback?.onOpened()
         }
         this.client.onCloseListener = {
             this.isOpen = false
             this.closedCallback?.onClosed()
         }
+        this.client.onErrorListener = { error ->
+            this.errorCallback?.onError(error)
+        }
     }

Line range hint 49-55: Add connection state validation and error handling

The connection management methods should validate state and handle errors appropriately.

     suspend fun open() {
+        if (isOpen) {
+            throw IllegalStateException("WebSocket is already open")
+        }
         client.open()
     }
 
     fun close() {
+        if (!isOpen) {
+            return
+        }
         client.close()
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 63339dd and 3ba0f65.

📒 Files selected for processing (29)
  • core/src/commonMain/kotlin/work/socialhub/kbsky/BlueskyConfig.kt (1 hunks)
  • core/src/commonMain/kotlin/work/socialhub/kbsky/model/share/RecordUnion.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStream.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamConfig.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamFactory.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/app/bsky/JetStreamResource.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/com/atproto/SyncResource.kt (2 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/entity/app/bsky/JetStreamSubscribeRequest.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/entity/com/atproto/SyncSubscribeReposRequest.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/JetStreamClient.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/callback/JetStreamEventCallback.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Account.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Commit.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Event.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Identity.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/callback/ClosedCallback.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/callback/ErrorCallback.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/callback/OpenedCallback.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/SyncStreamClient.kt (2 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/callback/SyncEventCallback.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/model/StreamOp.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/model/StreamRoot.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_BlueskyStream.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_JetStreamResource.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/com/atproto/_SyncResource.kt (2 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/util/callback/ClosedCallback.kt (0 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/util/callback/OpenedCallback.kt (0 hunks)
  • stream/src/jvmTest/kotlin/work/socialhub/kbsky/stream/app/bsky/JetStreamTest.kt (1 hunks)
  • stream/src/jvmTest/kotlin/work/socialhub/kbsky/stream/atproto/sync/SubscribeRepoTest.kt (2 hunks)
💤 Files with no reviewable changes (2)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/util/callback/ClosedCallback.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/util/callback/OpenedCallback.kt
✅ Files skipped from review due to trivial changes (7)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/entity/com/atproto/SyncSubscribeReposRequest.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/callback/ClosedCallback.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/callback/ErrorCallback.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/callback/OpenedCallback.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/callback/SyncEventCallback.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/model/StreamOp.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/model/StreamRoot.kt
🔇 Additional comments (26)
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStream.kt (1)

7-7: Verify the implementation of this interface.

The interface design is clean and follows good practices. Let's verify the existence of its implementation.

✅ Verification successful

Implementation verified and follows good practices

The interface BlueskyStream is properly implemented by _BlueskyStream class in the internal package. The implementation:

  • Correctly implements the jetStream() method
  • Uses constructor injection for configuration
  • Follows the internal implementation naming convention with underscore prefix
  • Maintains clean separation between interface and implementation
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the implementation of BlueskyStream interface exists
# Expected: Find the _BlueskyStream class implementing this interface

# Search for the implementation class
ast-grep --pattern 'class _BlueskyStream : BlueskyStream {
  $$$
}'

# Verify the jetStream method implementation
ast-grep --pattern 'override fun jetStream(): JetStreamResource {
  $$$
}'

Length of output: 146


Script:

#!/bin/bash
# Let's try a broader search to find any implementations or usages

# Search for any class implementing BlueskyStream
rg "class.*: ?BlueskyStream" -l

# Search for any mentions of jetStream
rg "jetStream.*JetStreamResource" -l

# Look for files containing BlueskyStream
fd "BlueskyStream" --type f

# Search for any class with BlueskyStream in its name
rg "class.*BlueskyStream"

Length of output: 863


Script:

#!/bin/bash
# Let's examine the implementation in _BlueskyStream.kt
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_BlueskyStream.kt

# Also check the interface definition
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStream.kt

Length of output: 958

core/src/commonMain/kotlin/work/socialhub/kbsky/BlueskyConfig.kt (1)

3-3: LGTM, but note this is a breaking change.

The addition of the open modifier is necessary for the jet stream feature implementation, allowing BlueskyStreamConfig to inherit from this class. However, be aware that this is technically a breaking change as it modifies the inheritance behavior of the class.

Let's verify the inheritance relationship:

✅ Verification successful

LGTM - Verified inheritance relationship for jet stream feature

The open modifier on BlueskyConfig is indeed necessary as BlueskyStreamConfig directly extends it (class BlueskyStreamConfig: BlueskyConfig()). This inheritance relationship is used to support the jet stream functionality, with BlueskyStreamConfig adding jet stream-specific configurations like jetStreamHosts. The breaking change is intentional and properly implemented.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the inheritance relationship with BlueskyStreamConfig
# Expected: Find the class that extends BlueskyConfig

ast-grep --pattern 'class $_ : BlueskyConfig'

Length of output: 47


Script:

#!/bin/bash
# Let's try a different approach to find files containing BlueskyStreamConfig
# and examine their content

# First, find files containing "BlueskyStreamConfig"
rg "BlueskyStreamConfig" -l

# Then, let's see the actual content around these occurrences
rg "BlueskyStreamConfig" -A 5 -B 5

Length of output: 7531

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/callback/JetStreamEventCallback.kt (1)

3-3: Verify Event model compatibility.

Ensure that the imported Event model class contains all necessary fields for the JetStream feature.

✅ Verification successful

Event model is properly structured for JetStream functionality

The Event class in work/socialhub/kbsky/stream/entity/app/bsky/model/Event.kt contains all the necessary fields for JetStream functionality:

  • did: For decentralized identifier
  • timeUs: For event timestamp
  • kind: For event type
  • commit, identity, and account: For associated data objects

The model is properly annotated with Kotlin serialization markers and is correctly used in the JetStream implementation as evidenced by its usage in JetStreamClient and test files.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check Event model definition and usage
ast-grep --pattern 'class Event {
  $$$
}'

Length of output: 44


Script:

#!/bin/bash
# Let's try to find the Event class definition using ripgrep
rg -l "class Event" --type kotlin

# Also search for data class definition which is common in Kotlin
rg -l "data class Event" --type kotlin

# Let's also check the content of the file containing JetStreamEventCallback
fd "JetStreamEventCallback.kt" --exec cat {}

Length of output: 405


Script:

#!/bin/bash
# Let's check the content of the Event class
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Event.kt

# Let's also search for any usages of Event class to understand its context
rg "Event" --type kotlin -C 2

Length of output: 12064

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Account.kt (1)

8-8: 🛠️ Refactor suggestion

Add initialization validation for lateinit properties

Consider adding initialization checks for did and time to prevent UninitializedPropertyAccessException:

 @Serializable
 class Account {
+    init {
+        require(did.isNotBlank()) { "DID must not be blank" }
+        require(time.matches(ISO8601_REGEX)) { "Time must be in ISO 8601 format" }
+    }
+
+    companion object {
+        private val ISO8601_REGEX = 
+            Regex("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?(Z|[+-]\\d{2}:?\\d{2})\$")
+    }
 }

Also applies to: 11-11

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Identity.kt (1)

11-11: Clarify the purpose and usage of the sec property

The sec property's purpose is unclear. If it's security-critical:

  1. Document its specific purpose
  2. Consider if a default value of 0 is appropriate
  3. Add validation if there are valid value ranges
✅ Verification successful

Let me gather more context about these classes to understand the purpose of sec.


Let me check if there are any usages of these classes to understand how sec is being used.


Let me check one more thing about the Event class since it contains both Identity and Account.


Now I have enough context to provide the final response:

The sec field appears to be a timestamp component for event streaming

Looking at the context, both Identity and Account are used within the Event class for the Bluesky streaming API. The sec field, along with the time field in both classes, appears to be related to event timing, similar to how the Event class uses timeUs for microsecond timestamps. The default value of 0 is appropriate as an initial state for timestamp fields.

Recommendations:

  • Add documentation to clarify that sec represents seconds in the timestamp
  • Consider renaming to timestampSec or similar for better clarity
  • Consider grouping sec and time fields together in the class definition for better readability
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other usages of 'sec' to understand its purpose
rg -p "sec.*=.*0" --type kotlin
rg "var sec.*:.*Long" --type kotlin

Length of output: 655


Script:

#!/bin/bash
# Get the full content of the files to understand the context
echo "=== Identity.kt ==="
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Identity.kt
echo -e "\n=== Account.kt ==="
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Account.kt

Length of output: 790


Script:

#!/bin/bash
# Search for usages of Identity and Account classes
rg -p "Identity|Account" --type kotlin -B 2 -A 2

# Search for any documentation or comments about 'sec' field
rg -p "sec" --type kotlin -B 2 -A 2

Length of output: 12426


Script:

#!/bin/bash
# Get the full content of Event.kt to understand the context
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Event.kt

# Also check for any documentation in parent directories
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/README.md 2>/dev/null || true

Length of output: 581

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamConfig.kt (2)

1-4: LGTM!

Package structure and imports are clean and follow conventions.


1-13: Verify configuration usage patterns.

Let's verify how this configuration is used to ensure our suggestions align with the codebase requirements.

✅ Verification successful

Let me analyze the usage of BlueskyStreamConfig further by checking the implementation details in BlueskyStreamFactory.kt and _JetStreamResource.kt.


Configuration usage is well-defined and follows a clear pattern

The verification shows that the jetStreamHosts configuration is used appropriately:

  • In BlueskyStreamFactory, it's either set to a single host from a provided URI or uses the default list
  • In _JetStreamResource, it's used as a fallback when no specific host is provided, randomly selecting from the configured hosts
  • The default configuration provides multiple host options for redundancy

The configuration implementation and its usage are consistent with typical stream connection patterns, providing both flexibility for custom endpoints and fallback options for reliability.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for usage of BlueskyStreamConfig and jetStreamHosts
echo "Searching for BlueskyStreamConfig usage..."
rg -l "BlueskyStreamConfig" 

echo "\nSearching for jetStreamHosts property access..."
rg "jetStreamHosts" -A 3

Length of output: 2093


Script:

#!/bin/bash
# Check the implementation details of files using BlueskyStreamConfig
echo "Checking BlueskyStreamFactory.kt implementation..."
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamFactory.kt

echo "\nChecking _JetStreamResource.kt implementation..."
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_JetStreamResource.kt

Length of output: 2784

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/app/bsky/JetStreamResource.kt (2)

1-14: LGTM! Clean and focused interface design.

The interface follows good design principles with a clear single responsibility and appropriate package structure.


11-13: Verify implementation patterns and error handling.

Let's ensure consistent implementation patterns across the codebase.

✅ Verification successful

Let me gather more specific information about the implementation and error handling patterns.


Let me check one more aspect to understand the implementation details better.


Based on the gathered information, I can now provide a final response:

Interface follows standard patterns, implementation details are consistent

The subscribe method in JetStreamResource interface follows a clean pattern where:

  • It accepts a JetStreamSubscribeRequest with configurable host and collections
  • Returns a JetStreamClient that handles WebSocket connections with proper callback support
  • Error handling is managed through the client's error callback mechanism

The implementation aligns with the BlueSkys's JetStream documentation and maintains consistency with other resource interfaces in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check implementation patterns and error handling

# Check for similar resource interfaces
echo "Checking for similar resource interfaces..."
rg -l "interface.*Resource"

# Check for WebSocket error handling patterns
echo "Checking for WebSocket error handling patterns..."
rg -A 5 "throw.*Exception.*WebSocket"

# Check for similar subscribe method implementations
echo "Checking for similar subscribe patterns..."
ast-grep --pattern 'fun subscribe($$$) {
  $$$
}'

Length of output: 1637


Script:

#!/bin/bash
# Let's check the implementation details of JetStreamResource and related classes

# Check JetStreamResource implementation
echo "Checking JetStreamResource implementation..."
rg -A 10 "class.*implements.*JetStreamResource"

# Check JetStreamClient implementation
echo "Checking JetStreamClient implementation..."
rg -A 10 "class.*JetStreamClient"

# Check for WebSocket-related error handling in stream package
echo "Checking WebSocket error handling in stream package..."
rg -A 5 "try.*catch.*" stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/

# Look for subscribe method implementations
echo "Looking for subscribe method implementations..."
rg -A 10 "fun subscribe.*JetStreamSubscribeRequest"

Length of output: 1987


Script:

#!/bin/bash
# Let's check the actual implementation of the subscribe method and related files

# Check JetStreamResource interface and implementation
echo "Checking JetStreamResource content..."
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/app/bsky/JetStreamResource.kt

# Check for WebSocket connection handling
echo "Checking WebSocket connection handling..."
rg -A 10 "fun connect" stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/

# Check JetStreamSubscribeRequest structure
echo "Checking JetStreamSubscribeRequest structure..."
rg -A 5 "class JetStreamSubscribeRequest"

Length of output: 1632

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/api/com/atproto/SyncResource.kt (2)

3-4: LGTM: Import changes are appropriate

The import changes correctly reflect the new type usage and package structure.


17-17: Verify the impact of return type change

The change from StreamClient to SyncStreamClient is a breaking change that might affect existing code.

Let's verify the impact:

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamFactory.kt (3)

1-5: LGTM! Appropriate package structure and imports.

The package structure follows the project conventions, and the imports are relevant for the functionality.


6-7: LGTM! Appropriate use of Kotlin object for factory pattern.

The singleton factory pattern implementation using Kotlin object is clean and follows best practices.


17-21: Consider adding config validation and immutability.

The current implementation directly passes the config object without validation or defensive copying. Consider:

  1. Validating required config properties
  2. Creating a defensive copy to prevent external modifications

Here's a suggested improvement:

 fun instance(
     config: BlueskyStreamConfig = BlueskyStreamConfig(),
 ): BlueskyStream {
+    with(config) {
+        require(jetStreamHosts.isNotEmpty()) { "At least one jetStreamHost is required" }
+    }
-    return _BlueskyStream(config)
+    return _BlueskyStream(config.copy())
 }

Let's verify if the config class has proper validation:

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_BlueskyStream.kt (1)

1-8: LGTM! Package structure and imports are well-organized.

The package structure follows Kotlin conventions with proper separation between public and internal implementations.

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/com/atproto/_SyncResource.kt (2)

8-9: LGTM: Import statements are correctly updated

The imports are properly updated to use the new SyncStreamClient and SyncSubscribeReposRequest types from their respective packages.


17-22: Verify impact of StreamClient to SyncStreamClient change

The implementation looks correct, but this change in return type from StreamClient to SyncStreamClient could be a breaking change for existing clients.

Let's verify the impact:

core/src/commonMain/kotlin/work/socialhub/kbsky/model/share/RecordUnion.kt (1)

30-37: LGTM! Clean type reference simplification.

The changes simplify the type references in the getter properties while maintaining the same functionality. This improves code readability without affecting the behavior, as Kotlin's type system correctly resolves these references through the imports.

stream/src/jvmTest/kotlin/work/socialhub/kbsky/stream/atproto/sync/SubscribeRepoTest.kt (2)

9-11: LGTM! Import paths reflect the new package structure.

The updated import paths align with the restructured package organization, improving code organization by grouping related components more logically.


Line range hint 1-51: Verify consistent usage of new callback interfaces across the codebase.

Let's ensure that all stream-related tests have been updated to use the new callback interfaces and package structure.

✅ Verification successful

Let me check the implementation details of the other test file to ensure consistency.


Callback interfaces are consistently implemented across the codebase

The verification shows that both test files (JetStreamTest.kt and SubscribeRepoTest.kt) and their corresponding client implementations (JetStreamClient.kt and SyncStreamClient.kt) are using the callback interfaces consistently:

  • Both test files properly use the OpenedCallback interface from the shared callback package
  • Each test file correctly uses its specific event callback interface (JetStreamEventCallback and SyncEventCallback)
  • The client implementations follow the same pattern for registering and handling callbacks
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for any remaining references to old callback interfaces
rg -l "EventCallback" --type kotlin

# Check for consistent usage of new callback interfaces
rg -l "SyncEventCallback|OpenedCallback" --type kotlin

# Look for potential test files that might need similar updates
fd -e kt "Test.kt$" | grep -i "stream"

Length of output: 1492


Script:

#!/bin/bash
# Check the implementation of JetStreamTest.kt
cat stream/src/jvmTest/kotlin/work/socialhub/kbsky/stream/app/bsky/JetStreamTest.kt

# Also check the implementation of stream clients to verify callback usage
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/JetStreamClient.kt
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/SyncStreamClient.kt

Length of output: 8861

stream/src/jvmTest/kotlin/work/socialhub/kbsky/stream/app/bsky/JetStreamTest.kt (1)

1-14: LGTM!

The imports are well-organized and the class structure follows Kotlin conventions.

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_JetStreamResource.kt (2)

1-17: LGTM! Class structure follows internal implementation pattern.

The class is properly structured with appropriate dependencies and constructor parameters. The underscore prefix in the class name correctly indicates this is an internal implementation.


28-37: Validate subscription parameters.

The code allows for both wantedCollections and wantedDids to be empty, which might not be a valid subscription state. Consider adding validation to ensure at least one type of subscription is requested.

stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/com/atproto/SyncStreamClient.kt (3)

1-12: LGTM! Package restructuring improves organization

The package restructuring better aligns with ATProto's namespace conventions, making the codebase more organized and maintainable.


29-32: LGTM! Clean callback implementation

The builder pattern implementation for callbacks is clean and type-safe.


Line range hint 57-143: Refactor message handling for better maintainability and error handling

Several issues need attention in the message handling implementation:

  1. The CBOR decoding workaround needs a proper solution
  2. Error handling uses println instead of proper logging
  3. The nested try-catch structure makes the code hard to maintain

Consider these improvements:

  1. Extract CBOR decoding logic into a separate class for better maintainability
  2. Implement proper logging instead of using println
  3. Restructure the nested try-catch blocks
-                    } catch (e: Exception) {
-                        println(
-                            "[Record Deleted?]"
-                                    + " repo: " + repo
-                                    + " path: " + path
-                        )
+                    } catch (e: Exception) {
+                        errorCallback?.onError(Exception(
+                            "Failed to process record. Possibly deleted. " +
+                            "repo: $repo, path: $path", e
+                        ))
                     }

Let's check if there's a logging framework in use:

✅ Verification successful

Let's see what logging or error handling patterns are currently used in the codebase before making recommendations.


Consider using the existing error callback mechanism instead of println

The codebase already has a proper error handling mechanism through callbacks. The ErrorCallback interface is available in the stream module and is already being used in the SyncStreamClient.

Instead of using println for error logging, utilize the existing errorCallback that's already set up in this class. This maintains consistency with the rest of the codebase's error handling pattern.

-                    } catch (e: Exception) {
-                        println(
-                            "[Record Deleted?]"
-                                    + " repo: " + repo
-                                    + " path: " + path
-                        )
+                    } catch (e: Exception) {
+                        errorCallback?.onError(Exception(
+                            "Failed to process record. Possibly deleted. " +
+                            "repo: $repo, path: $path", e
+                        ))
                     }
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for logging framework usage
rg -l "import.*log" --type kotlin

Length of output: 35


Script:

#!/bin/bash
# Let's check for any logging-related code or classes in the codebase
rg -i "log|logger" --type kotlin

# Also check for any callback or error handling patterns
rg "callback|onError" --type kotlin

Length of output: 14887

Comment on lines +5 to +6
@Serializable
class Identity {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a data class with proper constructor parameters

The class appears to be a data holder but isn't utilizing Kotlin's data class features. Consider refactoring to:

 @Serializable
-class Identity {
+data class Identity(
+    var did: String,
+    var handle: String,
+    var sec: Long = 0,
+    var time: String,
+)

This would:

  • Provide equals(), hashCode(), and toString() implementations
  • Make the class immutable by default
  • Eliminate the need for lateinit
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Serializable
class Identity {
@Serializable
data class Identity(
var did: String,
var handle: String,
var sec: Long = 0,
var time: String,
)

Comment on lines 7 to 12
var jetStreamHosts = listOf(
"jetstream1.us-east.bsky.network",
"jetstream2.us-east.bsky.network",
"jetstream1.us-west.bsky.network",
"jetstream2.us-west.bsky.network",
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve configuration flexibility and validation.

The current implementation has several limitations:

  1. Hardcoded hosts limit configuration flexibility
  2. No validation for host format
  3. No support for custom ports
  4. Limited to US regions
  5. Mutable property could lead to thread safety issues in concurrent scenarios

Consider implementing a more robust configuration:

-    var jetStreamHosts = listOf(
-        "jetstream1.us-east.bsky.network",
-        "jetstream2.us-east.bsky.network",
-        "jetstream1.us-west.bsky.network",
-        "jetstream2.us-west.bsky.network",
-    )
+    private val defaultHosts = listOf(
+        "jetstream1.us-east.bsky.network",
+        "jetstream2.us-east.bsky.network",
+        "jetstream1.us-west.bsky.network",
+        "jetstream2.us-west.bsky.network",
+    )
+
+    private var _jetStreamHosts: List<String> = defaultHosts
+
+    /**
+     * List of JetStream host endpoints.
+     * @throws IllegalArgumentException if any host format is invalid
+     */
+    var jetStreamHosts: List<String>
+        get() = _jetStreamHosts
+        set(value) {
+            require(value.isNotEmpty()) { "At least one host must be provided" }
+            value.forEach { host ->
+                require(isValidHostname(host)) { "Invalid host format: $host" }
+            }
+            _jetStreamHosts = value
+        }
+
+    private fun isValidHostname(host: String): Boolean {
+        // Add hostname validation logic
+        return host.matches(Regex("^[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"))
+    }

Also consider:

  1. Adding builder pattern for fluent configuration
  2. Supporting custom ports (e.g., "host:port" format)
  3. Adding region-aware host selection
  4. Making the property thread-safe if needed

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 1 to 16
package work.socialhub.kbsky.stream.entity.app.bsky.model

import kotlinx.serialization.Serializable
import work.socialhub.kbsky.model.share.RecordUnion

@Serializable
class Commit {

lateinit var rev: String
lateinit var operation: String
lateinit var collection: String
lateinit var rkey: String

var record: RecordUnion? = null
var cid: String? = null
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the codebase analysis, I can now generate the final response:

The operation field should be constrained to 'create' or 'delete' values

The Commit class is used in the streaming API to handle repository record operations, and based on the ATProtocol types and repository operations found in the codebase, there are only two valid operation types:

  • create - corresponding to RepoCreateRecord
  • delete - corresponding to RepoDeleteRecord
  • Consider adding an enum or string constant for these operation types in Commit.kt
  • Add validation to ensure only valid operation values are accepted
🔗 Analysis chain

Verify the streaming operation types.

The class handles streaming operations but doesn't enforce valid operation types. Let's verify the supported operations in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any enum or constant definitions related to commit operations
rg -t kotlin "enum.*Operation|const.*OPERATION" 

# Search for actual usage of the operation property to understand valid values
ast-grep --pattern 'operation = "$_"'

Length of output: 89


Script:

#!/bin/bash
# Let's try a broader search for operation-related code
rg -i "operation" -A 3 -B 3 --type kotlin

# Also search for any commit-related operations
rg -i "commit.*operation|operation.*commit" --type kotlin

Length of output: 9515


Script:

#!/bin/bash
# Let's search for any files that might be using or handling the Commit class
rg -l "Commit" --type kotlin

# Also search for any repository operations or CRUD patterns that might indicate valid operations
rg -i "create.*record|delete.*record|update.*record" --type kotlin -A 2 -B 2

Length of output: 48299


Script:

#!/bin/bash
# Let's check Event.kt since it's related to Commit
cat stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Event.kt

Length of output: 479

Comment on lines +9 to +14
lateinit var did: String

@SerialName("time_us")
var timeUs: Long = 0

lateinit var kind: String
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve null safety and validation

  1. Instead of using lateinit, consider providing default values or making properties nullable if they can be absent
  2. The kind property appears to represent a finite set of values. Consider using an enum class for type safety
@Serializable
enum class EventKind {
    @SerialName("commit")
    COMMIT,
    @SerialName("identity")
    IDENTITY,
    @SerialName("account")
    ACCOUNT,
    // Add other valid kinds
    @SerialName("unknown")
    UNKNOWN
}

Then update the property:

-lateinit var kind: String
+var kind: EventKind = EventKind.UNKNOWN

Comment on lines +16 to +18
var commit: Commit? = null
var identity: Identity? = null
var account: Account? = null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for event type consistency

The relationship between kind and the optional properties (commit, identity, account) should be validated to ensure data consistency. Consider adding a validation method:

fun validate() {
    when (kind) {
        EventKind.COMMIT -> requireNotNull(commit) { "Commit event must have commit data" }
        EventKind.IDENTITY -> requireNotNull(identity) { "Identity event must have identity data" }
        EventKind.ACCOUNT -> requireNotNull(account) { "Account event must have account data" }
        EventKind.UNKNOWN -> { /* Allow any combination for unknown types */ }
    }
}

Also consider making these properties internal to prevent external modification:

-    var commit: Commit? = null
-    var identity: Identity? = null
-    var account: Account? = null
+    internal var commit: Commit? = null
+    internal var identity: Identity? = null
+    internal var account: Account? = null

val builder = URLBuilder().also { b ->

b.protocol = URLProtocol.WSS
b.host = request.host ?: config.jetStreamHosts.random()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation for host configuration.

The random host selection could throw an exception if config.jetStreamHosts is empty. Consider adding validation.

-b.host = request.host ?: config.jetStreamHosts.random()
+b.host = request.host ?: config.jetStreamHosts.takeIf { it.isNotEmpty() }
+    ?.random()
+    ?: throw IllegalStateException("No JetStream hosts configured")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
b.host = request.host ?: config.jetStreamHosts.random()
b.host = request.host ?: config.jetStreamHosts.takeIf { it.isNotEmpty() }
?.random()
?: throw IllegalStateException("No JetStream hosts configured")

Comment on lines +11 to +20
class JetStreamClient(
val uri: String,
) {
var client = WebsocketRequest()
var isOpen: Boolean = false

var eventCallback: JetStreamEventCallback? = null
private var openedCallback: OpenedCallback? = null
private var closedCallback: ClosedCallback? = null
private var errorCallback: ErrorCallback? = null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve class structure and add documentation

Consider the following improvements:

  1. Make client property private to prevent external modification
  2. Add input validation for the URI parameter
  3. Add KDoc documentation for the public API

Apply this diff:

+/**
+ * Client for handling WebSocket connections to the Jet Stream service.
+ * @property uri The WebSocket endpoint URI
+ */
 class JetStreamClient(
-    val uri: String,
+    private val uri: String,
 ) {
-    var client = WebsocketRequest()
+    private var client = WebsocketRequest()
     var isOpen: Boolean = false
+        private set
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class JetStreamClient(
val uri: String,
) {
var client = WebsocketRequest()
var isOpen: Boolean = false
var eventCallback: JetStreamEventCallback? = null
private var openedCallback: OpenedCallback? = null
private var closedCallback: ClosedCallback? = null
private var errorCallback: ErrorCallback? = null
/**
* Client for handling WebSocket connections to the Jet Stream service.
* @property uri The WebSocket endpoint URI
*/
class JetStreamClient(
private val uri: String,
) {
private var client = WebsocketRequest()
var isOpen: Boolean = false
private set
var eventCallback: JetStreamEventCallback? = null
private var openedCallback: OpenedCallback? = null
private var closedCallback: ClosedCallback? = null
private var errorCallback: ErrorCallback? = null

Comment on lines +45 to +51
suspend fun open() {
client.open()
}

fun close() {
client.close()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve connection management with proper error handling

The connection management methods should handle errors and validate connection state.

Consider this implementation:

     suspend fun open() {
+        if (isOpen) {
+            throw IllegalStateException("WebSocket is already open")
+        }
         client.open()
     }

     fun close() {
+        if (!isOpen) {
+            return
+        }
         client.close()
     }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +27 to +43
init {
this.client.url(this.uri)
this.client.bytesListener = {
this.onMessage(it)
}
this.client.textListener = {
this.onMessage(it)
}
this.client.onOpenListener = {
this.isOpen = true
this.openedCallback?.onOpened()
}
this.client.onCloseListener = {
this.isOpen = false
this.closedCallback?.onClosed()
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handler to WebSocket initialization

The errorCallback is defined but never used. Add an error listener to handle WebSocket errors properly.

Add the error handler:

     init {
         this.client.url(this.uri)
         this.client.bytesListener = {
             this.onMessage(it)
         }
         this.client.textListener = {
             this.onMessage(it)
         }
         this.client.onOpenListener = {
             this.isOpen = true
             this.openedCallback?.onOpened()
         }
         this.client.onCloseListener = {
             this.isOpen = false
             this.closedCallback?.onClosed()
         }
+        this.client.onErrorListener = { error ->
+            this.errorCallback?.onError(error)
+        }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
init {
this.client.url(this.uri)
this.client.bytesListener = {
this.onMessage(it)
}
this.client.textListener = {
this.onMessage(it)
}
this.client.onOpenListener = {
this.isOpen = true
this.openedCallback?.onOpened()
}
this.client.onCloseListener = {
this.isOpen = false
this.closedCallback?.onClosed()
}
}
init {
this.client.url(this.uri)
this.client.bytesListener = {
this.onMessage(it)
}
this.client.textListener = {
this.onMessage(it)
}
this.client.onOpenListener = {
this.isOpen = true
this.openedCallback?.onOpened()
}
this.client.onCloseListener = {
this.isOpen = false
this.closedCallback?.onClosed()
}
this.client.onErrorListener = { error ->
this.errorCallback?.onError(error)
}
}

Comment on lines 53 to 62
private fun onMessage(text: String) {
try {
eventCallback?.let {
val event = _InternalUtility.fromJson<Event>(text)
it.onEvent(event)
}
} catch (e: Exception) {
e.printStackTrace()
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in message processing

The current implementation prints exceptions to console instead of properly handling them through the error callback.

Consider this implementation:

     private fun onMessage(text: String) {
         try {
             eventCallback?.let {
                 val event = _InternalUtility.fromJson<Event>(text)
                 it.onEvent(event)
             }
         } catch (e: Exception) {
-            e.printStackTrace()
+            errorCallback?.onError(e)
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private fun onMessage(text: String) {
try {
eventCallback?.let {
val event = _InternalUtility.fromJson<Event>(text)
it.onEvent(event)
}
} catch (e: Exception) {
e.printStackTrace()
}
}
private fun onMessage(text: String) {
try {
eventCallback?.let {
val event = _InternalUtility.fromJson<Event>(text)
it.onEvent(event)
}
} catch (e: Exception) {
errorCallback?.onError(e)
}
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (4)
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamConfig.kt (1)

5-5: Add KDoc documentation for the configuration class.

Add documentation explaining the purpose of this configuration class, its relationship with JetStream, and usage examples.

+/**
+ * Configuration for Bluesky JetStream functionality.
+ * Extends the base BlueskyConfig to provide JetStream-specific settings.
+ *
+ * Example usage:
+ * ```
+ * val config = BlueskyStreamConfig().apply {
+ *     jetStreamHost = "jetstream1.us-east.bsky.network"
+ * }
+ * ```
+ */
 class BlueskyStreamConfig : BlueskyConfig() {
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/JetStreamClient.kt (3)

22-25: Add KDoc documentation for callback methods

The callback methods would benefit from documentation explaining their purpose and usage.

Add documentation like this:

+    /**
+     * Sets the callback for handling JetStream events.
+     * @param callback The callback to handle incoming events
+     * @return This client instance for method chaining
+     */
     fun eventCallback(callback: JetStreamEventCallback) = also { this.eventCallback = callback }

53-62: Add logging for message processing

Consider adding debug logging to help with troubleshooting:

  • Log received message size
  • Log parsing errors with message details

Example implementation:

     private fun onMessage(text: String) {
+        logger.debug { "Received message: ${text.length} bytes" }
         try {
             eventCallback?.let {
                 val event = _InternalUtility.fromJson<Event>(text)
                 it.onEvent(event)
             }
         } catch (e: Exception) {
+            logger.error(e) { "Failed to parse message: ${text.take(100)}..." }
             errorCallback?.onError(e)
         }
     }

11-67: Add proper resource management

The class should implement proper resource cleanup mechanisms:

  1. Make the class implement Closeable interface
  2. Add cleanup logic for WebSocket resources
  3. Consider implementing connection timeout handling

Example implementation:

-class JetStreamClient(
+class JetStreamClient(
+    private val uri: String,
+    private val connectionTimeoutMs: Long = 30000
+) : Closeable {
     // ... existing code ...

+    override fun close() {
+        try {
+            client.close()
+        } finally {
+            eventCallback = null
+            openedCallback = null
+            closedCallback = null
+            errorCallback = null
+        }
+    }

+    suspend fun waitForOpen(timeoutMs: Long = connectionTimeoutMs): Boolean {
+        val start = System.currentTimeMillis()
+        while (!isOpen && System.currentTimeMillis() - start < timeoutMs) {
+            delay(100)
+        }
+        return isOpen
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 3ba0f65 and 133263b.

📒 Files selected for processing (7)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamConfig.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamFactory.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/JetStreamClient.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Commit.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Event.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Identity.kt (1 hunks)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_JetStreamResource.kt (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Event.kt
🚧 Files skipped from review as they are similar to previous changes (4)
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamFactory.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Commit.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/model/Identity.kt
  • stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/internal/app/bsky/_JetStreamResource.kt
🔇 Additional comments (4)
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/BlueskyStreamConfig.kt (1)

7-7: 🛠️ Refactor suggestion

Enhance the jetStreamHost property implementation.

The current implementation of jetStreamHost has several concerns that were previously identified. Additionally:

  1. The nullable property could lead to runtime issues
  2. There's no documentation explaining when null is acceptable
  3. Consider using a default value instead of null
-    var jetStreamHost: String? = null
+    /**
+     * The JetStream host endpoint to connect to.
+     * If not set, a random host from [defaultJetStreamHosts] will be used.
+     * @throws IllegalArgumentException if the host format is invalid
+     */
+    var jetStreamHost: String? = null
+        set(value) {
+            field = value?.also { host ->
+                require(isValidHostname(host)) { "Invalid host format: $host" }
+            }
+        }
stream/src/commonMain/kotlin/work/socialhub/kbsky/stream/entity/app/bsky/JetStreamClient.kt (3)

11-15: Improve encapsulation of class properties

The class exposes mutable properties that should be private to prevent external modification.

Apply this diff:

 class JetStreamClient(
-    val uri: String,
+    private val uri: String,
 ) {
-    var client = WebsocketRequest()
-    var isOpen: Boolean = false
+    private var client = WebsocketRequest()
+    private var isOpen: Boolean = false
+        get() = field

27-43: Add error handler to WebSocket initialization

The error callback is defined but not used in the WebSocket initialization.

Add the error handler:

     init {
         this.client.url(this.uri)
         this.client.bytesListener = {
             this.onMessage(it)
         }
         this.client.textListener = {
             this.onMessage(it)
         }
         this.client.onOpenListener = {
             this.isOpen = true
             this.openedCallback?.onOpened()
         }
         this.client.onCloseListener = {
             this.isOpen = false
             this.closedCallback?.onClosed()
         }
+        this.client.onErrorListener = { error ->
+            this.errorCallback?.onError(error)
+        }
     }

45-51: Improve connection state management

The connection methods should validate the connection state to prevent invalid operations.

Apply this improvement:

     suspend fun open() {
+        if (isOpen) {
+            throw IllegalStateException("WebSocket is already open")
+        }
         client.open()
     }

     fun close() {
+        if (!isOpen) {
+            return
+        }
         client.close()
     }

Comment on lines +9 to +16
companion object {
val defaultJetStreamHosts = listOf(
"jetstream1.us-east.bsky.network",
"jetstream2.us-east.bsky.network",
"jetstream1.us-west.bsky.network",
"jetstream2.us-west.bsky.network",
)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Consider adding host selection and fallback mechanisms

The verification reveals that while basic reconnection logic exists (JetStreamSubscribeRequest.kt), the codebase lacks comprehensive host management features. The current implementation:

  1. Only references these hosts in BlueskyStreamConfig.kt without additional host management logic
  2. Has basic reconnection support but no host-level fallback mechanism
  3. Lacks health checks for host availability

Consider implementing:

companion object {
    /**
     * Default JetStream hosts provided by Bluesky.
     * These hosts are located in US regions and may not be optimal for all users.
     * Consider configuring region-specific hosts for better performance.
     * 
     * Note: These are the official Bluesky JetStream endpoints. For production use,
     * implement host selection and health check mechanisms to ensure reliability.
     */
    val defaultJetStreamHosts = listOf(
        "jetstream1.us-east.bsky.network",
        "jetstream2.us-east.bsky.network",
        "jetstream1.us-west.bsky.network",
        "jetstream2.us-west.bsky.network",
    )
}
🔗 Analysis chain

Enhance default host management and documentation.

The current implementation of default hosts has several limitations:

  1. Limited to US regions which might impact latency for international users
  2. No mechanism to verify host availability
  3. Missing documentation about host selection strategy

Consider implementing a host selection strategy that:

  1. Supports regional endpoints
  2. Includes health checks
  3. Implements fallback mechanisms

Also, add documentation:

     companion object {
+        /**
+         * Default JetStream hosts provided by Bluesky.
+         * These hosts are located in US regions and may not be optimal for all users.
+         * Consider configuring region-specific hosts for better performance.
+         */
         val defaultJetStreamHosts = listOf(

Let's verify if these hosts are documented in the Bluesky documentation:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for documentation about JetStream hosts
# Expected: Find official documentation about available JetStream endpoints

# Search for any documentation files
fd -e md -e txt . | xargs rg -i "jetstream.*\.bsky\.network" -A 5

Length of output: 67


Script:

#!/bin/bash
# Let's search for any documentation or code references about these JetStream hosts
rg -i "jetstream.*\.bsky\.network" -A 3

# Also search for any configuration or documentation about host selection
rg -i "host.*selection|endpoint.*selection|region.*host" -A 3

# Look for any health check implementations
ast-grep --pattern 'fun $_(health|check|alive|status)$_($$$) { $$$ }'

# Search for any fallback or retry mechanisms
rg -i "fallback|retry|reconnect" -A 3

Length of output: 3192

Comment on lines +64 to +66
private fun onMessage(data: ByteArray) {
// TODO: zstd の場合はこちらで処理することになる
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement zstd decompression for binary messages

The TODO comment indicates that zstd compression support is needed. This is a critical feature for JetStream as it's commonly used for efficient data transfer.

Would you like me to help implement the zstd decompression logic? This would involve:

  1. Adding a zstd decompression library dependency
  2. Implementing the binary message handler
  3. Adding unit tests for compressed message handling

@uakihir0 uakihir0 merged commit 1fc67db into main Nov 30, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for JetStream
1 participant