Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create DoraWorkerServiceHandlerTest.java #18059

Merged
merged 15 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package alluxio.worker.grpc;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import alluxio.client.file.cache.CacheManager;
import alluxio.client.file.cache.CacheManagerOptions;
import alluxio.client.file.cache.PageMetaStore;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.ListStatusPRequest;
import alluxio.grpc.ListStatusPResponse;
import alluxio.membership.MembershipManager;
import alluxio.util.io.PathUtils;
import alluxio.worker.dora.PagedDoraWorker;

import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

public class DoraWorkerClientServiceHandlerTest {

private PagedDoraWorker mWorker;
@Rule
public TemporaryFolder mTestFolder = new TemporaryFolder();
private CacheManager mCacheManager;
private MembershipManager mMembershipManager;

private DoraWorkerClientServiceHandler mServiceHandler;

private ListStatusPRequest mRequest;

@Before
public void before() throws Exception {
Configuration.set(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR,
mTestFolder.newFolder("rocks"));
CacheManagerOptions cacheManagerOptions =
CacheManagerOptions.createForWorker(Configuration.global());

PageMetaStore pageMetaStore =
PageMetaStore.create(CacheManagerOptions.createForWorker(Configuration.global()));
mCacheManager =
CacheManager.Factory.create(Configuration.global(), cacheManagerOptions, pageMetaStore);
mMembershipManager =
MembershipManager.Factory.create(Configuration.global());
mWorker = new PagedDoraWorker(new AtomicReference<>(1L),
Configuration.global(), mCacheManager, mMembershipManager);
mServiceHandler = new DoraWorkerClientServiceHandler(mWorker);
}

@After
public void after() throws Exception {
mWorker.close();
}

@Test
public void testListStatus() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also test InAlluxioPercentage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you create different tests instead?

File rootFolder = mTestFolder.newFolder("root");
String rootPath = rootFolder.getAbsolutePath();
mTestFolder.newFolder("root/d1");
mTestFolder.newFolder("root/d1/d1");
mTestFolder.newFolder("root/d2");
String fileContent = "test";
File f = mTestFolder.newFile("root/f");
Files.write(f.toPath(), fileContent.getBytes());
mRequest = ListStatusPRequest.newBuilder().setOptions(
alluxio.grpc.ListStatusPOptions.newBuilder().setRecursive(true).build())
.setPath(rootPath).build();
TestStreamObserver responseObserver = new TestStreamObserver();
mServiceHandler.listStatus(mRequest, responseObserver);
List<MyStruct> responses = responseObserver.mResponses;
String[] expectedPaths =
new String[] {PathUtils.concatPath(rootPath, "d1"), PathUtils.concatPath(rootPath, "d1/d1"),
PathUtils.concatPath(rootPath, "d2"), PathUtils.concatPath(rootPath, "f")};
Boolean[] expectedIsDirectories = new Boolean[] {true, true, true, false};
assertEquals(expectedPaths.length, responses.size());
for (int i = 0; i < expectedPaths.length; i++) {
assertEquals(expectedPaths[i], responses.get(i).getPath());
assertEquals(expectedIsDirectories[i], responses.get(i).getIsDirectory());
assertEquals(true, responses.get(i).getIsCompleted());
}

mRequest = ListStatusPRequest.newBuilder().setOptions(
alluxio.grpc.ListStatusPOptions.newBuilder().setRecursive(false).build())
.setPath(rootPath).build();
responseObserver = new TestStreamObserver();
mServiceHandler.listStatus(mRequest, responseObserver);
responses = responseObserver.mResponses;
expectedPaths = new String[] {PathUtils.concatPath(rootPath, "d1"),
PathUtils.concatPath(rootPath, "d2"), PathUtils.concatPath(rootPath, "f")};
expectedIsDirectories = new Boolean[] {true, true, false};
assertEquals(expectedPaths.length, responses.size());
for (int i = 0; i < expectedPaths.length; i++) {
assertEquals(expectedPaths[i], responses.get(i).getPath());
assertEquals(expectedIsDirectories[i], responses.get(i).getIsDirectory());
assertEquals(true, responses.get(i).getIsCompleted());
}

mRequest = ListStatusPRequest.newBuilder().setOptions(
alluxio.grpc.ListStatusPOptions.newBuilder().setRecursive(true).build())
.setPath(PathUtils.concatPath(rootPath, "d3")).build();
responseObserver = new TestStreamObserver();
TestStreamObserver finalResponseObserver = responseObserver;
assertThrows(RuntimeException.class, () -> mServiceHandler.listStatus(mRequest,
finalResponseObserver));

//list status on a file
mRequest = ListStatusPRequest.newBuilder().setOptions(
alluxio.grpc.ListStatusPOptions.newBuilder().setRecursive(true).build())
.setPath(PathUtils.concatPath(rootPath, "f")).build();
responseObserver = new TestStreamObserver();
TestStreamObserver finalResponseObserver1 = responseObserver;
mServiceHandler.listStatus(mRequest, finalResponseObserver1);
responses = responseObserver.mResponses;
expectedPaths = new String[] {PathUtils.concatPath(rootPath, "f")};
expectedIsDirectories = new Boolean[] {false};
assertEquals(expectedPaths.length, responses.size());
for (int i = 0; i < expectedPaths.length; i++) {
assertEquals(expectedPaths[i], responses.get(i).getPath());
assertEquals(expectedIsDirectories[i], responses.get(i).getIsDirectory());
assertEquals(true, responses.get(i).getIsCompleted());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. list a file
  2. list a directory that does not exist


private static class TestStreamObserver implements StreamObserver<ListStatusPResponse> {
private final List<MyStruct> mResponses = new ArrayList<>();

@Override
public void onNext(ListStatusPResponse value) {
List<alluxio.grpc.FileInfo> fileInfosList = value.getFileInfosList();
for (alluxio.grpc.FileInfo fileInfo : fileInfosList) {
System.out.println(fileInfo);
mResponses.add(new MyStruct(fileInfo.getPath(), fileInfo.getFolder(),
fileInfo.getCompleted()));
}
}

@Override
public void onError(Throwable t) {
throw new RuntimeException(t);
}

@Override
public void onCompleted() {
mResponses.sort(Comparator.comparing(MyStruct::getPath));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort in the test instead of sorting in this test observer

}
}

protected static class MyStruct {
private final String mPath;
private final Boolean mIsDirectory;
private final Boolean mIsCompleted;

public MyStruct(String path, Boolean isDirectory, Boolean isCompleted) {
mPath = path;
mIsDirectory = isDirectory;
mIsCompleted = isCompleted;
}

public String getPath() {
return mPath;
}

public Boolean getIsDirectory() {
return mIsDirectory;
}

public Boolean getIsCompleted() {
return mIsCompleted;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ public final class DoraFileSystemIntegrationTest extends BaseIntegrationTest {
public void before() throws Exception {
}

private void startCluster(LocalAlluxioClusterResource cluster) throws Exception
{
private void startCluster(LocalAlluxioClusterResource cluster) throws Exception {
cluster.start();
mFileSystem = cluster.get().getClient();

Expand All @@ -119,8 +118,7 @@ private void startCluster(LocalAlluxioClusterResource cluster) throws Exception
}
}

private void stopCluster(LocalAlluxioClusterResource cluster) throws Exception
{
private void stopCluster(LocalAlluxioClusterResource cluster) throws Exception {
mFileSystem = null;
cluster.stop();
}
Expand All @@ -133,7 +131,7 @@ private void stopCluster(LocalAlluxioClusterResource cluster) throws Exception
private void writeThenDeleteFromUfs(boolean clientWriteToUFS)
throws IOException, AlluxioException, Exception {
mLocalAlluxioClusterResourceBuilder.setProperty(PropertyKey.CLIENT_WRITE_TO_UFS_ENABLED,
clientWriteToUFS);
clientWriteToUFS);
LocalAlluxioClusterResource clusterResource = mLocalAlluxioClusterResourceBuilder.build();
startCluster(clusterResource);

Expand Down Expand Up @@ -171,7 +169,7 @@ private void writeThenDeleteFromUfs(boolean clientWriteToUFS)
private void writeThenUpdateFromUfs(boolean clientWriteToUFS)
throws IOException, AlluxioException, Exception {
mLocalAlluxioClusterResourceBuilder.setProperty(PropertyKey.CLIENT_WRITE_TO_UFS_ENABLED,
clientWriteToUFS);
clientWriteToUFS);
LocalAlluxioClusterResource clusterResource = mLocalAlluxioClusterResourceBuilder.build();
startCluster(clusterResource);

Expand Down Expand Up @@ -208,12 +206,12 @@ private void writeThenUpdateFromUfs(boolean clientWriteToUFS)

private FileSystemMasterCommonPOptions optionNoSync() {
return FileSystemMasterCommonPOptions.newBuilder().setSyncIntervalMs(-1)
.build();
.build();
}

private FileSystemMasterCommonPOptions optionSync() {
return FileSystemMasterCommonPOptions.newBuilder().setSyncIntervalMs(0)
.build();
.build();
}

/**
Expand All @@ -237,4 +235,39 @@ public void testWriteThenUpdateFromUfs() throws Exception {
writeThenUpdateFromUfs(true);
writeThenUpdateFromUfs(false);
}

@Test
public void testRename() throws Exception {
mLocalAlluxioClusterResourceBuilder.setProperty(PropertyKey.CLIENT_WRITE_TO_UFS_ENABLED, true);
LocalAlluxioClusterResource clusterResource = mLocalAlluxioClusterResourceBuilder.build();
startCluster(clusterResource);

FileOutStream fos = mFileSystem.createFile(TEST_FILE_URI,
CreateFilePOptions.newBuilder().setOverwrite(true).build());
fos.write(TEST_CONTENT.getBytes());
fos.close();

AlluxioURI oldPath = TEST_FILE_URI;
for (int i = 1; i < 10; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we just need to test once

AlluxioURI newPath = new AlluxioURI(oldPath.getPath() + i);
assertNotNull(mFileSystem.getStatus(oldPath, GetStatusPOptions.newBuilder()
.setCommonOptions(optionNoSync())
.build()));
long oldFileId = mFileSystem.getStatus(oldPath, GetStatusPOptions.newBuilder()
.setCommonOptions(optionNoSync())
.build()).getFileId();
mFileSystem.rename(oldPath, newPath);
assertNotNull(mFileSystem.getStatus(newPath, GetStatusPOptions.newBuilder()
.setCommonOptions(optionNoSync())
.build()));
long newFileId = mFileSystem.getStatus(newPath, GetStatusPOptions.newBuilder()
.setCommonOptions(optionNoSync())
.build()).getFileId();
assertEquals(TEST_CONTENT, IOUtils.toString(mFileSystem.openFile(newPath,
OpenFilePOptions.newBuilder().setCommonOptions(optionNoSync()).build())));
oldPath = newPath;
}

stopCluster(clusterResource);
}
}