Skip to content

Commit ac45fd8

Browse files
mike-tr-adamsonmaedhrozadelapenapkolaczkjasonrutherglen
committed
Literal on-disk index and index write path (apache#9)
This commit contains the following additions to SAI: - The index write path and index building based around StorageAttachedIndexBuilder and StorageAttachedIndexWriter - The on-disk index versioning using the SSTable Descriptor analog IndexDescriptor with Version and OnDiskFormat - The literal on-disk index using the LiteralIndexWriter patch by Mike Adamson; reviewed by Caleb Rackliffe and Andres de la Peña for CASSANDRA-18062 Co-authored-by: Mike Adamson <mikeatdot@gmail.com> Co-authored-by: Caleb Rackliffe <calebrackliffe@gmail.com> Co-authored-by: Andres de la Peña <a.penya.garcia@gmail.com> Co-authored-by: Piotr Kołaczkowski <pkolaczk@gmail.com> Co-authored-by: Jason Rutherglen <jason.rutherglen@gmail.com>
1 parent 2269c11 commit ac45fd8

File tree

309 files changed

+21766
-1233
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

309 files changed

+21766
-1233
lines changed

conf/cassandra.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,6 +1526,12 @@ transparent_data_encryption_options:
15261526
store_type: JCEKS
15271527
key_password: cassandra
15281528

1529+
# Storage Attached Indexing options.
1530+
# sai_options:
1531+
## Total permitted memory allowed for writing SAI index segments. This memory
1532+
## is split between all SAI indexes being built so more indexes will mean smaller
1533+
## segment sizes.
1534+
# segment_write_buffer_size: 1024MiB
15291535

15301536
#####################
15311537
# SAFETY THRESHOLDS #

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,8 @@ public enum CassandraRelevantProperties
418418
// SAI specific properties
419419
/** Controls the maximum number of index query intersections that will take part in a query */
420420
SAI_INTERSECTION_CLAUSE_LIMIT("cassandra.sai.intersection.clause.limit", "2"),
421+
/** Latest version to be used for SAI index writing */
422+
SAI_LATEST_VERSION("cassandra.sai.latest_version", "aa"),
421423

422424
/** Defines how often schema definitions are pulled from the other nodes */
423425
SCHEMA_PULL_INTERVAL_MS("cassandra.schema_pull_interval_ms", "60000"),

src/java/org/apache/cassandra/config/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,8 @@ public static class SSTableConfig
777777
*/
778778
public volatile double range_tombstone_list_growth_factor = 1.5;
779779

780+
public StorageAttachedIndexOptions sai_options = new StorageAttachedIndexOptions();
781+
780782
/**
781783
* @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
782784
*/

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -967,6 +967,8 @@ else if (conf.max_value_size.toMebibytes() >= 2048)
967967

968968
if (conf.dump_heap_on_uncaught_exception && DatabaseDescriptor.getHeapDumpPath() == null)
969969
throw new ConfigurationException(String.format("Invalid configuration. Heap dump is enabled but cannot create heap dump output path: %s.", conf.heap_dump_path != null ? conf.heap_dump_path : "null"));
970+
971+
conf.sai_options.validate();
970972
}
971973

972974
@VisibleForTesting
@@ -4751,4 +4753,9 @@ public static StorageCompatibilityMode getStorageCompatibilityMode()
47514753
else
47524754
return conf.storage_compatibility_mode;
47534755
}
4756+
4757+
public static DataStorageSpec.IntMebibytesBound getSAISegmentWriteBufferSpace()
4758+
{
4759+
return conf.sai_options.segment_write_buffer_size;
4760+
}
47544761
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.cassandra.config;
19+
20+
import com.google.common.annotations.VisibleForTesting;
21+
22+
import org.apache.cassandra.exceptions.ConfigurationException;
23+
24+
public class StorageAttachedIndexOptions
25+
{
26+
public static final int DEFAULT_SEGMENT_BUFFER_MB = 1024;
27+
28+
@VisibleForTesting
29+
public static final int MAXIMUM_SEGMENT_BUFFER_MB = 32768;
30+
31+
@VisibleForTesting
32+
public static final String INVALID_BUFFER_SIZE_ERROR = "Invalid value for segment_write_buffer_size. " +
33+
"Value must be a positive integer less than " + MAXIMUM_SEGMENT_BUFFER_MB + "MiB";
34+
35+
public DataStorageSpec.IntMebibytesBound segment_write_buffer_size = new DataStorageSpec.IntMebibytesBound(DEFAULT_SEGMENT_BUFFER_MB);
36+
37+
public void validate()
38+
{
39+
if (segment_write_buffer_size.toMebibytes() > MAXIMUM_SEGMENT_BUFFER_MB)
40+
{
41+
throw new ConfigurationException(INVALID_BUFFER_SIZE_ERROR);
42+
}
43+
}
44+
}

