Skip to content

Commit

Permalink
extract map
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Dec 3, 2020
1 parent 9bef60d commit 5dcd53a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -65,8 +64,8 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.synchronizedMap;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableSortedSet;
import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.getShardCachePath;
Expand All @@ -78,10 +77,12 @@ public class PersistentCache implements Closeable {
private static final String NODE_VERSION_COMMIT_KEY = "node_version";

private final NodeEnvironment nodeEnvironment;
private final Map<String, Document> documents;
private final List<CacheIndexWriter> writers;
private final AtomicBoolean closed;

public PersistentCache(NodeEnvironment nodeEnvironment) {
this.documents = synchronizedMap(loadExistingDocuments(nodeEnvironment));
this.writers = createWriters(nodeEnvironment);
this.nodeEnvironment = nodeEnvironment;
this.closed = new AtomicBoolean();
Expand Down Expand Up @@ -147,7 +148,7 @@ void loadCacheFiles(CacheService cacheService) {

if (Files.isDirectory(shardCachePath)) {
logger.trace("found snapshot cache dir at [{}], loading cache files from disk and index", shardCachePath);
Files.walkFileTree(shardCachePath, new CacheFileVisitor(cacheService, writer));
Files.walkFileTree(shardCachePath, new CacheFileVisitor(cacheService, writer, documents));
}
}
}
Expand All @@ -159,6 +160,7 @@ void loadCacheFiles(CacheService cacheService) {
writer.commit();
}
logger.info("persistent cache index loaded");
documents.clear();
} catch (IOException e) {
try {
close();
Expand Down Expand Up @@ -227,6 +229,7 @@ public long getNumDocs() {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
IOUtils.close(writers);
documents.clear();
}
}

Expand Down Expand Up @@ -270,22 +273,6 @@ static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath
final Directory directory = FSDirectory.open(directoryPath);
closeables.add(directory);

final Map<String, Document> documents = new HashMap<>();
try (IndexReader indexReader = DirectoryReader.open(directory)) {
for (LeafReaderContext leafReaderContext : indexReader.leaves()) {
final LeafReader leafReader = leafReaderContext.reader();
final Bits liveDocs = leafReader.getLiveDocs();
for (int i = 0; i < leafReader.maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
final Document document = leafReader.document(i);
documents.put(getValue(document, CACHE_ID_FIELD), document);
}
}
}
} catch (IndexNotFoundException e) {
logger.debug("persistent cache index does not exist yet", e);
}

final IndexWriterConfig config = new IndexWriterConfig(new KeywordAnalyzer());
config.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
Expand All @@ -296,7 +283,7 @@ static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath
final IndexWriter indexWriter = new IndexWriter(directory, config);
closeables.add(indexWriter);

final CacheIndexWriter cacheIndexWriter = new CacheIndexWriter(nodePath, directory, indexWriter, documents);
final CacheIndexWriter cacheIndexWriter = new CacheIndexWriter(nodePath, directory, indexWriter);
success = true;
return cacheIndexWriter;
} finally {
Expand All @@ -306,6 +293,44 @@ static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath
}
}

/**
* Load existing documents from persistent cache indices located at the root of every node path.
*
* @param nodeEnvironment the data node environment
* @return a map of {cache file uuid, Lucene document}
*/
static Map<String, Document> loadExistingDocuments(NodeEnvironment nodeEnvironment) {
final Map<String, Document> documents = new HashMap<>();
try {
for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) {
final Path directoryPath = resolveCacheIndexFolder(nodePath);
if (Files.exists(directoryPath)) {
try (Directory directory = FSDirectory.open(directoryPath)) {
try (IndexReader indexReader = DirectoryReader.open(directory)) {
logger.trace("loading documents from persistent cache index [{}]", directoryPath);
for (LeafReaderContext leafReaderContext : indexReader.leaves()) {
final LeafReader leafReader = leafReaderContext.reader();
final Bits liveDocs = leafReader.getLiveDocs();
for (int i = 0; i < leafReader.maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
final Document document = leafReader.document(i);
logger.trace("loading document [{}]", document);
documents.put(getValue(document, CACHE_ID_FIELD), document);
}
}
}
} catch (IndexNotFoundException e) {
logger.debug("persistent cache index does not exist yet", e);
}
}
}
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to load existing documents from persistent cache index", e);
}
return documents;
}

