diff --git a/airbyte-queue/LICENSE b/airbyte-queue/LICENSE deleted file mode 100644 index ec45d182fcb9..000000000000 --- a/airbyte-queue/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2020 Airbyte, Inc. - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/airbyte-queue/build.gradle b/airbyte-queue/build.gradle deleted file mode 100644 index 390d217e5e6e..000000000000 --- a/airbyte-queue/build.gradle +++ /dev/null @@ -1,7 +0,0 @@ -plugins { - id 'java-library' -} - -dependencies { - implementation 'com.baidu:leansoft-bigqueue:0.7.3' -} diff --git a/airbyte-queue/readme.md b/airbyte-queue/readme.md deleted file mode 100644 index 16678592af58..000000000000 --- a/airbyte-queue/readme.md +++ /dev/null @@ -1,6 +0,0 @@ -# airbyte-queue - -Wraps an external tool that provides an on-disk queue. - -## Entrypoint -* `OnDiskQueue.java` diff --git a/airbyte-queue/src/main/java/io/airbyte/queue/OnDiskQueue.java b/airbyte-queue/src/main/java/io/airbyte/queue/OnDiskQueue.java deleted file mode 100644 index 1fd65c968829..000000000000 --- a/airbyte-queue/src/main/java/io/airbyte/queue/OnDiskQueue.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.queue; - -import com.google.common.base.Preconditions; -import com.leansoft.bigqueue.BigQueueImpl; -import com.leansoft.bigqueue.IBigQueue; -import io.airbyte.commons.lang.CloseableQueue; -import java.io.IOException; -import java.nio.file.Path; -import java.util.AbstractQueue; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.io.FileUtils; - -/** - * This Queue should be used when it is possible for the contents of the queue to be greater than - * the size of memory. It is meant for use by a single process. Closing this queue deletes the data - * on disk. It is NOT meant to be a long-lived, persistent queue. - * - * Wraps BigQueueImpl behind Airbyte persistent queue interface. BigQueueImpl is threadsafe. - * - */ -public class OnDiskQueue extends AbstractQueue implements CloseableQueue { - - private final IBigQueue queue; - private final AtomicBoolean closed = new AtomicBoolean(false); - private final Path persistencePath; - - public OnDiskQueue(final Path persistencePath, final String queueName) throws IOException { - this.persistencePath = persistencePath; - queue = new BigQueueImpl(persistencePath.toString(), queueName); - } - - @Override - public boolean offer(final byte[] bytes) { - Preconditions.checkState(!closed.get()); - try { - queue.enqueue(bytes); - return true; - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] poll() { - Preconditions.checkState(!closed.get()); - try { - return queue.dequeue(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] peek() { - Preconditions.checkState(!closed.get()); - try { - return queue.peek(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public int size() { - Preconditions.checkState(!closed.get()); - return Math.toIntExact(queue.size()); - } - - /** - * Logging frameworks call this method when printing out this class. Throw an disable this for now - * since iterating the contents of a queue is tricky and we want to avoid this for now. - */ - @Override - public Iterator iterator() { - // TODO(davin): Implement this properly. - throw new UnsupportedOperationException("This queue does not support iteration"); - } - - @Override - public void close() throws Exception { - closed.set(true); - try { - // todo (cgardens) - this barfs out a huge warning. known issue with the lib: - // https://github.com/bulldog2011/bigqueue/issues/35. - // deallocates memory used by bigqueue - queue.close(); - } finally { - // deletes all data files. - FileUtils.deleteQuietly(persistencePath.toFile()); - } - } - - /** - * Print size instead of queue contents to avoid any sort of logging complication. Note this does - * not hold any read locks for simplicity, and queue size cannot be used as a source of truth. - */ - @Override - public String toString() { - return "OnDiskQueue{" + - "queue=" + queue.hashCode() + - ", size=" + queue.size() + - ", closed=" + closed + - '}'; - } - -} diff --git a/airbyte-queue/src/test/java/io/airbyte/queue/OnDiskQueueTest.java b/airbyte-queue/src/test/java/io/airbyte/queue/OnDiskQueueTest.java deleted file mode 100644 index be9d1e5c7798..000000000000 --- a/airbyte-queue/src/test/java/io/airbyte/queue/OnDiskQueueTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.queue; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.google.common.base.Charsets; -import io.airbyte.commons.lang.CloseableQueue; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Objects; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -class OnDiskQueueTest { - - private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); - private static final String HELLO = "hello"; - private CloseableQueue queue; - private Path queueRoot; - - @BeforeEach - void setup() throws IOException { - queueRoot = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test"); - queue = new OnDiskQueue(queueRoot, "test"); - } - - @AfterEach - void teardown() throws Exception { - queue.close(); - } - - @Test - void testPoll() { - queue.offer(toBytes(HELLO)); - assertEquals(HELLO, new String(Objects.requireNonNull(queue.poll()), Charsets.UTF_8)); - } - - @Test - void testPeek() { - queue.offer(toBytes(HELLO)); - assertEquals(HELLO, new String(Objects.requireNonNull(queue.peek()), Charsets.UTF_8)); - assertEquals(HELLO, new String(Objects.requireNonNull(queue.peek()), Charsets.UTF_8)); - assertEquals(HELLO, new String(Objects.requireNonNull(queue.poll()), Charsets.UTF_8)); - } - - @Test - void testSize() { - assertEquals(0, queue.size()); - queue.offer(toBytes(HELLO)); - assertEquals(1, queue.size()); - queue.offer(toBytes(HELLO)); - assertEquals(2, queue.size()); - } - - @Test - void testClosed() throws Exception { - queue.close(); - assertDoesNotThrow(() -> queue.close()); - assertThrows(IllegalStateException.class, () -> queue.offer(toBytes(HELLO))); - assertThrows(IllegalStateException.class, () -> queue.poll()); - } - - @Test - void testCleanupOnEmpty() throws Exception { - assertTrue(Files.exists(queueRoot)); - - queue.offer(toBytes(HELLO)); - queue.poll(); - queue.close(); - - assertFalse(Files.exists(queueRoot)); - } - - @Test - void testCleanupOnNotEmpty() throws Exception { - assertTrue(Files.exists(queueRoot)); - - queue.offer(toBytes(HELLO)); - queue.close(); - - assertFalse(Files.exists(queueRoot)); - } - - @SuppressWarnings("SameParameterValue") - private static byte[] toBytes(final String string) { - return string.getBytes(Charsets.UTF_8); - } - -} diff --git a/settings.gradle b/settings.gradle index 073a53c4b0e1..2dc801ee0554 100644 --- a/settings.gradle +++ b/settings.gradle @@ -72,7 +72,6 @@ include ':airbyte-json-validation' include ':airbyte-metrics:metrics-lib' include ':airbyte-oauth' include ':airbyte-protocol:protocol-models' -include ':airbyte-queue' include ':airbyte-test-utils' // airbyte-workers has a lot of dependencies.