Skip to content

Commit

Permalink
HBASE-22890 Verify the file integrity in persistent IOEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhaoBQ committed Sep 20, 2019
1 parent 5c4d8e0 commit 031714b
Show file tree
Hide file tree
Showing 4 changed files with 473 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
Expand Down Expand Up @@ -242,6 +244,16 @@ public int compare(BlockCacheKey a, BlockCacheKey b) {
/** In-memory bucket size */
private float memoryFactor;

private static final String FILE_VERIFY_ALGORITHM =
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";

/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check
* persistent file integrity, default algorithm is MD5
* */
private String algorithm;

public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
Expand All @@ -252,8 +264,9 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf)
throws FileNotFoundException, IOException {
this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
throws IOException {
this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
ioEngine = getIOEngineFromName(ioEngineName, capacity);
this.writerThreads = new WriterThread[writerThreadNum];
long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) {
Expand Down Expand Up @@ -295,7 +308,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
} catch (IOException ioex) {
LOG.error("Can't restore from file because of", ioex);
} catch (ClassNotFoundException cnfe) {
LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe);
throw new RuntimeException(cnfe);
}
}
Expand Down Expand Up @@ -1021,41 +1034,69 @@ static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry>

private void persistToFile() throws IOException {
assert !cacheEnabled;
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try {
try (ObjectOutputStream oos = new ObjectOutputStream(
new FileOutputStream(persistencePath, false))){
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
fos = new FileOutputStream(persistencePath, false);
oos = new ObjectOutputStream(fos);
byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum(algorithm);
if (checksum != null) {
oos.write(ProtobufUtil.PB_MAGIC);
oos.writeInt(checksum.length);
oos.write(checksum);
}
oos.writeLong(cacheCapacity);
oos.writeUTF(ioEngine.getClass().getName());
oos.writeUTF(backingMap.getClass().getName());
oos.writeObject(deserialiserMap);
oos.writeObject(backingMap);
} finally {
if (oos != null) oos.close();
if (fos != null) fos.close();
}
}

@SuppressWarnings("unchecked")
private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
private void retrieveFromFile(int[] bucketSizes) throws IOException,
ClassNotFoundException {
File persistenceFile = new File(persistencePath);
if (!persistenceFile.exists()) {
return;
}
assert !cacheEnabled;
FileInputStream fis = null;
ObjectInputStream ois = null;
try {
if (!ioEngine.isPersistent())
throw new IOException(
"Attempt to restore non-persistent cache mappings!");
fis = new FileInputStream(persistencePath);
ois = new ObjectInputStream(fis);
ois = new ObjectInputStream(new FileInputStream(persistencePath));
int pblen = ProtobufUtil.lengthOfPBMagic();
byte[] pbuf = new byte[pblen];
int read = ois.read(pbuf);
if (read != pblen) {
LOG.warn("Can't restore from file because of incorrect number of bytes read while " +
"checking for protobuf magic number. Requested=" + pblen + ", but received= " +
read + ".");
return;
}
if (Bytes.equals(ProtobufUtil.PB_MAGIC, pbuf)) {
int length = ois.readInt();
byte[] persistentChecksum = new byte[length];
int readLen = ois.read(persistentChecksum);
if (readLen != length) {
LOG.warn("Can't restore from file because of incorrect number of bytes read while " +
"checking for persistent checksum. Requested=" + length + ", but received=" +
readLen + ". ");
return;
}
if (!((PersistentIOEngine) ioEngine).verifyFileIntegrity(
persistentChecksum, algorithm)) {
LOG.warn("Can't restore from file because of verification failed.");
return;
}
} else {
// persistent file may be an old version of file, it's not support verification,
// so reopen ObjectInputStream and read the persistent file from head
ois.close();
ois = new ObjectInputStream(new FileInputStream(persistencePath));
}
long capacitySize = ois.readLong();
if (capacitySize != cacheCapacity)
throw new IOException("Mismatched cache capacity:"
Expand All @@ -1079,8 +1120,9 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAlloc
deserialiserMap = deserMap;
backingMap = backingMapFromFile;
} finally {
if (ois != null) ois.close();
if (fis != null) fis.close();
if (ois != null) {
ois.close();
}
if (!persistenceFile.delete()) {
throw new IOException("Failed deleting persistence file "
+ persistenceFile.getAbsolutePath());
Expand Down Expand Up @@ -1597,4 +1639,9 @@ float getMultiFactor() {
float getMemoryFactor() {
return memoryFactor;
}

@VisibleForTesting
public UniqueIndexMap<Integer> getDeserialiserMap() {
return deserialiserMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

/**
* IO engine that stores data to a file on the local file system.
*/
@InterfaceAudience.Private
public class FileIOEngine implements IOEngine {
public class FileIOEngine implements PersistentIOEngine {
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
public static final String FILE_DELIMITER = ",";
private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""});

private final String[] filePaths;
private final FileChannel[] fileChannels;
private final RandomAccessFile[] rafs;
Expand Down Expand Up @@ -68,15 +74,20 @@ public FileIOEngine(long capacity, String... filePaths) throws IOException {
// The next setting length will throw exception,logging this message
// is just used for the detail reason of exception,
String msg = "Only " + StringUtils.byteDesc(totalSpace)
+ " total space under " + filePath + ", not enough for requested "
+ StringUtils.byteDesc(sizePerFile);
+ " total space under " + filePath + ", not enough for requested "
+ StringUtils.byteDesc(sizePerFile);
LOG.warn(msg);
}
rafs[i].setLength(sizePerFile);
File file = new File(filePath);
// setLength() method will change file's last modified time. So if don't do
// this check, wrong time will be used when calculating checksum.
if (file.length() != sizePerFile) {
rafs[i].setLength(sizePerFile);
}
fileChannels[i] = rafs[i].getChannel();
channelLocks[i] = new ReentrantLock();
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
+ ", on the path:" + filePath);
+ ", on the path: " + filePath);
} catch (IOException fex) {
LOG.error("Failed allocating cache on " + filePath, fex);
shutdown();
Expand All @@ -85,6 +96,18 @@ public FileIOEngine(long capacity, String... filePaths) throws IOException {
}
}

@Override
public boolean verifyFileIntegrity(byte[] persistentChecksum, String algorithm) {
byte[] calculateChecksum = calculateChecksum(algorithm);
if (!Bytes.equals(persistentChecksum, calculateChecksum)) {
LOG.error("Mismatch of checksum! The persistent checksum is " +
Bytes.toString(persistentChecksum) + ", but the calculate checksum is " +
Bytes.toString(calculateChecksum));
return false;
}
return true;
}

@Override
public String toString() {
return "ioengine=" + this.getClass().getSimpleName() + ", paths="
Expand Down Expand Up @@ -267,6 +290,61 @@ void refreshFileConnection(int accessFileNum, IOException ioe) throws IOExceptio
}
}

@Override
public byte[] calculateChecksum(String algorithm) {
if (filePaths == null) {
return null;
}
try {
StringBuilder sb = new StringBuilder();
for (String filePath : filePaths){
File file = new File(filePath);
sb.append(filePath);
sb.append(getFileSize(filePath));
sb.append(file.lastModified());
}
MessageDigest messageDigest = MessageDigest.getInstance(algorithm);
messageDigest.update(Bytes.toBytes(sb.toString()));
return messageDigest.digest();
} catch (IOException ioex) {
LOG.error("Calculating checksum failed.", ioex);
return null;
} catch (NoSuchAlgorithmException e) {
LOG.error("No such algorithm : " + algorithm + "!");
return null;
}
}

/**
* Using Linux command du to get file's real size
* @param filePath the file
* @return file's real size
* @throws IOException something happened like file not exists
*/
private static long getFileSize(String filePath) throws IOException {
DU.setExecCommand(filePath);
DU.execute();
return Long.parseLong(DU.getOutput().split("\t")[0]);
}

private static class DuFileCommand extends Shell.ShellCommandExecutor {
private String[] execCommand;

DuFileCommand(String[] execString) {
super(execString);
execCommand = execString;
}

void setExecCommand(String filePath) {
this.execCommand[1] = filePath;
}

@Override
public String[] getExecString() {
return this.execCommand;
}
}

private static interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import org.apache.hadoop.hbase.classification.InterfaceAudience;

/**
* A class implementing PersistentIOEngine interface supports persistent and file integrity verify
* for {@link BucketCache}
*/
@InterfaceAudience.Private
public interface PersistentIOEngine extends IOEngine {

/**
* Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5
* @param algorithm which algorithm to calculate checksum
* @return the checksum which is convert to HexString
*/
byte[] calculateChecksum(String algorithm);

/**
* Verify cache files's integrity
* @param persistentChecksum the persistent checksum
* @param algorithm which algorithm to calculate checksum
* @return true if verify successfully
*/
boolean verifyFileIntegrity(byte[] persistentChecksum, String algorithm);
}
Loading

0 comments on commit 031714b

Please sign in to comment.