diff --git a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java index fb6bc3b11fee8..99873f81894a5 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java @@ -72,7 +72,7 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO case 3: // Read using slice len = randomIntBetween(1, length - readPos); - IndexInput slice = indexInput.slice("slice (" + readPos + ", " + len + ") of " + indexInput, readPos, len); + IndexInput slice = indexInput.slice(randomAlphaOfLength(10) + randomFileExtension(), readPos, len); temp = randomReadAndSlice(slice, len); // assert that position in the original input didn't change assertEquals(readPos, indexInput.getFilePointer()); @@ -121,7 +121,7 @@ protected void doRun() throws Exception { clone = indexInput.clone(); } else { final int sliceEnd = between(readEnd, length); - clone = indexInput.slice("concurrent slice (0, " + sliceEnd + ") of " + indexInput, 0L, sliceEnd); + clone = indexInput.slice("slice" + randomAlphaOfLength(10) + randomFileExtension(), 0L, sliceEnd); } startLatch.countDown(); startLatch.await(); @@ -178,4 +178,34 @@ public void onRejection(Exception e) { return output; } + protected static String randomFileExtension() { + return randomFrom( + ".cfe", + ".cfs", + ".dii", + ".dim", + ".doc", + ".dvd", + ".dvm", + ".fdt", + ".fdx", + ".fdm", + ".fnm", + ".kdd", + ".kdi", + ".kdm", + ".liv", + ".nvd", + ".nvm", + ".pay", + ".pos", + ".tim", + ".tip", + ".tmd", + ".tvd", + ".tvx", + ".vec", + ".vem" + ); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index ea28874161aa8..f308a3de71e00 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; @@ -22,7 +21,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -33,10 +31,8 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.IndexingStats; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.test.InternalTestCluster; @@ -49,32 +45,26 @@ import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; -import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; import org.junit.AfterClass; import org.junit.BeforeClass; -import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; -import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_SHARD_SNAPSHOT_FORMAT; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.DATA_TIERS_CACHE_INDEX_PREFERENCE; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -117,6 +107,11 @@ protected int numberOfReplicas() { return 0; } + @Override + protected int numberOfShards() { + return 1; + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(cacheSettings).build(); @@ -156,17 +151,13 @@ public void testBlobStoreCache() throws Exception { final SnapshotId snapshot = createSnapshot(repositoryName, "test-snapshot", Collections.singletonList(indexName)).snapshotId(); assertAcked(client().admin().indices().prepareDelete(indexName)); - // extract the list of blobs per shard from the snapshot directory on disk - final Map blobsInSnapshot = blobsInSnapshot(repositoryLocation, snapshot.getUUID()); - assertThat("Failed to load all shard snapshot metadata files", blobsInSnapshot.size(), equalTo(numberOfShards.numPrimaries)); - expectThrows( IndexNotFoundException.class, ".snapshot-blob-cache system index should not be created yet", () -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get() ); - Storage storage = randomFrom(Storage.values()); + final Storage storage = randomFrom(Storage.values()); logger.info( "--> mount snapshot [{}] as an index for the first time [storage={}, max length={}]", snapshot, @@ -211,7 +202,8 @@ public void testBlobStoreCache() throws Exception { logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); if (numberOfDocs > 0) { - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); + refreshSystemIndex(); logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); assertThat( @@ -244,7 +236,14 @@ public void testBlobStoreCache() throws Exception { assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); assertAcked(client().admin().indices().prepareDelete(restoredIndex)); - storage = randomFrom(Storage.values()); + assertBusy(() -> { + refreshSystemIndex(); + assertThat( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value, + greaterThan(0L) + ); + }); + logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage); final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); mountSnapshot( @@ -261,22 +260,19 @@ public void testBlobStoreCache() throws Exception { ); ensureGreen(restoredAgainIndex); - logger.info("--> verifying cached documents (after second mount) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - if (numberOfDocs > 0) { - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); - } - logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() ).actionGet().getStats()) { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - // we read the header of each file contained within the .cfs file, which could be anywhere - final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs"); - if (mayReadMoreThanHeader == false) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); - } + assertThat( + Strings.toString(indexInputStats), + indexInputStats.getBlobStoreBytesRequested().getCount(), + storage == Storage.SHARED_CACHE ? equalTo(0L) + : indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L) + : equalTo(0L) + ); } } @@ -314,22 +310,19 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); ensureGreen(restoredAgainIndex); - logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - if (numberOfDocs > 0) { - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); - } - logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() ).actionGet().getStats()) { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - // we read the header of each file contained within the .cfs file, which could be anywhere - final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs"); - if (mayReadMoreThanHeader == false) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); - } + assertThat( + Strings.toString(indexInputStats), + indexInputStats.getBlobStoreBytesRequested().getCount(), + storage == Storage.SHARED_CACHE ? equalTo(0L) + : indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L) + : equalTo(0L) + ); } } @@ -373,61 +366,6 @@ private void refreshSystemIndex() { } } - /** - * Reads a repository location on disk and extracts the list of blobs for each shards - */ - private Map blobsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException { - final Map blobsPerShard = new HashMap<>(); - forEachFileRecursively(repositoryLocation.resolve("indices"), ((file, basicFileAttributes) -> { - final String fileName = file.getFileName().toString(); - if (fileName.equals(BlobStoreRepository.SNAPSHOT_FORMAT.blobName(snapshotId))) { - blobsPerShard.put( - String.join( - "/", - snapshotId, - file.getParent().getParent().getFileName().toString(), - file.getParent().getFileName().toString() - ), - INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file))) - ); - } - })); - return Collections.unmodifiableMap(blobsPerShard); - } - - private void assertCachedBlobsInSystemIndex(final String repositoryName, final Map blobsInSnapshot) - throws Exception { - final BlobStoreCacheService blobCacheService = internalCluster().getDataNodeInstance(BlobStoreCacheService.class); - assertBusy(() -> { - refreshSystemIndex(); - - long numberOfCachedBlobs = 0L; - for (Map.Entry blob : blobsInSnapshot.entrySet()) { - for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : blob.getValue().indexFiles()) { - if (fileInfo.name().startsWith("__") == false) { - continue; - } - - final String fileName = fileInfo.physicalName(); - final long length = fileInfo.length(); - final ByteRange expectedByteRange = blobCacheService.computeBlobCacheByteRange(fileName, length, blobCacheMaxLength); - final String path = String.join("/", repositoryName, blob.getKey(), fileName, "@" + expectedByteRange.start()); - - final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, SINGLE_MAPPING_NAME, path).get(); - assertThat("Expected cached blob [" + path + "] for blob [" + fileInfo + "]", getResponse.isExists(), is(true)); - final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); - assertThat(cachedBlob.from(), equalTo(expectedByteRange.start())); - assertThat(cachedBlob.to(), equalTo(expectedByteRange.end())); - assertThat((long) cachedBlob.length(), equalTo(expectedByteRange.length())); - numberOfCachedBlobs += 1; - } - } - - refreshSystemIndex(); - assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); - }); - } - /** * This plugin declares an {@link AllocationDecider} that forces searchable snapshot shards to be allocated after * the primary shards of the snapshot blob cache index are started. This way we can ensure that searchable snapshot diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 2564aa2a5e4da..46d7ef60adc60 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -30,6 +30,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { protected final Logger logger; + protected final String name; protected final SearchableSnapshotDirectory directory; protected final BlobContainer blobContainer; protected final FileInfo fileInfo; @@ -38,8 +39,10 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu protected final long offset; protected final long length; - /** Range of bytes that should be cached in the blob cache for the current index input **/ - protected final ByteRange blobCacheByteRange; + /** + * Range of bytes that should be cached in the blob cache for the current index input's header. + */ + protected final ByteRange headerBlobCacheByteRange; // the following are only mutable so they can be adjusted after cloning/slicing protected volatile boolean isClone; @@ -47,7 +50,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu public BaseSearchableSnapshotIndexInput( Logger logger, - String resourceDesc, + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -56,7 +59,8 @@ public BaseSearchableSnapshotIndexInput( long length, ByteRange blobCacheByteRange ) { - super(resourceDesc, context); + super(name, context); + this.name = Objects.requireNonNull(name); this.logger = Objects.requireNonNull(logger); this.directory = Objects.requireNonNull(directory); this.blobContainer = Objects.requireNonNull(directory.blobContainer()); @@ -64,7 +68,7 @@ public BaseSearchableSnapshotIndexInput( this.context = Objects.requireNonNull(context); assert fileInfo.metadata().hashEqualsContents() == false : "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')'; - this.blobCacheByteRange = Objects.requireNonNull(blobCacheByteRange); + this.headerBlobCacheByteRange = Objects.requireNonNull(blobCacheByteRange); this.stats = Objects.requireNonNull(stats); this.offset = offset; this.length = length; @@ -77,6 +81,13 @@ public final long length() { return length; } + protected long getAbsolutePosition() { + final long position = getFilePointer() + this.offset; + assert position >= 0L : "absolute position is negative: " + position; + assert position <= fileInfo.length() : position + " vs " + fileInfo.length(); + return position; + } + @Override protected final void readInternal(ByteBuffer b) throws IOException { assert assertCurrentThreadIsNotCacheFetchAsync(); @@ -107,7 +118,7 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { if (remaining > CodecUtil.footerLength()) { return false; } - final long position = getFilePointer() + this.offset; + final long position = getAbsolutePosition(); final long checksumPosition = fileInfo.length() - CodecUtil.footerLength(); if (position < checksumPosition) { return false; @@ -132,6 +143,13 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { return success; } + protected ByteRange maybeReadFromBlobCache(long position, int length) { + if (headerBlobCacheByteRange.contains(position, position + length)) { + return headerBlobCacheByteRange; + } + return ByteRange.EMPTY; + } + /** * Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range * spans multiple blobs then this stream will request them in turn. diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index d6fac81fe4166..2661e3c91da66 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -406,6 +406,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I if (useCache && isExcludedFromCache(name) == false) { if (partial) { return new FrozenIndexInput( + name, this, fileInfo, context, @@ -415,6 +416,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I ); } else { return new CachedBlobContainerIndexInput( + name, this, fileInfo, context, @@ -424,7 +426,15 @@ public IndexInput openInput(final String name, final IOContext context) throws I ); } } else { - return new DirectBlobContainerIndexInput(this, fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context)); + return new DirectBlobContainerIndexInput( + name, + this, + fileInfo, + context, + inputStats, + getUncachedChunkSize(), + bufferSize(context) + ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index a71405ee53158..2b960bd418e28 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -66,6 +66,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private long lastSeekPosition; public CachedBlobContainerIndexInput( + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -74,7 +75,7 @@ public CachedBlobContainerIndexInput( int recoveryRangeSize ) { this( - "CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")", + name, directory, fileInfo, context, @@ -84,14 +85,14 @@ public CachedBlobContainerIndexInput( new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize, recoveryRangeSize, - directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()) + directory.getBlobCacheByteRange(name, fileInfo.length()) ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); } private CachedBlobContainerIndexInput( - String resourceDesc, + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -101,9 +102,9 @@ private CachedBlobContainerIndexInput( CacheFileReference cacheFileReference, int rangeSize, int recoveryRangeSize, - ByteRange blobCacheByteRange + ByteRange headerBlobCacheByteRange ) { - super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange); + super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange); this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -134,7 +135,7 @@ private ByteRange computeRange(long position) { @Override protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - final long position = getFilePointer() + this.offset; + final long position = getAbsolutePosition(); final int length = b.remaining(); logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); @@ -159,7 +160,8 @@ protected void doReadInternal(ByteBuffer b) throws IOException { // Requested data is not on disk, so try the cache index next. final ByteRange indexCacheMiss; // null if not a miss - if (blobCacheByteRange.contains(position, position + length)) { + final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length); + if (blobCacheByteRange != ByteRange.EMPTY) { final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); @@ -562,15 +564,15 @@ public CachedBlobContainerIndexInput clone() { } @Override - public IndexInput slice(String sliceDescription, long offset, long length) { - if (offset < 0 || length < 0 || offset + length > length()) { + public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { + if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > length()) { throw new IllegalArgumentException( "slice() " - + sliceDescription + + sliceName + " out of bounds: offset=" - + offset + + sliceOffset + ",length=" - + length + + sliceLength + ",fileLength=" + length() + ": " @@ -578,17 +580,17 @@ public IndexInput slice(String sliceDescription, long offset, long length) { ); } final CachedBlobContainerIndexInput slice = new CachedBlobContainerIndexInput( - getFullSliceDescription(sliceDescription), + sliceName, directory, fileInfo, context, stats, - this.offset + offset, - length, + this.offset + sliceOffset, + sliceLength, cacheFileReference, defaultRangeSize, recoveryRangeSize, - ByteRange.EMPTY // TODO implement blob cache for slices when it makes sense (like CFs) + headerBlobCacheByteRange ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index a1b6be776cac3..1b38f83a948aa 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -35,6 +37,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Locale; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @@ -52,12 +55,26 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput { private final int defaultRangeSize; private final int recoveryRangeSize; + /** + * If > 0, represents a logical file within a compound (CFS) file or is a slice thereof represents the offset of the logical + * compound file within the physical CFS file + */ + private final long compoundFileOffset; + + /** + * Range of bytes that should be cached in the blob cache for the current index input's footer. This footer byte range should only be + * required for slices of CFS files; regular files already have their footers extracted from the {@link FileInfo} (see method + * {@link BaseSearchableSnapshotIndexInput#maybeReadChecksumFromFileInfo}). + */ + private final ByteRange footerBlobCacheByteRange; + // last read position is kept around in order to detect (non)contiguous reads for stats private long lastReadPosition; // last seek position is kept around in order to detect forward/backward seeks for stats private long lastSeekPosition; public FrozenIndexInput( + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -66,41 +83,48 @@ public FrozenIndexInput( int recoveryRangeSize ) { this( - "FrozenIndexInput(" + fileInfo.physicalName() + ")", + name, directory, fileInfo, context, stats, 0L, + 0L, fileInfo.length(), - directory.getFrozenCacheFile(fileInfo.physicalName(), fileInfo.length()), + directory.getFrozenCacheFile(name, fileInfo.length()), rangeSize, recoveryRangeSize, - directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()) + directory.getBlobCacheByteRange(name, fileInfo.length()), + ByteRange.EMPTY ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); } private FrozenIndexInput( - String resourceDesc, + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, long offset, + long compoundFileOffset, long length, FrozenCacheFile frozenCacheFile, int rangeSize, int recoveryRangeSize, - ByteRange blobCacheByteRange + ByteRange headerBlobCacheByteRange, + ByteRange footerBlobCacheByteRange ) { - super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange); + super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange); this.frozenCacheFile = frozenCacheFile; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; this.defaultRangeSize = rangeSize; this.recoveryRangeSize = recoveryRangeSize; + this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange); + this.compoundFileOffset = compoundFileOffset; + assert offset >= compoundFileOffset; } @Override @@ -109,22 +133,20 @@ public void doClose() { } private long getDefaultRangeSize() { - return (context != CACHE_WARMING_CONTEXT) - ? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize) - : fileInfo.partSize().getBytes(); + return directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize; } private ByteRange computeRange(long position) { final long rangeSize = getDefaultRangeSize(); long start = (position / rangeSize) * rangeSize; - long end = Math.min(start + rangeSize, fileInfo.length()); + long end = Math.min(start + rangeSize, frozenCacheFile.getLength()); return ByteRange.of(start, end); } @Override protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - final long position = getFilePointer() + this.offset; + final long position = getAbsolutePosition() - compoundFileOffset; final int length = b.remaining(); final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock(); @@ -141,6 +163,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { } }; + final String fileName = frozenCacheFile.getCacheKey().getFileName(); logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); try { @@ -168,8 +191,9 @@ protected void doReadInternal(ByteBuffer b) throws IOException { // Requested data is not on disk, so try the cache index next. final ByteRange indexCacheMiss; // null if not a miss - if (blobCacheByteRange.contains(position, position + length)) { - final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); + final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length); + if (blobCacheByteRange != ByteRange.EMPTY) { + final CachedBlob cachedBlob = directory.getCachedBlob(fileName, blobCacheByteRange); assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); @@ -182,12 +206,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. } else { - logger.trace( - "reading [{}] bytes of file [{}] at position [{}] using cache index", - length, - fileInfo.physicalName(), - position - ); + logger.trace("reading [{}] bytes of file [{}] at position [{}] using cache index", length, fileName, position); stats.addIndexCacheBytesRead(cachedBlob.length()); preventAsyncBufferChanges.run(); @@ -233,7 +252,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { "copied bytes [{}-{}] of file [{}] from cache index to disk", relativePos, relativePos + len, - fileInfo + fileName ); }, directory.cacheFetchAsyncExecutor() @@ -244,7 +263,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { "failed to store bytes [{}-{}] of file [{}] obtained from index cache", cachedBlob.from(), cachedBlob.to(), - fileInfo + fileName ), e ); @@ -302,65 +321,12 @@ protected void doReadInternal(ByteBuffer b) throws IOException { ); if (indexCacheMiss != null) { - final Releasable onCacheFillComplete = stats.addIndexCacheFill(); - final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); - - // We assume that we only cache small portions of blobs so that we do not need to: - // - use a BigArrays for allocation - // - use an intermediate copy buffer to read the file in sensibly-sized chunks - // - release the buffer once the indexing operation is complete - final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); - - final StepListener readListener = frozenCacheFile.readIfAvailableOrPending( - indexCacheMiss, - (channel, channelPos, relativePos, len) -> { - assert len <= indexCacheMissLength; - - if (len == 0) { - return 0; - } - - // create slice that is positioned to read the given values - final ByteBuffer dup = byteBuffer.duplicate(); - final int newPosition = dup.position() + Math.toIntExact(relativePos); - assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit(); - assert newPosition + len <= byteBuffer.limit(); - dup.position(newPosition); - dup.limit(newPosition + Math.toIntExact(len)); - - final int read = channel.read(dup, channelPos); - if (read < 0) { - throw new EOFException("read past EOF. pos [" + relativePos + "] length: [" + len + "]"); - } - // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats - assert read == len; - return read; - } - ); - if (readListener == null) { - // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically - // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that - // case, simply move on. - onCacheFillComplete.close(); - } else { - readListener.whenComplete(read -> { - assert read == indexCacheMissLength; - byteBuffer.position(read); // mark all bytes as accounted for - byteBuffer.flip(); - final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); - directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener() { - @Override - public void onResponse(Void response) { - onCacheFillComplete.close(); - } - - @Override - public void onFailure(Exception e1) { - onCacheFillComplete.close(); - } - }); - }, e -> onCacheFillComplete.close()); + fillIndexCache(fileName, indexCacheMiss); + if (compoundFileOffset > 0L + && indexCacheMiss.equals(headerBlobCacheByteRange) + && footerBlobCacheByteRange != ByteRange.EMPTY) { + fillIndexCache(fileName, footerBlobCacheByteRange); } } @@ -385,6 +351,69 @@ public void onFailure(Exception e1) { readComplete(position, length); } + private void fillIndexCache(String fileName, ByteRange indexCacheMiss) { + final Releasable onCacheFillComplete = stats.addIndexCacheFill(); + final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); + + // We assume that we only cache small portions of blobs so that we do not need to: + // - use a BigArrays for allocation + // - use an intermediate copy buffer to read the file in sensibly-sized chunks + // - release the buffer once the indexing operation is complete + final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); + + final StepListener readListener = frozenCacheFile.readIfAvailableOrPending( + indexCacheMiss, + (channel, channelPos, relativePos, len) -> { + assert len <= indexCacheMissLength; + + if (len == 0) { + return 0; + } + + // create slice that is positioned to read the given values + final ByteBuffer dup = byteBuffer.duplicate(); + final int newPosition = dup.position() + Math.toIntExact(relativePos); + assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit(); + assert newPosition + len <= byteBuffer.limit(); + dup.position(newPosition); + dup.limit(newPosition + Math.toIntExact(len)); + + final int read = channel.read(dup, channelPos); + if (read < 0) { + throw new EOFException("read past EOF. pos [" + relativePos + "] length: [" + len + "]"); + } + // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats + assert read == len; + return read; + } + ); + + if (readListener == null) { + // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically + // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that + // case, simply move on. + onCacheFillComplete.close(); + } else { + readListener.whenComplete(read -> { + assert read == indexCacheMissLength; + byteBuffer.position(read); // mark all bytes as accounted for + byteBuffer.flip(); + final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); + directory.putCachedBlob(fileName, indexCacheMiss.start(), content, new ActionListener() { + @Override + public void onResponse(Void response) { + onCacheFillComplete.close(); + } + + @Override + public void onFailure(Exception e1) { + onCacheFillComplete.close(); + } + }); + }, e -> onCacheFillComplete.close()); + } + } + private void readComplete(long position, int length) { stats.incrementBytesRead(lastReadPosition, position, length); lastReadPosition = position + length; @@ -408,7 +437,7 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e int bytesCopied = 0; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStreamFromBlobStore(position, length)) { + try (InputStream input = openInputStreamFromBlobStore(position + compoundFileOffset, length)) { long remaining = length; while (remaining > 0) { final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; @@ -441,6 +470,17 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e throw new IOException("failed to read data from cache", e); } + @Override + protected ByteRange maybeReadFromBlobCache(long position, int length) { + final long end = position + length; + if (headerBlobCacheByteRange.contains(position, end)) { + return headerBlobCacheByteRange; + } else if (footerBlobCacheByteRange.contains(position, end)) { + return footerBlobCacheByteRange; + } + return ByteRange.EMPTY; + } + private static int positionalWrite(SharedBytes.IO fc, long start, ByteBuffer byteBuffer) throws IOException { assert assertCurrentThreadMayWriteCacheFile(); return fc.write(byteBuffer, start); @@ -572,7 +612,7 @@ private void writeCacheFile( long bytesCopied = 0L; long remaining = length; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos, length)) { + try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + compoundFileOffset, length)) { while (remaining > 0L) { final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile); positionalWrite(fc, fileChannelPos + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead)); @@ -593,7 +633,7 @@ protected void seekInternal(long pos) throws IOException { } else if (pos < 0L) { throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); } - final long position = pos + this.offset; + final long position = pos + this.offset - compoundFileOffset; stats.incrementSeeks(lastSeekPosition, position); lastSeekPosition = position; } @@ -604,33 +644,63 @@ public FrozenIndexInput clone() { } @Override - public IndexInput slice(String sliceDescription, long offset, long length) { - if (offset < 0 || length < 0 || offset + length > length()) { + public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { + if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > length()) { throw new IllegalArgumentException( "slice() " - + sliceDescription + + sliceName + " out of bounds: offset=" - + offset + + sliceOffset + ",length=" - + length + + sliceLength + ",fileLength=" + length() + ": " + this ); } + + // Are we creating a slice from a CFS file? + final boolean sliceCompoundFile = IndexFileNames.matchesExtension(name, "cfs") + && IndexFileNames.getExtension(sliceName) != null + && compoundFileOffset == 0L // not already a compound file + && isClone == false; // tests aggressively clone and slice + + final FrozenCacheFile sliceFrozenCacheFile; + final ByteRange sliceHeaderByteRange; + final ByteRange sliceFooterByteRange; + final long sliceCompoundFileOffset; + + if (sliceCompoundFile) { + sliceCompoundFileOffset = this.offset + sliceOffset; + sliceFrozenCacheFile = directory.getFrozenCacheFile(sliceName, sliceLength); + sliceHeaderByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength); + if (sliceHeaderByteRange.length() < sliceLength) { + sliceFooterByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength); + } else { + sliceFooterByteRange = ByteRange.EMPTY; + } + } else { + sliceCompoundFileOffset = this.compoundFileOffset; + sliceFrozenCacheFile = this.frozenCacheFile; + sliceHeaderByteRange = ByteRange.EMPTY; + sliceFooterByteRange = ByteRange.EMPTY; + } + final FrozenIndexInput slice = new FrozenIndexInput( - getFullSliceDescription(sliceDescription), + sliceName, directory, fileInfo, context, stats, - this.offset + offset, - length, - frozenCacheFile, + this.offset + sliceOffset, + sliceCompoundFileOffset, + sliceLength, + sliceFrozenCacheFile, defaultRangeSize, recoveryRangeSize, - ByteRange.EMPTY // TODO implement blob cache for slices when it makes sense (like CFs) + sliceHeaderByteRange, + sliceFooterByteRange ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index d732c6f036bf0..48deca1bc29df 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -69,6 +69,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private static final int COPY_BUFFER_SIZE = 8192; public DirectBlobContainerIndexInput( + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -76,23 +77,12 @@ public DirectBlobContainerIndexInput( long sequentialReadSize, int bufferSize ) { - this( - "DirectBlobContainerIndexInput(" + fileInfo.physicalName() + ")", - directory, - fileInfo, - context, - stats, - 0L, - 0L, - fileInfo.length(), - sequentialReadSize, - bufferSize - ); + this(name, directory, fileInfo, context, stats, 0L, 0L, fileInfo.length(), sequentialReadSize, bufferSize); stats.incrementOpenCount(); } private DirectBlobContainerIndexInput( - String resourceDesc, + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -103,7 +93,7 @@ private DirectBlobContainerIndexInput( long sequentialReadSize, int bufferSize ) { - super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY); // TODO should use blob cache + super(logger, name, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY); // TODO should use blob cache this.position = position; assert sequentialReadSize >= 0; this.sequentialReadSize = sequentialReadSize; @@ -290,10 +280,10 @@ public DirectBlobContainerIndexInput clone() { } @Override - public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + public IndexInput slice(String sliceName, long offset, long length) throws IOException { if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { final DirectBlobContainerIndexInput slice = new DirectBlobContainerIndexInput( - getFullSliceDescription(sliceDescription), + sliceName, directory, fileInfo, context, @@ -311,16 +301,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw return slice; } else { throw new IllegalArgumentException( - "slice() " - + sliceDescription - + " out of bounds: offset=" - + offset - + ",length=" - + length - + ",fileLength=" - + length() - + ": " - + this + "slice() " + sliceName + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength=" + length() + ": " + this ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java index 8bd0ed75f99e0..f6e2003cc1db3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java @@ -200,7 +200,7 @@ private ByteRange mapSubRangeToRegion(ByteRange range, int region) { private long getRegionSize(long fileLength, int region) { assert fileLength > 0; final int maxRegion = getEndingRegion(fileLength); - assert region >= 0 && region <= maxRegion; + assert region >= 0 && region <= maxRegion : region + " - " + maxRegion; final long effectiveRegionSize; if (region == maxRegion && (region + 1) * regionSize != fileLength) { assert getRegionRelativePosition(fileLength) != 0L; @@ -710,6 +710,10 @@ public long getLength() { return length; } + public CacheKey getCacheKey() { + return cacheKey; + } + public StepListener populateAndRead( final ByteRange rangeToWrite, final ByteRange rangeToRead, diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java index 7820ddf499ee8..f44a345b76bfb 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomChecksumBytes; -import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomFileExtension; import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomIOContext; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.allOf; @@ -129,6 +128,7 @@ public int read(byte[] b, int off, int len) throws IOException { when(directory.blobContainer()).thenReturn(blobContainer); final DirectBlobContainerIndexInput indexInput = new DirectBlobContainerIndexInput( + fileName, directory, fileInfo, randomIOContext(), diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index a76a3618022db..fbbade0891071 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -335,37 +335,6 @@ public static Tuple randomChecksumBytes(byte[] bytes) throws IOE return Tuple.tuple(checksum, out.toArrayCopy()); } - public static String randomFileExtension() { - return randomFrom( - ".cfe", - ".cfs", - ".dii", - ".dim", - ".doc", - ".dvd", - ".dvm", - ".fdt", - ".fdx", - ".fdm", - ".fnm", - ".kdd", - ".kdi", - ".kdm", - ".liv", - ".nvd", - ".nvm", - ".pay", - ".pos", - ".tim", - ".tip", - ".tmd", - ".tvd", - ".tvx", - ".vec", - ".vem" - ); - } - /** * @return a random {@link IOContext} that corresponds to a default, read or read_once usage. *