-
Notifications
You must be signed in to change notification settings - Fork 509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDDS-8888. Consider Datanode queue capacity when sending DeleteBlocks command #4939
Merged
Merged
Changes from 5 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
75a6a0f
HDDS-8888. Limit SCMBlockDeletingService to sending delete transactio…
xichen01 4578b3c
added licensed for new file
xichen01 84e601a
Add new cnew configuration to ozone-default.xml
xichen01 75c2021
Using the existing hdds.datanode.block.delete.queue.limit configurati…
xichen01 a3a41dd
Fix unit test
xichen01 f31cf19
Merge branch 'master' into HDDS-8888
xichen01 092c852
Add more limit for the deleteBlocksCommand sending
xichen01 7580f2f
Fix test
48852c8
Merge remote-tracking branch 'origin/master' into HDDS-8888
adoroszlai File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
175 changes: 175 additions & 0 deletions
175
...erver-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* <p> | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* <p> | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hadoop.hdds.scm.block; | ||
|
||
import org.apache.hadoop.hdds.conf.OzoneConfiguration; | ||
import org.apache.hadoop.hdds.protocol.DatanodeDetails; | ||
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; | ||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; | ||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; | ||
import org.apache.hadoop.hdds.scm.events.SCMEvents; | ||
import org.apache.hadoop.hdds.scm.ha.SCMContext; | ||
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; | ||
import org.apache.hadoop.hdds.scm.node.NodeManager; | ||
import org.apache.hadoop.hdds.scm.node.NodeStatus; | ||
import org.apache.hadoop.hdds.server.events.EventPublisher; | ||
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; | ||
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.mockito.ArgumentCaptor; | ||
|
||
import java.time.Clock; | ||
import java.time.ZoneOffset; | ||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.anyInt; | ||
import static org.mockito.ArgumentMatchers.anySet; | ||
import static org.mockito.ArgumentMatchers.eq; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
/** | ||
* Test SCMBlockDeletingService. | ||
*/ | ||
public class TestSCMBlockDeletingService { | ||
private SCMBlockDeletingService service; | ||
private EventPublisher eventPublisher; | ||
private List<DatanodeDetails> datanodeDetails; | ||
private OzoneConfiguration conf; | ||
private NodeManager nodeManager; | ||
private ScmBlockDeletingServiceMetrics metrics; | ||
|
||
@BeforeEach | ||
public void setup() throws Exception { | ||
nodeManager = mock(NodeManager.class); | ||
eventPublisher = mock(EventPublisher.class); | ||
conf = new OzoneConfiguration(); | ||
metrics = ScmBlockDeletingServiceMetrics.create(); | ||
when(nodeManager.getTotalDatanodeCommandCount(any(), | ||
any())).thenReturn(0); | ||
SCMServiceManager scmServiceManager = mock(SCMServiceManager.class); | ||
SCMContext scmContext = mock(SCMContext.class); | ||
|
||
DatanodeDeletedBlockTransactions ddbt = | ||
new DatanodeDeletedBlockTransactions(); | ||
DatanodeDetails datanode1 = MockDatanodeDetails.randomDatanodeDetails(); | ||
DatanodeDetails datanode2 = MockDatanodeDetails.randomDatanodeDetails(); | ||
DatanodeDetails datanode3 = MockDatanodeDetails.randomDatanodeDetails(); | ||
datanodeDetails = Arrays.asList(datanode1, datanode2, datanode3); | ||
when(nodeManager.getNodes(NodeStatus.inServiceHealthy())).thenReturn( | ||
datanodeDetails); | ||
DeletedBlocksTransaction tx1 = createTestDeleteTxn(1, Arrays.asList(1L), 1); | ||
ddbt.addTransactionToDN(datanode1.getUuid(), tx1); | ||
ddbt.addTransactionToDN(datanode2.getUuid(), tx1); | ||
ddbt.addTransactionToDN(datanode3.getUuid(), tx1); | ||
DeletedBlockLog mockDeletedBlockLog = mock(DeletedBlockLog.class); | ||
when(mockDeletedBlockLog.getTransactions( | ||
anyInt(), anySet())).thenReturn(ddbt); | ||
|
||
service = spy(new SCMBlockDeletingService( | ||
mockDeletedBlockLog, nodeManager, eventPublisher, scmContext, | ||
scmServiceManager, conf, metrics, Clock.system( | ||
ZoneOffset.UTC))); | ||
when(service.shouldRun()).thenReturn(true); | ||
} | ||
|
||
@AfterEach | ||
public void stop() { | ||
service.stop(); | ||
ScmBlockDeletingServiceMetrics.unRegister(); | ||
} | ||
|
||
@Test | ||
public void testCall() throws Exception { | ||
callDeletedBlockTransactionScanner(); | ||
|
||
ArgumentCaptor<CommandForDatanode> argumentCaptor = | ||
ArgumentCaptor.forClass(CommandForDatanode.class); | ||
|
||
// Three Datanode is healthy and in-service, and the task queue is empty, | ||
// so the transaction will send to all three Datanode | ||
verify(eventPublisher, times(3)).fireEvent( | ||
eq(SCMEvents.DATANODE_COMMAND), argumentCaptor.capture()); | ||
List<CommandForDatanode> actualCommands = argumentCaptor.getAllValues(); | ||
List<UUID> actualDnIds = actualCommands.stream() | ||
.map(CommandForDatanode::getDatanodeId) | ||
.collect(Collectors.toList()); | ||
Set<UUID> expectedDnIdsSet = datanodeDetails.stream() | ||
.map(DatanodeDetails::getUuid).collect(Collectors.toSet()); | ||
|
||
assertEquals(expectedDnIdsSet, new HashSet<>(actualDnIds)); | ||
assertEquals(datanodeDetails.size(), | ||
metrics.getNumBlockDeletionCommandSent()); | ||
// Echo Command has one Transaction | ||
assertEquals(datanodeDetails.size() * 1, | ||
metrics.getNumBlockDeletionTransactionSent()); | ||
} | ||
|
||
private void callDeletedBlockTransactionScanner() throws Exception { | ||
service.getTasks().poll().call(); | ||
} | ||
|
||
@Test | ||
public void testLimitCommandSending() throws Exception { | ||
DatanodeConfiguration dnConf = | ||
conf.getObject(DatanodeConfiguration.class); | ||
int pendingCommandLimit = dnConf.getBlockDeleteQueueLimit(); | ||
|
||
// The number of commands pending on all Datanodes has reached the limit. | ||
when(nodeManager.getTotalDatanodeCommandCount(any(), | ||
any())).thenReturn(pendingCommandLimit); | ||
assertEquals(0, | ||
service.getDatanodesWithinCommandLimit(datanodeDetails).size()); | ||
|
||
// The number of commands pending on all Datanodes is 0 | ||
when(nodeManager.getTotalDatanodeCommandCount(any(), | ||
any())).thenReturn(0); | ||
assertEquals(datanodeDetails.size(), | ||
service.getDatanodesWithinCommandLimit(datanodeDetails).size()); | ||
|
||
// The number of commands pending on first Datanodes has reached the limit. | ||
DatanodeDetails fullDatanode = datanodeDetails.get(0); | ||
when(nodeManager.getTotalDatanodeCommandCount(fullDatanode, | ||
Type.deleteBlocksCommand)).thenReturn(pendingCommandLimit); | ||
Set<DatanodeDetails> includeNodes = | ||
service.getDatanodesWithinCommandLimit(datanodeDetails); | ||
assertEquals(datanodeDetails.size() - 1, | ||
includeNodes.size()); | ||
assertFalse(includeNodes.contains(fullDatanode)); | ||
} | ||
|
||
private DeletedBlocksTransaction createTestDeleteTxn( | ||
long txnID, List<Long> blocks, long containerID) { | ||
return DeletedBlocksTransaction.newBuilder().setTxID(txnID) | ||
.setContainerID(containerID).addAllLocalID(blocks).setCount(0).build(); | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The basic idea looks good, but I have a concern that the queued cmd size from the last dn heartbeat has a potential delay. If the delete operation is frequent, there might be a case that the service cannot find idle data nodes to send cmd until the next heartbeat comes. The deletion process might not be smooth as you expect.
Pls correct me if i am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default the SCM generates a delete command every 60s and sends it to the DN. The DN reports a heartbeat every 30s. So normally SCM can get the newer DN status.
getTotalDatanodeCommandCount
returns the number ofDeleteBlocksCommand
, each time SCM executesDeletedBlockTransactionScanner
, only oneDeleteBlocksCommand
is sent to a specific DN.DeletedBlockTransactionScanner
execution frequency is fixed, the limit here is 5, so the SCM must execute at least 5 times before the DN's queue is full, which needs 5 min. as long as the DN can send a heartbeat of before all these commands are executed, then the SCM can continue to send delete commands to the DN.If the queue of all DNs is full, SCM should not continue to send new commands to DN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Most of the time it should be fine.
The worst case:
A If DN reported the HB at time A with a full cmd queue.
A+ 29.9s. SCM DeletedBlockTransactionScanner executes and cannot send cmd to DN, needs to wait for next
round.
A+ 90s. SCM has updated the latest HB from DN and DeletedBlockTransactionScanner executes, and finally
send cmd to DN again.
It could lead to at most a 90-sec gap.
Right now it seems to be trivial compared to the interval of DeletedBlockTransactionScanner.
Thx for the explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and when A+ 29.9s ~ A+ 90s the DN can continue to process the
DeleteBlocksCommand
in its command queue. And the DN's command queue is full, DN will not be idle. Because this PR is determine whether to continue sendingDeleteBlocksCommand
to DN according to the command queue length of DN.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xichen01
Considering parallel HB and SCM deleteBlock processing, they are synchronized using lock,
So below sequence,
Scenario 1: - No issue as queue empty and next command can be added
Scenario 2: Here, adding same command will be duplicate and there is retry
Considering this, we need not have queue at SCM and above changes not required.