src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,35 @@
4848

4949
public final class CreateIndexStatement extends AlterSchemaStatement
5050
{
51+
public static final String SASI_INDEX_DISABLED = "SASI indexes are disabled. Enable in cassandra.yaml to use.";
52+
public static final String KEYSPACE_DOES_NOT_EXIST = "Keyspace '%s' doesn't exist";
53+
public static final String TABLE_DOES_NOT_EXIST = "Table '%s' doesn't exist";
54+
public static final String COUNTER_TABLES_NOT_SUPPORTED = "Secondary indexes on counter tables aren't supported";
55+
public static final String MATERIALIZED_VIEWS_NOT_SUPPORTED = "Secondary indexes on materialized views aren't supported";
56+
public static final String TRANSIENTLY_REPLICATED_KEYSPACE_NOT_SUPPORTED = "Secondary indexes are not supported on transiently replicated keyspaces";
57+
public static final String CUSTOM_CREATE_WITHOUT_COLUMN = "Only CUSTOM indexes can be created without specifying a target column";
58+
public static final String CUSTOM_MULTIPLE_COLUMNS = "Only CUSTOM indexes support multiple columns";
59+
public static final String DUPLICATE_TARGET_COLUMN = "Duplicate column '%s' in index target list";
60+
public static final String COLUMN_DOES_NOT_EXIST = "Column '%s' doesn't exist";
5161
public static final String INVALID_CUSTOM_INDEX_TARGET = "Column '%s' is longer than the permissible name length of %d characters or" +
5262
" contains non-alphanumeric-underscore characters";
63+
public static final String COLLECTIONS_WITH_DURATIONS_NOT_SUPPORTED = "Secondary indexes are not supported on collections containing durations";
64+
public static final String TUPLES_WITH_DURATIONS_NOT_SUPPORTED = "Secondary indexes are not supported on tuples containing durations";
65+
public static final String DURATIONS_NOT_SUPPORTED = "Secondary indexes are not supported on duration columns";
66+
public static final String UDTS_WITH_DURATIONS_NOT_SUPPORTED = "Secondary indexes are not supported on UDTs containing durations";
67+
public static final String PRIMARY_KEY_IN_COMPACT_STORAGE = "Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables";
68+
public static final String COMPACT_COLUMN_IN_COMPACT_STORAGE = "Secondary indexes are not supported on compact value column of COMPACT STORAGE tables";
69+
public static final String ONLY_PARTITION_KEY = "Cannot create secondary index on the only partition key column %s";
70+
public static final String CREATE_ON_FROZEN_COLUMN = "Cannot create %s() index on frozen column %s. Frozen collections are immutable and must be fully " +
71+
"indexed by using the 'full(%s)' modifier";
72+
public static final String FULL_ON_FROZEN_COLLECTIONS = "full() indexes can only be created on frozen collections";
73+
public static final String NON_COLLECTION_SIMPLE_INDEX = "Cannot create %s() index on %s. Non-collection columns only support simple indexes";
74+
public static final String CREATE_WITH_NON_MAP_TYPE = "Cannot create index on %s of column %s with non-map type";
75+
public static final String CREATE_ON_NON_FROZEN_UDT = "Cannot create index on non-frozen UDT column %s";
76+
public static final String INDEX_ALREADY_EXISTS = "Index '%s' already exists";
77+
public static final String INDEX_DUPLICATE_OF_EXISTING = "Index %s is a duplicate of existing index %s";
78+
public static final String KEYSPACE_DOES_NOT_MATCH_TABLE = "Keyspace name '%s' doesn't match table name '%s'";
79+
public static final String KEYSPACE_DOES_NOT_MATCH_INDEX = "Keyspace name '%s' doesn't match index name '%s'";
5380

5481
private final String indexName;
5582
private final String tableName;
@@ -90,32 +117,32 @@ public Keyspaces apply(Keyspaces schema)
90117
Guardrails.createSecondaryIndexesEnabled.ensureEnabled("Creating secondary indexes", state);
91118

92119
if (attrs.isCustom && attrs.customClass.equals(SASIIndex.class.getName()) && !DatabaseDescriptor.getSASIIndexesEnabled())
93-
throw new InvalidRequestException("SASI indexes are disabled. Enable in cassandra.yaml to use.");
120+
throw new InvalidRequestException(SASI_INDEX_DISABLED);
94121

95122
KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
96123
if (null == keyspace)
97-
throw ire("Keyspace '%s' doesn't exist", keyspaceName);
124+
throw ire(KEYSPACE_DOES_NOT_EXIST, keyspaceName);
98125

99126
TableMetadata table = keyspace.getTableOrViewNullable(tableName);
100127
if (null == table)
101-
throw ire("Table '%s' doesn't exist", tableName);
128+
throw ire(TABLE_DOES_NOT_EXIST, tableName);
102129

103130
if (null != indexName && keyspace.hasIndex(indexName))
104131
{
105132
if (ifNotExists)
106133
return schema;
107134

108-
throw ire("Index '%s' already exists", indexName);
135+
throw ire(INDEX_ALREADY_EXISTS, indexName);
109136
}
110137

111138
if (table.isCounter())
112-
throw ire("Secondary indexes on counter tables aren't supported");
139+
throw ire(COUNTER_TABLES_NOT_SUPPORTED);
113140

114141
if (table.isView())
115-
throw ire("Secondary indexes on materialized views aren't supported");
142+
throw ire(MATERIALIZED_VIEWS_NOT_SUPPORTED);
116143

117144
if (Keyspace.open(table.keyspace).getReplicationStrategy().hasTransientReplicas())
118-
throw new InvalidRequestException("Secondary indexes are not supported on transiently replicated keyspaces");
145+
throw new InvalidRequestException(TRANSIENTLY_REPLICATED_KEYSPACE_NOT_SUPPORTED);
119146

120147
// guardrails to limit number of secondary indexes per table.
121148
Guardrails.secondaryIndexesPerTable.guard(table.indexes.size() + 1,
@@ -128,17 +155,17 @@ public Keyspaces apply(Keyspaces schema)
128155
List<IndexTarget> indexTargets = Lists.newArrayList(transform(rawIndexTargets, t -> t.prepare(table)));
129156

130157
if (indexTargets.isEmpty() && !attrs.isCustom)
131-
throw ire("Only CUSTOM indexes can be created without specifying a target column");
158+
throw ire(CUSTOM_CREATE_WITHOUT_COLUMN);
132159

133160
if (indexTargets.size() > 1)
134161
{
135162
if (!attrs.isCustom)
136-
throw ire("Only CUSTOM indexes support multiple columns");
163+
throw ire(CUSTOM_MULTIPLE_COLUMNS);
137164

138165
Set<ColumnIdentifier> columns = new HashSet<>();
139166
for (IndexTarget target : indexTargets)
140167
if (!columns.add(target.column))
141-
throw ire("Duplicate column '%s' in index target list", target.column);
168+
throw ire(DUPLICATE_TARGET_COLUMN, target.column);
142169
}
143170

144171
IndexMetadata.Kind kind = attrs.isCustom ? IndexMetadata.Kind.CUSTOM : IndexMetadata.Kind.COMPOSITES;
@@ -158,7 +185,7 @@ public Keyspaces apply(Keyspaces schema)
158185
if (ifNotExists)
159186
return schema;
160187

161-
throw ire("Index %s is a duplicate of existing index %s", index.name, equalIndex.name);
188+
throw ire(INDEX_DUPLICATE_OF_EXISTING, index.name, equalIndex.name);
162189
}
163190

164191
TableMetadata newTable = table.withSwapped(table.indexes.with(index));
@@ -181,52 +208,51 @@ private void validateIndexTarget(TableMetadata table, IndexMetadata.Kind kind, I
181208
ColumnMetadata column = table.getColumn(target.column);
182209

183210
if (null == column)
184-
throw ire("Column '%s' doesn't exist", target.column);
211+
throw ire(COLUMN_DOES_NOT_EXIST, target.column);
185212

186213
if ((kind == IndexMetadata.Kind.CUSTOM) && !SchemaConstants.isValidName(target.column.toString()))
187214
throw ire(INVALID_CUSTOM_INDEX_TARGET, target.column, SchemaConstants.NAME_LENGTH);
188215

189216
if (column.type.referencesDuration())
190217
{
191218
if (column.type.isCollection())
192-
throw ire("Secondary indexes are not supported on collections containing durations");
219+
throw ire(COLLECTIONS_WITH_DURATIONS_NOT_SUPPORTED);
193220

194221
if (column.type.isTuple())
195-
throw ire("Secondary indexes are not supported on tuples containing durations");
222+
throw ire(TUPLES_WITH_DURATIONS_NOT_SUPPORTED);
196223

197224
if (column.type.isUDT())
198-
throw ire("Secondary indexes are not supported on UDTs containing durations");
225+
throw ire(UDTS_WITH_DURATIONS_NOT_SUPPORTED);
199226

200-
throw ire("Secondary indexes are not supported on duration columns");
227+
throw ire(DURATIONS_NOT_SUPPORTED);
201228
}
202229

203230
if (table.isCompactTable())
204231
{
205232
TableMetadata.CompactTableMetadata compactTable = (TableMetadata.CompactTableMetadata) table;
206233
if (column.isPrimaryKeyColumn())
207-
throw new InvalidRequestException("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables");
234+
throw new InvalidRequestException(PRIMARY_KEY_IN_COMPACT_STORAGE);
208235
if (compactTable.compactValueColumn.equals(column))
209-
throw new InvalidRequestException("Secondary indexes are not supported on compact value column of COMPACT STORAGE tables");
236+
throw new InvalidRequestException(COMPACT_COLUMN_IN_COMPACT_STORAGE);
210237
}
211238

212239
if (column.isPartitionKey() && table.partitionKeyColumns().size() == 1)
213-
throw ire("Cannot create secondary index on the only partition key column %s", column);
240+
throw ire(ONLY_PARTITION_KEY, column);
214241

215242
if (column.type.isFrozenCollection() && target.type != Type.FULL)
216-
throw ire("Cannot create %s() index on frozen column %s. Frozen collections are immutable and must be fully " +
217-
"indexed by using the 'full(%s)' modifier", target.type, column, column);
243+
throw ire(CREATE_ON_FROZEN_COLUMN, target.type, column, column);
218244

219245
if (!column.type.isFrozenCollection() && target.type == Type.FULL)
220-
throw ire("full() indexes can only be created on frozen collections");
246+
throw ire(FULL_ON_FROZEN_COLLECTIONS);
221247

222248
if (!column.type.isCollection() && target.type != Type.SIMPLE)
223-
throw ire("Cannot create %s() index on %s. Non-collection columns only support simple indexes", target.type, column);
249+
throw ire(NON_COLLECTION_SIMPLE_INDEX, target.type, column);
224250

225251
if (!(column.type instanceof MapType && column.type.isMultiCell()) && (target.type == Type.KEYS || target.type == Type.KEYS_AND_VALUES))
226-
throw ire("Cannot create index on %s of column %s with non-map type", target.type, column);
252+
throw ire(CREATE_WITH_NON_MAP_TYPE, target.type, column);
227253

228254
if (column.type.isUDT() && column.type.isMultiCell())
229-
throw ire("Cannot create index on non-frozen UDT column %s", column);
255+
throw ire(CREATE_ON_NON_FROZEN_UDT, column);
230256
}
231257

232258
private String generateIndexName(KeyspaceMetadata keyspace, List<IndexTarget> targets)
@@ -286,10 +312,10 @@ public CreateIndexStatement prepare(ClientState state)
286312
: indexName.hasKeyspace() ? indexName.getKeyspace() : state.getKeyspace();
287313

288314
if (tableName.hasKeyspace() && !keyspaceName.equals(tableName.getKeyspace()))
289-
throw ire("Keyspace name '%s' doesn't match table name '%s'", keyspaceName, tableName);
315+
throw ire(KEYSPACE_DOES_NOT_MATCH_TABLE, keyspaceName, tableName);
290316

291317
if (indexName.hasKeyspace() && !keyspaceName.equals(indexName.getKeyspace()))
292-
throw ire("Keyspace name '%s' doesn't match index name '%s'", keyspaceName, tableName);
318+
throw ire(KEYSPACE_DOES_NOT_MATCH_INDEX, keyspaceName, tableName);
293319

294320
return new CreateIndexStatement(keyspaceName, tableName.getName(), indexName.getName(), rawIndexTargets, attrs, ifNotExists);
295321
}

