Skip to content

Commit

Permalink
[rocksandra] support cassandra partition deletion
Browse files Browse the repository at this point in the history
* add a merge operator for parition meta data (currently partition
  deletion info only)
* read partition deletion in cassandra compaction filter and drop rows
  if it's partition has been deleted
  • Loading branch information
wpc committed May 21, 2018
1 parent 26da367 commit 5ab2561
Show file tree
Hide file tree
Showing 18 changed files with 512 additions and 125 deletions.
3 changes: 3 additions & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/rocks_callback_object.cc
rocksjni/cassandra_compactionfilterjni.cc
rocksjni/cassandra_value_operator.cc
rocksjni/cassandra_partition_meta_merge_operator.cc
rocksjni/restorejni.cc
rocksjni/rocksdb_exception_test.cc
rocksjni/rocksjni.cc
Expand Down Expand Up @@ -66,6 +67,7 @@ set(NATIVE_JAVA_CLASSES
org.rocksdb.Cache
org.rocksdb.CassandraCompactionFilter
org.rocksdb.CassandraValueMergeOperator
org.rocksdb.CassandraPartitionMetaMergeOperator
org.rocksdb.Checkpoint
org.rocksdb.ClockCache
org.rocksdb.ColumnFamilyHandle
Expand Down Expand Up @@ -170,6 +172,7 @@ add_jar(
src/main/java/org/rocksdb/Cache.java
src/main/java/org/rocksdb/CassandraCompactionFilter.java
src/main/java/org/rocksdb/CassandraValueMergeOperator.java
src/main/java/org/rocksdb/CassandraPartitionMetaMergeOperator.java
src/main/java/org/rocksdb/Checkpoint.java
src/main/java/org/rocksdb/ChecksumType.java
src/main/java/org/rocksdb/ClockCache.java
Expand Down
1 change: 1 addition & 0 deletions java/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.ClockCache\
org.rocksdb.CassandraCompactionFilter\
org.rocksdb.CassandraValueMergeOperator\
org.rocksdb.CassandraPartitionMetaMergeOperator\
org.rocksdb.ColumnFamilyHandle\
org.rocksdb.ColumnFamilyOptions\
org.rocksdb.CompactionOptionsFIFO\
Expand Down
24 changes: 22 additions & 2 deletions java/rocksjni/cassandra_compactionfilterjni.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,29 @@
*/
jlong Java_org_rocksdb_CassandraCompactionFilter_createNewCassandraCompactionFilter0(
JNIEnv* /*env*/, jclass /*jcls*/, jboolean purge_ttl_on_expiration,
jint gc_grace_period_in_seconds) {
jboolean ignore_range_delete_on_read, jint gc_grace_period_in_seconds) {
auto* compaction_filter = new rocksdb::cassandra::CassandraCompactionFilter(
purge_ttl_on_expiration, gc_grace_period_in_seconds);
purge_ttl_on_expiration, ignore_range_delete_on_read,
gc_grace_period_in_seconds);
// set the native handle to our native compaction filter
return reinterpret_cast<jlong>(compaction_filter);
}

/*
* Class: org_rocksdb_CassandraCompactionFilter
* Method: setMetaCfHandle
* Signature: (JJ)V
*/
JNIEXPORT void JNICALL
Java_org_rocksdb_CassandraCompactionFilter_setMetaCfHandle(
JNIEnv* /*env*/, jclass /*jcls*/, jlong compaction_filter_pointer,
jlong rocksdb_pointer, jlong meta_cf_handle_pointer) {
auto* compaction_filter =
reinterpret_cast<rocksdb::cassandra::CassandraCompactionFilter*>(
compaction_filter_pointer);
auto* db = reinterpret_cast<rocksdb::DB*>(rocksdb_pointer);
auto* meta_cf_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(meta_cf_handle_pointer);

compaction_filter->SetMetaCfHandle(db, meta_cf_handle);
}
47 changes: 47 additions & 0 deletions java/rocksjni/cassandra_partition_meta_merge_operator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include <jni.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory>
#include <string>

#include "include/org_rocksdb_CassandraPartitionMetaMergeOperator.h"
#include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/options.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table.h"
#include "rocksjni/portal.h"
#include "utilities/cassandra/merge_operator.h"

/*
* Class: org_rocksdb_CassandraPartitionMetaMergeOperator
* Method: newCassandraPartitionMetaMergeOperator
* Signature: ()J
*/
jlong JNICALL
Java_org_rocksdb_CassandraPartitionMetaMergeOperator_newCassandraPartitionMetaMergeOperator(
JNIEnv* /*env*/, jclass /*jclazz*/) {
auto* op = new std::shared_ptr<rocksdb::MergeOperator>(
new rocksdb::cassandra::CassandraPartitionMetaMergeOperator());
return reinterpret_cast<jlong>(op);
}

/*
* Class: org_rocksdb_CassandraPartitionMetaMergeOperator
* Method: disposeInternal
* Signature: (J)V
*/
void JNICALL
Java_org_rocksdb_CassandraPartitionMetaMergeOperator_disposeInternal(
JNIEnv* /*env*/, jobject /*jobj*/, jlong jhandle) {
auto* op =
reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*>(jhandle);
delete op;
}
15 changes: 12 additions & 3 deletions java/src/main/java/org/rocksdb/CassandraCompactionFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,19 @@
*/
public class CassandraCompactionFilter
extends AbstractCompactionFilter<Slice> {
public CassandraCompactionFilter(boolean purgeTtlOnExpiration, int gcGracePeriodInSeconds) {
super(createNewCassandraCompactionFilter0(purgeTtlOnExpiration, gcGracePeriodInSeconds));
public CassandraCompactionFilter(
boolean purgeTtlOnExpiration, boolean ignoreRangeDeleteOnRead, int gcGracePeriodInSeconds) {
super(createNewCassandraCompactionFilter0(
purgeTtlOnExpiration, ignoreRangeDeleteOnRead, gcGracePeriodInSeconds));
}

public void setMetaCfHandle(RocksDB rocksdb, ColumnFamilyHandle metaCfHandle) {
setMetaCfHandle(getNativeHandle(), rocksdb.getNativeHandle(), metaCfHandle.getNativeHandle());
}

private native static long createNewCassandraCompactionFilter0(
boolean purgeTtlOnExpiration, int gcGracePeriodInSeconds);
boolean purgeTtlOnExpiration, boolean ignoreRangeDeleteOnRead, int gcGracePeriodInSeconds);

private native static void setMetaCfHandle(
long compactionFilter, long rocksdb, long metaCfHandle);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* merge operator that merges cassandra partition meta data such as
*/
public class CassandraPartitionMetaMergeOperator extends MergeOperator {
public CassandraPartitionMetaMergeOperator() {
super(newCassandraPartitionMetaMergeOperator());
}

private native static long newCassandraPartitionMetaMergeOperator();
@Override protected final native void disposeInternal(final long handle);
}
2 changes: 1 addition & 1 deletion java/src/main/java/org/rocksdb/RocksMutableObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected synchronized boolean isOwningHandle() {
*
* @return the pointer value for the native object
*/
protected synchronized long getNativeHandle() {
public synchronized long getNativeHandle() {
assert (this.nativeHandle_ != 0);
return this.nativeHandle_;
}
Expand Down
12 changes: 12 additions & 0 deletions java/src/main/java/org/rocksdb/RocksObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ protected RocksObject(final long nativeHandle) {
this.nativeHandle_ = nativeHandle;
}

/**
* Gets the value of the C++ pointer pointing to the underlying
* native C++ object
*
* @return the pointer value for the native object
*/
public long getNativeHandle() {
assert (this.nativeHandle_ != 0);
return this.nativeHandle_;
}


/**
* Deletes underlying C++ object pointer.
*/
Expand Down
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/remove_emptyvalue_compactionfilterjni.cc \
java/rocksjni/cassandra_compactionfilterjni.cc \
java/rocksjni/cassandra_value_operator.cc \
java/rocksjni/cassandra_partition_meta_merge_operator.cc \
java/rocksjni/restorejni.cc \
java/rocksjni/rocks_callback_object.cc \
java/rocksjni/rocksjni.cc \
Expand Down
60 changes: 54 additions & 6 deletions utilities/cassandra/cassandra_compaction_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
// (found in the LICENSE.Apache file in the root directory).

#include "utilities/cassandra/cassandra_compaction_filter.h"
#include <string>
#include "rocksdb/slice.h"
#include "utilities/cassandra/format.h"


namespace rocksdb {
namespace cassandra {
Expand All @@ -16,20 +12,72 @@ const char* CassandraCompactionFilter::Name() const {
return "CassandraCompactionFilter";
}

void CassandraCompactionFilter::SetMetaCfHandle(
DB* meta_db, ColumnFamilyHandle* meta_cf_handle) {
meta_db_ = meta_db;
meta_cf_handle_ = meta_cf_handle;
}

PartitionDeletion CassandraCompactionFilter::GetPartitionDelete(
const Slice& key) const {
if (!meta_db_) {
// skip triming when parition meta db is not ready yet
return PartitionDeletion::kDefault;
}

DB* meta_db = meta_db_.load();
if (!meta_cf_handle_) {
// skip triming when parition meta cf handle is not ready yet
return PartitionDeletion::kDefault;
}
ColumnFamilyHandle* meta_cf_handle = meta_cf_handle_.load();

auto it = unique_ptr<Iterator>(
meta_db->NewIterator(rocksdb::ReadOptions(), meta_cf_handle));
// partition meta key is encoded token+paritionkey
it->SeekForPrev(key);
if (!it->Valid()) {
// skip trimming when
return PartitionDeletion::kDefault;
}

if (!key.starts_with(it->key())) {
// skip trimming when there is no parition meta data
return PartitionDeletion::kDefault;
}

Slice value = it->value();
return PartitionDeletion::Deserialize(value.data(), value.size());
}

bool CassandraCompactionFilter::ShouldDropByParitionDelete(
const Slice& key,
std::chrono::time_point<std::chrono::system_clock> row_timestamp) const {
std::chrono::seconds gc_grace_period =
ignore_range_delete_on_read_ ? std::chrono::seconds(0) : gc_grace_period_;
return GetPartitionDelete(key).MarkForDeleteAt() >
row_timestamp + gc_grace_period;
}

CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
int /*level*/, const Slice& /*key*/, ValueType value_type,
int /*level*/, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
bool value_changed = false;
RowValue row_value = RowValue::Deserialize(
existing_value.data(), existing_value.size());

if (ShouldDropByParitionDelete(key, row_value.LastModifiedTimePoint())) {
return Decision::kRemove;
}

RowValue compacted =
purge_ttl_on_expiration_
? row_value.RemoveExpiredColumns(&value_changed)
: row_value.ConvertExpiredColumnsToTombstones(&value_changed);

if (value_type == ValueType::kValue) {
compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_);
compacted = compacted.RemoveTombstones(gc_grace_period_);
}

if(compacted.Empty()) {
Expand Down
30 changes: 25 additions & 5 deletions utilities/cassandra/cassandra_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,59 @@
// (found in the LICENSE.Apache file in the root directory).

#pragma once
#include <atomic>
#include <string>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "utilities/cassandra/format.h"

namespace rocksdb {
namespace cassandra {

/**
* Compaction filter for removing expired Cassandra data with ttl.
* Compaction filter for removing expired/deleted Cassandra data.
*
* If option `purge_ttl_on_expiration` is set to true, expired data
* will be directly purged. Otherwise expired data will be converted
* tombstones first, then be eventally removed after gc grace period.
* `purge_ttl_on_expiration` should only be on in the case all the
* writes have same ttl setting, otherwise it could bring old data back.
*
* Compaction filter is also in charge of removing tombstone that has been
* promoted to kValue type after serials of merging in compaction.
* If option `ignore_range_tombstone_on_read` is set to true, when client
* care more about disk space releasing and not what would be read after
* range/partition, we will drop deleted data more aggressively without
* considering gc grace period.
*
*/
class CassandraCompactionFilter : public CompactionFilter {
public:
explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
bool ignore_range_delete_on_read,
int32_t gc_grace_period_in_seconds)
: purge_ttl_on_expiration_(purge_ttl_on_expiration),
gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
ignore_range_delete_on_read_(ignore_range_delete_on_read),
gc_grace_period_(gc_grace_period_in_seconds),
meta_cf_handle_(nullptr),
meta_db_(nullptr) {}

const char* Name() const override;
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override;

void SetMetaCfHandle(DB* meta_db, ColumnFamilyHandle* meta_cf_handle);

private:
bool purge_ttl_on_expiration_;
int32_t gc_grace_period_in_seconds_;
bool ignore_range_delete_on_read_;
std::chrono::seconds gc_grace_period_;
std::atomic<ColumnFamilyHandle*> meta_cf_handle_;
std::atomic<DB*> meta_db_;
PartitionDeletion GetPartitionDelete(const Slice& key) const;
bool ShouldDropByParitionDelete(
const Slice& key,
std::chrono::time_point<std::chrono::system_clock> row_timestamp) const;
};
} // namespace cassandra
} // namespace rocksdb
Loading

0 comments on commit 5ab2561

Please sign in to comment.