Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rocksandra] support cassandra partition deletion #5898

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/backupablejni.cc
rocksjni/backupenginejni.cc
rocksjni/cassandra_compactionfilterjni.cc
rocksjni/cassandra_partition_meta_merge_operator.cc
rocksjni/cassandra_value_operator.cc
rocksjni/checkpoint.cc
rocksjni/clock_cache.cc
Expand Down Expand Up @@ -103,6 +104,7 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/BuiltinComparator.java
src/main/java/org/rocksdb/Cache.java
src/main/java/org/rocksdb/CassandraCompactionFilter.java
src/main/java/org/rocksdb/CassandraPartitionMetaMergeOperator.java
src/main/java/org/rocksdb/CassandraValueMergeOperator.java
src/main/java/org/rocksdb/Checkpoint.java
src/main/java/org/rocksdb/ChecksumType.java
Expand Down Expand Up @@ -386,6 +388,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7"
org.rocksdb.BlockBasedTableConfig
org.rocksdb.BloomFilter
org.rocksdb.CassandraCompactionFilter
org.rocksdb.CassandraPartitionMetaMergeOperator
org.rocksdb.CassandraValueMergeOperator
org.rocksdb.Checkpoint
org.rocksdb.ClockCache
Expand Down
1 change: 1 addition & 0 deletions java/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.ClockCache\
org.rocksdb.CassandraCompactionFilter\
org.rocksdb.CassandraValueMergeOperator\
org.rocksdb.CassandraPartitionMetaMergeOperator\
org.rocksdb.ColumnFamilyHandle\
org.rocksdb.ColumnFamilyOptions\
org.rocksdb.CompactionJobInfo\
Expand Down
24 changes: 22 additions & 2 deletions java/rocksjni/cassandra_compactionfilterjni.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,29 @@
*/
jlong Java_org_rocksdb_CassandraCompactionFilter_createNewCassandraCompactionFilter0(
JNIEnv* /*env*/, jclass /*jcls*/, jboolean purge_ttl_on_expiration,
jint gc_grace_period_in_seconds) {
jboolean ignore_range_delete_on_read, jint gc_grace_period_in_seconds) {
auto* compaction_filter = new rocksdb::cassandra::CassandraCompactionFilter(
purge_ttl_on_expiration, gc_grace_period_in_seconds);
purge_ttl_on_expiration, ignore_range_delete_on_read,
gc_grace_period_in_seconds);
// set the native handle to our native compaction filter
return reinterpret_cast<jlong>(compaction_filter);
}

/*
* Class: org_rocksdb_CassandraCompactionFilter
* Method: setMetaCfHandle
* Signature: (JJ)V
*/
JNIEXPORT void JNICALL
Java_org_rocksdb_CassandraCompactionFilter_setMetaCfHandle(
JNIEnv* /*env*/, jclass /*jcls*/, jlong compaction_filter_pointer,
jlong rocksdb_pointer, jlong meta_cf_handle_pointer) {
auto* compaction_filter =
reinterpret_cast<rocksdb::cassandra::CassandraCompactionFilter*>(
compaction_filter_pointer);
auto* db = reinterpret_cast<rocksdb::DB*>(rocksdb_pointer);
auto* meta_cf_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(meta_cf_handle_pointer);

compaction_filter->SetMetaCfHandle(db, meta_cf_handle);
}
47 changes: 47 additions & 0 deletions java/rocksjni/cassandra_partition_meta_merge_operator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include <jni.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory>
#include <string>

#include "include/org_rocksdb_CassandraPartitionMetaMergeOperator.h"
#include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/options.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table.h"
#include "rocksjni/portal.h"
#include "utilities/cassandra/merge_operator.h"

/*
* Class: org_rocksdb_CassandraPartitionMetaMergeOperator
* Method: newCassandraPartitionMetaMergeOperator
* Signature: ()J
*/
jlong JNICALL
Java_org_rocksdb_CassandraPartitionMetaMergeOperator_newCassandraPartitionMetaMergeOperator(
JNIEnv* /*env*/, jclass /*jclazz*/) {
auto* op = new std::shared_ptr<rocksdb::MergeOperator>(
new rocksdb::cassandra::CassandraPartitionMetaMergeOperator());
return reinterpret_cast<jlong>(op);
}