src/java/org/apache/cassandra/db/ReadCommand.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,6 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
411411
{
412412
cfs.indexManager.checkQueryability(indexQueryPlan);
413413

414-
Index index = indexQueryPlan.getFirst();
415414
searcher = indexQueryPlan.searcherFor(this);
416415
Tracing.trace("Executing read on {}.{} using index{} {}",
417416
cfs.metadata.keyspace,

src/java/org/apache/cassandra/db/lifecycle/Tracker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,12 @@ Throwable updateSizeTracking(Iterable<SSTableReader> oldSSTables, Iterable<SSTab
217217
return accumulate;
218218
}
219219

220+
public void updateLiveDiskSpaceUsed(long adjustment)
221+
{
222+
cfstore.metric.liveDiskSpaceUsed.inc(adjustment);
223+
cfstore.metric.totalDiskSpaceUsed.inc(adjustment);
224+
}
225+
220226
// SETUP / CLEANUP
221227

222228
public void addInitialSSTables(Iterable<SSTableReader> sstables)

src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import org.apache.cassandra.db.RegularAndStaticColumns;
3232
import org.apache.cassandra.db.commitlog.CommitLogPosition;
33+
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
3334
import org.apache.cassandra.db.partitions.Partition;
3435
import org.apache.cassandra.db.rows.EncodingStats;
3536
import org.apache.cassandra.schema.ColumnMetadata;
@@ -39,6 +40,7 @@
3940

4041
public abstract class AbstractMemtable implements Memtable
4142
{
43+
private final AtomicReference<LifecycleTransaction> flushTransaction = new AtomicReference<>(null);
4244
protected final AtomicLong currentOperations = new AtomicLong(0);
4345
protected final ColumnsCollector columnsCollector;
4446
protected final StatsCollector statsCollector = new StatsCollector();
@@ -66,21 +68,25 @@ public AbstractMemtable(TableMetadataRef metadataRef, long minTimestamp)
6668
this.minTimestamp = new AtomicLong(minTimestamp);
6769
}
6870

71+
@Override
6972
public TableMetadata metadata()
7073
{
7174
return metadata.get();
7275
}
7376

77+
@Override
7478
public long operationCount()
7579
{
7680
return currentOperations.get();
7781
}
7882

83+
@Override
7984
public long getMinTimestamp()
8085
{
8186
return minTimestamp.get() != EncodingStats.NO_STATS.minTimestamp ? minTimestamp.get() : NO_MIN_TIMESTAMP;
8287
}
8388

89+
@Override
8490
public long getMinLocalDeletionTime()
8591
{
8692
return minLocalDeletionTime.get();
@@ -120,6 +126,18 @@ EncodingStats encodingStats()
120126
return statsCollector.get();
121127
}
122128

129+
@Override
130+
public LifecycleTransaction getFlushTransaction()
131+
{
132+
return flushTransaction.get();
133+
}
134+
135+
@Override
136+
public LifecycleTransaction setFlushTransaction(LifecycleTransaction flushTransaction)
137+
{
138+
return this.flushTransaction.getAndSet(flushTransaction);
139+
}
140+
123141
protected static class ColumnsCollector
124142
{
125143
private final HashMap<ColumnMetadata, AtomicBoolean> predefined = new HashMap<>();
@@ -205,26 +223,31 @@ public EncodingStats get()
205223

206224
protected abstract class AbstractFlushablePartitionSet<P extends Partition> implements FlushablePartitionSet<P>
207225
{
226+
@Override
208227
public long dataSize()
209228
{
210229
return getLiveDataSize();
211230
}
212231

232+
@Override
213233
public CommitLogPosition commitLogLowerBound()
214234
{
215235
return AbstractMemtable.this.getCommitLogLowerBound();
216236
}
217237

238+
@Override
218239
public LastCommitLogPosition commitLogUpperBound()
219240
{
220241
return AbstractMemtable.this.getFinalCommitLogUpperBound();
221242
}
222243

244+
@Override
223245
public EncodingStats encodingStats()
224246
{
225247
return AbstractMemtable.this.encodingStats();
226248
}
227249

250+
@Override
228251
public RegularAndStaticColumns columns()
229252
{
230253
return AbstractMemtable.this.columns();

0 commit comments

Comments
 (0)