Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#122] feat(core): Add KV backend storage interface for EntityStore #123

Merged
merged 3 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.datastrato.graviton.config.ConfigBuilder;
import com.datastrato.graviton.config.ConfigEntry;

public interface configs {
public interface Configs {

ConfigEntry<String> ENTITY_STORE =
new ConfigBuilder("graviton.entity.store")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class EntitySerDeFactory {
private EntitySerDeFactory() {}

public static EntitySerDe createEntitySerDe(Config config) {
String name = config.get(configs.ENTITY_SERDE);
String name = config.get(Configs.ENTITY_SERDE);
return createEntitySerDe(name);
}

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/java/com/datastrato/graviton/EntityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ public interface EntityStore extends Closeable {
*
* @param namespace the namespace of the entities
* @return the list of entities
* @param <E> the type of the entity
* @param type the type of the entity
* @throws IOException if the list operation fails
*/
<E extends Entity & HasIdentifier> List<E> list(Namespace namespace) throws IOException;
<E extends Entity & HasIdentifier> List<E> list(Namespace namespace, Class<E> type)
throws IOException;

/**
* Check if the entity with the specified {@link NameIdentifier} exists.
Expand Down Expand Up @@ -93,11 +94,11 @@ <E extends Entity & HasIdentifier> void put(NameIdentifier ident, E e, boolean o
*
* @param ident the unique identifier of the entity
* @return the entity retrieved from the underlying storage
* @param <E> the type of the entity
* @param type the type of the entity
* @throws NoSuchEntityException if the entity does not exist
* @throws IOException if the retrieve operation fails
*/
<E extends Entity & HasIdentifier> E get(NameIdentifier ident)
<E extends Entity & HasIdentifier> E get(NameIdentifier ident, Class<E> type)
throws NoSuchEntityException, IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class EntityStoreFactory {
private EntityStoreFactory() {}

public static EntityStore createEntityStore(Config config) {
String name = config.get(configs.ENTITY_STORE);
String name = config.get(Configs.ENTITY_STORE);
String className = ENTITY_STORES.getOrDefault(name, name);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private TableCatalog asTables() {

public CatalogManager(Config config) {
this.config = config;
long cacheEvictionIntervalInMs = config.get(configs.CATALOG_CACHE_EVICTION_INTERVAL_MS);
long cacheEvictionIntervalInMs = config.get(Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS);

this.catalogCache =
Caffeine.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.storage.kv;

import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;

public class CustomEntityKeyEncoder implements EntityKeyEncoder {

@Override
public byte[] encode(NameIdentifier entity) {
// Simple implementation, just use the entity's identifier as the key
// We will change this in next PR
// TODO (yuqi) Information of NameIdentifier may not enough for key encoding, we need to infer
// object
// class from key and then deserialize it when try to get it from kv store.
return entity.toString().getBytes();
}

@Override
public byte[] encode(Namespace namespace) {
return namespace.toString().getBytes();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.storage.kv;

import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;

/**
* Interface for encoding entity key for KV backend, e.g. RocksDB. The key is used to store the
* entity in the backend.
*/
public interface EntityKeyEncoder {
byte[] encode(NameIdentifier identifier);

byte[] encode(Namespace namespace);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.storage.kv;

import com.datastrato.graviton.Config;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;

public interface KvBackend extends Closeable {
/**
* Init KvBackend environment
*
* @param config configuration for the backend
*/
void initialize(Config config) throws IOException;

/**
* Store key value pair ignoring any existing value
*
* @param key key of the pair
* @param value value of the pair
*/
void put(byte[] key, byte[] value) throws IOException;

/** Get value pair for key, Null if the key does not exist */
byte[] get(byte[] key) throws IOException;

/** Delete key value pair */
default boolean delete(byte[] key) throws IOException {
return false;
}

/**
* Scan the range represented by {@link KvRangeScan} and return the list of key value pairs
*
* @param scanRange range to scan
* @return List of key value pairs
* @throws IOException if exectiopn occurs
*/
List<Pair<byte[], byte[]>> scan(KvRangeScan scanRange) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.storage.kv;

import com.datastrato.graviton.Config;
import com.datastrato.graviton.Entity;
import com.datastrato.graviton.EntityAlreadyExistsException;
import com.datastrato.graviton.EntitySerDe;
import com.datastrato.graviton.EntityStore;
import com.datastrato.graviton.HasIdentifier;
import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;
import com.datastrato.graviton.NoSuchEntityException;
import com.datastrato.graviton.util.Executable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.locks.Lock;

/**
* KV store to store entities. This means we can store entities in a key value store. i.e. RocksDB,
* Cassandra, etc. If you want to use a different backend, you can implement the {@link
* com.datastrato.graviton.storage.kv.KvBackend} interface
*/
public class KvEntityStore implements EntityStore {
private KvBackend backend;
private EntityKeyEncoder entityKeyEncoder;
private EntitySerDe serDe;

// TODO replaced with rocksdb transaction
private Lock lock;

@Override
public void initialize(Config config) throws RuntimeException {
// TODO
}

@Override
public void setSerDe(EntitySerDe entitySerDe) {
this.serDe = entitySerDe;
}

@Override
public <E extends Entity & HasIdentifier> List<E> list(Namespace namespace, Class<E> e)
throws IOException {
// TODO
return null;
}

@Override
public boolean exists(NameIdentifier ident) throws IOException {
return false;
}

@Override
public <E extends Entity & HasIdentifier> void put(NameIdentifier ident, E e, boolean overwritten)
throws IOException, EntityAlreadyExistsException {
// TODO
}

@Override
public <E extends Entity & HasIdentifier> E get(NameIdentifier ident, Class<E> type)
throws NoSuchEntityException, IOException {
// TODO
return null;
}

@Override
public boolean delete(NameIdentifier ident) throws IOException {
return false;
}

@Override
public <R> R executeInTransaction(Executable<R> executable) throws IOException {
lock.lock();
try {
return executable.execute();
} finally {
lock.unlock();
}
}

@Override
public void close() throws IOException {
backend.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.storage.kv;

import lombok.Builder;
import lombok.Data;

@Builder
@Data
public class KvRangeScan {
private byte[] start;
private byte[] end;
private boolean startInclusive;
private boolean endInclusive;

private int limit;
}
14 changes: 8 additions & 6 deletions core/src/test/java/com/datastrato/graviton/TestEntityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public void setSerDe(EntitySerDe entitySerDe) {
}

@Override
public <E extends Entity & HasIdentifier> List<E> list(Namespace namespace) throws IOException {
public <E extends Entity & HasIdentifier> List<E> list(Namespace namespace, Class<E> type)
throws IOException {
return entityMap.entrySet().stream()
.filter(e -> e.getKey().namespace().equals(namespace))
.map(entry -> (E) entry.getValue())
Expand Down Expand Up @@ -75,7 +76,7 @@ public <E extends Entity & HasIdentifier> void put(
}

@Override
public <E extends Entity & HasIdentifier> E get(NameIdentifier ident)
public <E extends Entity & HasIdentifier> E get(NameIdentifier ident, Class<E> type)
throws NoSuchEntityException, IOException {
E e = (E) entityMap.get(ident);
if (e == null) {
Expand Down Expand Up @@ -149,18 +150,19 @@ public void testEntityStoreAndRetrieve() throws Exception {
store.put(catalog.nameIdentifier(), catalog);
store.put(table.nameIdentifier(), table);

Metalake retrievedMetalake = store.get(metalake.nameIdentifier());
Metalake retrievedMetalake = store.get(metalake.nameIdentifier(), BaseMetalake.class);
Assertions.assertEquals(metalake, retrievedMetalake);

CatalogEntity retrievedCatalog = store.get(catalog.nameIdentifier());
CatalogEntity retrievedCatalog = store.get(catalog.nameIdentifier(), CatalogEntity.class);
Assertions.assertEquals(catalog, retrievedCatalog);

Table retrievedTable = store.get(table.nameIdentifier());
Table retrievedTable = store.get(table.nameIdentifier(), TestTable.class);
Assertions.assertEquals(table, retrievedTable);

store.delete(metalake.nameIdentifier());
Assertions.assertThrows(
NoSuchEntityException.class, () -> store.get(metalake.nameIdentifier()));
NoSuchEntityException.class,
() -> store.get(metalake.nameIdentifier(), BaseMetalake.class));

Assertions.assertThrows(
EntityAlreadyExistsException.class,
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
rootProject.name = "Graviton"

include("api", "client-java", "common", "core", "meta", "server", "catalog-hive")
include("storage")