diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index 9141ad3d29cf7..27cd4a2d2f60b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -42,6 +42,7 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -252,14 +253,16 @@ protected CompletableFuture storePut( } private CompletionStage convertException(Throwable ex) { - if (ex.getCause() instanceof UnexpectedVersionIdException - || ex.getCause() instanceof KeyAlreadyExistsException) { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof UnexpectedVersionIdException || actEx instanceof KeyAlreadyExistsException) { return CompletableFuture.failedFuture( - new MetadataStoreException.BadVersionException(ex.getCause())); - } else if (ex.getCause() instanceof IllegalStateException) { - return CompletableFuture.failedFuture(new MetadataStoreException.AlreadyClosedException(ex.getCause())); + new MetadataStoreException.BadVersionException(actEx)); + } else if (actEx instanceof IllegalStateException) { + return CompletableFuture.failedFuture(new MetadataStoreException.AlreadyClosedException(actEx)); + } else if (actEx instanceof MetadataStoreException) { + return CompletableFuture.failedFuture(actEx); } else { - return CompletableFuture.failedFuture(ex.getCause()); + return CompletableFuture.failedFuture(new MetadataStoreException(actEx)); } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaMetadataStoreErrorTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaMetadataStoreErrorTest.java new file mode 100644 index 0000000000000..b45afe5413fc1 --- /dev/null +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaMetadataStoreErrorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.MetadataStoreFactory; +import org.testng.annotations.Test; + +@Slf4j +public class OxiaMetadataStoreErrorTest extends BaseMetadataStoreTest { + + @Test + public void emptyStoreTest() throws Exception { + String metadataStoreUrl = "oxia://" + getOxiaServerConnectString(); + String prefix = newKey(); + @Cleanup + MetadataStore store = MetadataStoreFactory.create(metadataStoreUrl, + MetadataStoreConfig.builder().fsyncEnable(false).build()); + oxiaServer.close(); + try { + store.exists(prefix + "/non-existing-key").join(); + fail("Expected an exception because the metadata store server has been closed."); + } catch (Exception ex) { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + assertTrue(actEx instanceof MetadataStoreException); + } + } +}