-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-22890 Verify the files when RegionServer is starting and BucketCache is in file mode #528
Changes from all commits
bc321df
687ad91
890f766
77f8e96
c14b213
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import java.io.ObjectOutputStream; | ||
import java.io.Serializable; | ||
import java.nio.ByteBuffer; | ||
import java.security.NoSuchAlgorithmException; | ||
import java.util.ArrayList; | ||
import java.util.Comparator; | ||
import java.util.HashSet; | ||
|
@@ -69,6 +70,7 @@ | |
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; | ||
import org.apache.hadoop.hbase.io.hfile.CachedBlock; | ||
import org.apache.hadoop.hbase.io.hfile.HFileBlock; | ||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; | ||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; | ||
import org.apache.hadoop.hbase.util.HasThread; | ||
import org.apache.hadoop.hbase.util.IdReadWriteLock; | ||
|
@@ -242,6 +244,17 @@ public int compare(BlockCacheKey a, BlockCacheKey b) { | |
/** In-memory bucket size */ | ||
private float memoryFactor; | ||
|
||
private String ioEngineName; | ||
private static final String FILE_VERIFY_ALGORITHM = | ||
"hbase.bucketcache.persistent.file.integrity.check.algorithm"; | ||
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; | ||
|
||
/** | ||
* Use {@link java.security.MessageDigest} class's encryption algorithms to check | ||
* persistent file integrity, default algorithm is MD5 | ||
* */ | ||
private String algorithm; | ||
|
||
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, | ||
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, | ||
IOException { | ||
|
@@ -252,8 +265,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck | |
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, | ||
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, | ||
Configuration conf) | ||
throws FileNotFoundException, IOException { | ||
this.ioEngine = getIOEngineFromName(ioEngineName, capacity); | ||
throws IOException { | ||
this.writerThreads = new WriterThread[writerThreadNum]; | ||
long blockNumCapacity = capacity / blockSize; | ||
if (blockNumCapacity >= Integer.MAX_VALUE) { | ||
|
@@ -275,6 +287,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck | |
", memoryFactor: " + memoryFactor); | ||
|
||
this.cacheCapacity = capacity; | ||
this.ioEngineName = ioEngineName; | ||
this.persistencePath = persistencePath; | ||
this.blockSize = blockSize; | ||
this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; | ||
|
@@ -288,14 +301,15 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck | |
this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>(); | ||
|
||
this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity); | ||
|
||
this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); | ||
ioEngine = getIOEngineFromName(); | ||
if (ioEngine.isPersistent() && persistencePath != null) { | ||
try { | ||
retrieveFromFile(bucketSizes); | ||
} catch (IOException ioex) { | ||
LOG.error("Can't restore from file because of", ioex); | ||
} catch (ClassNotFoundException cnfe) { | ||
LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); | ||
LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe); | ||
throw new RuntimeException(cnfe); | ||
} | ||
} | ||
|
@@ -359,24 +373,22 @@ public String getIoEngine() { | |
|
||
/** | ||
* Get the IOEngine from the IO engine name | ||
* @param ioEngineName | ||
* @param capacity | ||
* @return the IOEngine | ||
* @throws IOException | ||
*/ | ||
private IOEngine getIOEngineFromName(String ioEngineName, long capacity) | ||
private IOEngine getIOEngineFromName() | ||
throws IOException { | ||
if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { | ||
// In order to make the usage simple, we only need the prefix 'files:' in | ||
// document whether one or multiple file(s), but also support 'file:' for | ||
// the compatibility | ||
String[] filePaths = | ||
ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); | ||
return new FileIOEngine(capacity, filePaths); | ||
return new FileIOEngine(algorithm, persistencePath, cacheCapacity, filePaths); | ||
} else if (ioEngineName.startsWith("offheap")) | ||
return new ByteBufferIOEngine(capacity, true); | ||
return new ByteBufferIOEngine(cacheCapacity, true); | ||
else if (ioEngineName.startsWith("heap")) | ||
return new ByteBufferIOEngine(capacity, false); | ||
return new ByteBufferIOEngine(cacheCapacity, false); | ||
else | ||
throw new IllegalArgumentException( | ||
"Don't understand io engine name for cache - prefix with file:, heap or offheap"); | ||
|
@@ -1021,41 +1033,48 @@ static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> | |
|
||
private void persistToFile() throws IOException { | ||
assert !cacheEnabled; | ||
FileOutputStream fos = null; | ||
ObjectOutputStream oos = null; | ||
try { | ||
try (ObjectOutputStream oos = new ObjectOutputStream( | ||
new FileOutputStream(persistencePath, false))){ | ||
if (!ioEngine.isPersistent()) { | ||
throw new IOException("Attempt to persist non-persistent cache mappings!"); | ||
} | ||
fos = new FileOutputStream(persistencePath, false); | ||
oos = new ObjectOutputStream(fos); | ||
if (ioEngine instanceof PersistentIOEngine) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are in this persistToFile() , it means it is PersistentIOEngine. May be an assert and direct casting is better way than if check. |
||
oos.write(ProtobufUtil.PB_MAGIC); | ||
byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum(); | ||
oos.writeInt(checksum.length); | ||
oos.write(checksum); | ||
} | ||
oos.writeLong(cacheCapacity); | ||
oos.writeUTF(ioEngine.getClass().getName()); | ||
oos.writeUTF(backingMap.getClass().getName()); | ||
oos.writeObject(deserialiserMap); | ||
oos.writeObject(backingMap); | ||
} finally { | ||
if (oos != null) oos.close(); | ||
if (fos != null) fos.close(); | ||
} catch (NoSuchAlgorithmException e) { | ||
LOG.error("No such algorithm : " + algorithm + "! Failed to persist data on exit",e); | ||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, | ||
private void retrieveFromFile(int[] bucketSizes) throws IOException, | ||
ClassNotFoundException { | ||
File persistenceFile = new File(persistencePath); | ||
if (!persistenceFile.exists()) { | ||
return; | ||
} | ||
assert !cacheEnabled; | ||
FileInputStream fis = null; | ||
ObjectInputStream ois = null; | ||
try { | ||
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistencePath))){ | ||
if (!ioEngine.isPersistent()) | ||
throw new IOException( | ||
"Attempt to restore non-persistent cache mappings!"); | ||
fis = new FileInputStream(persistencePath); | ||
ois = new ObjectInputStream(fis); | ||
// for backward compatibility | ||
if (ioEngine instanceof PersistentIOEngine && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above comment. See above |
||
!((PersistentIOEngine) ioEngine).isOldVersion()) { | ||
byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length]; | ||
ois.read(PBMagic); | ||
int length = ois.readInt(); | ||
byte[] persistenceChecksum = new byte[length]; | ||
ois.read(persistenceChecksum); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually we are reading persistentChecksum twice in this flow. At FileIOE create time as part of verify call and here too. Here we are doing as a skip way. So why can't we do it here only? We have verifyFileIntegrity() in PersistentIOEngine interface and we can call that from here? It looks bit odd. The oldVersion check can be done here also based on he PBMagic matching. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about your thoughts. If we want to read it once, we should pass ObjectInputStream object "ois" to the verifyFileIntegrity() method. If it's an old version persistent file, the ois object should be reset, but reset() method is not support. We can recreate an ObjectInputStream without using try-with-resource statement, but this may be a bit unsightly...... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be we need to close and reopen for the read. My biggest worry is where we do this verify. See my below comment. Now we are doing it while creation of FileIOE. If the verify fails, we are not allowing the FileIOE to be created and do its future work. My point is this. We should create the FileIOE. And then the Bucket Cache is trying to retrieve the already cached data from persistent store and for that its recreating the cache meta. At that step we should be doing the verification right. First see whether the checksum for verify is present already and if so verify. If verify ok and then try to recreate the cache meta data. Or else just forget abt that existing persisted cache data and may be do the necessary cleanup. All these work of Bucket Cache. It can ask the FileIOE to do actual verify. But should be initiated by the BucketCache. You get my mind clearly now? Sorry for not saying in detail at 1st step itself. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got your point already. In fact, if the verification fails, FileIOE can still be created. If verification fails, we would throw IOException, then cache the IOException and do some cleanup, but the creation of FileIOE will continue. Below is the code for the cache: |
||
} | ||
long capacitySize = ois.readLong(); | ||
if (capacitySize != cacheCapacity) | ||
throw new IOException("Mismatched cache capacity:" | ||
|
@@ -1078,9 +1097,8 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAlloc | |
bucketAllocator = allocator; | ||
deserialiserMap = deserMap; | ||
backingMap = backingMapFromFile; | ||
blockNumber.set(backingMap.size()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change? Is it related to this jira directly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When retrieve successfully from file, the "Block Count" in WebUI would be 0 if blockNumber is not changed. But it's have blocks actually. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. So this is an existing bug. Its a one liner change. Still can be done as another bug jira may be. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
} finally { | ||
if (ois != null) ois.close(); | ||
if (fis != null) fis.close(); | ||
if (!persistenceFile.delete()) { | ||
throw new IOException("Failed deleting persistence file " | ||
+ persistenceFile.getAbsolutePath()); | ||
|
@@ -1597,4 +1615,9 @@ float getMultiFactor() { | |
float getMemoryFactor() { | ||
return memoryFactor; | ||
} | ||
|
||
@VisibleForTesting | ||
public UniqueIndexMap<Integer> getDeserialiserMap() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto |
||
return deserialiserMap; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,46 +19,96 @@ | |
package org.apache.hadoop.hbase.io.hfile.bucket; | ||
|
||
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.IOException; | ||
import java.io.ObjectInputStream; | ||
import java.io.RandomAccessFile; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.ClosedByInterruptException; | ||
import java.nio.channels.ClosedChannelException; | ||
import java.nio.channels.FileChannel; | ||
import java.security.MessageDigest; | ||
import java.security.NoSuchAlgorithmException; | ||
import java.util.Arrays; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
import org.apache.hadoop.hbase.classification.InterfaceAudience; | ||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; | ||
import org.apache.hadoop.hbase.util.Bytes; | ||
import org.apache.hadoop.util.Shell; | ||
import org.apache.hadoop.util.StringUtils; | ||
|
||
/** | ||
* IO engine that stores data to a file on the local file system. | ||
*/ | ||
@InterfaceAudience.Private | ||
public class FileIOEngine implements IOEngine { | ||
public class FileIOEngine implements PersistentIOEngine { | ||
private static final Log LOG = LogFactory.getLog(FileIOEngine.class); | ||
public static final String FILE_DELIMITER = ","; | ||
private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""}); | ||
|
||
private final String[] filePaths; | ||
private final FileChannel[] fileChannels; | ||
private final RandomAccessFile[] rafs; | ||
private final ReentrantLock[] channelLocks; | ||
|
||
private final long sizePerFile; | ||
private final long capacity; | ||
private final String algorithmName; | ||
private boolean oldVersion; | ||
|
||
private FileReadAccessor readAccessor = new FileReadAccessor(); | ||
private FileWriteAccessor writeAccessor = new FileWriteAccessor(); | ||
|
||
public FileIOEngine(long capacity, String... filePaths) throws IOException { | ||
public FileIOEngine(String algorithmName, String persistentPath, | ||
long capacity, String... filePaths) throws IOException { | ||
this.sizePerFile = capacity / filePaths.length; | ||
this.capacity = this.sizePerFile * filePaths.length; | ||
this.filePaths = filePaths; | ||
this.fileChannels = new FileChannel[filePaths.length]; | ||
this.rafs = new RandomAccessFile[filePaths.length]; | ||
this.channelLocks = new ReentrantLock[filePaths.length]; | ||
this.algorithmName = algorithmName; | ||
verifyFileIntegrity(persistentPath); | ||
init(); | ||
} | ||
|
||
/** | ||
* Verify cache files's integrity | ||
* @param persistentPath the backingMap persistent path | ||
*/ | ||
@Override | ||
public void verifyFileIntegrity(String persistentPath) { | ||
if (persistentPath != null) { | ||
byte[] persistentChecksum = readPersistentChecksum(persistentPath); | ||
if (!oldVersion) { | ||
try { | ||
byte[] calculateChecksum = calculateChecksum(); | ||
if (!Bytes.equals(persistentChecksum, calculateChecksum)) { | ||
LOG.warn("The persistent checksum is " + Bytes.toString(persistentChecksum) + | ||
", but the calculate checksum is " + Bytes.toString(calculateChecksum)); | ||
throw new IOException(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually if the checksum do not match, we can still continue with RS operation. We can not regain the old cached data. But now as this throw IOE happens while construction of the FileIOEngine, we can no longer use the IOEngine itself. That is wrong. One more reason not to do this verify as part of constructor but at a later time as part of retrieve from persisted meta data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The IOException will be cached, and then do the delete operation. So the IOEngine can use after that. |
||
} | ||
} catch (IOException ioex) { | ||
LOG.error("File verification failed because of ", ioex); | ||
// delete cache files and backingMap persistent file. | ||
deleteCacheDataFile(); | ||
new File(persistentPath).delete(); | ||
} catch (NoSuchAlgorithmException nsae) { | ||
LOG.error("No such algorithm " + algorithmName, nsae); | ||
throw new RuntimeException(nsae); | ||
} | ||
} | ||
} else { | ||
// not configure persistent path | ||
deleteCacheDataFile(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where we will create the cache files again then? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cache files would be created in init() method. |
||
} | ||
} | ||
|
||
private void init() throws IOException { | ||
for (int i = 0; i < filePaths.length; i++) { | ||
String filePath = filePaths[i]; | ||
try { | ||
|
@@ -68,15 +118,15 @@ public FileIOEngine(long capacity, String... filePaths) throws IOException { | |
// The next setting length will throw exception,logging this message | ||
// is just used for the detail reason of exception, | ||
String msg = "Only " + StringUtils.byteDesc(totalSpace) | ||
+ " total space under " + filePath + ", not enough for requested " | ||
+ StringUtils.byteDesc(sizePerFile); | ||
+ " total space under " + filePath + ", not enough for requested " | ||
+ StringUtils.byteDesc(sizePerFile); | ||
LOG.warn(msg); | ||
} | ||
rafs[i].setLength(sizePerFile); | ||
fileChannels[i] = rafs[i].getChannel(); | ||
channelLocks[i] = new ReentrantLock(); | ||
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) | ||
+ ", on the path:" + filePath); | ||
+ ", on the path: " + filePath); | ||
} catch (IOException fex) { | ||
LOG.error("Failed allocating cache on " + filePath, fex); | ||
shutdown(); | ||
|
@@ -267,6 +317,98 @@ void refreshFileConnection(int accessFileNum, IOException ioe) throws IOExceptio | |
} | ||
} | ||
|
||
/** | ||
* Read the persistent checksum from persistent path | ||
* @param persistentPath the backingMap persistent path | ||
* @return the persistent checksum | ||
*/ | ||
private byte[] readPersistentChecksum(String persistentPath) { | ||
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistentPath))) { | ||
byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length]; | ||
ois.read(PBMagic); | ||
if (Bytes.equals(ProtobufUtil.PB_MAGIC, PBMagic)) { | ||
int length = ois.readInt(); | ||
byte[] persistentChecksum = new byte[length]; | ||
ois.read(persistentChecksum); | ||
return persistentChecksum; | ||
} else { | ||
// if the persistent file is not start with PB_MAGIC, it's an old version file | ||
oldVersion = true; | ||
} | ||
} catch (IOException ioex) { | ||
LOG.warn("Failed read persistent checksum, because of " + ioex); | ||
return null; | ||
} | ||
return null; | ||
} | ||
|
||
@Override | ||
public void deleteCacheDataFile() { | ||
if (filePaths == null) { | ||
return; | ||
} | ||
for (String file : filePaths) { | ||
new File(file).delete(); | ||
} | ||
} | ||
|
||
@Override | ||
public byte[] calculateChecksum() | ||
throws IOException, NoSuchAlgorithmException { | ||
if (filePaths == null) { | ||
return null; | ||
} | ||
StringBuilder sb = new StringBuilder(); | ||
for (String filePath : filePaths){ | ||
File file = new File(filePath); | ||
if (file.exists()){ | ||
sb.append(filePath); | ||
sb.append(getFileSize(filePath)); | ||
sb.append(file.lastModified()); | ||
} else { | ||
throw new IOException("Cache file: " + filePath + " is not exists."); | ||
} | ||
} | ||
MessageDigest messageDigest = MessageDigest.getInstance(algorithmName); | ||
messageDigest.update(Bytes.toBytes(sb.toString())); | ||
return messageDigest.digest(); | ||
} | ||
|
||
@Override | ||
public boolean isOldVersion() { | ||
return oldVersion; | ||
} | ||
|
||
/** | ||
* Using Linux command du to get file's real size | ||
* @param filePath the file | ||
* @return file's real size | ||
* @throws IOException something happened like file not exists | ||
*/ | ||
private static long getFileSize(String filePath) throws IOException { | ||
DU.setExecCommand(filePath); | ||
DU.execute(); | ||
return Long.parseLong(DU.getOutput().split("\t")[0]); | ||
} | ||
|
||
private static class DuFileCommand extends Shell.ShellCommandExecutor { | ||
private String[] execCommand; | ||
|
||
DuFileCommand(String[] execString) { | ||
super(execString); | ||
execCommand = execString; | ||
} | ||
|
||
void setExecCommand(String filePath) { | ||
this.execCommand[1] = filePath; | ||
} | ||
|
||
@Override | ||
public String[] getExecString() { | ||
return this.execCommand; | ||
} | ||
} | ||
|
||
private static interface FileAccessor { | ||
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) | ||
throws IOException; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java doc for this new member, what is this algorithm for?