diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index b1f706c161e..c8573538211 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -8,6 +8,7 @@ set(JNI_NATIVE_SOURCES rocksjni/backupablejni.cc rocksjni/backupenginejni.cc rocksjni/cassandra_compactionfilterjni.cc + rocksjni/cassandra_partition_meta_merge_operator.cc rocksjni/cassandra_value_operator.cc rocksjni/checkpoint.cc rocksjni/clock_cache.cc @@ -103,6 +104,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/BuiltinComparator.java src/main/java/org/rocksdb/Cache.java src/main/java/org/rocksdb/CassandraCompactionFilter.java + src/main/java/org/rocksdb/CassandraPartitionMetaMergeOperator.java src/main/java/org/rocksdb/CassandraValueMergeOperator.java src/main/java/org/rocksdb/Checkpoint.java src/main/java/org/rocksdb/ChecksumType.java @@ -386,6 +388,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7" org.rocksdb.BlockBasedTableConfig org.rocksdb.BloomFilter org.rocksdb.CassandraCompactionFilter + org.rocksdb.CassandraPartitionMetaMergeOperator org.rocksdb.CassandraValueMergeOperator org.rocksdb.Checkpoint org.rocksdb.ClockCache diff --git a/java/Makefile b/java/Makefile index f8642c2d612..6f5fc720cd4 100644 --- a/java/Makefile +++ b/java/Makefile @@ -13,6 +13,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\ org.rocksdb.ClockCache\ org.rocksdb.CassandraCompactionFilter\ org.rocksdb.CassandraValueMergeOperator\ + org.rocksdb.CassandraPartitionMetaMergeOperator\ org.rocksdb.ColumnFamilyHandle\ org.rocksdb.ColumnFamilyOptions\ org.rocksdb.CompactionJobInfo\ diff --git a/java/rocksjni/cassandra_compactionfilterjni.cc b/java/rocksjni/cassandra_compactionfilterjni.cc index 799e25e3f44..9c652df189d 100644 --- a/java/rocksjni/cassandra_compactionfilterjni.cc +++ b/java/rocksjni/cassandra_compactionfilterjni.cc @@ -15,9 +15,29 @@ */ jlong Java_org_rocksdb_CassandraCompactionFilter_createNewCassandraCompactionFilter0( JNIEnv* /*env*/, jclass /*jcls*/, jboolean purge_ttl_on_expiration, - jint gc_grace_period_in_seconds) { + jboolean ignore_range_delete_on_read, jint gc_grace_period_in_seconds) { auto* compaction_filter = new rocksdb::cassandra::CassandraCompactionFilter( - purge_ttl_on_expiration, gc_grace_period_in_seconds); + purge_ttl_on_expiration, ignore_range_delete_on_read, + gc_grace_period_in_seconds); // set the native handle to our native compaction filter return reinterpret_cast(compaction_filter); } + +/* + * Class: org_rocksdb_CassandraCompactionFilter + * Method: setMetaCfHandle + * Signature: (JJ)V + */ +JNIEXPORT void JNICALL +Java_org_rocksdb_CassandraCompactionFilter_setMetaCfHandle( + JNIEnv* /*env*/, jclass /*jcls*/, jlong compaction_filter_pointer, + jlong rocksdb_pointer, jlong meta_cf_handle_pointer) { + auto* compaction_filter = + reinterpret_cast( + compaction_filter_pointer); + auto* db = reinterpret_cast(rocksdb_pointer); + auto* meta_cf_handle = + reinterpret_cast(meta_cf_handle_pointer); + + compaction_filter->SetMetaCfHandle(db, meta_cf_handle); +} diff --git a/java/rocksjni/cassandra_partition_meta_merge_operator.cc b/java/rocksjni/cassandra_partition_meta_merge_operator.cc new file mode 100644 index 00000000000..e7651734c0d --- /dev/null +++ b/java/rocksjni/cassandra_partition_meta_merge_operator.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include +#include +#include +#include + +#include "include/org_rocksdb_CassandraPartitionMetaMergeOperator.h" +#include "rocksdb/db.h" +#include "rocksdb/memtablerep.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/options.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/statistics.h" +#include "rocksdb/table.h" +#include "rocksjni/portal.h" +#include "utilities/cassandra/merge_operator.h" + +/* + * Class: org_rocksdb_CassandraPartitionMetaMergeOperator + * Method: newCassandraPartitionMetaMergeOperator + * Signature: ()J + */ +jlong JNICALL +Java_org_rocksdb_CassandraPartitionMetaMergeOperator_newCassandraPartitionMetaMergeOperator( + JNIEnv* /*env*/, jclass /*jclazz*/) { + auto* op = new std::shared_ptr( + new rocksdb::cassandra::CassandraPartitionMetaMergeOperator()); + return reinterpret_cast(op); +} + +/* + * Class: org_rocksdb_CassandraPartitionMetaMergeOperator + * Method: disposeInternal + * Signature: (J)V + */ +void JNICALL +Java_org_rocksdb_CassandraPartitionMetaMergeOperator_disposeInternal( + JNIEnv* /*env*/, jobject /*jobj*/, jlong jhandle) { + auto* op = + reinterpret_cast*>(jhandle); + delete op; +} diff --git a/java/src/main/java/org/rocksdb/CassandraCompactionFilter.java b/java/src/main/java/org/rocksdb/CassandraCompactionFilter.java index 6c87cc1884f..830ea4b9cc5 100644 --- a/java/src/main/java/org/rocksdb/CassandraCompactionFilter.java +++ b/java/src/main/java/org/rocksdb/CassandraCompactionFilter.java @@ -10,10 +10,19 @@ */ public class CassandraCompactionFilter extends AbstractCompactionFilter { - public CassandraCompactionFilter(boolean purgeTtlOnExpiration, int gcGracePeriodInSeconds) { - super(createNewCassandraCompactionFilter0(purgeTtlOnExpiration, gcGracePeriodInSeconds)); + public CassandraCompactionFilter( + boolean purgeTtlOnExpiration, boolean ignoreRangeDeleteOnRead, int gcGracePeriodInSeconds) { + super(createNewCassandraCompactionFilter0( + purgeTtlOnExpiration, ignoreRangeDeleteOnRead, gcGracePeriodInSeconds)); + } + + public void setMetaCfHandle(RocksDB rocksdb, ColumnFamilyHandle metaCfHandle) { + setMetaCfHandle(getNativeHandle(), rocksdb.getNativeHandle(), metaCfHandle.getNativeHandle()); } private native static long createNewCassandraCompactionFilter0( - boolean purgeTtlOnExpiration, int gcGracePeriodInSeconds); + boolean purgeTtlOnExpiration, boolean ignoreRangeDeleteOnRead, int gcGracePeriodInSeconds); + + private native static void setMetaCfHandle( + long compactionFilter, long rocksdb, long metaCfHandle); } diff --git a/java/src/main/java/org/rocksdb/CassandraPartitionMetaMergeOperator.java b/java/src/main/java/org/rocksdb/CassandraPartitionMetaMergeOperator.java new file mode 100644 index 00000000000..cada5e2cf82 --- /dev/null +++ b/java/src/main/java/org/rocksdb/CassandraPartitionMetaMergeOperator.java @@ -0,0 +1,18 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + * merge operator that merges cassandra partition meta data such as + */ +public class CassandraPartitionMetaMergeOperator extends MergeOperator { + public CassandraPartitionMetaMergeOperator() { + super(newCassandraPartitionMetaMergeOperator()); + } + + private native static long newCassandraPartitionMetaMergeOperator(); + @Override protected final native void disposeInternal(final long handle); +} diff --git a/java/src/main/java/org/rocksdb/RocksMutableObject.java b/java/src/main/java/org/rocksdb/RocksMutableObject.java index e92289dc0c5..ea315a31ac3 100644 --- a/java/src/main/java/org/rocksdb/RocksMutableObject.java +++ b/java/src/main/java/org/rocksdb/RocksMutableObject.java @@ -65,7 +65,7 @@ protected synchronized boolean isOwningHandle() { * * @return the pointer value for the native object */ - protected synchronized long getNativeHandle() { + public synchronized long getNativeHandle() { assert (this.nativeHandle_ != 0); return this.nativeHandle_; } diff --git a/java/src/main/java/org/rocksdb/RocksObject.java b/java/src/main/java/org/rocksdb/RocksObject.java index 545dd896a06..be206387b04 100644 --- a/java/src/main/java/org/rocksdb/RocksObject.java +++ b/java/src/main/java/org/rocksdb/RocksObject.java @@ -29,6 +29,18 @@ protected RocksObject(final long nativeHandle) { this.nativeHandle_ = nativeHandle; } + /** + * Gets the value of the C++ pointer pointing to the underlying + * native C++ object + * + * @return the pointer value for the native object + */ + public long getNativeHandle() { + assert (this.nativeHandle_ != 0); + return this.nativeHandle_; + } + + /** * Deletes underlying C++ object pointer. */ diff --git a/src.mk b/src.mk index 42d6ae70b03..f5983c33dd0 100644 --- a/src.mk +++ b/src.mk @@ -474,6 +474,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/remove_emptyvalue_compactionfilterjni.cc \ java/rocksjni/cassandra_compactionfilterjni.cc \ java/rocksjni/cassandra_value_operator.cc \ + java/rocksjni/cassandra_partition_meta_merge_operator.cc \ java/rocksjni/restorejni.cc \ java/rocksjni/rocks_callback_object.cc \ java/rocksjni/rocksjni.cc \ diff --git a/utilities/cassandra/cassandra_compaction_filter.cc b/utilities/cassandra/cassandra_compaction_filter.cc index 1b99d3a8b7d..22f3e6f8ee8 100644 --- a/utilities/cassandra/cassandra_compaction_filter.cc +++ b/utilities/cassandra/cassandra_compaction_filter.cc @@ -4,10 +4,6 @@ // (found in the LICENSE.Apache file in the root directory). #include "utilities/cassandra/cassandra_compaction_filter.h" -#include -#include "rocksdb/slice.h" -#include "utilities/cassandra/format.h" - namespace rocksdb { namespace cassandra { @@ -16,20 +12,72 @@ const char* CassandraCompactionFilter::Name() const { return "CassandraCompactionFilter"; } +void CassandraCompactionFilter::SetMetaCfHandle( + DB* meta_db, ColumnFamilyHandle* meta_cf_handle) { + meta_db_ = meta_db; + meta_cf_handle_ = meta_cf_handle; +} + +PartitionDeletion CassandraCompactionFilter::GetPartitionDelete( + const Slice& key) const { + if (!meta_db_) { + // skip triming when parition meta db is not ready yet + return PartitionDeletion::kDefault; + } + + DB* meta_db = meta_db_.load(); + if (!meta_cf_handle_) { + // skip triming when parition meta cf handle is not ready yet + return PartitionDeletion::kDefault; + } + ColumnFamilyHandle* meta_cf_handle = meta_cf_handle_.load(); + + auto it = std::unique_ptr( + meta_db->NewIterator(rocksdb::ReadOptions(), meta_cf_handle)); + // partition meta key is encoded token+paritionkey + it->SeekForPrev(key); + if (!it->Valid()) { + // skip trimming when + return PartitionDeletion::kDefault; + } + + if (!key.starts_with(it->key())) { + // skip trimming when there is no parition meta data + return PartitionDeletion::kDefault; + } + + Slice value = it->value(); + return PartitionDeletion::Deserialize(value.data(), value.size()); +} + +bool CassandraCompactionFilter::ShouldDropByParitionDelete( + const Slice& key, + std::chrono::time_point row_timestamp) const { + std::chrono::seconds gc_grace_period = + ignore_range_delete_on_read_ ? std::chrono::seconds(0) : gc_grace_period_; + return GetPartitionDelete(key).MarkForDeleteAt() > + row_timestamp + gc_grace_period; +} + CompactionFilter::Decision CassandraCompactionFilter::FilterV2( - int /*level*/, const Slice& /*key*/, ValueType value_type, + int /*level*/, const Slice& key, ValueType value_type, const Slice& existing_value, std::string* new_value, std::string* /*skip_until*/) const { bool value_changed = false; RowValue row_value = RowValue::Deserialize( existing_value.data(), existing_value.size()); + + if (ShouldDropByParitionDelete(key, row_value.LastModifiedTimePoint())) { + return Decision::kRemove; + } + RowValue compacted = purge_ttl_on_expiration_ ? row_value.RemoveExpiredColumns(&value_changed) : row_value.ConvertExpiredColumnsToTombstones(&value_changed); if (value_type == ValueType::kValue) { - compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_); + compacted = compacted.RemoveTombstones(gc_grace_period_); } if(compacted.Empty()) { diff --git a/utilities/cassandra/cassandra_compaction_filter.h b/utilities/cassandra/cassandra_compaction_filter.h index 4ee2445a1a3..d497cab4f52 100644 --- a/utilities/cassandra/cassandra_compaction_filter.h +++ b/utilities/cassandra/cassandra_compaction_filter.h @@ -4,39 +4,59 @@ // (found in the LICENSE.Apache file in the root directory). #pragma once +#include #include #include "rocksdb/compaction_filter.h" +#include "rocksdb/db.h" #include "rocksdb/slice.h" +#include "utilities/cassandra/format.h" namespace rocksdb { namespace cassandra { /** - * Compaction filter for removing expired Cassandra data with ttl. + * Compaction filter for removing expired/deleted Cassandra data. + * * If option `purge_ttl_on_expiration` is set to true, expired data * will be directly purged. Otherwise expired data will be converted * tombstones first, then be eventally removed after gc grace period. * `purge_ttl_on_expiration` should only be on in the case all the * writes have same ttl setting, otherwise it could bring old data back. * - * Compaction filter is also in charge of removing tombstone that has been - * promoted to kValue type after serials of merging in compaction. + * If option `ignore_range_tombstone_on_read` is set to true, when client + * care more about disk space releasing and not what would be read after + * range/partition, we will drop deleted data more aggressively without + * considering gc grace period. + * */ class CassandraCompactionFilter : public CompactionFilter { public: explicit CassandraCompactionFilter(bool purge_ttl_on_expiration, + bool ignore_range_delete_on_read, int32_t gc_grace_period_in_seconds) : purge_ttl_on_expiration_(purge_ttl_on_expiration), - gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} + ignore_range_delete_on_read_(ignore_range_delete_on_read), + gc_grace_period_(gc_grace_period_in_seconds), + meta_cf_handle_(nullptr), + meta_db_(nullptr) {} const char* Name() const override; virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, const Slice& existing_value, std::string* new_value, std::string* skip_until) const override; + void SetMetaCfHandle(DB* meta_db, ColumnFamilyHandle* meta_cf_handle); + private: bool purge_ttl_on_expiration_; - int32_t gc_grace_period_in_seconds_; + bool ignore_range_delete_on_read_; + std::chrono::seconds gc_grace_period_; + std::atomic meta_cf_handle_; + std::atomic meta_db_; + PartitionDeletion GetPartitionDelete(const Slice& key) const; + bool ShouldDropByParitionDelete( + const Slice& key, + std::chrono::time_point row_timestamp) const; }; } // namespace cassandra } // namespace rocksdb diff --git a/utilities/cassandra/cassandra_format_test.cc b/utilities/cassandra/cassandra_format_test.cc index 7af21247eb1..ffd96ed2c17 100644 --- a/utilities/cassandra/cassandra_format_test.cc +++ b/utilities/cassandra/cassandra_format_test.cc @@ -129,11 +129,11 @@ TEST(TombstoneTest, TombstoneCollectable) { EXPECT_TRUE(Tombstone(ColumnTypeMask::DELETION_MASK, 0, now - gc_grace_seconds - time_delta_seconds, ToMicroSeconds(now - gc_grace_seconds - time_delta_seconds)) - .Collectable(gc_grace_seconds)); + .Collectable(std::chrono::seconds(gc_grace_seconds))); EXPECT_FALSE(Tombstone(ColumnTypeMask::DELETION_MASK, 0, now - gc_grace_seconds + time_delta_seconds, ToMicroSeconds(now - gc_grace_seconds + time_delta_seconds)) - .Collectable(gc_grace_seconds)); + .Collectable(std::chrono::seconds(gc_grace_seconds))); } TEST(TombstoneTest, Tombstone) { @@ -317,12 +317,13 @@ TEST(RowValueTest, RowWithColumns) { TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) { int64_t now = time(nullptr); - auto row_value = CreateTestRowValue({ - CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)), - CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired - CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired - CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) - }); + auto row_value = CreateTestRowValue( + {CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)), + CreateTestColumnSpec(kExpiringColumn, 1, + ToMicroSeconds(now - kTtl - 10)), // expired + CreateTestColumnSpec(kExpiringColumn, 2, + ToMicroSeconds(now)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))}); bool changed = false; auto purged = row_value.RemoveExpiredColumns(&changed); @@ -339,12 +340,13 @@ TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) { TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) { int64_t now = time(nullptr); - auto row_value = CreateTestRowValue({ - CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)), - CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired - CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired - CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) - }); + auto row_value = CreateTestRowValue( + {CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)), + CreateTestColumnSpec(kExpiringColumn, 1, + ToMicroSeconds(now - kTtl - 10)), // expired + CreateTestColumnSpec(kExpiringColumn, 2, + ToMicroSeconds(now)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))}); bool changed = false; auto compacted = row_value.ConvertExpiredColumnsToTombstones(&changed); @@ -358,6 +360,48 @@ TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) { compacted.ConvertExpiredColumnsToTombstones(&changed); EXPECT_FALSE(changed); } + +TEST(ParitionDeletionTest, Supersedes) { + PartitionDeletion pd1(100, 100); + PartitionDeletion pd2(100, 101); + PartitionDeletion pd3(101, 101); + + EXPECT_TRUE(pd2.Supersedes(pd1)); + EXPECT_TRUE(pd3.Supersedes(pd2)); + EXPECT_TRUE(pd3.Supersedes(pd1)); +} + +void AssertRoundTrip(PartitionDeletion pd0) { + std::string value; + value.reserve(PartitionDeletion::kSize); + pd0.Serialize(&value); + PartitionDeletion pd = + PartitionDeletion::Deserialize(value.data(), value.size()); + EXPECT_EQ(pd.MarkForDeleteAt(), pd0.MarkForDeleteAt()); + EXPECT_EQ(pd.LocalDeletionTime(), pd0.LocalDeletionTime()); +} + +TEST(ParitionDeletionTest, Serialization) { + AssertRoundTrip(PartitionDeletion(100, 101)); + AssertRoundTrip(PartitionDeletion(0, 0)); + AssertRoundTrip(PartitionDeletion(0, 0)); + AssertRoundTrip(PartitionDeletion::kDefault); +} + +TEST(ParitionDeletionTest, MergeKeepLastestDeletion) { + auto pd0 = PartitionDeletion::kDefault; + auto pd1 = PartitionDeletion(100, 200); + auto pd2 = PartitionDeletion(101, 300); + auto pd3 = PartitionDeletion(100, 300); + std::vector pds; + pds.push_back(pd0); + pds.push_back(pd1); + pds.push_back(pd2); + pds.push_back(pd3); + PartitionDeletion merged = PartitionDeletion::Merge(std::move(pds)); + EXPECT_EQ(merged.LocalDeletionTime(), pd2.LocalDeletionTime()); + EXPECT_EQ(merged.MarkForDeleteAt(), pd2.MarkForDeleteAt()); +} } // namespace cassandra } // namespace rocksdb diff --git a/utilities/cassandra/cassandra_functional_test.cc b/utilities/cassandra/cassandra_functional_test.cc index cec9ce7d88f..a3ddf75929b 100644 --- a/utilities/cassandra/cassandra_functional_test.cc +++ b/utilities/cassandra/cassandra_functional_test.cc @@ -95,20 +95,23 @@ class CassandraStore { class TestCompactionFilterFactory : public CompactionFilterFactory { public: explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration, + bool ignore_range_delete_on_read, int32_t gc_grace_period_in_seconds) : purge_ttl_on_expiration_(purge_ttl_on_expiration), + ignore_range_delete_on_read_(ignore_range_delete_on_read), gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} std::unique_ptr CreateCompactionFilter( const CompactionFilter::Context& /*context*/) override { return std::unique_ptr(new CassandraCompactionFilter( - purge_ttl_on_expiration_, gc_grace_period_in_seconds_)); - } + purge_ttl_on_expiration_, ignore_range_delete_on_read_, gc_grace_period_in_seconds_)); + } const char* Name() const override { return "TestCompactionFilterFactory"; } private: bool purge_ttl_on_expiration_; + bool ignore_range_delete_on_read_ = false; int32_t gc_grace_period_in_seconds_; }; @@ -126,13 +129,14 @@ class CassandraFunctionalTest : public testing::Test { options.create_if_missing = true; options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_)); auto* cf_factory = new TestCompactionFilterFactory( - purge_ttl_on_expiration_, gc_grace_period_in_seconds_); + purge_ttl_on_expiration_, ignore_range_delete_on_read_, gc_grace_period_in_seconds_); options.compaction_filter_factory.reset(cf_factory); EXPECT_OK(DB::Open(options, kDbName, &db)); return std::shared_ptr(db); } bool purge_ttl_on_expiration_ = false; + bool ignore_range_delete_on_read_ = false; int32_t gc_grace_period_in_seconds_ = 100; }; @@ -142,23 +146,29 @@ TEST_F(CassandraFunctionalTest, SimpleMergeTest) { CassandraStore store(OpenDb()); int64_t now = time(nullptr); - store.Append("k1", CreateTestRowValue({ - CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)), - CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)), - CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)), - })); - store.Append("k1",CreateTestRowValue({ - CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)), - CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)), - CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)), - CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)), - })); - store.Append("k1", CreateTestRowValue({ - CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)), - CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)), - CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)), - CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)), - })); + store.Append( + "k1", + CreateTestRowValue({ + CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)), + CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)), + CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)), + })); + store.Append( + "k1", + CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)), + CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)), + CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)), + CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)), + })); + store.Append( + "k1", + CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)), + CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)), + CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)), + CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)), + })); auto ret = store.Get("k1"); @@ -177,18 +187,24 @@ TEST_F(CassandraFunctionalTest, CassandraStore store(OpenDb()); int64_t now= time(nullptr); - store.Append("k1", CreateTestRowValue({ - CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired - CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired - CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) - })); + store.Append( + "k1", + CreateTestRowValue( + {CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 20)), // expired + CreateTestColumnSpec( + kExpiringColumn, 1, + ToMicroSeconds(now - kTtl + 10)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))})); store.Flush(); - store.Append("k1",CreateTestRowValue({ - CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired - CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now)) - })); + store.Append( + "k1", + CreateTestRowValue( + {CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 10)), // expired + CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))})); store.Flush(); store.Compact(); @@ -210,18 +226,23 @@ TEST_F(CassandraFunctionalTest, CassandraStore store(OpenDb()); int64_t now = time(nullptr); - store.Append("k1", CreateTestRowValue({ - CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired - CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired - CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) - })); + store.Append( + "k1", + CreateTestRowValue( + {CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 20)), // expired + CreateTestColumnSpec(kExpiringColumn, 1, + ToMicroSeconds(now)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))})); store.Flush(); - store.Append("k1",CreateTestRowValue({ - CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired - CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now)) - })); + store.Append( + "k1", + CreateTestRowValue( + {CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 10)), // expired + CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))})); store.Flush(); store.Compact(); @@ -242,15 +263,18 @@ TEST_F(CassandraFunctionalTest, int64_t now = time(nullptr); store.Append("k1", CreateTestRowValue({ - CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), - CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)), - })); + CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 20)), + CreateTestColumnSpec(kExpiringColumn, 1, + ToMicroSeconds(now - kTtl - 20)), + })); store.Flush(); - store.Append("k1",CreateTestRowValue({ - CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), - })); + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 10)), + })); store.Flush(); store.Compact(); @@ -263,20 +287,21 @@ TEST_F(CassandraFunctionalTest, CassandraStore store(OpenDb()); int64_t now = time(nullptr); - store.Append("k1", CreateTestRowValue({ - CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), - CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)) - })); + store.Append("k1", + CreateTestRowValue( + {CreateTestColumnSpec( + kTombstone, 0, + ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), + CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))})); - store.Append("k2", CreateTestRowValue({ - CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)) - })); + store.Append("k2", CreateTestRowValue({CreateTestColumnSpec( + kColumn, 0, ToMicroSeconds(now))})); store.Flush(); - store.Append("k1",CreateTestRowValue({ - CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)), - })); + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)), + })); store.Flush(); store.Compact(); @@ -293,9 +318,12 @@ TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) { CassandraStore store(OpenDb()); int64_t now = time(nullptr); - store.Put("k1", CreateTestRowValue({ - CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), - })); + store.Put("k1", + CreateTestRowValue({ + CreateTestColumnSpec( + kTombstone, 0, + ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), + })); store.Flush(); store.Compact(); diff --git a/utilities/cassandra/cassandra_row_merge_test.cc b/utilities/cassandra/cassandra_row_merge_test.cc index 88dee118b5b..1be891d3dfc 100644 --- a/utilities/cassandra/cassandra_row_merge_test.cc +++ b/utilities/cassandra/cassandra_row_merge_test.cc @@ -13,31 +13,25 @@ namespace cassandra { TEST(RowValueMergeTest, Merge) { std::vector row_values; - row_values.push_back( - CreateTestRowValue({ + row_values.push_back(CreateTestRowValue({ CreateTestColumnSpec(kTombstone, 0, 5), CreateTestColumnSpec(kColumn, 1, 8), CreateTestColumnSpec(kExpiringColumn, 2, 5), - }) - ); + })); - row_values.push_back( - CreateTestRowValue({ + row_values.push_back(CreateTestRowValue({ CreateTestColumnSpec(kColumn, 0, 2), CreateTestColumnSpec(kExpiringColumn, 1, 5), CreateTestColumnSpec(kTombstone, 2, 7), CreateTestColumnSpec(kExpiringColumn, 7, 17), - }) - ); + })); - row_values.push_back( - CreateTestRowValue({ + row_values.push_back(CreateTestRowValue({ CreateTestColumnSpec(kExpiringColumn, 0, 6), CreateTestColumnSpec(kTombstone, 1, 5), CreateTestColumnSpec(kColumn, 2, 4), CreateTestColumnSpec(kTombstone, 11, 11), - }) - ); + })); RowValue merged = RowValue::Merge(std::move(row_values)); EXPECT_FALSE(merged.IsTombstone()); @@ -58,28 +52,21 @@ TEST(RowValueMergeTest, MergeWithRowTombstone) { ); // This row's timestamp is smaller than tombstone. - row_values.push_back( - CreateTestRowValue({ - CreateTestColumnSpec(kColumn, 0, 5), - CreateTestColumnSpec(kColumn, 1, 6), - }) - ); + row_values.push_back(CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 0, 5), CreateTestColumnSpec(kColumn, 1, 6), + })); // Some of the column's row is smaller, some is larger. - row_values.push_back( - CreateTestRowValue({ + row_values.push_back(CreateTestRowValue({ CreateTestColumnSpec(kColumn, 2, 10), CreateTestColumnSpec(kColumn, 3, 12), - }) - ); + })); // All of the column's rows are larger than tombstone. - row_values.push_back( - CreateTestRowValue({ + row_values.push_back(CreateTestRowValue({ CreateTestColumnSpec(kColumn, 4, 13), CreateTestColumnSpec(kColumn, 5, 14), - }) - ); + })); RowValue merged = RowValue::Merge(std::move(row_values)); EXPECT_FALSE(merged.IsTombstone()); diff --git a/utilities/cassandra/format.cc b/utilities/cassandra/format.cc index 42cd7206b61..b800e2860c3 100644 --- a/utilities/cassandra/format.cc +++ b/utilities/cassandra/format.cc @@ -129,10 +129,8 @@ std::shared_ptr ExpiringColumn::ToTombstone() const { int64_t marked_for_delete_at = std::chrono::duration_cast(expired_at).count(); return std::make_shared( - static_cast(ColumnTypeMask::DELETION_MASK), - Index(), - local_deletion_time, - marked_for_delete_at); + static_cast(ColumnTypeMask::DELETION_MASK), Index(), + local_deletion_time, marked_for_delete_at); } std::shared_ptr ExpiringColumn::Deserialize( @@ -176,10 +174,9 @@ void Tombstone::Serialize(std::string* dest) const { rocksdb::cassandra::Serialize(marked_for_delete_at_, dest); } -bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const { +bool Tombstone::Collectable(std::chrono::seconds gc_grace_period) const { auto local_deleted_at = std::chrono::time_point( std::chrono::seconds(local_deletion_time_)); - auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds); return local_deleted_at + gc_grace_period < std::chrono::system_clock::now(); } @@ -226,6 +223,12 @@ int64_t RowValue::LastModifiedTime() const { } } +std::chrono::time_point +RowValue::LastModifiedTimePoint() const { + return std::chrono::time_point( + std::chrono::microseconds(LastModifiedTime())); +} + bool RowValue::IsTombstone() const { return marked_for_delete_at_ > kDefaultMarkedForDeleteAt; } @@ -277,7 +280,8 @@ RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const { return RowValue(std::move(new_columns), last_modified_time_); } -RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const { +RowValue RowValue::RemoveTombstones( + std::chrono::seconds gc_grace_period) const { Columns new_columns; for (auto& column : columns_) { if (column->Mask() == ColumnTypeMask::DELETION_MASK) { @@ -382,5 +386,63 @@ RowValue RowValue::Merge(std::vector&& values) { return RowValue(std::move(columns), last_modified_time); } +const PartitionDeletion PartitionDeletion::kDefault(kDefaultLocalDeletionTime, + kDefaultMarkedForDeleteAt); +const std::size_t PartitionDeletion::kSize = sizeof(int32_t) + sizeof(int64_t); + +PartitionDeletion::PartitionDeletion(int32_t local_deletion_time, + int64_t marked_for_delete_at) + : local_deletion_time_(local_deletion_time), + marked_for_delete_at_(marked_for_delete_at) {} + +std::chrono::time_point +PartitionDeletion::MarkForDeleteAt() const { + return std::chrono::time_point( + std::chrono::microseconds(marked_for_delete_at_)); +} + +std::chrono::time_point +PartitionDeletion::LocalDeletionTime() const { + return std::chrono::time_point( + std::chrono::seconds(local_deletion_time_)); +} + +PartitionDeletion PartitionDeletion::Deserialize(const char* src, + std::size_t size) { + if (size < kSize) { + return PartitionDeletion::kDefault; + } + + int32_t local_deletion_time = + rocksdb::cassandra::Deserialize(src, 0); + int64_t marked_for_delete_at = + rocksdb::cassandra::Deserialize(src, sizeof(int32_t)); + return PartitionDeletion(local_deletion_time, marked_for_delete_at); +} + +void PartitionDeletion::Serialize(std::string* dest) const { + rocksdb::cassandra::Serialize(local_deletion_time_, dest); + rocksdb::cassandra::Serialize(marked_for_delete_at_, dest); +} + +// Merge multiple PartitionDeletion only keep latest one +PartitionDeletion PartitionDeletion::Merge( + std::vector&& pds) { + assert(pds.size() > 0); + PartitionDeletion candidate = kDefault; + for (auto& deletion : pds) { + if (deletion.Supersedes(candidate)) { + candidate = deletion; + } + } + return candidate; +} + +bool PartitionDeletion::Supersedes(PartitionDeletion& pd) const { + return MarkForDeleteAt() > pd.MarkForDeleteAt() || + (MarkForDeleteAt() == pd.MarkForDeleteAt() && + LocalDeletionTime() > pd.LocalDeletionTime()); +} + } // namepsace cassandrda } // namespace rocksdb diff --git a/utilities/cassandra/format.h b/utilities/cassandra/format.h index b7f6e32f6ba..719aef10a8d 100644 --- a/utilities/cassandra/format.h +++ b/utilities/cassandra/format.h @@ -115,7 +115,7 @@ class Tombstone : public ColumnBase { virtual int64_t Timestamp() const override; virtual std::size_t Size() const override; virtual void Serialize(std::string* dest) const override; - bool Collectable(int32_t gc_grace_period) const; + bool Collectable(std::chrono::seconds gc_grace_period) const; static std::shared_ptr Deserialize(const char* src, std::size_t offset); @@ -162,10 +162,12 @@ class RowValue { // For Tombstone this returns the marked_for_delete_at_, // otherwise it returns the max timestamp of containing columns. int64_t LastModifiedTime() const; + std::chrono::time_point LastModifiedTimePoint() + const; void Serialize(std::string* dest) const; RowValue RemoveExpiredColumns(bool* changed) const; RowValue ConvertExpiredColumnsToTombstones(bool* changed) const; - RowValue RemoveTombstones(int32_t gc_grace_period) const; + RowValue RemoveTombstones(std::chrono::seconds gc_grace_period) const; bool Empty() const; static RowValue Deserialize(const char* src, std::size_t size); @@ -193,5 +195,22 @@ class RowValue { CompactionShouldRemoveTombstoneExceedingGCGracePeriod); }; +class PartitionDeletion { + public: + PartitionDeletion(int32_t local_deletion_time, int64_t marked_for_delete_at); + std::chrono::time_point MarkForDeleteAt() const; + std::chrono::time_point LocalDeletionTime() const; + void Serialize(std::string* dest) const; + bool Supersedes(PartitionDeletion& pd) const; + static PartitionDeletion Merge(std::vector&& pds); + static PartitionDeletion Deserialize(const char* src, std::size_t size); + const static PartitionDeletion kDefault; + const static std::size_t kSize; + + private: + int32_t local_deletion_time_; + int64_t marked_for_delete_at_; +}; + } // namepsace cassandrda } // namespace rocksdb diff --git a/utilities/cassandra/merge_operator.cc b/utilities/cassandra/merge_operator.cc index 4e529a6f2a8..e759a66ce7b 100644 --- a/utilities/cassandra/merge_operator.cc +++ b/utilities/cassandra/merge_operator.cc @@ -34,7 +34,7 @@ bool CassandraValueMergeOperator::FullMergeV2( } RowValue merged = RowValue::Merge(std::move(row_values)); - merged = merged.RemoveTombstones(gc_grace_period_in_seconds_); + merged = merged.RemoveTombstones(gc_grace_period_); merge_out->new_value.reserve(merged.Size()); merged.Serialize(&(merge_out->new_value)); @@ -62,6 +62,54 @@ const char* CassandraValueMergeOperator::Name() const { return "CassandraValueMergeOperator"; } +bool CassandraPartitionMetaMergeOperator::FullMergeV2( + const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + // Clear the *new_value for writing. + merge_out->new_value.clear(); + std::vector pds; + + if (merge_in.existing_value) { + pds.push_back(PartitionDeletion::Deserialize( + merge_in.existing_value->data(), merge_in.existing_value->size())); + } + + for (auto& operand : merge_in.operand_list) { + pds.push_back( + PartitionDeletion::Deserialize(operand.data(), operand.size())); + } + + PartitionDeletion merged = PartitionDeletion::Merge(std::move(pds)); + merge_out->new_value.reserve(PartitionDeletion::kSize); + merged.Serialize(&(merge_out->new_value)); + + return true; +} + +bool CassandraPartitionMetaMergeOperator::PartialMergeMulti( + const Slice& /*key*/, const std::deque& operand_list, + std::string* new_value, Logger* /*logger*/) const { + // Clear the *new_value for writing. + assert(new_value); + new_value->clear(); + + std::vector pds; + for (auto& operand : operand_list) { + pds.push_back( + PartitionDeletion::Deserialize(operand.data(), operand.size())); + } + + PartitionDeletion merged = PartitionDeletion::Merge(std::move(pds)); + new_value->reserve(PartitionDeletion::kSize); + merged.Serialize(new_value); + + return true; +} + +const char* CassandraPartitionMetaMergeOperator::Name() const { + return "CassandraPartitionMetaMergeOperator"; +} + } // namespace cassandra } // namespace rocksdb diff --git a/utilities/cassandra/merge_operator.h b/utilities/cassandra/merge_operator.h index 4d02c26de43..e126733fb9b 100644 --- a/utilities/cassandra/merge_operator.h +++ b/utilities/cassandra/merge_operator.h @@ -4,6 +4,7 @@ // (found in the LICENSE.Apache file in the root directory). #pragma once +#include #include "rocksdb/merge_operator.h" #include "rocksdb/slice.h" @@ -17,7 +18,7 @@ class CassandraValueMergeOperator : public MergeOperator { public: explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds, size_t operands_limit = 0) - : gc_grace_period_in_seconds_(gc_grace_period_in_seconds), + : gc_grace_period_(gc_grace_period_in_seconds), operands_limit_(operands_limit) {} virtual bool FullMergeV2(const MergeOperationInput& merge_in, @@ -37,8 +38,27 @@ class CassandraValueMergeOperator : public MergeOperator { } private: - int32_t gc_grace_period_in_seconds_; + std::chrono::seconds gc_grace_period_; size_t operands_limit_; }; + +/** + * A MergeOperator implements Cassandra partition meta data merge. + */ +class CassandraPartitionMetaMergeOperator : public MergeOperator { + public: + explicit CassandraPartitionMetaMergeOperator(){}; + + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; + + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override; + + virtual const char* Name() const override; +}; + } // namespace cassandra } // namespace rocksdb