diff --git a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java index 3faabee2fc..78989e1e76 100644 --- a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java +++ b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java @@ -86,6 +86,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import com.github.ambry.account.Dataset; + import static com.github.ambry.utils.TestUtils.*; import static org.junit.Assert.*; @@ -258,6 +260,70 @@ public void postGetHeadUpdateDeleteUndeleteTest() throws Exception { } } + @Test + public void datasetTest() throws Exception { + Account refAccount = ACCOUNT_SERVICE.createAndAddRandomAccount(); + Container namedBlobOptionalContainer = + new ContainerBuilder((short) 11, "optional", Container.ContainerStatus.ACTIVE, "", + refAccount.getId()).setNamedBlobMode(Container.NamedBlobMode.OPTIONAL).build(); + ACCOUNT_SERVICE.updateContainers(refAccount.getName(), Arrays.asList(namedBlobOptionalContainer)); + String contentType = "application/octet-stream"; + String ownerId = "datasetTest"; + List datasetList = doDatasetPutUpdateGetTest(refAccount, namedBlobOptionalContainer, null); + List> allDatasetVersions = new ArrayList<>(); + Pair, byte[]> idsAndContent = + uploadDataChunksAndVerify(refAccount, namedBlobOptionalContainer, null, 50, 50, 50, 50, 17); + //Test put and get + List> datasetVersionsFromPut = + doDatasetVersionPutGetTest(refAccount, namedBlobOptionalContainer, datasetList, contentType, ownerId); + //Test Stitch and Get + ownerId = "stitchedUploadTest"; + List> datasetVersionsFromStitch = + doStitchDatasetVersionGetTest(refAccount, namedBlobOptionalContainer, datasetList, contentType, ownerId, + idsAndContent.getFirst(), idsAndContent.getSecond(), 217); + allDatasetVersions.addAll(datasetVersionsFromPut); + allDatasetVersions.addAll(datasetVersionsFromStitch); + //Test List dataset + doListDatasetAndVerify(refAccount.getName(), namedBlobOptionalContainer.getName(), datasetList); + //Test List dataset version + List> allDatasetVersionPairs = doListDatasetVersionAndVerify(datasetList, allDatasetVersions); + //Test delete + doDeleteDatasetVersionAndVerify(refAccount.getName(), namedBlobOptionalContainer.getName(), allDatasetVersionPairs); + //After delete, it should have an empty list. + doListDatasetVersionAndVerify(datasetList, new ArrayList<>()); + //Test delete dataset + doDeleteDatasetAndVerify(refAccount.getName(), namedBlobOptionalContainer.getName(), datasetList); + //After delete, it should have an empty list. + doListDatasetAndVerify(refAccount.getName(), namedBlobOptionalContainer.getName(), new ArrayList<>()); + } + + @Test + public void datasetTtlTest() throws Exception { + Account refAccount = ACCOUNT_SERVICE.createAndAddRandomAccount(); + Container namedBlobOptionalContainer = + new ContainerBuilder((short) 11, "optional", Container.ContainerStatus.ACTIVE, "", + refAccount.getId()).setNamedBlobMode(Container.NamedBlobMode.OPTIONAL).build(); + ACCOUNT_SERVICE.updateContainers(refAccount.getName(), Arrays.asList(namedBlobOptionalContainer)); + String contentType = "application/octet-stream"; + String ownerId = "datasetTest"; + int contentSize = 100; + List datasetList = null; + List allDatasets = new ArrayList<>(); + List> allDatasetVersions = new ArrayList<>(); + for (Long ttl : new Long[]{null, (long) -1, TTL_SECS}) { + datasetList = doDatasetPutUpdateGetTest(refAccount, namedBlobOptionalContainer, ttl); + List> datasetVersionsFromPut = + doDatasetVersionPutGetWithTtlTest(refAccount, namedBlobOptionalContainer, datasetList, contentType, ownerId, + contentSize); + allDatasetVersions.addAll(datasetVersionsFromPut); + allDatasets.addAll(datasetList); + } + List> allDatasetVersionPairs = doListDatasetVersionAndVerify(allDatasets, allDatasetVersions); + doDatasetUpdateTtlAndVerify(refAccount.getName(), namedBlobOptionalContainer.getName(), allDatasetVersionPairs, + contentSize, contentType, ownerId); + } + + /** * Tests multipart POST and verifies it via GET operations. * @throws Exception diff --git a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTestBase.java b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTestBase.java index f972e4f8bd..f0132f805d 100644 --- a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTestBase.java +++ b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTestBase.java @@ -18,6 +18,7 @@ import com.github.ambry.account.AccountCollectionSerde; import com.github.ambry.account.AccountService; import com.github.ambry.account.Container; +import com.github.ambry.account.InMemAccountService; import com.github.ambry.config.FrontendConfig; import com.github.ambry.config.QuotaConfig; import com.github.ambry.config.VerifiableProperties; @@ -71,6 +72,18 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import org.junit.Assert; +import com.github.ambry.account.Dataset; +import com.github.ambry.account.DatasetBuilder; +import java.io.ByteArrayInputStream; +import java.text.SimpleDateFormat; +import java.util.Comparator; +import java.util.Random; +import java.util.stream.Collectors; +import org.json.JSONObject; +import java.util.TimeZone; +import static com.github.ambry.account.Dataset.VersionSchema.*; +import static com.github.ambry.frontend.Operations.*; +import static com.github.ambry.rest.RestUtils.Headers.*; import static com.github.ambry.utils.TestUtils.*; import static org.junit.Assert.*; @@ -419,6 +432,489 @@ void verifyListNamedBlobs(String accountName, String containerName, String owner } } + List doDatasetPutUpdateGetTest(Account account, Container container, Long ttl) throws Exception { + String accountName = account.getName(); + String containerName = container.getName(); + List versionSchemas = new ArrayList<>(); + List datasetList = new ArrayList<>(); + versionSchemas.add(TIMESTAMP); + versionSchemas.add(SEMANTIC); + versionSchemas.add(MONOTONIC); + for (Dataset.VersionSchema versionSchema : versionSchemas) { + String datasetName = "zzzz" + TestUtils.getRandomString(10); + Dataset dataset; + if (ttl == null) { + dataset = new DatasetBuilder(accountName, containerName, datasetName).setVersionSchema(versionSchema).build(); + } else { + dataset = new DatasetBuilder(accountName, containerName, datasetName).setVersionSchema(versionSchema) + .setRetentionTimeInSeconds(ttl) + .build(); + } + HttpHeaders headers = new DefaultHttpHeaders(); + //Test put dataset + putDatasetAndVerify(dataset, headers, false); + HttpHeaders getHeaders = new DefaultHttpHeaders(); + getHeaders.add(TARGET_ACCOUNT_NAME, dataset.getAccountName()); + getHeaders.add(TARGET_CONTAINER_NAME, dataset.getContainerName()); + getHeaders.add(TARGET_DATASET_NAME, dataset.getDatasetName()); + getDatasetAndVerify(dataset, getHeaders); + + //Update dataset + Dataset datasetToUpdate = new DatasetBuilder(dataset).setRetentionCount(10).build(); + putDatasetAndVerify(datasetToUpdate, headers, true); + getDatasetAndVerify(datasetToUpdate, getHeaders); + datasetList.add(datasetToUpdate); + } + return datasetList; + } + + List> doDatasetVersionPutGetWithTtlTest(Account account, Container container, List datasets, + String contentType, String ownerId, int contentSize) throws Exception { + String accountName = account.getName(); + List> datasetVersions = new ArrayList<>(); + for (Dataset dataset : datasets) { + for (long ttl : new long[]{-1, TTL_SECS}) { + //Test put dataset version with default dataset level ttl + HttpHeaders headers = new DefaultHttpHeaders(); + setAmbryHeadersForPut(headers, ttl, false, accountName, contentType, ownerId, null, null); + headers.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + ByteBuffer content = ByteBuffer.wrap(TestUtils.getRandomBytes(contentSize)); + String version = generateDatasetVersion(dataset); + if (dataset.getRetentionTimeInSeconds() == null) { + putDatasetVersionAndVerify(dataset, version, headers, content, contentSize, ttl); + datasetVersions.add(new Pair<>(dataset.getDatasetName(), version)); + } else { + putDatasetVersionAndVerify(dataset, version, headers, content, contentSize, + dataset.getRetentionTimeInSeconds()); + datasetVersions.add(new Pair<>(dataset.getDatasetName(), version)); + } + + //Test put dataset version with dataset level ttl + headers = new DefaultHttpHeaders(); + setAmbryHeadersForPut(headers, ttl, false, accountName, contentType, ownerId, null, null); + headers.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + headers.add(RestUtils.Headers.DATASET_VERSION_TTL_ENABLED, true); + version = generateDatasetVersion(dataset); + putDatasetVersionAndVerify(dataset, version, headers, content, contentSize, ttl); + datasetVersions.add(new Pair<>(dataset.getDatasetName(), version)); + } + } + return datasetVersions; + } + + void doDatasetUpdateTtlAndVerify(String accountName, String containerName, + List> allDatasetVersionPairs, int expectedContentSize, String contentType, String ownerId) + throws Exception { + for (Pair pair : allDatasetVersionPairs) { + String datasetName = pair.getFirst(); + String version = pair.getSecond(); + HttpHeaders updateTtlHeaders = new DefaultHttpHeaders(); + updateTtlHeaders.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + updateTtlHeaders.add(RestUtils.Headers.BLOB_ID, + buildUriForDatasetVersion(accountName, containerName, datasetName, version)); + updateTtlHeaders.add(RestUtils.Headers.SERVICE_ID, accountName); + + //Update ttl + updateDatasetVersionTtl(updateTtlHeaders); + + //Test get dataset version, should be permanent + HttpHeaders getHeaders = new DefaultHttpHeaders(); + getHeaders.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + + HttpHeaders expectedGetHeaders = new DefaultHttpHeaders().add(updateTtlHeaders); + expectedGetHeaders.add(RestUtils.Headers.BLOB_SIZE, expectedContentSize); + expectedGetHeaders.add(RestUtils.Headers.LIFE_VERSION, "0"); + expectedGetHeaders.add(TARGET_ACCOUNT_NAME, accountName); + expectedGetHeaders.add(RestUtils.Headers.TARGET_CONTAINER_NAME, containerName); + expectedGetHeaders.add(RestUtils.Headers.AMBRY_CONTENT_TYPE, contentType); + expectedGetHeaders.add(OWNER_ID, ownerId); + + getDatasetVersionInfoAndVerify(accountName, containerName, datasetName, version, getHeaders, Utils.Infinite_Time, + expectedGetHeaders); + } + } + + List> doDatasetVersionPutGetTest(Account account, Container container, List datasets, + String contentType, String ownerId) throws Exception { + String accountName = account.getName(); + String containerName = container.getName(); + List> datasetVersions = new ArrayList<>(); + for (Dataset dataset : datasets) { + for (long ttl : new long[]{-1, TTL_SECS}) { + //Test put dataset version + HttpHeaders headers = new DefaultHttpHeaders(); + setAmbryHeadersForPut(headers, ttl, false, accountName, contentType, ownerId, null, null); + headers.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + int contentSize = 100; + ByteBuffer content = ByteBuffer.wrap(TestUtils.getRandomBytes(contentSize)); + String version = generateDatasetVersion(dataset); + String blobId = putDatasetVersionAndVerify(dataset, version, headers, content, contentSize, ttl); + + // This is the blob id for the given blob name, we should be able to do all get operations on this blob id. + HttpHeaders expectedGetHeaders = new DefaultHttpHeaders().add(headers); + expectedGetHeaders.add(RestUtils.Headers.BLOB_SIZE, content.capacity()); + expectedGetHeaders.add(RestUtils.Headers.LIFE_VERSION, "0"); + expectedGetHeaders.add(TARGET_ACCOUNT_NAME, accountName); + expectedGetHeaders.add(RestUtils.Headers.TARGET_CONTAINER_NAME, containerName); + doVariousGetAndVerify(blobId, expectedGetHeaders, false, content, 100, accountName, containerName, null, + container); + + //Test get dataset version + HttpHeaders getHeaders = new DefaultHttpHeaders(); + getHeaders.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + getDatasetVersionAndVerify(dataset, version, getHeaders, contentSize, ttl, expectedGetHeaders, content); + getDatasetVersionInfoAndVerify(accountName, containerName, dataset.getDatasetName(), version, getHeaders, ttl, + expectedGetHeaders); + + //add all versions when successfully get it. + datasetVersions.add(new Pair<>(dataset.getDatasetName(), version)); + } + } + return datasetVersions; + } + + List> doStitchDatasetVersionGetTest(Account account, Container container, List datasets, + String contentType, String ownerId, List signedChunkIds, byte[] fullContentArray, long stitchedBlobSize) + throws Exception { + String accountName = account.getName(); + String containerName = container.getName(); + List> datasetVersions = new ArrayList<>(); + for (Dataset dataset : datasets) { + for (long ttl : new long[]{-1, TTL_SECS}) { + //Test stitch + HttpHeaders stitchHeaders = new DefaultHttpHeaders(); + setAmbryHeadersForPut(stitchHeaders, ttl, !container.isCacheable(), "stitcher", contentType, ownerId, + account.getName(), container.getName()); + stitchHeaders.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + stitchHeaders.add(RestUtils.Headers.UPLOAD_NAMED_BLOB_MODE, "STITCH"); + String version = generateDatasetVersion(dataset); + String stitchedBlobId = + doDatasetStitchAndVerify(account, container, dataset, version, stitchHeaders, signedChunkIds, + stitchedBlobSize, ttl); + HttpHeaders expectedGetHeaders = new DefaultHttpHeaders().add(stitchHeaders); + // Test different request types on stitched blob ID + // (getBlobInfo, getBlob, getBlob w/ range, head, updateBlobTtl, deleteBlob) + expectedGetHeaders.add(RestUtils.Headers.BLOB_SIZE, fullContentArray.length); + expectedGetHeaders.set(RestUtils.Headers.LIFE_VERSION, "0"); + getBlobInfoAndVerify(stitchedBlobId, GetOption.None, expectedGetHeaders, !container.isCacheable(), + account.getName(), container.getName(), null, container); + List ranges = new ArrayList<>(); + ranges.add(null); + ranges.add(ByteRanges.fromLastNBytes(ThreadLocalRandom.current().nextLong(fullContentArray.length + 1))); + ranges.add(ByteRanges.fromStartOffset(ThreadLocalRandom.current().nextLong(fullContentArray.length))); + long random1 = ThreadLocalRandom.current().nextLong(fullContentArray.length); + long random2 = ThreadLocalRandom.current().nextLong(fullContentArray.length); + ranges.add(ByteRanges.fromOffsetRange(Math.min(random1, random2), Math.max(random1, random2))); + for (ByteRange range : ranges) { + getBlobAndVerify(stitchedBlobId, range, GetOption.None, false, expectedGetHeaders, !container.isCacheable(), + ByteBuffer.wrap(fullContentArray), account.getName(), container.getName(), container); + getHeadAndVerify(stitchedBlobId, range, GetOption.None, expectedGetHeaders, !container.isCacheable(), + account.getName(), container.getName()); + } + + //Test get dataset version + HttpHeaders getHeaders = new DefaultHttpHeaders(); + getHeaders.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + getDatasetVersionAndVerify(dataset, version, getHeaders, stitchedBlobSize, ttl, expectedGetHeaders, + ByteBuffer.wrap(fullContentArray)); + getDatasetVersionInfoAndVerify(accountName, containerName, dataset.getDatasetName(), version, getHeaders, ttl, + expectedGetHeaders); + + //add all versions when successfully get it. + datasetVersions.add(new Pair<>(dataset.getDatasetName(), version)); + } + } + return datasetVersions; + } + + List> doListDatasetVersionAndVerify(List datasets, + List> expectDatasetVersions) throws Exception { + InMemAccountService.PAGE_SIZE = -1; + List> allDatasetVersions = new ArrayList<>(); + for (Dataset dataset : datasets) { + HttpHeaders listHeaders = new DefaultHttpHeaders(); + listHeaders.add(TARGET_ACCOUNT_NAME, dataset.getAccountName()); + listHeaders.add(TARGET_CONTAINER_NAME, dataset.getContainerName()); + listHeaders.add(TARGET_DATASET_NAME, dataset.getDatasetName()); + listHeaders.add(ENABLE_DATASET_VERSION_LISTING, true); + listHeaders.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + allDatasetVersions.addAll(listDatasetVersions(dataset, listHeaders)); + } + sortPair(expectDatasetVersions); + sortPair(allDatasetVersions); + assertEquals("The size of the dataset version should meet expectation", expectDatasetVersions.size(), + allDatasetVersions.size()); + assertEquals("Should list all dataset versions", expectDatasetVersions, allDatasetVersions); + return allDatasetVersions; + } + + void doListDatasetAndVerify(String accountName, String containerName, List expectedDatasets) + throws Exception { + InMemAccountService.PAGE_SIZE = -1; + HttpHeaders listHeaders = new DefaultHttpHeaders(); + listHeaders.add(TARGET_ACCOUNT_NAME, accountName); + listHeaders.add(TARGET_CONTAINER_NAME, containerName); + List datasetNames = listDataset(listHeaders); + List expectedDatasetNames = + expectedDatasets.stream().map(Dataset::getDatasetName) // Extract the name from each Dataset + .collect(Collectors.toList()); + Collections.sort(datasetNames); + Collections.sort(expectedDatasetNames); + assertEquals("Dataset name should match", expectedDatasetNames, datasetNames); + } + + void doDeleteDatasetVersionAndVerify(String accountName, String containerName, + List> allDatasetVersionPairs) throws Exception { + for (Pair pair : allDatasetVersionPairs) { + String datasetName = pair.getFirst(); + String version = pair.getSecond(); + HttpHeaders deleteHeaders = new DefaultHttpHeaders(); + deleteHeaders.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true); + deleteDatasetVersion(accountName, containerName, datasetName, version, deleteHeaders); + } + } + + void doDeleteDatasetAndVerify(String accountName, String containerName, List datasets) throws Exception { + for (Dataset dataset : datasets) { + HttpHeaders deleteHeaders = new DefaultHttpHeaders(); + String datasetName = dataset.getDatasetName(); + deleteHeaders.add(TARGET_ACCOUNT_NAME, accountName); + deleteHeaders.add(TARGET_CONTAINER_NAME, containerName); + deleteHeaders.add(TARGET_DATASET_NAME, datasetName); + deleteDataset(deleteHeaders); + } + } + + List> listDatasetVersions(Dataset dataset, HttpHeaders listHeaders) throws Exception { + List> datasetVersions = new ArrayList<>(); + String accountName = dataset.getAccountName(); + String containerName = dataset.getContainerName(); + String datasetName = dataset.getDatasetName(); + FullHttpRequest httpRequest = + buildRequest(HttpMethod.GET, buildUriForNamedBlob(accountName, containerName, datasetName), listHeaders, null); + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status()); + ByteBuffer content = getContent(responseParts.queue, HttpUtil.getContentLength(response)); + Page page = Page.fromJsonWithoutKey(new JSONObject(new String(content.array())), Object::toString); + for (String version : page.getEntries()) { + datasetVersions.add(new Pair<>(datasetName, version)); + } + return datasetVersions; + } + + List listDataset(HttpHeaders listHeaders) throws Exception { + List datasetNames = new ArrayList<>(); + FullHttpRequest httpRequest = buildRequest(HttpMethod.GET, ACCOUNTS_CONTAINERS_DATASETS, listHeaders, null); + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status()); + ByteBuffer content = getContent(responseParts.queue, HttpUtil.getContentLength(response)); + Page page = Page.fromJsonWithoutKey(new JSONObject(new String(content.array())), Object::toString); + for (String datasetName : page.getEntries()) { + datasetNames.add(datasetName); + } + return datasetNames; + } + + void deleteDatasetVersion(String accountName, String containerName, String datasetName, String version, + HttpHeaders deleteHeaders) throws Exception { + FullHttpRequest httpRequest = + buildRequest(HttpMethod.DELETE, buildUriForDatasetVersion(accountName, containerName, datasetName, version), + deleteHeaders, null); + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + assertEquals("Unexpected response status", HttpResponseStatus.ACCEPTED, response.status()); + } + + void updateDatasetVersionTtl(HttpHeaders ttlUpdateHeaders) throws Exception { + FullHttpRequest httpRequest = buildRequest(HttpMethod.PUT, "/" + Operations.UPDATE_TTL, ttlUpdateHeaders, null); + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status()); + } + + void deleteDataset(HttpHeaders deleteHeaders) throws Exception { + FullHttpRequest httpRequest = buildRequest(HttpMethod.DELETE, ACCOUNTS_CONTAINERS_DATASETS, deleteHeaders, null); + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + assertEquals("Unexpected response status", HttpResponseStatus.ACCEPTED, response.status()); + } + + String doDatasetStitchAndVerify(Account account, Container container, Dataset dataset, String version, + HttpHeaders stitchHeaders, List signedChunkIds, long stitchedBlobSize, long ttl) throws Exception { + HttpRequest httpRequest = buildRequest(HttpMethod.PUT, + buildUriForDatasetVersion(account.getName(), container.getName(), dataset.getDatasetName(), version), + stitchHeaders, + ByteBuffer.wrap(StitchRequestSerDe.toJson(signedChunkIds).toString().getBytes(StandardCharsets.UTF_8))); + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + return verifyDatasetVersionAndReturnBlobId(response, responseParts, stitchedBlobSize, ttl); + } + + void putDatasetAndVerify(Dataset dataset, HttpHeaders headers, boolean enableUpdate) throws Exception { + byte[] datasetsUpdateJson = AccountCollectionSerde.serializeDatasetsInJson(dataset); + ByteBuffer content = ByteBuffer.wrap(datasetsUpdateJson); + if (enableUpdate) { + headers.add(RestUtils.Headers.DATASET_UPDATE, true); + } + FullHttpRequest httpRequest = buildRequest(HttpMethod.POST, ACCOUNTS_CONTAINERS_DATASETS, headers, content); + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status()); + } + + void getDatasetAndVerify(Dataset expectedDataset, HttpHeaders headers) throws Exception { + FullHttpRequest httpRequest = buildRequest(HttpMethod.GET, ACCOUNTS_CONTAINERS_DATASETS, headers, null); + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status()); + ByteBuffer content = getContent(responseParts.queue, HttpUtil.getContentLength(response)); + assertEquals("Dataset does not match", expectedDataset, + AccountCollectionSerde.datasetsFromInputStreamInJson(new ByteArrayInputStream(content.array()))); + } + + String putDatasetVersionAndVerify(Dataset dataset, String version, HttpHeaders headers, ByteBuffer content, + long contentSize, long expectTtl) throws Exception { + String accountName = dataset.getAccountName(); + String containerName = dataset.getContainerName(); + String datasetName = dataset.getDatasetName(); + FullHttpRequest httpRequest = + buildRequest(HttpMethod.PUT, buildUriForDatasetVersion(accountName, containerName, datasetName, version), + headers, content); + + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + return verifyDatasetVersionAndReturnBlobId(response, responseParts, contentSize, expectTtl); + } + + void getDatasetVersionAndVerify(Dataset dataset, String version, HttpHeaders headers, long contentSize, + long expectTtl, HttpHeaders expectedHeaders, ByteBuffer expectedContent) throws Exception { + String accountName = dataset.getAccountName(); + String containerName = dataset.getContainerName(); + String datasetName = dataset.getDatasetName(); + FullHttpRequest httpRequest = + buildRequest(HttpMethod.GET, buildUriForDatasetVersion(accountName, containerName, datasetName, version), + headers, null); + + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + verifyGetDatasetVersion(response, responseParts, expectedHeaders, contentSize, expectTtl, expectedContent); + } + + void getDatasetVersionInfoAndVerify(String accountName, String containerName, String datasetName, String version, + HttpHeaders headers, long expectTtl, HttpHeaders expectedHeaders) throws Exception { + FullHttpRequest httpRequest = buildRequest(HttpMethod.GET, + buildUriForDatasetVersion(accountName, containerName, datasetName, version) + "/" + + RestUtils.SubResource.BlobInfo, headers, null); + NettyClient.ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = getHttpResponse(responseParts); + assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status()); + checkCommonGetHeadHeaders(response.headers()); + verifyTrackingHeaders(response); + verifyBlobProperties(expectedHeaders, false, response); + verifyAccountAndContainerHeaders(accountName, containerName, response); + assertEquals(RestUtils.Headers.BLOB_SIZE + " does not match", expectedHeaders.get(RestUtils.Headers.BLOB_SIZE), + response.headers().get(RestUtils.Headers.BLOB_SIZE)); + if (expectTtl != -1) { + assertEquals("Unexpected ttl value", expectTtl, + (gmtToEpoch(response.headers().get(RestUtils.Headers.DATASET_EXPIRATION_TIME)) - gmtToEpoch( + response.headers().get(RestUtils.Headers.CREATION_TIME))) / 1000); + } + assertTrue("Channel should be active", HttpUtil.isKeepAlive(response)); + assertEquals(RestUtils.Headers.LIFE_VERSION + " does not match", + expectedHeaders.get(RestUtils.Headers.LIFE_VERSION), response.headers().get(RestUtils.Headers.LIFE_VERSION)); + } + + private void verifyGetDatasetVersion(HttpResponse response, NettyClient.ResponseParts responseParts, + HttpHeaders expectedHeaders, long contentSize, long expectTtl, ByteBuffer expectedContent) throws Exception { + assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status()); + checkCommonGetHeadHeaders(response.headers()); + assertEquals("Content-Type does not match", expectedHeaders.get(RestUtils.Headers.AMBRY_CONTENT_TYPE), + response.headers().get(HttpHeaderNames.CONTENT_TYPE)); + assertEquals(RestUtils.Headers.BLOB_SIZE + " does not match", expectedHeaders.get(RestUtils.Headers.BLOB_SIZE), + response.headers().get(RestUtils.Headers.BLOB_SIZE)); + assertEquals("Accept-Ranges not set correctly", "bytes", response.headers().get(RestUtils.Headers.ACCEPT_RANGES)); + assertEquals(RestUtils.Headers.LIFE_VERSION + " does not match", + expectedHeaders.get(RestUtils.Headers.LIFE_VERSION), response.headers().get(RestUtils.Headers.LIFE_VERSION)); + if (expectTtl != -1) { + assertEquals("Unexpected ttl value", expectTtl, + (gmtToEpoch(response.headers().get(RestUtils.Headers.DATASET_EXPIRATION_TIME)) - gmtToEpoch( + response.headers().get(RestUtils.Headers.CREATION_TIME))) / 1000); + } + assertEquals("Correct blob size should be returned in response", Long.toString(contentSize), + response.headers().get(RestUtils.Headers.BLOB_SIZE)); + byte[] expectedContentArray = expectedContent.array(); + byte[] responseContentArray = getContent(responseParts.queue, expectedContentArray.length).array(); + assertArrayEquals("GET content does not match original content", expectedContentArray, responseContentArray); + } + + private String verifyDatasetVersionAndReturnBlobId(HttpResponse response, NettyClient.ResponseParts responseParts, + long contentSize, long expectTtl) throws Exception { + assertEquals("Unexpected response status", HttpResponseStatus.CREATED, response.status()); + assertTrue("No Date header", response.headers().getTimeMillis(HttpHeaderNames.DATE, -1) != -1); + assertNotNull("No " + RestUtils.Headers.CREATION_TIME, + response.headers().get(RestUtils.Headers.CREATION_TIME, null)); + assertEquals("Content-Length is not 0", 0, HttpUtil.getContentLength(response)); + String blobId = response.headers().get(HttpHeaderNames.LOCATION, null); + assertNotNull("Blob ID from PUT should not be null", blobId); + assertNoContent(responseParts.queue, 1); + assertTrue("Channel should be active", HttpUtil.isKeepAlive(response)); + assertEquals("Correct blob size should be returned in response", Long.toString(contentSize), + response.headers().get(RestUtils.Headers.BLOB_SIZE)); + if (expectTtl != -1) { + assertEquals("Unexpected ttl value", expectTtl, + (gmtToEpoch(response.headers().get(RestUtils.Headers.DATASET_EXPIRATION_TIME)) - gmtToEpoch( + response.headers().get(RestUtils.Headers.CREATION_TIME))) / 1000); + } + verifyTrackingHeaders(response); + verifyPostRequestCostHeaders(response, contentSize); + return blobId; + } + + private String generateDatasetVersion(Dataset dataset) { + String version; + Dataset.VersionSchema datasetVersionSchema = dataset.getVersionSchema(); + Random random = new Random(); + if (TIMESTAMP.equals(datasetVersionSchema)) { + sleep(); + version = String.valueOf(System.currentTimeMillis()); + } else if (MONOTONIC.equals(datasetVersionSchema)) { + version = String.valueOf(random.nextInt(10000)); + } else if (SEMANTIC.equals(datasetVersionSchema)) { + int major = random.nextInt(100); + int minor = random.nextInt(100); + int patch = random.nextInt(100); + version = major + "." + minor + "." + patch; + } else { + throw new IllegalArgumentException("This type of version schema is not compatible"); + } + return version; + } + + /** + * Sleeps for a millisecond. + */ + private void sleep() { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new IllegalStateException("Sleep was interrupted", e); + } + } + + /** + * The http request uri for dataset version. + * @param accountName The account name. + * @param containerName The container name. + * @param blobName The dataset name. + * @param version The version of the dataset. + * @return + */ + String buildUriForDatasetVersion(String accountName, String containerName, String blobName, String version) { + return String.format("/named/%s/%s/%s/%s", accountName, containerName, blobName, version); + } + /** * The http request uri for named blob. * @param accountName The account name. @@ -1336,4 +1832,25 @@ public void setNextPageToken(String nextPageToken) { this.nextPageToken = nextPageToken; } } + + private static long gmtToEpoch(String gmtTime) throws Exception { + SimpleDateFormat gmtFormat = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z"); + gmtFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + Date date = gmtFormat.parse(gmtTime); + return date.getTime(); + } + + private void sortPair(List> pairs) { + Collections.sort(pairs, new Comparator>() { + @Override + public int compare(Pair p1, Pair p2) { + int firstComparison = p1.getFirst().compareTo(p2.getFirst()); + if (firstComparison != 0) { + return firstComparison; + } else { + return p1.getSecond().compareTo(p2.getSecond()); + } + } + }); + } } diff --git a/ambry-test-utils/src/main/java/com/github/ambry/account/InMemAccountService.java b/ambry-test-utils/src/main/java/com/github/ambry/account/InMemAccountService.java index 956be5042e..f1533119a1 100644 --- a/ambry-test-utils/src/main/java/com/github/ambry/account/InMemAccountService.java +++ b/ambry-test-utils/src/main/java/com/github/ambry/account/InMemAccountService.java @@ -51,7 +51,7 @@ public class InMemAccountService implements AccountService { Arrays.asList(Container.UNKNOWN_CONTAINER, Container.DEFAULT_PUBLIC_CONTAINER, Container.DEFAULT_PRIVATE_CONTAINER), Account.QUOTA_RESOURCE_TYPE_DEFAULT_VALUE); static final String INMEM_ACCOUNT_UPDATER_PREFIX = "in-memory-account-updater"; - private static final int PAGE_SIZE = 1; + public static int PAGE_SIZE = 1; private final boolean shouldReturnOnlyUnknown; private final boolean notifyConsumers; private final Map idToAccountMap = new HashMap<>(); @@ -262,6 +262,9 @@ public synchronized Page listAllValidDatasets(String accountName, String List datasets = new ArrayList<>(nameToDatasetMap.get(new Pair<>(accountName, containerName)).keySet()); Collections.sort(datasets); int index = 0; + if (pageToken == null && PAGE_SIZE < 1) { + return new Page<>(datasets, null); + } if (pageToken != null) { index = Collections.binarySearch(datasets, pageToken); } @@ -276,10 +279,15 @@ public synchronized Page listAllValidDatasetVersions(String accountName, short containerId = account.getContainerByName(containerName).getId(); List datasetRecords = new ArrayList<>(idToDatasetVersionMap.get(new Pair<>(accountId, containerId)).values()); - List versionList = - datasetRecords.stream().map(DatasetVersionRecord::getVersion).collect(Collectors.toList()); + List versionList = datasetRecords.stream() + .filter(record -> datasetName.equals(record.getDatasetName())) + .map(DatasetVersionRecord::getVersion) + .collect(Collectors.toList()); Collections.sort(versionList); int index = 0; + if (pageToken == null && PAGE_SIZE < 1) { + return new Page<>(versionList, null); + } if (pageToken != null) { index = Collections.binarySearch(versionList, pageToken); }