From d20693cef78576d45b40bcf689745f0d3997678d Mon Sep 17 00:00:00 2001 From: dyl Date: Thu, 21 Nov 2024 06:23:17 +0900 Subject: [PATCH 1/3] Handle ErrClientNotActivated and ErrClientNotFound --- .../kotlin/dev/yorkie/core/ClientTest.kt | 31 ++++++- .../kotlin/dev/yorkie/api/ElementConverter.kt | 4 +- .../dev/yorkie/api/OperationConverter.kt | 2 +- .../dev/yorkie/api/PresenceConverter.kt | 2 +- .../src/main/kotlin/dev/yorkie/core/Client.kt | 90 ++++++++++++++----- .../dev/yorkie/util/ConnectRpcErrorUtil.kt | 37 +++++++- .../kotlin/dev/yorkie/util/YorkieException.kt | 22 ++--- .../test/kotlin/dev/yorkie/core/ClientTest.kt | 26 ++++-- 8 files changed, 166 insertions(+), 48 deletions(-) diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt index 4eb5a82f..da338e08 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt @@ -21,7 +21,9 @@ import dev.yorkie.document.json.TreeBuilder.element import dev.yorkie.document.json.TreeBuilder.text import dev.yorkie.document.operation.OperationInfo import dev.yorkie.util.YorkieException -import dev.yorkie.util.YorkieException.Code.* +import dev.yorkie.util.YorkieException.Code.ErrClientNotActivated +import dev.yorkie.util.YorkieException.Code.ErrDocumentNotAttached +import dev.yorkie.util.YorkieException.Code.ErrDocumentNotDetached import java.util.UUID import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -45,6 +47,33 @@ import org.junit.runner.RunWith @RunWith(AndroidJUnit4::class) class ClientTest { + @Test + fun can_attach_and_detach_document() { + runBlocking { + val c1 = createClient() + val key = UUID.randomUUID().toString().toDocKey() + val d1 = Document(key) + c1.activateAsync().await() + + c1.attachAsync(d1).await() + // attach again after attached + val exception = assertFailsWith(YorkieException::class) { + c1.attachAsync(d1).await() + } + assertEquals(ErrDocumentNotDetached, exception.code) + + c1.detachAsync(d1).await() + // detach again after detached + val exception2 = assertFailsWith(YorkieException::class) { + c1.detachAsync(d1).await() + } + assertEquals(ErrDocumentNotAttached, exception2.code) + + c1.deactivateAsync().await() + c1.close() + } + } + @Test fun test_multiple_clients_working_on_same_document() { runBlocking { diff --git a/yorkie/src/main/kotlin/dev/yorkie/api/ElementConverter.kt b/yorkie/src/main/kotlin/dev/yorkie/api/ElementConverter.kt index 60c7ac51..501696b7 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/api/ElementConverter.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/api/ElementConverter.kt @@ -175,7 +175,7 @@ internal fun PBValueType.toPrimitiveType(): CrdtPrimitive.Type { PBValueType.VALUE_TYPE_STRING -> CrdtPrimitive.Type.String PBValueType.VALUE_TYPE_BYTES -> CrdtPrimitive.Type.Bytes PBValueType.VALUE_TYPE_DATE -> CrdtPrimitive.Type.Date - else -> throw YorkieException(ErrUnimplemented, "unimplemented type : $this") + else -> throw YorkieException(ErrUnimplemented, "unimplemented value type : $this") } } @@ -547,7 +547,7 @@ internal fun PBJsonElementSimple.toCrdtElement(): CrdtElement { PBValueType.VALUE_TYPE_TREE -> value.toCrdtTree() - else -> throw YorkieException(ErrUnimplemented, "unimplemented type : $this") + else -> throw YorkieException(ErrUnimplemented, "unimplemented element : $this") } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/api/OperationConverter.kt b/yorkie/src/main/kotlin/dev/yorkie/api/OperationConverter.kt index 5a9af916..27ffb243 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/api/OperationConverter.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/api/OperationConverter.kt @@ -115,7 +115,7 @@ internal fun List.toOperations(): List { .associate { (key, value) -> ActorID(key) to value.toTimeTicket() }, ) - else -> throw YorkieException(ErrUnimplemented, "unimplemented operation") + else -> throw YorkieException(ErrUnimplemented, "unimplemented operations") } } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/api/PresenceConverter.kt b/yorkie/src/main/kotlin/dev/yorkie/api/PresenceConverter.kt index 539840fa..db19de09 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/api/PresenceConverter.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/api/PresenceConverter.kt @@ -41,7 +41,7 @@ internal fun PBPresenceChange.toPresenceChange(): PresenceChange { return when (type) { PBPresenceChangeType.CHANGE_TYPE_PUT -> PresenceChange.Put(presence.toPresence()) PBPresenceChangeType.CHANGE_TYPE_CLEAR -> PresenceChange.Clear - else -> throw YorkieException(Unsupported, "unsupported type : $type") + else -> throw YorkieException(ErrUnimplemented, "Unimplemented type: $type") } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index 063d3b87..758ea58c 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -44,7 +44,7 @@ import dev.yorkie.util.YorkieException.Code.ErrDocumentNotAttached import dev.yorkie.util.YorkieException.Code.ErrDocumentNotDetached import dev.yorkie.util.checkYorkieError import dev.yorkie.util.createSingleThreadDispatcher -import dev.yorkie.util.isRetryable +import dev.yorkie.util.handleConnectException import java.io.Closeable import java.io.InterruptedIOException import java.util.UUID @@ -185,6 +185,9 @@ public class Client @VisibleForTesting internal constructor( projectBasedRequestHeader, ).getOrElse { ensureActive() + handleConnectException(it) { + deactivateInternal() + } return@async Result.failure(it) } _status.emit(Status.Activated(ActorID(activateResponse.clientId))) @@ -210,7 +213,7 @@ public class Client @VisibleForTesting internal constructor( if (result.isSuccess) { conditions[ClientCondition.SYNC_LOOP] = true SyncStatusChanged.Synced - } else if (isRetryable(result.exceptionOrNull() as? ConnectException)) { + } else if (handleConnectException(result.exceptionOrNull() as? ConnectException)) { // check if the exception is retryable conditions[ClientCondition.SYNC_LOOP] = true SyncStatusChanged.SyncFailed(result.exceptionOrNull()) } else { @@ -231,12 +234,18 @@ public class Client @VisibleForTesting internal constructor( */ public fun syncAsync(document: Document? = null): Deferred { return scope.async { - checkYorkieError(isActive, YorkieException(ErrClientNotActivated, "client is not active")) + checkYorkieError( + isActive, + YorkieException(ErrClientNotActivated, "client is not active"), + ) var failure: Throwable? = null val attachments = document?.let { val attachment = attachments.value[it.key]?.copy(syncMode = SyncMode.Realtime) - ?: throw YorkieException(ErrDocumentNotAttached, "document(${document.key}) is not attached") + ?: throw YorkieException( + ErrDocumentNotAttached, + "document(${document.key}) is not attached", + ) listOf(AttachmentEntry(it.key, attachment)) } ?: attachments.value.entries @@ -312,6 +321,11 @@ public class Client @VisibleForTesting internal constructor( } }.onFailure { coroutineContext.ensureActive() + (it as? ConnectException)?.let { exception -> + handleConnectException(exception) { + deactivateInternal() + } + } }, ) } @@ -376,7 +390,8 @@ public class Client @VisibleForTesting internal constructor( stream.safeClose() return@onFailure } - shouldContinue = handleWatchStreamFailure(attachment.document, stream, it) + shouldContinue = + handleWatchStreamFailure(attachment.document, stream, it) }.onClosed { handleWatchStreamFailure( attachment.document, @@ -426,7 +441,7 @@ public class Client @VisibleForTesting internal constructor( cause?.let(::sendWatchStreamException) - if (isRetryable(cause as? ConnectException)) { + if (handleConnectException(cause as? ConnectException)) { coroutineContext.ensureActive() delay(options.reconnectStreamDelay.inWholeMilliseconds) return true @@ -548,7 +563,10 @@ public class Client @VisibleForTesting internal constructor( syncMode: SyncMode = SyncMode.Realtime, ): Deferred { return scope.async { - checkYorkieError(isActive, YorkieException(ErrClientNotActivated, "client is not active")) + checkYorkieError( + isActive, + YorkieException(ErrClientNotActivated, "client is not active"), + ) checkYorkieError( document.status == DocumentStatus.Detached, @@ -571,6 +589,9 @@ public class Client @VisibleForTesting internal constructor( document.key.documentBasedRequestHeader, ).getOrElse { ensureActive() + handleConnectException(it) { + deactivateInternal() + } return@async Result.failure(it) } val pack = response.changePack.toChangePack() @@ -602,11 +623,17 @@ public class Client @VisibleForTesting internal constructor( */ public fun detachAsync(document: Document): Deferred { return scope.async { - checkYorkieError(isActive, YorkieException(ErrClientNotActivated, "client is not active")) + checkYorkieError( + isActive, + YorkieException(ErrClientNotActivated, "client is not active"), + ) document.mutex.withLock { val attachment = attachments.value[document.key] - ?: throw YorkieException(ErrDocumentNotAttached, "document(${document.key}) is not attached") + ?: throw YorkieException( + ErrDocumentNotAttached, + "document(${document.key}) is not attached", + ) document.updateAsync { _, presence -> presence.clear() @@ -622,6 +649,9 @@ public class Client @VisibleForTesting internal constructor( document.key.documentBasedRequestHeader, ).getOrElse { ensureActive() + handleConnectException(it) { + deactivateInternal() + } return@async Result.failure(it) } val pack = response.changePack.toChangePack() @@ -653,30 +683,42 @@ public class Client @VisibleForTesting internal constructor( projectBasedRequestHeader, ).getOrElse { ensureActive() + handleConnectException(it) { + deactivateInternal() + } return@async Result.failure(it) } - attachments.value.values.forEach { - detachAsync(it.document).await() - it.document.applyDocumentStatus(DocumentStatus.Detached) - } - - _status.emit(Status.Deactivated) - + deactivateInternal() SUCCESS } } + private suspend fun deactivateInternal() { + attachments.value.values.forEach { + detachAsync(it.document).await() + it.document.applyDocumentStatus(DocumentStatus.Detached) + } + + _status.emit(Status.Deactivated) + } + /** * Removes the given [document]. */ public fun removeAsync(document: Document): Deferred { return scope.async { - checkYorkieError(isActive, YorkieException(ErrClientNotActivated, "client is not active")) + checkYorkieError( + isActive, + YorkieException(ErrClientNotActivated, "client is not active"), + ) document.mutex.withLock { val attachment = attachments.value[document.key] - ?: throw YorkieException(ErrDocumentNotAttached, "document(${document.key}) is not attached") + ?: throw YorkieException( + ErrDocumentNotAttached, + "document(${document.key}) is not attached", + ) val request = removeDocumentRequest { clientId = requireClientId().value @@ -707,7 +749,12 @@ public class Client @VisibleForTesting internal constructor( } } - public fun requireClientId() = (status.value as Status.Activated).clientId + public fun requireClientId(): ActorID { + if (status.value is Status.Deactivated) { + throw YorkieException(ErrClientNotActivated, "client is not active") + } + return (status.value as Status.Activated).clientId + } /** * Changes the sync mode of the [document]. @@ -716,7 +763,10 @@ public class Client @VisibleForTesting internal constructor( checkYorkieError(isActive, YorkieException(ErrClientNotActivated, "client is not active")) val attachment = attachments.value[document.key] - ?: throw YorkieException(ErrDocumentNotAttached, "document(${document.key}) is not attached") + ?: throw YorkieException( + ErrDocumentNotAttached, + "document(${document.key}) is not attached", + ) attachments.value += document.key to if (syncMode == SyncMode.Realtime) { attachment.copy(syncMode = syncMode, remoteChangeEventReceived = true) } else { diff --git a/yorkie/src/main/kotlin/dev/yorkie/util/ConnectRpcErrorUtil.kt b/yorkie/src/main/kotlin/dev/yorkie/util/ConnectRpcErrorUtil.kt index d5aad517..ce56da75 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/util/ConnectRpcErrorUtil.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/util/ConnectRpcErrorUtil.kt @@ -1,16 +1,45 @@ package dev.yorkie.util import com.connectrpc.Code -import com.connectrpc.ConnectErrorDetail import com.connectrpc.ConnectException +import com.google.rpc.ErrorInfo +import dev.yorkie.util.YorkieException.Code.ErrClientNotActivated +import dev.yorkie.util.YorkieException.Code.ErrClientNotFound /** - * [isRetryable] will return true if the given error is retryable. + * [handleConnectException] will return true if the given error is retryable. + * + * If caller want to handle the error about [ErrClientNotActivated] or [ErrClientNotFound], + * then pass the lambda function to [handleError]. */ -public fun isRetryable(exception: ConnectException?): Boolean { +public suspend fun handleConnectException( + exception: ConnectException?, + handleError: (suspend () -> Unit)? = null, +): Boolean { val errorCode = exception?.code ?: return false - return errorCode == Code.CANCELED || + + if (errorCode == Code.CANCELED || errorCode == Code.UNKNOWN || errorCode == Code.RESOURCE_EXHAUSTED || errorCode == Code.UNAVAILABLE + ) { + return true + } + + val yorkieErrorCode = errorCodeOf(exception) + if (yorkieErrorCode == ErrClientNotActivated.codeString || + yorkieErrorCode == ErrClientNotFound.codeString + ) { + print("Yorkie error: $yorkieErrorCode") + handleError?.invoke() + } + return false +} + +private fun errorCodeOf(exception: ConnectException): String { + val infos = exception.unpackedDetails(ErrorInfo::class) + for (info in infos) { + info.metadataMap["code"]?.let { return it } + } + return "" } diff --git a/yorkie/src/main/kotlin/dev/yorkie/util/YorkieException.kt b/yorkie/src/main/kotlin/dev/yorkie/util/YorkieException.kt index 6aa901c6..2e8ab75c 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/util/YorkieException.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/util/YorkieException.kt @@ -7,36 +7,36 @@ import kotlin.contracts.contract * `YorkieError` is an error returned by a Yorkie operation. */ public data class YorkieException(public val code: Code, public val errorMessage: String) : RuntimeException(errorMessage) { - public enum class Code { + public enum class Code(val codeString: String) { // Ok is returned when the operation completed successfully. - Ok, + Ok("ok"), // ErrClientNotActivated is returned when the client is not active. - ErrClientNotActivated, + ErrClientNotActivated("ErrClientNotActivated"), // ErrClientNotFound is returned when the client is not found. - ErrClientNotFound, + ErrClientNotFound("ErrClientNotFound"), // ErrUnimplemented is returned when the operation is not implemented. - ErrUnimplemented, + ErrUnimplemented("ErrUnimplemented"), // Unsupported is returned when the operation is not supported. - Unsupported, + Unsupported("Unsupported"), // ErrDocumentNotAttached is returned when the document is not attached. - ErrDocumentNotAttached, + ErrDocumentNotAttached("ErrDocumentNotAttached"), // ErrDocumentNotDetached is returned when the document is not detached. - ErrDocumentNotDetached, + ErrDocumentNotDetached("ErrDocumentNotDetached"), // ErrDocumentRemoved is returned when the document is removed. - ErrDocumentRemoved, + ErrDocumentRemoved("ErrDocumentRemoved"), // InvalidObjectKey is returned when the object key is invalid. - ErrInvalidObjectKey, + ErrInvalidObjectKey("ErrInvalidObjectKey"), // ErrInvalidArgument is returned when the argument is invalid. - ErrInvalidArgument, + ErrInvalidArgument("ErrInvalidArgument"); } } diff --git a/yorkie/src/test/kotlin/dev/yorkie/core/ClientTest.kt b/yorkie/src/test/kotlin/dev/yorkie/core/ClientTest.kt index e9ccbc12..483302e3 100644 --- a/yorkie/src/test/kotlin/dev/yorkie/core/ClientTest.kt +++ b/yorkie/src/test/kotlin/dev/yorkie/core/ClientTest.kt @@ -31,9 +31,12 @@ import dev.yorkie.document.change.CheckPoint import dev.yorkie.document.json.JsonText import dev.yorkie.document.presence.PresenceChange import dev.yorkie.document.time.ActorID +import dev.yorkie.util.YorkieException +import dev.yorkie.util.YorkieException.Code.ErrDocumentNotAttached import dev.yorkie.util.createSingleThreadDispatcher -import dev.yorkie.util.isRetryable +import dev.yorkie.util.handleConnectException import kotlin.test.assertEquals +import kotlin.test.assertFailsWith import kotlin.test.assertFalse import kotlin.test.assertIs import kotlin.test.assertTrue @@ -154,7 +157,6 @@ class ClientTest { val document = Document(Key(WATCH_SYNC_ERROR_DOCUMENT_KEY)) target.activateAsync().await() target.attachAsync(document).await() - val syncEventDeferred = async(start = CoroutineStart.UNDISPATCHED) { document.events.filterIsInstance().first() } @@ -182,11 +184,15 @@ class ClientTest { assertTrue(target.syncAsync().await().isSuccess) target.detachAsync(success).await() - val failing = Document(Key(WATCH_SYNC_ERROR_DOCUMENT_KEY)) - target.attachAsync(failing).await() - assertFalse(target.syncAsync().await().isSuccess) + val failing = Document(Key(ATTACH_ERROR_DOCUMENT_KEY)) + assertTrue(target.attachAsync(failing).await().isFailure) + + val exception = assertFailsWith(YorkieException::class) { + target.detachAsync(failing).await() + } + assertEquals(ErrDocumentNotAttached, exception.code) + assertTrue(target.syncAsync().await().isSuccess) - target.detachAsync(failing).await() target.deactivateAsync().await() } @@ -260,7 +266,11 @@ class ClientTest { val client = Client( yorkieService, - Client.Options(key = TEST_KEY, apiKey = TEST_KEY, syncLoopDuration = 500.milliseconds), + Client.Options( + key = TEST_KEY, + apiKey = TEST_KEY, + syncLoopDuration = 500.milliseconds, + ), createSingleThreadDispatcher("Client Test"), OkHttpClient(), OkHttpClient(), @@ -314,7 +324,7 @@ class ClientTest { // 02. Simulate FailedPrecondition error which is not retryable. Code.entries.filterNot { errorCode -> - isRetryable(ConnectException(errorCode)) + handleConnectException(ConnectException(errorCode)) }.forEach { nonRetryableErrorCode -> mockYorkieService.customError[WATCH_SYNC_ERROR_DOCUMENT_KEY] = nonRetryableErrorCode document.updateAsync { root, _ -> From bc2f17e7be859827daff6ed148215fa70a8f6761 Mon Sep 17 00:00:00 2001 From: dyl Date: Thu, 21 Nov 2024 06:33:08 +0900 Subject: [PATCH 2/3] Added proto file. --- .../proto/com/google/rpc/error_details.proto | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 yorkie/proto/com/google/rpc/error_details.proto diff --git a/yorkie/proto/com/google/rpc/error_details.proto b/yorkie/proto/com/google/rpc/error_details.proto new file mode 100644 index 00000000..1d4bf2d5 --- /dev/null +++ b/yorkie/proto/com/google/rpc/error_details.proto @@ -0,0 +1,50 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.rpc; + +import "google/protobuf/duration.proto"; + +option go_package = "google.golang.org/genproto/googleapis/rpc/errdetails;errdetails"; +option java_multiple_files = true; +option java_outer_classname = "ErrorDetailsProto"; +option java_package = "com.google.rpc"; +option objc_class_prefix = "RPC"; + +message ErrorInfo { + // The reason of the error. This is a constant value that identifies the + // proximate cause of the error. Error reasons are unique within a particular + // domain of errors. This should be at most 63 characters and match + // /[A-Z0-9_]+/. + string reason = 1; + + // The logical grouping to which the "reason" belongs. Often "domain" will + // contain the registered service name of the tool or product that is the + // source of the error. Example: "pubsub.googleapis.com". If the error is + // common across many APIs, the first segment of the example above will be + // omitted. The value will be, "googleapis.com". + string domain = 2; + + // Additional structured details about this error. + // + // Keys should match /[a-zA-Z0-9-_]/ and be limited to 64 characters in + // length. When identifying the current value of an exceeded limit, the units + // should be contained in the key, not the value. For example, rather than + // {"instanceLimit": "100/request"}, should be returned as, + // {"instanceLimitPerRequest": "100"}, if the client exceeds the number of + // instances that can be created in a single (batch) request. + map metadata = 3; +} From 34a41459c80be9ad89860a98b1ebb60b77e3f2ee Mon Sep 17 00:00:00 2001 From: dyl Date: Thu, 5 Dec 2024 01:32:48 +0900 Subject: [PATCH 3/3] Applied suggestion --- yorkie/src/main/kotlin/dev/yorkie/util/ConnectRpcErrorUtil.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/yorkie/src/main/kotlin/dev/yorkie/util/ConnectRpcErrorUtil.kt b/yorkie/src/main/kotlin/dev/yorkie/util/ConnectRpcErrorUtil.kt index ce56da75..94c008c6 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/util/ConnectRpcErrorUtil.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/util/ConnectRpcErrorUtil.kt @@ -12,7 +12,7 @@ import dev.yorkie.util.YorkieException.Code.ErrClientNotFound * If caller want to handle the error about [ErrClientNotActivated] or [ErrClientNotFound], * then pass the lambda function to [handleError]. */ -public suspend fun handleConnectException( +internal suspend fun handleConnectException( exception: ConnectException?, handleError: (suspend () -> Unit)? = null, ): Boolean { @@ -30,7 +30,6 @@ public suspend fun handleConnectException( if (yorkieErrorCode == ErrClientNotActivated.codeString || yorkieErrorCode == ErrClientNotFound.codeString ) { - print("Yorkie error: $yorkieErrorCode") handleError?.invoke() } return false