/*
* Class: org_rocksdb_CassandraPartitionMetaMergeOperator
* Method: disposeInternal
* Signature: (J)V
*/
void JNICALL
Java_org_rocksdb_CassandraPartitionMetaMergeOperator_disposeInternal(
JNIEnv* /*env*/, jobject /*jobj*/, jlong jhandle) {
auto* op =
reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*>(jhandle);
delete op;
}
15 changes: 12 additions & 3 deletions java/src/main/java/org/rocksdb/CassandraCompactionFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,19 @@
*/
public class CassandraCompactionFilter
extends AbstractCompactionFilter<Slice> {
public CassandraCompactionFilter(boolean purgeTtlOnExpiration, int gcGracePeriodInSeconds) {
super(createNewCassandraCompactionFilter0(purgeTtlOnExpiration, gcGracePeriodInSeconds));
public CassandraCompactionFilter(
boolean purgeTtlOnExpiration, boolean ignoreRangeDeleteOnRead, int gcGracePeriodInSeconds) {
super(createNewCassandraCompactionFilter0(
purgeTtlOnExpiration, ignoreRangeDeleteOnRead, gcGracePeriodInSeconds));
}

public void setMetaCfHandle(RocksDB rocksdb, ColumnFamilyHandle metaCfHandle) {
setMetaCfHandle(getNativeHandle(), rocksdb.getNativeHandle(), metaCfHandle.getNativeHandle());
}

private native static long createNewCassandraCompactionFilter0(
boolean purgeTtlOnExpiration, int gcGracePeriodInSeconds);
boolean purgeTtlOnExpiration, boolean ignoreRangeDeleteOnRead, int gcGracePeriodInSeconds);

private native static void setMetaCfHandle(
long compactionFilter, long rocksdb, long metaCfHandle);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* merge operator that merges cassandra partition meta data such as
*/
public class CassandraPartitionMetaMergeOperator extends MergeOperator {
public CassandraPartitionMetaMergeOperator() {
super(newCassandraPartitionMetaMergeOperator());
}

private native static long newCassandraPartitionMetaMergeOperator();
@Override protected final native void disposeInternal(final long handle);
}
2 changes: 1 addition & 1 deletion java/src/main/java/org/rocksdb/RocksMutableObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected synchronized boolean isOwningHandle() {
*
* @return the pointer value for the native object
*/
protected synchronized long getNativeHandle() {
public synchronized long getNativeHandle() {
assert (this.nativeHandle_ != 0);
return this.nativeHandle_;
}
Expand Down
12 changes: 12 additions & 0 deletions java/src/main/java/org/rocksdb/RocksObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ protected RocksObject(final long nativeHandle) {
this.nativeHandle_ = nativeHandle;
}

/**
* Gets the value of the C++ pointer pointing to the underlying
* native C++ object
*
* @return the pointer value for the native object
*/
public long getNativeHandle() {
assert (this.nativeHandle_ != 0);
return this.nativeHandle_;
}


/**
* Deletes underlying C++ object pointer.
*/
Expand Down
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/remove_emptyvalue_compactionfilterjni.cc \
java/rocksjni/cassandra_compactionfilterjni.cc \
java/rocksjni/cassandra_value_operator.cc \
java/rocksjni/cassandra_partition_meta_merge_operator.cc \
java/rocksjni/restorejni.cc \
java/rocksjni/rocks_callback_object.cc \
java/rocksjni/rocksjni.cc \
Expand Down
60 changes: 54 additions & 6 deletions utilities/cassandra/cassandra_compaction_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
// (found in the LICENSE.Apache file in the root directory).

#include "utilities/cassandra/cassandra_compaction_filter.h"
#include <string>
#include "rocksdb/slice.h"
#include "utilities/cassandra/format.h"


namespace rocksdb {
namespace cassandra {
Expand All @@ -16,20 +12,72 @@ const char* CassandraCompactionFilter::Name() const {
return "CassandraCompactionFilter";
}

void CassandraCompactionFilter::SetMetaCfHandle(
DB* meta_db, ColumnFamilyHandle* meta_cf_handle) {
meta_db_ = meta_db;
meta_cf_handle_ = meta_cf_handle;
}

PartitionDeletion CassandraCompactionFilter::GetPartitionDelete(
const Slice& key) const {
if (!meta_db_) {
// skip triming when parition meta db is not ready yet
return PartitionDeletion::kDefault;
}

DB* meta_db = meta_db_.load();
if (!meta_cf_handle_) {
// skip triming when parition meta cf handle is not ready yet
return PartitionDeletion::kDefault;
}
ColumnFamilyHandle* meta_cf_handle = meta_cf_handle_.load();

auto it = std::unique_ptr<Iterator>(
meta_db->NewIterator(rocksdb::ReadOptions(), meta_cf_handle));
// partition meta key is encoded token+paritionkey
it->SeekForPrev(key);
if (!it->Valid()) {
// skip trimming when
return PartitionDeletion::kDefault;
}

if (!key.starts_with(it->key())) {
// skip trimming when there is no parition meta data
return PartitionDeletion::kDefault;
}

Slice value = it->value();
return PartitionDeletion::Deserialize(value.data(), value.size());
}

bool CassandraCompactionFilter::ShouldDropByParitionDelete(
const Slice& key,
std::chrono::time_point<std::chrono::system_clock> row_timestamp) const {
std::chrono::seconds gc_grace_period =
ignore_range_delete_on_read_ ? std::chrono::seconds(0) : gc_grace_period_;
return GetPartitionDelete(key).MarkForDeleteAt() >
row_timestamp + gc_grace_period;
}

CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
int /*level*/, const Slice& /*key*/, ValueType value_type,
int /*level*/, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
bool value_changed = false;
RowValue row_value = RowValue::Deserialize(
existing_value.data(), existing_value.size());

if (ShouldDropByParitionDelete(key, row_value.LastModifiedTimePoint())) {
return Decision::kRemove;
}

RowValue compacted =
purge_ttl_on_expiration_
? row_value.RemoveExpiredColumns(&value_changed)
: row_value.ConvertExpiredColumnsToTombstones(&value_changed);

if (value_type == ValueType::kValue) {
compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_);
compacted = compacted.RemoveTombstones(gc_grace_period_);
}

if(compacted.Empty()) {
Expand Down
30 changes: 25 additions & 5 deletions utilities/cassandra/cassandra_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,59 @@
// (found in the LICENSE.Apache file in the root directory).

#pragma once
#include <atomic>
#include <string>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "utilities/cassandra/format.h"

namespace rocksdb {
namespace cassandra {

/**
* Compaction filter for removing expired Cassandra data with ttl.
* Compaction filter for removing expired/deleted Cassandra data.
*
* If option `purge_ttl_on_expiration` is set to true, expired data
* will be directly purged. Otherwise expired data will be converted
* tombstones first, then be eventally removed after gc grace period.
* `purge_ttl_on_expiration` should only be on in the case all the
* writes have same ttl setting, otherwise it could bring old data back.
*
* Compaction filter is also in charge of removing tombstone that has been
* promoted to kValue type after serials of merging in compaction.
* If option `ignore_range_tombstone_on_read` is set to true, when client
* care more about disk space releasing and not what would be read after
* range/partition, we will drop deleted data more aggressively without
* considering gc grace period.
*
*/
class CassandraCompactionFilter : public CompactionFilter {
public:
explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
bool ignore_range_delete_on_read,
int32_t gc_grace_period_in_seconds)
: purge_ttl_on_expiration_(purge_ttl_on_expiration),
gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
ignore_range_delete_on_read_(ignore_range_delete_on_read),
gc_grace_period_(gc_grace_period_in_seconds),
meta_cf_handle_(nullptr),
meta_db_(nullptr) {}

const char* Name() const override;
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override;

void SetMetaCfHandle(DB* meta_db, ColumnFamilyHandle* meta_cf_handle);

private:
bool purge_ttl_on_expiration_;
int32_t gc_grace_period_in_seconds_;
bool ignore_range_delete_on_read_;
std::chrono::seconds gc_grace_period_;
std::atomic<ColumnFamilyHandle*> meta_cf_handle_;
std::atomic<DB*> meta_db_;
PartitionDeletion GetPartitionDelete(const Slice& key) const;
bool ShouldDropByParitionDelete(
const Slice& key,
std::chrono::time_point<std::chrono::system_clock> row_timestamp) const;
};
} // namespace cassandra
} // namespace rocksdb
Loading