Skip to content

Commit

Permalink
BugFix: call listener.onFailure on failure to pin the timestamp (#16248)
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <sachinpkale@gmail.com>
  • Loading branch information
sachinpkale authored Oct 10, 2024
1 parent d7b0116 commit 58adc18
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,59 @@
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_SLOWDOWN_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
String segmentRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
REPOSITORY_NAME
);

return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(segmentRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
}

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}
public void onResponse(Void unused) {
// do nothing
}

@Override
public void onFailure(Exception e) {}
public void onFailure(Exception e) {
fail();
}
};

public void testTimestampPinUnpin() throws Exception {
Expand All @@ -57,15 +76,8 @@ public void testTimestampPinUnpin() throws Exception {
);

Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1();
assertEquals(-1L, lastFetchTimestamp);
assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2());

assertThrows(
IllegalArgumentException.class,
() -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener)
);

long timestamp1 = System.currentTimeMillis() + 30000L;
long timestamp2 = System.currentTimeMillis() + 60000L;
long timestamp3 = System.currentTimeMillis() + 900000L;
Expand Down Expand Up @@ -197,6 +209,104 @@ public void onFailure(Exception e) {
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}

public void testPinExceptionsOlderTimestamp() throws InterruptedException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// We expect onFailure to be called
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}, latch));

latch.await();
}

public void testPinExceptionsRemoteStoreCallTakeTime() throws InterruptedException, ExecutionException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

CountDownLatch latch = new CountDownLatch(1);
slowDownRepo(REPOSITORY_NAME, 10);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(1));
long timestampToBePinned = System.currentTimeMillis() + 600000;
remoteStorePinnedTimestampService.pinTimestamp(timestampToBePinned, "ss1", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// We expect onFailure to be called
fail();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage());
assertTrue(e instanceof RuntimeException);
assertTrue(e.getMessage().contains("Timestamp pinning took"));

// Check if the timestamp was unpinned
remoteStorePinnedTimestampService.forceSyncPinnedTimestamps();
assertFalse(RemoteStorePinnedTimestampService.getPinnedTimestamps().v2().contains(timestampToBePinned));
}
}, latch));

latch.await();
}

protected void slowDownRepo(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_SLOWDOWN_SETTING.getKey(), value);
createRepository(repoName, rmd.type(), settings);
}

public void testUnpinException() throws InterruptedException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.unpinTimestamp(1234L, "dummy-entity", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// We expect onFailure to be called
fail();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage());
assertTrue(e instanceof IllegalArgumentException);
}
}, latch));

latch.await();
}

public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception {
logger.info("Starting up cluster manager");
logger.info("cluster.remote_store.pinned_timestamps.enabled set to true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,16 @@ public static Map<String, Set<Long>> fetchPinnedTimestamps(Settings settings, Re
* @throws IllegalArgumentException If the timestamp is less than the current time minus one second
*/
public void pinTimestamp(long timestamp, String pinningEntity, ActionListener<Void> listener) {
// If a caller uses current system time to pin the timestamp, following check will almost always fail.
// So, we allow pinning timestamp in the past upto some buffer
long lookbackIntervalInMills = RemoteStoreSettings.getPinnedTimestampsLookbackInterval().millis();
if (timestamp < (System.currentTimeMillis() - lookbackIntervalInMills)) {
throw new IllegalArgumentException(
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
);
}
long startTime = System.nanoTime();
try {
// If a caller uses current system time to pin the timestamp, following check will almost always fail.
// So, we allow pinning timestamp in the past upto some buffer
long lookbackIntervalInMills = RemoteStoreSettings.getPinnedTimestampsLookbackInterval().millis();
if (timestamp < (System.currentTimeMillis() - lookbackIntervalInMills)) {
throw new IllegalArgumentException(
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
);
}
long startTime = System.nanoTime();
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity);
blobContainer.writeBlob(getBlobName(timestamp, pinningEntity), new ByteArrayInputStream(new byte[0]), 0, true);
long elapsedTime = System.nanoTime() - startTime;
Expand All @@ -155,7 +155,7 @@ public void pinTimestamp(long timestamp, String pinningEntity, ActionListener<Vo
} else {
listener.onResponse(null);
}
} catch (IOException e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ public void cloneTimestamp(long timestamp, String existingPinningEntity, String
logger.error(errorMessage);
listener.onFailure(new IllegalArgumentException(errorMessage));
}
} catch (IOException e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
Expand Down Expand Up @@ -249,7 +249,7 @@ public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener<
logger.error(errorMessage);
listener.onFailure(new IllegalArgumentException(errorMessage));
}
} catch (IOException e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
Expand Down

0 comments on commit 58adc18

Please sign in to comment.