/**
* Cleans any leftover searchable snapshot caches (files and Lucene indices) when a non-data node is starting up.
* This is useful when the node is repurposed and is not a data node anymore.
Expand Down Expand Up @@ -344,23 +369,15 @@ public static void cleanUp(Settings settings, NodeEnvironment nodeEnvironment) {

/**
* A {@link CacheIndexWriter} contains a Lucene {@link Directory} with an {@link IndexWriter} that can be used to index documents in
* the persistent cache index. The list of existing cache documents is loaded at startup and kept around until a first commit is done.
* There is one {@link CacheIndexWriter} for each data path.
* the persistent cache index. There is one {@link CacheIndexWriter} for each data path.
*/
static class CacheIndexWriter implements Closeable {

private final AtomicReference<Map<String, Document>> documentsRef;
private final NodeEnvironment.NodePath nodePath;
private final IndexWriter indexWriter;
private final Directory directory;

private CacheIndexWriter(
NodeEnvironment.NodePath nodePath,
Directory directory,
IndexWriter indexWriter,
Map<String, Document> documents
) {
this.documentsRef = new AtomicReference<>(Objects.requireNonNull(documents));
private CacheIndexWriter(NodeEnvironment.NodePath nodePath, Directory directory, IndexWriter indexWriter) {
this.nodePath = nodePath;
this.directory = directory;
this.indexWriter = indexWriter;
Expand All @@ -370,29 +387,13 @@ NodeEnvironment.NodePath nodePath() {
return nodePath;
}

Map<String, Document> getDocuments() {
return documentsRef.get();
}

@Nullable
Document getDocument(String cacheFileId) {
final Map<String, Document> documents = getDocuments();
if (documents == null) {
assert false : "this method should only be used when loading persistent cache, before any prior commit";
throw new IllegalStateException("Persistent cache index was already committed");
}
return documents.get(cacheFileId);
}

void updateCacheFile(CacheFile cacheFile, SortedSet<Tuple<Long, Long>> cacheRanges) throws IOException {
assert getDocuments() == null : "this method should only be used after loading persistent cache";
final Term term = buildTerm(cacheFile);
logger.debug("updating document with term [{}]", term);
indexWriter.updateDocument(term, buildDocument(nodePath, cacheFile, cacheRanges));
}

void updateCacheFile(String cacheFileId, Document cacheFileDocument) throws IOException {
assert getDocuments() != null : "this method should only be used when loading persistent cache, before any prior commit";
final Term term = buildTerm(cacheFileId);
logger.debug("updating document with term [{}]", term);
indexWriter.updateDocument(term, cacheFileDocument);
Expand All @@ -417,20 +418,8 @@ void prepareCommit() throws IOException {
}

void commit() throws IOException {
boolean success = false;
try {
logger.debug("committing");
indexWriter.commit();
success = true;
} finally {
if (success) {
Map<String, Document> documents = documentsRef.getAndSet(null);
if (documents != null) {
logger.trace("clearing existing cache documents");
documents.clear();
}
}
}
logger.debug("committing");
indexWriter.commit();
}

@Override
Expand All @@ -454,18 +443,20 @@ public String toString() {
private static class CacheFileVisitor extends SimpleFileVisitor<Path> {

private final CacheService cacheService;
private final Map<String, Document> documents;
private final CacheIndexWriter writer;

private CacheFileVisitor(CacheService cacheService, CacheIndexWriter writer) {
private CacheFileVisitor(CacheService cacheService, CacheIndexWriter writer, Map<String, Document> documents) {
this.cacheService = Objects.requireNonNull(cacheService);
this.documents = Objects.requireNonNull(documents);
this.writer = Objects.requireNonNull(writer);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
try {
final String id = buildId(file);
final Document cacheDocument = writer.getDocument(id);
final Document cacheDocument = documents.get(id);
if (cacheDocument != null) {
logger.trace("indexing cache file with id [{}] in persistent cache index", id);
writer.updateCacheFile(id, cacheDocument);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ public void testCacheIndexWriter() throws Exception {
final Path snapshotCacheIndexDir = resolveCacheIndexFolder(nodePath);
assertThat(Files.exists(snapshotCacheIndexDir), equalTo(iter > 0));

// load existing documents from persistent cache index before each iteration
final Map<String, Document> documents = PersistentCache.loadExistingDocuments(nodeEnvironment);
assertThat(documents.size(), equalTo(liveDocs.size()));

try (PersistentCache.CacheIndexWriter writer = createCacheIndexWriter(nodePath)) {
assertThat(writer.nodePath(), sameInstance(nodePath));
assertThat(writer.getDocuments(), notNullValue());
assertThat(writer.getDocuments().size(), equalTo(liveDocs.size()));

// verify that existing documents are loaded
for (Map.Entry<String, Integer> liveDoc : liveDocs.entrySet()) {
final Document document = writer.getDocument(liveDoc.getKey());
final Document document = documents.get(liveDoc.getKey());
assertThat("Document should be loaded", document, notNullValue());
final String iteration = document.get("update_iteration");
assertThat(iteration, equalTo(String.valueOf(liveDoc.getValue())));
Expand All @@ -74,7 +76,7 @@ public void testCacheIndexWriter() throws Exception {

// verify that deleted documents are not loaded
for (String deletedDoc : deletedDocs) {
final Document document = writer.getDocument(deletedDoc);
final Document document = documents.get(deletedDoc);
assertThat("Document should not be loaded", document, nullValue());
}

Expand Down Expand Up @@ -110,16 +112,15 @@ public void testCacheIndexWriter() throws Exception {
}

boolean commit = false;
if (randomBoolean()) {
if (frequently()) {
writer.prepareCommit();
if (randomBoolean()) {
if (frequently()) {
writer.commit();
commit = true;
}
}

if (commit) {
assertThat(writer.getDocuments(), nullValue());
liveDocs.putAll(updatedDocs);
liveDocs.putAll(newDocs);
for (String cacheId : removedDocs.keySet()) {
Expand Down

0 comments on commit 5dcd53a

Please sign in to comment.