Skip to content

Commit

Permalink
Add logs for remote store metadata intermittent read failures (opense…
Browse files Browse the repository at this point in the history
…arch-project#8618)

---------

Signed-off-by: bansvaru <bansvaru@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
linuxpi authored and shiv0408 committed Apr 25, 2024
1 parent e8be7d5 commit ad2867f
Showing 1 changed file with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

import java.io.IOException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
Expand All @@ -22,6 +26,8 @@
* @opensearch.internal
*/
public class VersionedCodecStreamWrapper<T> {
private static final Logger logger = LogManager.getLogger(VersionedCodecStreamWrapper.class);

// TODO This can be updated to hold a streamReadWriteHandlerFactory and get relevant handler based on the stream versions
private final IndexIOStreamHandler<T> indexIOStreamHandler;
private final int currentVersion;
Expand All @@ -46,10 +52,21 @@ public VersionedCodecStreamWrapper(IndexIOStreamHandler<T> indexIOStreamHandler,
* @return stream content parsed into {@link T}
*/
public T readStream(IndexInput indexInput) throws IOException {
CodecUtil.checksumEntireFile(indexInput);
int readStreamVersion = checkHeader(indexInput);
T content = getHandlerForVersion(readStreamVersion).readContent(indexInput);
return content;
logger.debug("Reading input stream [{}] of length - [{}]", indexInput.toString(), indexInput.length());
try {
CodecUtil.checksumEntireFile(indexInput);
int readStreamVersion = checkHeader(indexInput);
return getHandlerForVersion(readStreamVersion).readContent(indexInput);
} catch (CorruptIndexException cie) {
logger.error(
() -> new ParameterizedMessage(
"Error while validating header/footer for [{}]. Total data length [{}]",
indexInput.toString(),
indexInput.length()
)
);
throw cie;
}
}

/**
Expand Down

0 comments on commit ad2867f

Please sign in to comment.