Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BugFix: call listener.onFailure on failure to pin the timestamp #16248

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,59 @@
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_SLOWDOWN_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
String segmentRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
REPOSITORY_NAME
);

return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(segmentRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
}

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}
public void onResponse(Void unused) {
// do nothing
}

@Override
public void onFailure(Exception e) {}
public void onFailure(Exception e) {
fail();
}
};

public void testTimestampPinUnpin() throws Exception {
Expand All @@ -57,15 +76,8 @@ public void testTimestampPinUnpin() throws Exception {
);

Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1();
assertEquals(-1L, lastFetchTimestamp);
assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2());

assertThrows(
IllegalArgumentException.class,
() -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener)
);

long timestamp1 = System.currentTimeMillis() + 30000L;
long timestamp2 = System.currentTimeMillis() + 60000L;
long timestamp3 = System.currentTimeMillis() + 900000L;
Expand Down Expand Up @@ -197,6 +209,104 @@ public void onFailure(Exception e) {
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}

public void testPinExceptionsOlderTimestamp() throws InterruptedException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// We expect onFailure to be called
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}, latch));

latch.await();
}

public void testPinExceptionsRemoteStoreCallTakeTime() throws InterruptedException, ExecutionException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

CountDownLatch latch = new CountDownLatch(1);
slowDownRepo(REPOSITORY_NAME, 10);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(1));
long timestampToBePinned = System.currentTimeMillis() + 600000;
remoteStorePinnedTimestampService.pinTimestamp(timestampToBePinned, "ss1", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// We expect onFailure to be called
fail();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage());
assertTrue(e instanceof RuntimeException);
assertTrue(e.getMessage().contains("Timestamp pinning took"));

// Check if the timestamp was unpinned
remoteStorePinnedTimestampService.forceSyncPinnedTimestamps();
assertFalse(RemoteStorePinnedTimestampService.getPinnedTimestamps().v2().contains(timestampToBePinned));
}
}, latch));

latch.await();
}

protected void slowDownRepo(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_SLOWDOWN_SETTING.getKey(), value);
createRepository(repoName, rmd.type(), settings);
}

public void testUnpinException() throws InterruptedException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.unpinTimestamp(1234L, "dummy-entity", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// We expect onFailure to be called
fail();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage());
assertTrue(e instanceof IllegalArgumentException);
}
}, latch));

latch.await();
}

public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception {
logger.info("Starting up cluster manager");
logger.info("cluster.remote_store.pinned_timestamps.enabled set to true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,16 @@
* @throws IllegalArgumentException If the timestamp is less than the current time minus one second
*/
public void pinTimestamp(long timestamp, String pinningEntity, ActionListener<Void> listener) {
// If a caller uses current system time to pin the timestamp, following check will almost always fail.
// So, we allow pinning timestamp in the past upto some buffer
long lookbackIntervalInMills = RemoteStoreSettings.getPinnedTimestampsLookbackInterval().millis();
if (timestamp < (System.currentTimeMillis() - lookbackIntervalInMills)) {
throw new IllegalArgumentException(
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
);
}
long startTime = System.nanoTime();
try {
// If a caller uses current system time to pin the timestamp, following check will almost always fail.
// So, we allow pinning timestamp in the past upto some buffer
long lookbackIntervalInMills = RemoteStoreSettings.getPinnedTimestampsLookbackInterval().millis();
if (timestamp < (System.currentTimeMillis() - lookbackIntervalInMills)) {
throw new IllegalArgumentException(

Check warning on line 138 in server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java#L138

Added line #L138 was not covered by tests
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
);
}
long startTime = System.nanoTime();
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity);
blobContainer.writeBlob(getBlobName(timestamp, pinningEntity), new ByteArrayInputStream(new byte[0]), 0, true);
long elapsedTime = System.nanoTime() - startTime;
Expand All @@ -155,7 +155,7 @@
} else {
listener.onResponse(null);
}
} catch (IOException e) {
} catch (Exception e) {

Check warning on line 158 in server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java#L158

Added line #L158 was not covered by tests
listener.onFailure(e);
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@
logger.error(errorMessage);
listener.onFailure(new IllegalArgumentException(errorMessage));
}
} catch (IOException e) {
} catch (Exception e) {

Check warning on line 201 in server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java#L201

Added line #L201 was not covered by tests
listener.onFailure(e);
}
}
Expand Down Expand Up @@ -249,7 +249,7 @@
logger.error(errorMessage);
listener.onFailure(new IllegalArgumentException(errorMessage));
}
} catch (IOException e) {
} catch (Exception e) {

Check warning on line 252 in server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java#L252

Added line #L252 was not covered by tests
listener.onFailure(e);
}
}
Expand Down
Loading