Skip to content

Commit

Permalink
HDDS-12397. Update Used Bytes.
Browse files Browse the repository at this point in the history
  • Loading branch information
aswinshakil committed Feb 25, 2025
1 parent f3f8612 commit d6e68c6
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,12 @@ public void writeChunkForClosedContainer(ChunkInfo chunkInfo, BlockID blockID,

/**
* Handle Put Block operation for closed container. Calls BlockManager to process the request.
*
* This is primarily used by container reconciliation process to persist the block data for closed container.
* @param kvContainer - Container for which block data need to be persisted.
* @param blockData - Block Data to be persisted (BlockData should have the chunks).
* @param blockCommitSequenceId - Block Commit Sequence ID for the block.
* @param overwriteBscId - To overwrite bcsId in the block data. In case of chunk failure during reconciliation,
* we do not want to overwrite the bcsId as this block is incomplete in its current state.
*/
public void putBlockForClosedContainer(KeyValueContainer kvContainer, BlockData blockData,
long blockCommitSequenceId, boolean overwriteBscId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public long putBlockForClosedContainer(Container container, BlockData data, bool
// block length is used, And also on restart the blocks committed to DB
// is only used to compute the bytes used. This is done to keep the
// current behavior and avoid DB write during write chunk operation.
// Write UTs for this
db.getStore().getMetadataTable().putWithBatch(batch, containerData.getBytesUsedKey(),
containerData.getBytesUsed());

Expand Down Expand Up @@ -443,19 +442,6 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
}
}

@Override
public boolean blockExists(Container container, BlockID blockID) throws IOException {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
try (DBHandle db = BlockUtils.getDB(containerData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);
String blockKey = containerData.getBlockKey(blockID.getLocalID());
return db.getStore().getBlockDataTable().isExist(blockKey);
}
}

/**
* Shutdown KeyValueContainerManager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.container.keyvalue.impl;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
import static org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage.COMMIT_DATA;
Expand Down Expand Up @@ -174,8 +175,25 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ChunkUtils.validateChunkSize(channel, info, chunkFile.getName());
}

ChunkUtils
.writeData(channel, chunkFile.getName(), data, offset, len, volume);
long fileLengthBeforeWrite;
try {
fileLengthBeforeWrite = channel.size();
} catch (IOException e) {
throw new StorageContainerException("IO error encountered while " +
"getting the file size for " + chunkFile.getName(), CHUNK_FILE_INCONSISTENCY);
}

ChunkUtils.writeData(channel, chunkFile.getName(), data, offset, len, volume);

// When overwriting, update the bytes used if the new length is greater than the old length
// This is to ensure that the bytes used is updated correctly when overwriting a smaller chunk
// with a larger chunk.
if (overwrite) {
long fileLengthAfterWrite = offset + len;
if (fileLengthAfterWrite > fileLengthBeforeWrite) {
containerData.incrBytesUsed(fileLengthAfterWrite - fileLengthBeforeWrite);
}
}

containerData.updateWriteStats(len, overwrite);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,6 @@ long putBlockForClosedContainer(Container container, BlockData data, boolean ove
List<BlockData> listBlock(Container container, long startLocalID, int count)
throws IOException;

/**
* Check if a block exists in the container.
*
* @param container - Container from which blocks need to be listed.
* @param blockID - BlockID of the Block.
* @return True if block exists, false otherwise.
*/
boolean blockExists(Container container, BlockID blockID) throws IOException;

/**
* Returns last committed length of the block.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.container.keyvalue.impl;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
Expand All @@ -41,6 +42,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
Expand Down Expand Up @@ -83,10 +85,10 @@ private void initTest(ContainerTestVersionInfo versionInfo)
this.schemaVersion = versionInfo.getSchemaVersion();
this.config = new OzoneConfiguration();
ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, config);
initilaze();
initialize();
}

private void initilaze() throws Exception {
private void initialize() throws Exception {
UUID datanodeId = UUID.randomUUID();
HddsVolume hddsVolume = new HddsVolume.Builder(folder.toString())
.conf(config)
Expand Down Expand Up @@ -202,46 +204,59 @@ public void testPutAndGetBlock(ContainerTestVersionInfo versionInfo)
public void testPutBlockForClosed(ContainerTestVersionInfo versionInfo)
throws Exception {
initTest(versionInfo);
assertEquals(0, keyValueContainer.getContainerData().getBlockCount());
KeyValueContainerData containerData = keyValueContainer.getContainerData();
assertEquals(0, containerData.getBlockCount());
keyValueContainer.close();
assertEquals(CLOSED, keyValueContainer.getContainerState());
// 1. Put Block with bcsId = 2, Overwrite = true
blockManager.putBlockForClosedContainer(keyValueContainer, blockData1, true);

BlockData fromGetBlockData;
//Check Container's bcsId
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData1.getBlockID());
assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
assertEquals(1, keyValueContainer.getContainerData().getBlockCommitSequenceId());
assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());

// 2. Put Block with bcsId = 3, Overwrite = false
BlockData blockData2 = createBlockData(1L, 3L, 1, 0, 2048, 2);
blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, false);

// The block should be written, but we won't be able to read it, As BcsId < container's BcsId
// fails during block read.
Assertions.assertThrows(StorageContainerException.class, () -> blockManager
.getBlock(keyValueContainer, blockData2.getBlockID()));
assertEquals(2, keyValueContainer.getContainerData().getBlockCount());
// BcsId should still be 1, as the BcsId is not overwritten
assertEquals(1, keyValueContainer.getContainerData().getBlockCommitSequenceId());

// 3. Put Block with bcsId = 3, Overwrite = true
// This should succeed as we are overwriting the BcsId, The container BcsId should be updated to 3
// The block count should not change.
blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, true);
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData2.getBlockID());
assertEquals(2, keyValueContainer.getContainerData().getBlockCount());
assertEquals(2, keyValueContainer.getContainerData().getBlockCommitSequenceId());
assertEquals(2, fromGetBlockData.getBlockCommitSequenceId());

// 4. Put Block with bcsId = 1 < container bcsId, Overwrite = true
// Container bcsId should not change
BlockData blockData3 = createBlockData(1L, 1L, 1, 0, 2048, 1);
blockManager.putBlockForClosedContainer(keyValueContainer, blockData3, true);
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData3.getBlockID());
assertEquals(3, keyValueContainer.getContainerData().getBlockCount());
assertEquals(2, keyValueContainer.getContainerData().getBlockCommitSequenceId());
assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());
try (DBHandle db = BlockUtils.getDB(containerData, config)) {
BlockData fromGetBlockData;
//Check Container's bcsId
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData1.getBlockID());
assertEquals(1, containerData.getBlockCount());
assertEquals(1, containerData.getBlockCommitSequenceId());
assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());
assertEquals(1, db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));
assertEquals(1, db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));

// 2. Put Block with bcsId = 2, Overwrite = false
BlockData blockData2 = createBlockData(1L, 3L, 1, 0, 2048, 2);
blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, false);

// The block should be written, but we won't be able to read it, As BcsId < container's BcsId
// fails during block read.
Assertions.assertThrows(StorageContainerException.class, () -> blockManager
.getBlock(keyValueContainer, blockData2.getBlockID()));
assertEquals(2, containerData.getBlockCount());
// BcsId should still be 1, as the BcsId is not overwritten
assertEquals(1, containerData.getBlockCommitSequenceId());
assertEquals(2, db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
assertEquals(1, db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));

// 3. Put Block with bcsId = 2, Overwrite = true
// This should succeed as we are overwriting the BcsId, The container BcsId should be updated to 3
// The block count should not change.
blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, true);
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData2.getBlockID());
assertEquals(2, containerData.getBlockCount());
assertEquals(2, containerData.getBlockCommitSequenceId());
assertEquals(2, fromGetBlockData.getBlockCommitSequenceId());
assertEquals(2, db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
assertEquals(2, db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));

// 4. Put Block with bcsId = 1 < container bcsId, Overwrite = true
// Container bcsId should not change
BlockData blockData3 = createBlockData(1L, 1L, 1, 0, 2048, 1);
blockManager.putBlockForClosedContainer(keyValueContainer, blockData3, true);
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData3.getBlockID());
assertEquals(3, containerData.getBlockCount());
assertEquals(2, containerData.getBlockCommitSequenceId());
assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());
assertEquals(3, db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
assertEquals(2, db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));
}
}

@ContainerTestVersionInfo.ContainerTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,38 +225,70 @@ public void testPutBlockForClosedContainer() throws IOException {
containerSet.addContainer(kvContainer);
KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
chunkInfoList.add(getChunkInfo().getProtoBufMessage());
ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", getBlockID().getLocalID(), 0), 0L, 20L);

chunkInfoList.add(info.getProtoBufMessage());
BlockData putBlockData = new BlockData(getBlockID());
putBlockData.setChunks(chunkInfoList);

ChunkBuffer chunkData = ContainerTestHelper.getData(20);
keyValueHandler.writeChunkForClosedContainer(info, getBlockID(), chunkData, kvContainer);
keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 1L, true);
Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 1L);
Assertions.assertEquals(containerData.getBlockCount(), 1L);
assertEquals(1L, containerData.getBlockCommitSequenceId());
assertEquals(1L, containerData.getBlockCount());

try (DBHandle dbHandle = BlockUtils.getDB(containerData, new OzoneConfiguration())) {
long localID = putBlockData.getLocalID();
BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
.get(containerData.getBlockKey(localID));
Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
// Overwriting the same
assertEquals(20L, containerData.getBytesUsed());
assertEquals(20L, dbHandle.getStore().getMetadataTable().get(containerData.getBytesUsedKey()));
}

// Add another chunk and check the put block data
ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d", getBlockID()
.getLocalID(), 1L), 0, 20L);
ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d", getBlockID().getLocalID(), 1L), 20L, 20L);
chunkInfoList.add(newChunkInfo.getProtoBufMessage());
putBlockData.setChunks(chunkInfoList);

chunkData = ContainerTestHelper.getData(20);
keyValueHandler.writeChunkForClosedContainer(newChunkInfo, getBlockID(), chunkData, kvContainer);
keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 2L, true);
assertEquals(2L, containerData.getBlockCommitSequenceId());
assertEquals(1L, containerData.getBlockCount());

try (DBHandle dbHandle = BlockUtils.getDB(containerData, new OzoneConfiguration())) {
long localID = putBlockData.getLocalID();
BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
.get(containerData.getBlockKey(localID));
Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
assertEquals(40L, containerData.getBytesUsed());
assertEquals(40L, dbHandle.getStore().getMetadataTable().get(containerData.getBytesUsedKey()));
}

newChunkInfo = new ChunkInfo(String.format("%d.data.%d", getBlockID().getLocalID(), 1L), 20L, 30L);
chunkInfoList.remove(chunkInfoList.size() - 1);
chunkInfoList.add(newChunkInfo.getProtoBufMessage());
putBlockData.setChunks(chunkInfoList);

chunkData = ContainerTestHelper.getData(30);
keyValueHandler.writeChunkForClosedContainer(newChunkInfo, getBlockID(), chunkData, kvContainer);
keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 2L, true);
Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
Assertions.assertEquals(containerData.getBlockCount(), 1L);
assertEquals(2L, containerData.getBlockCommitSequenceId());
assertEquals(1L, containerData.getBlockCount());

try (DBHandle dbHandle = BlockUtils.getDB(containerData, new OzoneConfiguration())) {
long localID = putBlockData.getLocalID();
BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
.get(containerData.getBlockKey(localID));
Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
assertEquals(50L, containerData.getBytesUsed());
assertEquals(50L, dbHandle.getStore().getMetadataTable().get(containerData.getBytesUsedKey()));
}

keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 2L, true);
Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
assertEquals(2L, containerData.getBlockCommitSequenceId());
}

private boolean blockDataEquals(BlockData putBlockData, BlockData getBlockData) {
Expand Down

0 comments on commit d6e68c6

Please sign in to comment.