Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool manager: retry request on pool up #7677

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* dCache - http://www.dcache.org/
*
* Copyright (C) 2021-2022 Deutsches Elektronen-Synchrotron
* Copyright (C) 2021-2024 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand All @@ -20,6 +20,8 @@

import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.StorageInfo;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -85,6 +87,11 @@ public FileAttributesBuilder withChecksum(Checksum checksum) {
return this;
}

public FileAttributesBuilder withLocations(String...locations) {
_attributes.setLocations(Arrays.asList(locations));
return this;
}

public FileAttributes build() {
if (!_checksums.isEmpty()) {
_attributes.setChecksums(_checksums);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.dcache.cells.CellStub;
import org.dcache.namespace.FileAttribute;
import org.dcache.poolmanager.CostException;
import org.dcache.poolmanager.Partition;
import org.dcache.poolmanager.PartitionManager;
Expand Down Expand Up @@ -363,10 +364,12 @@ public void poolStatusChanged(String poolName, int poolStatus) {
*
* in this construction we will fall down to next case
*/
if (rph.getPoolCandidate().equals(POOL_UNKNOWN_STRING)) {
if (rph.getPoolCandidate().equals(POOL_UNKNOWN_STRING) || rph.expectedOnPool(poolName)) {
LOGGER.info("Restore Manager : retrying : {}", rph);
rph.retry();
}

// fall through to retry requests scheduled on that pool
case PoolStatusChangedMessage.DOWN:
/*
* if pool is down, re-try all request scheduled to this
Expand Down Expand Up @@ -1039,6 +1042,16 @@ public String getPoolCandidate() {
}
}

/**
* Returns true if file is expected to be on specified pool.
* @param poolName pool name to check.
* @return true if file is expected to be on specified pool.
*/
public boolean expectedOnPool(String poolName) {
return _fileAttributes.isDefined(FileAttribute.LOCATIONS)
&& _fileAttributes.getLocations().contains(poolName);
}

private String getPoolCandidateState() {
if (_stageCandidate.isPresent()) {
return _stageCandidate.get().name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3337,6 +3337,73 @@ public void shouldCancelStageRequestOnFail() throws Exception {
is("rh kill 80D1B8B90CED30430608C58002811B3285FC"));
}

@Test
public void shouldRetryOnPoolUpEvenForStage() throws Exception {
var stagePool = aPool("stage-pool@dCacheDomain");
given(aPartitionManager().withDefault(aPartition().withStageAllowed(true)));
given(aPoolSelectionUnit().withNetUnit("all-net", "192.168.1.1")
.withProtocolUnit("HTTP", "http/1"));
given(aPoolMonitor().thatReturns(aPoolSelectorThat()
.onReadThrows(aFileNotInCacheException())
.onStageSelects(stagePool)));

given(aContainer("PoolManager@dCacheDomain").thatDoesNotSendHitMessages());

whenReceiving(aReadRequest()
.by(ROOT)
.forFile("80D1B8B90CED30430608C58002811B3285FC")
.withBillingPath("/public/test")
.withTransferPath("/uploads/50/test")
.withFileAttributes(fileAttributes().withSize(10, KiB)
.withLocations("some-pool")
.withStorageInfo(aStorageInfo().withLocation("osm://RZ1/bfid1")))
.withProtocolInfo(aProtocolInfo().withProtocol("http")
.withMajorVersion(1).withIPAddress("192.168.1.1")));

container.setPoolMonitor(poolMonitor);
whenReceiving(aPoolStatusChange().thatPool("some-pool").isUp());

var reply = replySentWith(endpoint);

then(reply).should().setFailed(eq(10021), any());
then(reply).should().setContext(eq(0), any());

then(endpoint).shouldHaveNoMoreInteractions();
}

@Test
public void shouldIgnoreOnRandomPoolUp() throws Exception {
var stagePool = aPool("stage-pool@dCacheDomain");
given(aPartitionManager().withDefault(aPartition().withStageAllowed(true)));
given(aPoolSelectionUnit().withNetUnit("all-net", "192.168.1.1")
.withProtocolUnit("HTTP", "http/1"));
given(aPoolMonitor().thatReturns(aPoolSelectorThat()
.onReadThrows(aFileNotInCacheException())
.onStageSelects(stagePool)));

given(aContainer("PoolManager@dCacheDomain").thatDoesNotSendHitMessages());

whenReceiving(aReadRequest()
.by(ROOT)
.forFile("80D1B8B90CED30430608C58002811B3285FC")
.withBillingPath("/public/test")
.withTransferPath("/uploads/50/test")
.withFileAttributes(fileAttributes().withSize(10, KiB)
.withLocations("some-pool")
.withStorageInfo(aStorageInfo().withLocation("osm://RZ1/bfid1")))
.withProtocolInfo(aProtocolInfo().withProtocol("http")
.withMajorVersion(1).withIPAddress("192.168.1.1")));

container.setPoolMonitor(poolMonitor);
whenReceiving(aPoolStatusChange().thatPool("random-pool").isUp());

var message = stageSentWith(endpoint);

// the only message we have is starting stage
assertThat(message.isReply(), is(false));
then(endpoint).shouldHaveNoMoreInteractions();
}

private void given(ContainerBuilder builder) {
container = builder.build();
}
Expand Down Expand Up @@ -3406,6 +3473,11 @@ private static PoolMgrSelectReadPoolMsg replySentWith(CellEndpoint endpointUsed)
return (PoolMgrSelectReadPoolMsg) envelope.getMessageObject();
}

private static PoolFetchFileMessage stageSentWith(CellEndpoint endpointUsed) {
var envelope = envelopeSentWith(endpointUsed);
return (PoolFetchFileMessage) envelope.getMessageObject();
}

private static List<PoolMgrSelectReadPoolMsg> allRepliesSentWith(CellEndpoint endpointUsed) {
var envelopeArg = ArgumentCaptor.forClass(CellMessage.class);
verify(endpointUsed, Mockito.atLeastOnce()).sendMessage(envelopeArg.capture());
Expand Down