diff --git a/pulsar-package-management/core/pom.xml b/pulsar-package-management/core/pom.xml
index e2c2eb2fac882..3c4af214eaa48 100644
--- a/pulsar-package-management/core/pom.xml
+++ b/pulsar-package-management/core/pom.xml
@@ -42,5 +42,10 @@
com.google.code.gson
gson
+
+
+ org.apache.commons
+ commons-lang3
+
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesManagement.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesManagement.java
index 2acb9db9e89d1..ce484525da325 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesManagement.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesManagement.java
@@ -35,9 +35,8 @@ public interface PackagesManagement {
*
* @param storage
* the storage used to saving packages
- * @return
*/
- CompletableFuture initialize(PackagesStorage storage);
+ void initialize(PackagesStorage storage);
/**
* Get the metadata of a package.
@@ -94,7 +93,7 @@ public interface PackagesManagement {
* @return
* all the versions of the specified package
*/
- CompletableFuture> list(PackageName packageName);
+ CompletableFuture> list(PackageName packageName);
/**
* List all the packages with the type of a namespace.
@@ -103,7 +102,7 @@ public interface PackagesManagement {
* @param tenant the tenant name
* @param namespace the namespace name
* @return
- * the packages under the specified namespace
+ * the packages name under the specified namespace
*/
- CompletableFuture> list(PackageType type, String tenant, String namespace);
+ CompletableFuture> list(PackageType type, String tenant, String namespace);
}
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageMetadata.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageMetadata.java
index 32f8f8a52dcd8..ac5cf6a788d1a 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageMetadata.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageMetadata.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.packages.management.core.common;
+import java.io.Serializable;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -26,6 +27,8 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException.MetadataFormatException;
/**
* Package metadata.
@@ -36,10 +39,26 @@
@NoArgsConstructor
@Setter
@Getter
-public class PackageMetadata {
+public class PackageMetadata implements Serializable {
String description;
String contact;
long createTime;
long modificationTime;
Map properties;
+
+ public static PackageMetadata fromBytes(byte[] bytes) throws MetadataFormatException {
+ try {
+ Object o = SerializationUtils.deserialize(bytes);
+ if (!(o instanceof PackageMetadata)) {
+ throw new MetadataFormatException("Unexpected metadata format");
+ }
+ return (PackageMetadata) o;
+ } catch (Exception e) {
+ throw new MetadataFormatException("Unexpected error", e);
+ }
+ }
+
+ public byte[] toBytes() {
+ return SerializationUtils.serialize(this);
+ }
}
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/exceptions/PackagesManagementException.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/exceptions/PackagesManagementException.java
new file mode 100644
index 0000000000000..a9d80ddb78193
--- /dev/null
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/exceptions/PackagesManagementException.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.core.exceptions;
+
+/**
+ * Packages management related exceptions.
+ */
+public class PackagesManagementException extends Exception {
+ /**
+ * Constructs an {@code PackagesManagementException} with the specified cause.
+ *
+ * @param throwable
+ * The cause
+ */
+ public PackagesManagementException(Throwable throwable) {
+ super(throwable);
+ }
+
+ /**
+ * Constructs an {@code PackagesManagementException} with the specified detail message.
+ *
+ * @param message
+ * The detail message
+ */
+ public PackagesManagementException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an {@code PackagesManagementException} with the specified detail message and the cause.
+ *
+ * @param message
+ * The detail message
+ * @param throwable
+ * The cause
+ */
+ public PackagesManagementException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+
+
+ public static class NotFoundException extends PackagesManagementException {
+ /**
+ * Constructs an {@code NotFoundException} with the specified cause.
+ *
+ * @param throwable
+ * The cause
+ */
+ public NotFoundException(Throwable throwable) {
+ super(throwable);
+ }
+
+ /**
+ * Constructs an {@code NotFoundException} with the specified detail message.
+ *
+ * @param message
+ * The detail message
+ */
+ public NotFoundException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an {@code NotFoundException} with the specified detail message and the cause.
+ *
+ * @param message
+ * The detail message
+ * @param throwable
+ * The cause
+ */
+ public NotFoundException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+ }
+
+ public static class MetadataFormatException extends PackagesManagementException {
+ /**
+ * Constructs an {@code MetadataFormatException} with the specified detail message.
+ *
+ * @param message
+ * The detail message
+ */
+ public MetadataFormatException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an {@code MetadataFormatException} with the specified detail message and the cause.
+ *
+ * @param message
+ * The detail message
+ * @param throwable
+ * The cause
+ */
+ public MetadataFormatException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+ }
+}
+
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/exceptions/package-info.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/exceptions/package-info.java
new file mode 100644
index 0000000000000..c68db979453a1
--- /dev/null
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/exceptions/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Exceptions of the packages management service thrown.
+ */
+package org.apache.pulsar.packages.management.core.exceptions;
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java
new file mode 100644
index 0000000000000..c668fb45c8151
--- /dev/null
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.core.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.packages.management.core.PackagesManagement;
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.common.PackageMetadata;
+import org.apache.pulsar.packages.management.core.common.PackageName;
+import org.apache.pulsar.packages.management.core.common.PackageType;
+import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
+import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException.NotFoundException;
+
+/**
+ * Packages management implementation.
+ */
+public class PackagesManagementImpl implements PackagesManagement {
+
+ private PackagesStorage storage;
+
+ @Override
+ public void initialize(PackagesStorage storage) {
+ this.storage = storage;
+ }
+
+ @Override
+ public CompletableFuture getMeta(PackageName packageName) {
+ CompletableFuture future = new CompletableFuture<>();
+ String metadataPath = metadataPath(packageName);
+ checkMetadataNotExistsAndThrowException(packageName)
+ .whenComplete((ignore, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ storage.readAsync(metadataPath, outputStream)
+ .thenCompose(aVoid -> metadataReadFromStream(outputStream))
+ .whenComplete((metadata, t) -> {
+ if (t != null) {
+ future.completeExceptionally(t);
+ } else {
+ future.complete(metadata);
+ }
+ });
+ } catch (IOException e) {
+ future.completeExceptionally(new PackagesManagementException(
+ String.format("Read package '%s' metadata failed", packageName.toString()), e));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture updateMeta(PackageName packageName, PackageMetadata metadata) {
+ CompletableFuture future = new CompletableFuture<>();
+ String metadataPath = metadataPath(packageName);
+ checkMetadataNotExistsAndThrowException(packageName)
+ .whenComplete((ignore, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+ try (ByteArrayInputStream inputStream = new ByteArrayInputStream(metadata.toBytes())) {
+ storage.deleteAsync(metadataPath)
+ .thenCompose(aVoid -> storage.writeAsync(metadataPath, inputStream))
+ .whenComplete((aVoid, t) -> {
+ if (t != null) {
+ future.completeExceptionally(new PackagesManagementException(
+ String.format("Update package '%s' metadata failed", packageName.toString()), t));
+ } else {
+ future.complete(null);
+ }
+ });
+ } catch (IOException e) {
+ future.completeExceptionally(new PackagesManagementException(
+ String.format("Read package '%s' metadata failed", packageName.toString()), e));
+ }
+ });
+ return future;
+ }
+
+ private CompletableFuture writeMeta(PackageName packageName, PackageMetadata metadata) {
+ CompletableFuture future = new CompletableFuture<>();
+ String metadataPath = metadataPath(packageName);
+ try (ByteArrayInputStream inputStream = new ByteArrayInputStream(metadata.toBytes())) {
+ storage.writeAsync(metadataPath, inputStream)
+ .whenComplete((aVoid, t) -> {
+ if (t != null) {
+ future.completeExceptionally(new PackagesManagementException(
+ String.format("Update package '%s' metadata failed", packageName.toString()), t));
+ } else {
+ future.complete(null);
+ }
+ });
+ } catch (IOException e) {
+ future.completeExceptionally(new PackagesManagementException(
+ String.format("Read package '%s' metadata failed", packageName.toString()), e));
+ }
+ return future;
+ }
+
+ @Override
+ public CompletableFuture download(PackageName packageName, OutputStream outputStream) {
+ String packagePath = packagePath(packageName);
+ return checkPackageNotExistsAndThrowException(packageName)
+ .thenCompose(ignore -> storage.readAsync(packagePath, outputStream));
+ }
+
+ @Override
+ public CompletableFuture upload(PackageName packageName, PackageMetadata metadata, InputStream inputStream) {
+ return CompletableFuture.allOf(
+ checkMetadataExistsAndThrowException(packageName),
+ checkPackageExistsAndThrowException(packageName)
+ ).thenCompose(ignore -> CompletableFuture.allOf(
+ writeMeta(packageName, metadata),
+ storage.writeAsync(packagePath(packageName), inputStream))
+ );
+ }
+
+ @Override
+ public CompletableFuture delete(PackageName packageName) {
+ return CompletableFuture.allOf(
+ storage.deleteAsync(metadataPath(packageName)),
+ storage.deleteAsync(packagePath(packageName)));
+ }
+
+ @Override
+ public CompletableFuture> list(PackageName packageName) {
+ return checkPackageNotExistsAndThrowException(packageName)
+ .thenCompose(ignore -> storage.listAsync(packageWithoutVersionPath(packageName)));
+ }
+
+ @Override
+ public CompletableFuture> list(PackageType type, String tenant, String namespace) {
+ return storage.listAsync(String.format("%s/%s/%s", type, tenant, namespace));
+ }
+
+ private CompletableFuture checkMetadataNotExistsAndThrowException(PackageName packageName) {
+ String path = metadataPath(packageName);
+ CompletableFuture future = new CompletableFuture<>();
+ storage.existAsync(path)
+ .whenComplete((exist, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+ if (exist) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(
+ new NotFoundException(String.format("Package '%s' metadata does not exist", packageName)));
+ }
+ });
+ return future;
+ }
+
+ private CompletableFuture checkMetadataExistsAndThrowException(PackageName packageName) {
+ String path = metadataPath(packageName);
+ CompletableFuture future = new CompletableFuture<>();
+ storage.existAsync(path)
+ .whenComplete((exist, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+ if (!exist) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(
+ new NotFoundException(String.format("Package '%s' metadata already exists", packageName)));
+ }
+ });
+ return future;
+ }
+
+ private CompletableFuture checkPackageNotExistsAndThrowException(PackageName packageName) {
+ String path = packagePath(packageName);
+ CompletableFuture future = new CompletableFuture<>();
+ storage.existAsync(path)
+ .whenComplete((exist, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+ if (exist) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(
+ new NotFoundException(String.format("Package '%s' does not exist", packageName.toString())));
+ }
+ });
+ return future;
+ }
+
+ private CompletableFuture checkPackageExistsAndThrowException(PackageName packageName) {
+ String path = packagePath(packageName);
+ CompletableFuture future = new CompletableFuture<>();
+ storage.existAsync(path)
+ .whenComplete((exist, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+ if (!exist) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(
+ new NotFoundException(String.format("Package '%s' already exists", packageName.toString())));
+ }
+ });
+ return future;
+ }
+
+ private CompletableFuture metadataReadFromStream(ByteArrayOutputStream outputStream) {
+ CompletableFuture future = new CompletableFuture<>();
+ try {
+ PackageMetadata metadata = PackageMetadata.fromBytes(outputStream.toByteArray());
+ future.complete(metadata);
+ } catch (PackagesManagementException.MetadataFormatException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ private String metadataPath(PackageName packageName) {
+ return packageName.toRestPath() + "/meta";
+ }
+
+ private String packagePath(PackageName packageName) {
+ return packageName.toRestPath();
+ }
+
+ private String packageWithoutVersionPath(PackageName packageName) {
+ return String.format("%s/%s/%s/%s",
+ packageName.getPkgType().toString(),
+ packageName.getTenant(),
+ packageName.getNamespace(),
+ packageName.getName());
+ }
+}
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/package-info.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/package-info.java
new file mode 100644
index 0000000000000..e32095143bd26
--- /dev/null
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Packages management service implementation.
+ */
+package org.apache.pulsar.packages.management.core.impl;
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
new file mode 100644
index 0000000000000..c5a379efebd44
--- /dev/null
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class MockedPackagesStorage implements PackagesStorage {
+ private PackagesStorageConfiguration configuration;
+
+ private ConcurrentHashMap storage = new ConcurrentHashMap<>();
+
+ MockedPackagesStorage(PackagesStorageConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public CompletableFuture writeAsync(String path, InputStream inputStream) {
+ CompletableFuture future = new CompletableFuture<>();
+ CompletableFuture.runAsync(() -> {
+ try {
+ byte[] bytes = new byte[inputStream.available()];
+ inputStream.read(bytes);
+ storage.put(path, bytes);
+ future.complete(null);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture readAsync(String path, OutputStream outputStream) {
+ CompletableFuture future = new CompletableFuture<>();
+ CompletableFuture.runAsync(() -> {
+ byte[] bytes = storage.get(path);
+ if (bytes == null) {
+ future.completeExceptionally(
+ new Exception(String.format("Path '%s' does not exist", path)));
+ return;
+ }
+ try {
+ outputStream.write(bytes);
+ outputStream.flush();
+ future.complete(null);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture deleteAsync(String path) {
+ storage.remove(path);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture> listAsync(String path) {
+ return CompletableFuture.completedFuture(storage.keySet().stream()
+ .filter(s -> s.startsWith(path))
+ .map(s -> s.substring(path.length()))
+ .map(s -> s.split("/")[1])
+ .distinct()
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public CompletableFuture existAsync(String path) {
+ return CompletableFuture.completedFuture(storage.keySet().stream()
+ .filter(s -> s.startsWith(path))
+ .collect(Collectors.toList()))
+ .thenApply(paths -> !paths.isEmpty());
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ storage.clear();
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
new file mode 100644
index 0000000000000..ada2d6c8e3f88
--- /dev/null
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.core;
+
+import java.util.HashMap;
+
+public class MockedPackagesStorageConfiguration implements PackagesStorageConfiguration {
+ private HashMap properties = new HashMap<>();
+
+ @Override
+ public Object getProperty(String key) {
+ return properties.get(key);
+ }
+
+ @Override
+ public void setProperty(String key, Object value) {
+ properties.put(key, value);
+ }
+}
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageProvider.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageProvider.java
new file mode 100644
index 0000000000000..eb8ce1847ae35
--- /dev/null
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.core;
+
+public class MockedPackagesStorageProvider implements PackagesStorageProvider {
+ @Override
+ public PackagesStorage getStorage(PackagesStorageConfiguration config) {
+ return new MockedPackagesStorage(config);
+ }
+}
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
new file mode 100644
index 0000000000000..940be34ce07de
--- /dev/null
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.core.common;
+
+import java.util.HashMap;
+import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class PackageMetadataSerdeTest {
+ @Test
+ public void testPackageMetadataSerDe() {
+ HashMap properties = new HashMap<>();
+ properties.put("testKey", "testValue");
+ PackageMetadata metadata = PackageMetadata.builder()
+ .description("test package metadata serialize and deserialize flow")
+ .createTime(System.currentTimeMillis())
+ .contact("test@apache.org")
+ .modificationTime(System.currentTimeMillis() + 1000)
+ .properties(properties).build();
+
+ byte[] metadataSerialized = metadata.toBytes();
+
+ try {
+ PackageMetadata deSerializedMetadata = PackageMetadata.fromBytes(metadataSerialized);
+ Assert.assertEquals(metadata, deSerializedMetadata);
+ } catch (PackagesManagementException.MetadataFormatException e) {
+ Assert.fail("should not throw any exception");
+ }
+
+ try {
+ byte[] failedMetadataSerialized = "wrong package metadata".getBytes();
+ PackageMetadata deSerializedMetadata = PackageMetadata.fromBytes(failedMetadataSerialized);
+ Assert.fail("should throw the metadata format exception");
+ } catch (PackagesManagementException.MetadataFormatException e) {
+ // expected error
+ }
+ }
+}
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
new file mode 100644
index 0000000000000..bae08f176ced0
--- /dev/null
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.core.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.packages.management.core.MockedPackagesStorageConfiguration;
+import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
+import org.apache.pulsar.packages.management.core.PackagesManagement;
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
+import org.apache.pulsar.packages.management.core.common.PackageMetadata;
+import org.apache.pulsar.packages.management.core.common.PackageName;
+import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class PackagesManagementImplTest {
+ private static PackagesStorage storage;
+ private static PackagesManagement packagesManagement;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ PackagesStorageProvider storageProvider = PackagesStorageProvider.newProvider(MockedPackagesStorageProvider.class.getName());
+ MockedPackagesStorageConfiguration packagesStorageConfiguration = new MockedPackagesStorageConfiguration();
+ storage = storageProvider.getStorage(packagesStorageConfiguration);
+
+ packagesManagement = new PackagesManagementImpl();
+ packagesManagement.initialize(storage);
+ }
+
+ @AfterClass
+ public static void teardown() throws ExecutionException, InterruptedException {
+ storage.closeAsync().get();
+ }
+
+
+ @Test
+ public void testPackagesManagementFlow() {
+ PackageName packageName = PackageName.get("function://tenant/ns/non-existent-package@v1");
+ // get a non-existent package metadata should fail
+ try {
+ packagesManagement.getMeta(packageName).get();
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PackagesManagementException.NotFoundException)) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+
+ // update a non-existent package metadata should fail
+ PackageMetadata failedUpdateMetadata = PackageMetadata.builder()
+ .description("Failed update package metadata").build();
+ try {
+ packagesManagement.updateMeta(packageName, failedUpdateMetadata).get();
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PackagesManagementException.NotFoundException)) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+
+ // download a non-existent package should fail
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ packagesManagement.download(packageName, outputStream).get();
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PackagesManagementException.NotFoundException)) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+
+ // delete a non-existent package should fail
+ try {
+ packagesManagement.delete(packageName).get();
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PackagesManagementException.NotFoundException)) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+
+ // list a non-existent package version should fail
+ try {
+ packagesManagement.list(packageName).get();
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PackagesManagementException.NotFoundException)) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+
+ // list the packages in a non-existent namespace should fail
+ try {
+ packagesManagement.list(packageName.getPkgType(), packageName.getTenant(), packageName.getNamespace()).get();
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PackagesManagementException.NotFoundException)) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+
+ // upload a package
+ PackageMetadata metadata = PackageMetadata.builder()
+ .contact("test@apache.org")
+ .description("A mocked test package")
+ .createTime(System.currentTimeMillis()).build();
+ try (ByteArrayInputStream inputStream= new ByteArrayInputStream(metadata.toBytes())) {
+ packagesManagement.upload(packageName, metadata, inputStream).get();
+ } catch (Exception e) {
+ Assert.fail("should not throw any exception");
+ }
+
+ // get an existent package metadata should succeed
+ try {
+ PackageMetadata getPackageMetadata = packagesManagement.getMeta(packageName).get();
+ Assert.assertEquals(metadata, getPackageMetadata);
+ } catch (Exception e) {
+ Assert.fail("should not throw any exception");
+ }
+
+ // download an existent package should succeed
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ packagesManagement.download(packageName, outputStream).get();
+ PackageMetadata getPackage = PackageMetadata.fromBytes(outputStream.toByteArray());
+ Assert.assertEquals(metadata, getPackage);
+ } catch (Exception e) {
+ Assert.fail("should not throw any exception");
+ }
+
+ // update an existent package metadata should succeed
+ metadata.setModificationTime(System.currentTimeMillis());
+ try {
+ packagesManagement.updateMeta(packageName, metadata).get();
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PackagesManagementException.NotFoundException)) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+
+ // get the updated metadata
+ try {
+ PackageMetadata updatedMetadata = packagesManagement.getMeta(packageName).get();
+ Assert.assertEquals(metadata, updatedMetadata);
+ } catch (Exception e) {
+ Assert.fail("should not throw any exception");
+ }
+
+ // list an existent package version should success
+ try {
+ List versions = packagesManagement.list(packageName).get();
+ Assert.assertEquals(1, versions.size());
+ Assert.assertEquals(packageName.getVersion(), versions.get(0));
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PackagesManagementException.NotFoundException)) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+
+ // list the packages in a non-existent namespace should fail
+ try {
+ List packageNames = packagesManagement
+ .list(packageName.getPkgType(), packageName.getTenant(), packageName.getNamespace()).get();
+ Assert.assertEquals(1, packageNames.size());
+ Assert.assertEquals(packageName.getName(), packageNames.get(0));
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PackagesManagementException.NotFoundException)) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+
+
+ // delete an existent package should succeed
+ try {
+ packagesManagement.delete(packageName).get();
+ } catch (Exception e) {
+ Assert.fail("should not throw any exception");
+ }
+ }
+}