diff --git a/cli/src/alluxio.org/cli/cmd/fs/check_cached.go b/cli/src/alluxio.org/cli/cmd/fs/check_cached.go new file mode 100644 index 000000000000..300aca2e6f23 --- /dev/null +++ b/cli/src/alluxio.org/cli/cmd/fs/check_cached.go @@ -0,0 +1,67 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package fs + +import ( + "strconv" + + "github.com/spf13/cobra" + + "alluxio.org/cli/env" +) + +func CheckCached(className string) env.Command { + return &CheckCachedCommand{ + BaseJavaCommand: &env.BaseJavaCommand{ + CommandName: "check-cached", + JavaClassName: className, + Parameters: []string{"check-cached"}, + }, + } +} + +type CheckCachedCommand struct { + *env.BaseJavaCommand + + sample int + limit int +} + +func (c *CheckCachedCommand) Base() *env.BaseJavaCommand { + return c.BaseJavaCommand +} + +func (c *CheckCachedCommand) ToCommand() *cobra.Command { + cmd := c.Base().InitRunJavaClassCmd(&cobra.Command{ + Use: "check-cached [path]", + Short: "Checks if files under a path have been cached in alluxio.", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return c.Run(args) + }, + }) + cmd.Flags().IntVar(&c.sample, "sample", 1, "Sample ratio, 10 means sample 1 in every 10 files.") + cmd.Flags().IntVar(&c.limit, "limit", 1000, "Limit number of files to check") + return cmd +} + +func (c *CheckCachedCommand) Run(args []string) error { + javaArgs := []string{"check-cached"} + if c.sample != 0 { + javaArgs = append(javaArgs, "--sample", strconv.Itoa(c.sample)) + } + if c.limit != 0 { + javaArgs = append(javaArgs, "--limit", strconv.Itoa(c.limit)) + } + javaArgs = append(javaArgs, args...) + return c.Base().Run(javaArgs) +} diff --git a/cli/src/alluxio.org/cli/cmd/fs/consistant_hash.go b/cli/src/alluxio.org/cli/cmd/fs/consistant_hash.go new file mode 100644 index 000000000000..adbe88da08e6 --- /dev/null +++ b/cli/src/alluxio.org/cli/cmd/fs/consistant_hash.go @@ -0,0 +1,74 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package fs + +import ( + "alluxio.org/cli/env" + "github.com/palantir/stacktrace" + "github.com/spf13/cobra" +) + +func ConsistentHash(className string) env.Command { + return &ConsistentHashCommand{ + BaseJavaCommand: &env.BaseJavaCommand{ + CommandName: "consistent-hash", + JavaClassName: className, + Parameters: []string{"consistent-hash"}, + }, + } +} + +type ConsistentHashCommand struct { + *env.BaseJavaCommand + + createCheckFile bool + compareCheckFiles bool + cleanCheckData bool +} + +func (c *ConsistentHashCommand) Base() *env.BaseJavaCommand { + return c.BaseJavaCommand +} + +func (c *ConsistentHashCommand) ToCommand() *cobra.Command { + cmd := c.Base().InitRunJavaClassCmd(&cobra.Command{ + Use: "consistent-hash [--create]|[--compare <1stCheckFilePath> <2ndCheckFilePath>]|[--clean] ", + Short: "This command is for checking whether the consistent hash ring is changed or not", + RunE: func(cmd *cobra.Command, args []string) error { + return c.Run(args) + }, + }) + cmd.Flags().BoolVar(&c.createCheckFile, "create", false, "Generate check file.") + cmd.Flags().BoolVar(&c.compareCheckFiles, "compare", false, "Compare check files to see if the hash ring has changed and if data lost.") + cmd.Flags().BoolVar(&c.cleanCheckData, "clean", false, "Clean all check data.") + cmd.MarkFlagsMutuallyExclusive("create", "compare", "clean") + return cmd +} + +func (c *ConsistentHashCommand) Run(args []string) error { + javaArgs := []string{} + if c.createCheckFile { + javaArgs = append(javaArgs, "--create") + } + if c.compareCheckFiles { + if len(args) != 2 { + return stacktrace.NewError("expect 2 arguments with --compare-check-files but got %v", len(args)) + } + javaArgs = append(javaArgs, "--compare") + } + if c.cleanCheckData { + javaArgs = append(javaArgs, "--clean") + } + + javaArgs = append(javaArgs, args...) + return c.Base().Run(javaArgs) +} diff --git a/cli/src/alluxio.org/cli/cmd/fs/fs.go b/cli/src/alluxio.org/cli/cmd/fs/fs.go index 61e3a887dd70..acf5f75b5e6c 100644 --- a/cli/src/alluxio.org/cli/cmd/fs/fs.go +++ b/cli/src/alluxio.org/cli/cmd/fs/fs.go @@ -12,37 +12,40 @@ package fs import ( - "alluxio.org/cli/cmd/names" - "alluxio.org/cli/env" + "alluxio.org/cli/cmd/names" + "alluxio.org/cli/env" ) var Service = &env.Service{ - Name: "fs", - Description: "Operations to interface with the Alluxio filesystem", - Commands: Cmds(names.FileSystemShellJavaClass), + Name: "fs", + Description: "Operations to interface with the Alluxio filesystem", + Commands: Cmds(names.FileSystemShellJavaClass), } func Cmds(className string) []env.Command { - var ret []env.Command - for _, c := range []func(string) env.Command{ - Cat, - Checksum, - Chgrp, - Chmod, - Chown, - Cp, - Head, - Ls, - Mkdir, - Mv, - Rm, - Stat, - Tail, - Test, - Touch, - } { - ret = append(ret, c(className)) - } + var ret []env.Command + for _, c := range []func(string) env.Command{ + Cat, + Checksum, + Chgrp, + Chmod, + Chown, + Cp, + Head, + Ls, + Mkdir, + Mv, + Rm, + Stat, + Tail, + Test, + Touch, + Location, + CheckCached, + ConsistentHash, + } { + ret = append(ret, c(className)) + } - return ret + return ret } diff --git a/cli/src/alluxio.org/cli/cmd/fs/location.go b/cli/src/alluxio.org/cli/cmd/fs/location.go new file mode 100644 index 000000000000..c0b7e55dc675 --- /dev/null +++ b/cli/src/alluxio.org/cli/cmd/fs/location.go @@ -0,0 +1,52 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package fs + +import ( + "github.com/spf13/cobra" + + "alluxio.org/cli/env" +) + +func Location(className string) env.Command { + return &LocationCommand{ + BaseJavaCommand: &env.BaseJavaCommand{ + CommandName: "location", + JavaClassName: className, + Parameters: []string{"location"}, + }, + } +} + +type LocationCommand struct { + *env.BaseJavaCommand +} + +func (c *LocationCommand) Base() *env.BaseJavaCommand { + return c.BaseJavaCommand +} + +func (c *LocationCommand) ToCommand() *cobra.Command { + cmd := c.Base().InitRunJavaClassCmd(&cobra.Command{ + Use: "location [path]", + Short: "Displays the list of hosts storing the specified file.", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return c.Run(args) + }, + }) + return cmd +} + +func (c *LocationCommand) Run(args []string) error { + return c.Base().Run(args) +} diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/DelegatingFileSystem.java b/dora/core/client/fs/src/main/java/alluxio/client/file/DelegatingFileSystem.java index b4f36f8460d1..3e9ba47389eb 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/DelegatingFileSystem.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/DelegatingFileSystem.java @@ -13,6 +13,7 @@ import alluxio.AlluxioURI; import alluxio.PositionReader; +import alluxio.client.file.ufs.UfsBaseFileSystem; import alluxio.conf.AlluxioConfiguration; import alluxio.exception.AlluxioException; import alluxio.exception.DirectoryNotEmptyException; @@ -50,6 +51,7 @@ import java.util.Map; import java.util.Optional; import java.util.function.Consumer; +import javax.annotation.Nullable; /** * A wrapper of a FileSystem instance. @@ -260,6 +262,18 @@ public void close() throws IOException { mDelegatedFileSystem.close(); } + @Nullable + @Override + public DoraCacheFileSystem getDoraCacheFileSystem() { + return mDelegatedFileSystem.getDoraCacheFileSystem(); + } + + @Nullable + @Override + public UfsBaseFileSystem getUfsBaseFileSystem() { + return mDelegatedFileSystem.getUfsBaseFileSystem(); + } + /** * @return the underlying fileSystem */ diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/DoraCacheFileSystem.java b/dora/core/client/fs/src/main/java/alluxio/client/file/DoraCacheFileSystem.java index b3c620b1dda5..30fb00e817e8 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/DoraCacheFileSystem.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/DoraCacheFileSystem.java @@ -64,7 +64,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.function.Consumer; +import javax.annotation.Nullable; /** * Dora Cache file system implementation. @@ -81,7 +83,7 @@ public class DoraCacheFileSystem extends DelegatingFileSystem { private final DoraCacheClient mDoraClient; protected final FileSystemContext mFsContext; private final boolean mMetadataCacheEnabled; - private final boolean mUfsFallbackEnabled; + private boolean mUfsFallbackEnabled; private final long mDefaultVirtualBlockSize; private final boolean mClientWriteToUFSEnabled; @@ -160,7 +162,7 @@ public URIStatus getStatus(AlluxioURI path, GetStatusPOptions options) throw ex; } UFS_FALLBACK_COUNTER.inc(); - LOG.debug("Dora client get status error ({} times). Fall back to UFS.", + LOG.error("Dora client get status error ({} times). Fall back to UFS.", UFS_FALLBACK_COUNTER.getCount(), ex); return mDelegatedFileSystem.getStatus(ufsFullPath, options).setFromUFSFallBack(); } @@ -202,7 +204,7 @@ public FileInStream openFile(URIStatus status, OpenFilePOptions options) throw ex; } UFS_FALLBACK_COUNTER.inc(); - LOG.debug("Dora client open file error ({} times). Fall back to UFS.", + LOG.error("Dora client open file error ({} times). Fall back to UFS.", UFS_FALLBACK_COUNTER.getCount(), ex); return mDelegatedFileSystem.openFile(status, mergedOptions); } @@ -271,7 +273,7 @@ public List listStatus(AlluxioURI path, ListStatusPOptions options) } UFS_FALLBACK_COUNTER.inc(); - LOG.debug("Dora client list status error ({} times). Fall back to UFS.", + LOG.error("Dora client list status error ({} times). Fall back to UFS.", UFS_FALLBACK_COUNTER.getCount(), ex); return mDelegatedFileSystem.listStatus(ufsFullPath, options); } @@ -318,7 +320,7 @@ public FileOutStream createFile(AlluxioURI alluxioPath, CreateFilePOptions optio // TODO(JiamingMai): delete the file // delete(alluxioPath); UFS_FALLBACK_COUNTER.inc(); - LOG.debug("Dora client CreateFile error ({} times). Fall back to UFS.", + LOG.error("Dora client CreateFile error ({} times). Fall back to UFS.", UFS_FALLBACK_COUNTER.getCount(), e); return mDelegatedFileSystem.createFile(ufsFullPath, options); } @@ -338,7 +340,7 @@ public void createDirectory(AlluxioURI path, CreateDirectoryPOptions options) throw ex; } UFS_FALLBACK_COUNTER.inc(); - LOG.debug("Dora client createDirectory error ({} times). Fall back to UFS.", + LOG.error("Dora client createDirectory error ({} times). Fall back to UFS.", UFS_FALLBACK_COUNTER.getCount(), ex); mDelegatedFileSystem.createDirectory(ufsFullPath, options); } @@ -380,7 +382,7 @@ public void rename(AlluxioURI src, AlluxioURI dst, RenamePOptions options) throw ex; } UFS_FALLBACK_COUNTER.inc(); - LOG.debug("Dora client rename error ({} times). Fall back to UFS.", + LOG.error("Dora client rename error ({} times). Fall back to UFS.", UFS_FALLBACK_COUNTER.getCount(), ex); mDelegatedFileSystem.rename(srcUfsFullPath, dstUfsFullPath, options); } @@ -408,7 +410,7 @@ public boolean exists(AlluxioURI path, ExistsPOptions options) throw ex; } UFS_FALLBACK_COUNTER.inc(); - LOG.debug("Dora client exists error ({} times). Fall back to UFS.", + LOG.error("Dora client exists error ({} times). Fall back to UFS.", UFS_FALLBACK_COUNTER.getCount(), ex); return mDelegatedFileSystem.exists(ufsFullPath, options); } @@ -429,7 +431,7 @@ public void setAttribute(AlluxioURI path, SetAttributePOptions options) throw ex; } UFS_FALLBACK_COUNTER.inc(); - LOG.debug("Dora client setAttribute error ({} times). Fall back to UFS.", + LOG.error("Dora client setAttribute error ({} times). Fall back to UFS.", UFS_FALLBACK_COUNTER.getCount(), ex); mDelegatedFileSystem.setAttribute(ufsFullPath, options); } @@ -465,6 +467,46 @@ public AlluxioURI convertToAlluxioPath(AlluxioURI ufsPath) throws InvalidPathExc return PathUtils.convertUfsPathToAlluxioPath(ufsPath, rootUfs); } + /** + * Get the worker address which the specified file locates at. + * @param path the file path + * @return the worker address which the file locates at + */ + public WorkerNetAddress getWorkerNetAddress(AlluxioURI path) { + AlluxioURI ufsFullPath = convertToUfsPath(path); + return mDoraClient.getWorkerNetAddress(ufsFullPath.toString()); + } + + /** + * Check the location of the specified path. + * @param path the file path + * @return a map that maps the file path to a list of workers + * @throws IOException + */ + public Map> checkFileLocation(AlluxioURI path) throws IOException { + return checkFileLocation(path, GetStatusPOptions.getDefaultInstance()); + } + + /** + * Check the location of the specified path. + * @param path the file path + * @param options the get status options + * @return a map that maps the file path to a list of workers + * @throws IOException + */ + public Map> checkFileLocation(AlluxioURI path, + GetStatusPOptions options) throws IOException { + AlluxioURI ufsFullPath = convertToUfsPath(path); + return mDoraClient.checkFileLocation(ufsFullPath.toString(), options); + } + + /** + * Get the location information of the specified file. + * @param path the path to get the location information + * @return the location information of the specified file + * @throws IOException + * @throws AlluxioException + */ @Override public List getBlockLocations(AlluxioURI path) throws IOException, AlluxioException { @@ -474,6 +516,13 @@ public List getBlockLocations(AlluxioURI path) return getBlockLocations(status); } + /** + * Get the location information of the specified file. + * @param status the uri of the file + * @return the location information of the specified file + * @throws IOException + * @throws AlluxioException + */ @Override public List getBlockLocations(URIStatus status) throws IOException, AlluxioException { @@ -506,4 +555,18 @@ public List getBlockLocations(URIStatus status) } return listBuilder.build(); } + + /** + * Dora Cache file system implementation. + * @param enabled is ufs fall back enabled + */ + public void setUfsFallbackEnabled(boolean enabled) { + mUfsFallbackEnabled = enabled; + } + + @Nullable + @Override + public DoraCacheFileSystem getDoraCacheFileSystem() { + return this; + } } diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/FileSystem.java b/dora/core/client/fs/src/main/java/alluxio/client/file/FileSystem.java index 221fbb106283..155bb45f8a33 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/FileSystem.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/FileSystem.java @@ -74,6 +74,7 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import javax.annotation.Nullable; import javax.security.auth.Subject; /** @@ -796,4 +797,18 @@ default void unmount(AlluxioURI path) throws IOException, AlluxioException { */ String getJobProgress(JobDescription jobDescription, JobProgressReportFormat format, boolean verbose); + + /** + * @return the instance of {@link DoraCacheFileSystem} + */ + default @Nullable DoraCacheFileSystem getDoraCacheFileSystem() { + return null; + } + + /** + * @return the instance of {@link UfsBaseFileSystem} + */ + default @Nullable UfsBaseFileSystem getUfsBaseFileSystem() { + return null; + } } diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/DoraCacheClient.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/DoraCacheClient.java index 5795e0028884..eb6dbcabc320 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/DoraCacheClient.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/DoraCacheClient.java @@ -63,7 +63,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -250,6 +252,45 @@ public Pair createFile(String path, CreateFilePOptions option } } + /** + * Get a map that maps file path to the workers list. + * @param path the file path to check + * @param options the get status options + * @return a map that maps file path to the workers list + * @throws IOException + */ + public Map> checkFileLocation(String path, + GetStatusPOptions options) throws IOException { + Map> pathDistributionMap = new HashMap<>(); + List workers = mContext.getCachedWorkers(); + for (BlockWorkerInfo worker : workers) { + try (CloseableResource client = + mContext.acquireBlockWorkerClient(worker.getNetAddress())) { + GetStatusPRequest request = GetStatusPRequest.newBuilder() + .setPath(path) + .setOptions(options) + .build(); + try { + FileInfo fileInfo = client.get().getStatus(request).getFileInfo(); + URIStatus uriStatus = new URIStatus(GrpcUtils.fromProto(fileInfo)); + if (uriStatus.getInAlluxioPercentage() > 0) { + List assignedWorkers = pathDistributionMap.get(path); + if (assignedWorkers == null) { + assignedWorkers = new ArrayList<>(); + } + assignedWorkers.add(worker.getNetAddress()); + pathDistributionMap.put(path, assignedWorkers); + } + } catch (Exception e) { + // ignore this exception + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return pathDistributionMap; + } + /** * Mark the newly created and written file as complete. * diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/ufs/UfsBaseFileSystem.java b/dora/core/client/fs/src/main/java/alluxio/client/file/ufs/UfsBaseFileSystem.java index 0499434b259c..e70d297f1d24 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/ufs/UfsBaseFileSystem.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/ufs/UfsBaseFileSystem.java @@ -13,6 +13,7 @@ import alluxio.AlluxioURI; import alluxio.PositionReader; +import alluxio.client.file.DoraCacheFileSystem; import alluxio.client.file.FileInStream; import alluxio.client.file.FileOutStream; import alluxio.client.file.FileSystem; @@ -80,6 +81,7 @@ import java.util.Map; import java.util.Optional; import java.util.function.Consumer; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** @@ -517,4 +519,16 @@ interface UfsCallable { interface UfsCallableWithReturn { V call() throws IOException; } + + @Nullable + @Override + public DoraCacheFileSystem getDoraCacheFileSystem() { + return null; + } + + @Nullable + @Override + public UfsBaseFileSystem getUfsBaseFileSystem() { + return this; + } } diff --git a/dora/core/common/src/main/java/alluxio/exception/ExceptionMessage.java b/dora/core/common/src/main/java/alluxio/exception/ExceptionMessage.java index 89a57b6a05fa..2644f377ccb0 100644 --- a/dora/core/common/src/main/java/alluxio/exception/ExceptionMessage.java +++ b/dora/core/common/src/main/java/alluxio/exception/ExceptionMessage.java @@ -34,6 +34,8 @@ public enum ExceptionMessage { INODE_NOT_IN_PARTIAL_LISTING("Inode not found in root path \"{0}\" during partial listing. " + "It was likely moved across listing calls."), PATH_MUST_BE_FILE("Path \"{0}\" must be a file."), + PATH_MUST_BE_DIRECTORY("Path \"{0}\" must be a directory."), + PATH_INVALID("Path \"{0}\" is invalid."), STATE_LOCK_TIMED_OUT("Failed to acquire the lock after {0}ms"), diff --git a/dora/core/common/src/main/java/alluxio/worker/dora/DoraWorker.java b/dora/core/common/src/main/java/alluxio/worker/dora/DoraWorker.java index 6b4ddfcf3ee3..6857c096be9d 100644 --- a/dora/core/common/src/main/java/alluxio/worker/dora/DoraWorker.java +++ b/dora/core/common/src/main/java/alluxio/worker/dora/DoraWorker.java @@ -29,6 +29,7 @@ import alluxio.proto.dataserver.Protocol; import alluxio.underfs.UfsStatus; import alluxio.wire.FileInfo; +import alluxio.wire.WorkerNetAddress; import alluxio.worker.DataWorker; import alluxio.worker.SessionCleanable; import alluxio.worker.block.io.BlockReader; @@ -188,4 +189,10 @@ boolean exists(String path, ExistsPOptions options) * @param options the options of this operation */ void setAttribute(String path, SetAttributePOptions options) throws IOException; + + /** + * Get the address of the Dora Worker. + * @return worker address + */ + WorkerNetAddress getAddress(); } diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java b/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java index 3abfd72a0118..c3b4077fe2f8 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java @@ -533,7 +533,7 @@ public boolean processResponse(DoraLoadTask doraLoadTask) { - response.getFailuresList().size(); int totalLoadedFile = (int) (doraLoadTask.getFilesToLoad().stream().filter(UfsStatus::isFile).count() - - response.getFailuresList().stream() + - response.getFailuresList().stream() .filter(it -> !it.getUfsStatus().getIsDirectory()).count()); int totalLoadedDirectory = totalLoadedInodes - totalLoadedFile; Set failedFullUfsPaths = diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java index 1fefa1ad91ef..46d3be2e9304 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java @@ -12,6 +12,7 @@ package alluxio.worker.dora; import alluxio.AlluxioURI; +import alluxio.Constants; import alluxio.client.file.cache.CacheManager; import alluxio.conf.AlluxioConfiguration; import alluxio.conf.Configuration; @@ -26,6 +27,7 @@ import alluxio.underfs.UnderFileSystemConfiguration; import alluxio.underfs.options.GetStatusOptions; import alluxio.underfs.options.ListOptions; +import alluxio.util.logging.SamplingLogger; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -54,6 +56,8 @@ public class DoraMetaManager implements Closeable { private final PagedDoraWorker mDoraWorker; private final DoraUfsManager mUfsManager; + private static final Logger SAMPLING_LOG = new SamplingLogger( + LoggerFactory.getLogger(DoraMetaManager.class), 1L * Constants.MINUTE_MS); private final long mListingCacheCapacity = Configuration.getInt(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_NR_FILES); private final boolean mGetRealContentHash @@ -333,6 +337,7 @@ private boolean shouldInvalidatePageCache(FileInfo origin, FileInfo updated) { } private void invalidateCachedFile(String path) { + SAMPLING_LOG.info("Invalidating cached file {}", path); FileId fileId = FileId.of(AlluxioURI.hash(path)); mCacheManager.deleteFile(fileId.toString()); } diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java index efe9e79c6293..f7d34db17312 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java @@ -1020,4 +1020,9 @@ protected void checkMovePermission(String srcPath, String dstPath) protected DoraOpenFileHandleContainer getOpenFileHandleContainer() { return mOpenFileHandleContainer; } + + @Override + public WorkerNetAddress getAddress() { + return mAddress; + } } diff --git a/dora/shell/src/main/java/alluxio/cli/fs/command/CheckCachedCommand.java b/dora/shell/src/main/java/alluxio/cli/fs/command/CheckCachedCommand.java new file mode 100644 index 000000000000..9a211fe8ed9b --- /dev/null +++ b/dora/shell/src/main/java/alluxio/cli/fs/command/CheckCachedCommand.java @@ -0,0 +1,245 @@ + +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.cli.fs.command; + +import alluxio.AlluxioURI; +import alluxio.annotation.PublicApi; +import alluxio.client.file.DoraCacheFileSystem; +import alluxio.client.file.FileSystem; +import alluxio.client.file.FileSystemContext; +import alluxio.client.file.URIStatus; +import alluxio.exception.AlluxioException; +import alluxio.exception.ExceptionMessage; +import alluxio.exception.FileDoesNotExistException; +import alluxio.exception.status.InvalidArgumentException; +import alluxio.grpc.FileSystemMasterCommonPOptions; +import alluxio.grpc.GetStatusPOptions; +import alluxio.grpc.LoadMetadataPType; +import alluxio.util.FormatUtils; + +import com.google.common.base.Preconditions; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Moves a file or a directory in the Alluxio filesystem using job service. + */ +@ThreadSafe +@PublicApi +public final class CheckCachedCommand extends AbstractFileSystemCommand { + private static final Logger LOG = LoggerFactory.getLogger(CheckCachedCommand.class); + + private static final int DEFAULT_LIMIT = 1000; + private static final int LIMIT_WARNING_THRESHOLD = 10000; + private static final int SAMPLE_FILE_SIZE = 100; + private final FileSystem mUfsFileSystem; + private final DoraCacheFileSystem mDoraCacheFileSystem; + + private static final Option SAMPLE_OPTION = + Option.builder() + .longOpt("sample") + .required(false) + .hasArg(true) + .desc("Sample ratio, 10 means sample 1 in every 10 files.") + .build(); + + private static final Option LIMIT_OPTION = + Option.builder() + .longOpt("limit") + .required(false) + .hasArg(true) + .desc("limit, default 1000") + .build(); + + /** + * @param fsContext the filesystem of Alluxio + */ + public CheckCachedCommand(FileSystemContext fsContext) { + super(fsContext); + Preconditions.checkNotNull(mFileSystem.getDoraCacheFileSystem()); + mDoraCacheFileSystem = mFileSystem.getDoraCacheFileSystem(); + // Disable ufs fallback + mDoraCacheFileSystem.setUfsFallbackEnabled(false); + mUfsFileSystem = mFileSystem.getUfsBaseFileSystem(); + } + + @Override + public void validateArgs(CommandLine cl) throws InvalidArgumentException { + super.validateArgs(cl); + String sampleOptionValue = cl.getOptionValue(SAMPLE_OPTION.getLongOpt()); + String limitOptionValue = cl.getOptionValue(LIMIT_OPTION.getLongOpt()); + if (sampleOptionValue != null) { + Preconditions.checkState(StringUtils.isNumeric(sampleOptionValue)); + } + if (limitOptionValue != null) { + Preconditions.checkState(StringUtils.isNumeric(limitOptionValue)); + int limit = Integer.parseInt(limitOptionValue); + if (limit > LIMIT_WARNING_THRESHOLD) { + LOG.warn("Limit {} is too large. This may cause client freeze. " + + "Considering reduce the value if the processing takes too long.", limit); + } + } + } + + @Override + public String getCommandName() { + return "check-cached"; + } + + @Override + public Options getOptions() { + return new Options() + .addOption(SAMPLE_OPTION) + .addOption(LIMIT_OPTION); + } + + // TODO(elega) validate limit value + @Override + public int run(CommandLine cl) throws AlluxioException, IOException { + String[] args = cl.getArgs(); + AlluxioURI path = new AlluxioURI(args[0]); + + int sampleRatio = 1; + int limit = DEFAULT_LIMIT; + + int numFullyCachedFile = 0; + int numPartiallyCachedFile = 0; + int numNotCachedFile = 0; + int numFailed = 0; + + long approximateCachedBytes = 0; + long totalBytes = 0; + + List fullyCachedFileSampleList = new ArrayList<>(); + List partiallyCachedFileSampleList = new ArrayList<>(); + List notCachedFileSampleList = new ArrayList<>(); + List failedSampleList = new ArrayList<>(); + + String sampleOptionValue = cl.getOptionValue(SAMPLE_OPTION.getLongOpt()); + String limitOptionValue = cl.getOptionValue(LIMIT_OPTION.getLongOpt()); + if (sampleOptionValue != null) { + sampleRatio = Integer.parseInt(sampleOptionValue); + } + if (limitOptionValue != null) { + limit = Integer.parseInt(limitOptionValue); + } + + if (!mDoraCacheFileSystem.getStatus(path).isFolder()) { + throw new FileDoesNotExistException(ExceptionMessage.PATH_MUST_BE_DIRECTORY.getMessage(path)); + } + + // TODO(elega) need an iterative API to avoid loading too many files + List statuses = mUfsFileSystem.listStatus( + mDoraCacheFileSystem.convertToUfsPath(path)).stream() + .filter(it -> !it.isFolder() && it.isCompleted() + ).collect(Collectors.toList()); + Collections.shuffle(statuses); + List statusesToCheck = + statuses.subList(0, Math.min(statuses.size() / sampleRatio, limit)); + for (URIStatus status: statusesToCheck) { + String filePath = status.getUfsPath(); + try { + URIStatus fileStatus = mDoraCacheFileSystem.getStatus( + new AlluxioURI(filePath), GetStatusPOptions.newBuilder() + .setLoadMetadataType(LoadMetadataPType.NEVER) + .setCommonOptions(FileSystemMasterCommonPOptions.newBuilder() + .setSyncIntervalMs(-1).build()) + .build() + ); + totalBytes += fileStatus.getLength(); + approximateCachedBytes += + fileStatus.getLength() * fileStatus.getInAlluxioPercentage() / 100; + if (fileStatus.getLength() == 0 || fileStatus.getInAlluxioPercentage() == 100) { + numFullyCachedFile++; + if (fullyCachedFileSampleList.size() < SAMPLE_FILE_SIZE) { + fullyCachedFileSampleList.add(filePath); + } + } else if (fileStatus.getInAlluxioPercentage() > 0) { + numPartiallyCachedFile++; + if (partiallyCachedFileSampleList.size() < SAMPLE_FILE_SIZE) { + partiallyCachedFileSampleList.add(filePath); + } + } else { + numNotCachedFile++; + if (notCachedFileSampleList.size() < SAMPLE_FILE_SIZE) { + notCachedFileSampleList.add(filePath); + } + } + } catch (Exception e) { + if (e instanceof FileDoesNotExistException) { + numNotCachedFile++; + if (notCachedFileSampleList.size() < SAMPLE_FILE_SIZE) { + notCachedFileSampleList.add(filePath); + } + } else { + numFailed++; + if (failedSampleList.size() < SAMPLE_FILE_SIZE) { + failedSampleList.add(filePath); + } + } + } + } + + System.out.println("Total files checked: " + statusesToCheck.size()); + System.out.println("Fully cached files count: " + numFullyCachedFile); + System.out.println("Partially cached files count: " + numPartiallyCachedFile); + System.out.println("Not cached files count: " + numNotCachedFile); + System.out.println("Failed files count: " + numFailed); + System.out.println( + "Total bytes checked: " + FormatUtils.getSizeFromBytes(totalBytes)); + System.out.println( + "Approximate bytes cached: " + FormatUtils.getSizeFromBytes(approximateCachedBytes)); + System.out.println("--------------Fully cached files samples (up to 100)------------------"); + for (String file: fullyCachedFileSampleList) { + System.out.println(file); + } + System.out.println(); + System.out.println("--------------Partially cached files samples (up to 100)-----------------"); + for (String file: partiallyCachedFileSampleList) { + System.out.println(file); + } + System.out.println(); + System.out.println("--------------Not cached files samples (up to 100)------------------"); + for (String file: notCachedFileSampleList) { + System.out.println(file); + } + System.out.println(); + System.out.println("--------------Failed files samples (up to 100)------------------"); + for (String file: failedSampleList) { + System.out.println(file); + } + System.out.println(); + return 0; + } + + @Override + public String getUsage() { + return "check-cached [--sample ] [--limit ]"; + } + + @Override + public String getDescription() { + return "Checks if files under a path have been cached in alluxio."; + } +} diff --git a/dora/shell/src/main/java/alluxio/cli/fs/command/ConsistentHashCommand.java b/dora/shell/src/main/java/alluxio/cli/fs/command/ConsistentHashCommand.java new file mode 100644 index 000000000000..bc3d7a0554b0 --- /dev/null +++ b/dora/shell/src/main/java/alluxio/cli/fs/command/ConsistentHashCommand.java @@ -0,0 +1,384 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.cli.fs.command; + +import alluxio.AlluxioURI; +import alluxio.Constants; +import alluxio.annotation.PublicApi; +import alluxio.client.file.DoraCacheFileSystem; +import alluxio.client.file.FileInStream; +import alluxio.client.file.FileOutStream; +import alluxio.client.file.FileSystem; +import alluxio.client.file.FileSystemContext; +import alluxio.conf.InstancedConfiguration; +import alluxio.exception.AlluxioException; +import alluxio.exception.status.InvalidArgumentException; +import alluxio.wire.WorkerNetAddress; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.lang.reflect.Type; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Copies the specified file specified by "source path" to the path specified by "remote path". + * This command will fail if "remote path" already exists. + */ +@ThreadSafe +@PublicApi +public final class ConsistentHashCommand extends AbstractFileSystemCommand { + + private static final Logger LOG = LoggerFactory.getLogger(ConsistentHashCommand.class); + + private final String mFolderName = "/consistent-hash-check-data_ALLUXIO"; + + private final int mFileNum = 1000; + + private SimpleDateFormat mSimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss"); + + public static final Option CREATE_CHECK_FILE = + Option.builder() + .longOpt("create") + .required(false) + .hasArg(false) + .desc("Generate check file.") + .build(); + + public static final Option COMPARE_CHECK_FILES = + Option.builder() + .longOpt("compare") + .required(false) + .hasArg(false) + .desc("Compare check files to see if the hash ring has changed " + + "and if data lost.") + .build(); + + public static final Option CLEAN_CHECK_DATA = + Option.builder() + .longOpt("clean") + .required(false) + .hasArg(false) + .desc("Clean all check data.") + .build(); + + /** + * @param fsContext the filesystem of Alluxio + */ + public ConsistentHashCommand(FileSystemContext fsContext) { + super(fsContext); + // The copyFromLocal command needs its own filesystem context because we overwrite the + // block location policy configuration. + // The original one can't be closed because it may still be in-use within the same shell. + InstancedConfiguration conf = new InstancedConfiguration( + fsContext.getClusterConf().copyProperties()); + + FileSystemContext updatedCtx = FileSystemContext.sFileSystemContextFactory.create(conf); + mFsContext = updatedCtx; + mFileSystem = FileSystem.Factory.create(updatedCtx); + } + + /** + * Clean all the check data. + * @throws IOException + * @throws AlluxioException + */ + public void cleanCheckData() throws IOException, AlluxioException { + AlluxioURI folder = new AlluxioURI(mFolderName); + for (int i = 0; i < mFileNum; i++) { + System.out.println("Progress: " + (i + 1) + "/" + mFileNum); + String fileName = "file" + i; + AlluxioURI file = new AlluxioURI(folder, new AlluxioURI(fileName)); + if (mFileSystem.exists(file)) { + mFileSystem.delete(file); + } + } + if (mFileSystem.exists(folder)) { + mFileSystem.delete(folder); + } + System.out.println("Check data has been cleaned successfully."); + } + + /** + * Create the check file. + * @throws IOException + * @throws AlluxioException + */ + public void createCheckFile() throws IOException, AlluxioException { + // Step 1. create folder + String folderName = "/consistent-hash-check-data_ALLUXIO"; + AlluxioURI folder = new AlluxioURI(folderName); + if (!mFileSystem.exists(folder)) { + mFileSystem.createDirectory(folder); + } + + // Step 2. generate 1000 files and put them into the folder + Set fileLocationSet = new HashSet<>(); + for (int i = 0; i < mFileNum; i++) { + System.out.println("Progress: " + (i + 1) + "/" + mFileNum); + String fileName = "file" + i; + AlluxioURI file = new AlluxioURI(folder, new AlluxioURI(fileName)); + + boolean hasCachedFile = false; + if (!mFileSystem.exists(file)) { + writeFile(file); + cacheFile(file); + hasCachedFile = true; + } + + if (mFileSystem.getDoraCacheFileSystem() != null) { + DoraCacheFileSystem doraCacheFileSystem = mFileSystem.getDoraCacheFileSystem(); + AlluxioURI ufsFullPath = doraCacheFileSystem.convertToUfsPath(file); + String fileUfsFullName = ufsFullPath.toString(); + boolean dataOnPreferredWorker = false; + Set workersThatHaveDataSet = new HashSet<>(); + + WorkerNetAddress preferredWorker = doraCacheFileSystem.getWorkerNetAddress(file); + Map> fileOnWorkersMap = checkFileLocation(file); + + if (fileOnWorkersMap != null && fileOnWorkersMap.size() > 0) { + Optional fileUfsFullNameOpt = fileOnWorkersMap.keySet().stream().findFirst(); + if (fileUfsFullNameOpt.isPresent()) { + List workersThatHaveDataList = fileOnWorkersMap.get(fileUfsFullName); + if (workersThatHaveDataList != null && !workersThatHaveDataList.isEmpty()) { + dataOnPreferredWorker = workersThatHaveDataList.contains(preferredWorker); + workersThatHaveDataSet = workersThatHaveDataList.stream() + .map(workerNetAddress -> workerNetAddress.getHost()).collect(Collectors.toSet()); + } + } + } + + FileLocation fileLocation = new FileLocation( + fileUfsFullName, + preferredWorker.getHost(), + dataOnPreferredWorker, + workersThatHaveDataSet); + fileLocationSet.add(fileLocation); + } + + if (hasCachedFile == false) { + cacheFile(file); + } + } + + // Step 3. convert to JSON and persist to UFS + Gson gson = new Gson(); + String json = gson.toJson(fileLocationSet); + String persistFileName = "/consistent-hash-check-" + + mSimpleDateFormat.format(new Date()) + ".json"; + writeFile(new AlluxioURI(persistFileName), json.getBytes()); + + System.out.println("Check file " + persistFileName + " is generated successfully."); + } + + private Map> checkFileLocation(AlluxioURI file) + throws IOException { + if (mFileSystem.getDoraCacheFileSystem() != null) { + DoraCacheFileSystem doraCacheFileSystem = mFileSystem.getDoraCacheFileSystem(); + Map> pathLocations = + doraCacheFileSystem.checkFileLocation(file); + return pathLocations; + } else { + throw new RuntimeException("Only DORA architecture can use this command. "); + } + } + + private void cacheFile(AlluxioURI file) throws IOException, AlluxioException { + byte[] buf = new byte[Constants.MB]; + try (FileInStream is = mFileSystem.openFile(file)) { + int read = is.read(buf); + while (read != -1) { + read = is.read(buf); + } + } + } + + private void writeFile(AlluxioURI file) throws IOException, AlluxioException { + try (FileOutStream outStream = mFileSystem.createFile(file)) { + byte[] bytes = new byte[1]; + bytes[0] = 1; + outStream.write(bytes); + } + } + + private void writeFile(AlluxioURI file, byte[] data) throws IOException, AlluxioException { + BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(data)); + try (FileOutStream outStream = mFileSystem.createFile(file)) { + byte[] buffer = new byte[Constants.MB]; + int bytesRead; + do { + bytesRead = inputStream.read(buffer); + if (bytesRead != -1) { + outStream.write(buffer, 0, bytesRead); + } + } while (bytesRead != -1); + } + } + + private CheckResult compareCheckFile(String checkFile, String anotherCheckFile) + throws IOException, AlluxioException { + AlluxioURI checkFileUri = new AlluxioURI(checkFile); + AlluxioURI anotherCheckFileUri = new AlluxioURI(anotherCheckFile); + Set fileLocationSet = loadCheckFile(checkFileUri); + Set anotherFileLocationSet = loadCheckFile(anotherCheckFileUri); + + Map fileLocationMap = new HashMap<>(); + for (FileLocation fileLocation : fileLocationSet) { + fileLocationMap.put(fileLocation.getFileName(), fileLocation); + } + Map anotherFileLocationMap = new HashMap<>(); + for (FileLocation fileLocation : anotherFileLocationSet) { + anotherFileLocationMap.put(fileLocation.getFileName(), fileLocation); + } + + boolean isHashRingChanged = false; + boolean isDataLost = false; + for (Map.Entry entry : fileLocationMap.entrySet()) { + String fileName = entry.getKey(); + FileLocation fileLocation = entry.getValue(); + FileLocation anotherFileLocation = anotherFileLocationMap.get(fileName); + + if (anotherFileLocation == null + || (!fileLocation.getPreferredWorker() + .equals(anotherFileLocation.getPreferredWorker()))) { + isHashRingChanged = true; + } + + if (anotherFileLocation == null + || (fileLocation.isDataOnPreferredWorker() + && !anotherFileLocation.isDataOnPreferredWorker())) { + isDataLost = true; + } + } + CheckResult checkResult = new CheckResult(isHashRingChanged, isDataLost); + Gson gson = new Gson(); + System.out.println(gson.toJson(checkResult)); + return checkResult; + } + + private Set loadCheckFile(AlluxioURI checkFileUri) + throws IOException, AlluxioException { + StringBuffer stringBuffer = new StringBuffer(); + byte[] buf = new byte[Constants.MB]; + try (FileInStream is = mFileSystem.openFile(checkFileUri)) { + int read = is.read(buf); + while (read != -1) { + stringBuffer.append(new String(buf, 0, read)); + read = is.read(buf); + } + } + String json = stringBuffer.toString(); + Gson gson = new Gson(); + Type type = new TypeToken>(){}.getType(); + Set fileLocationSet = gson.fromJson(json, type); + return fileLocationSet; + } + + @Override + public void close() throws IOException { + // Close updated {@link FileSystem} instance that is created for internal cp command. + // This will close the {@link FileSystemContext} associated with it. + mFileSystem.close(); + } + + @Override + public String getCommandName() { + return "consistent-hash"; + } + + @Override + public Options getOptions() { + return new Options().addOption(ConsistentHashCommand.CREATE_CHECK_FILE) + .addOption(ConsistentHashCommand.COMPARE_CHECK_FILES) + .addOption(ConsistentHashCommand.CLEAN_CHECK_DATA); + } + + @Override + public void validateArgs(CommandLine cl) throws InvalidArgumentException { + } + + @Override + public int run(CommandLine cl) throws AlluxioException, IOException { + String[] args = cl.getArgs(); + Option[] options = cl.getOptions(); + switch (options[0].getLongOpt()) { + case "create": + createCheckFile(); + break; + case "compare": + String checkFilePath = args[0]; + String anotherCheckFilePath = args[1]; + compareCheckFile(checkFilePath, anotherCheckFilePath); + break; + case "clean": + cleanCheckData(); + break; + default: + System.out.println(getUsage()); + } + return 0; + } + + @Override + public String getUsage() { + return "consistent-hash " + + "[--create] " + + "[--compare <1stCheckFilePath> <2ndCheckFilePath>] " + + "[--clean] "; + } + + @Override + public String getDescription() { + return "This command is for checking whether the consistent hash ring is changed or not. " + + "The command will generates 1000 files and caches them in Alluxio Workers. And then " + + "create a check file which records the location of each file. Next time we can execute " + + "this command again, and check if the check files are the same. If they are different, " + + "it means that the consistent hash ring has changed."; + } + + class CheckResult { + + private boolean mHashRingChanged; + + private boolean mDataLost; + + public CheckResult(boolean hashRingChanged, boolean dataLost) { + mHashRingChanged = hashRingChanged; + mDataLost = dataLost; + } + + public boolean isHashRingChanged() { + return mHashRingChanged; + } + + public boolean isDataLost() { + return mDataLost; + } + } +} diff --git a/dora/shell/src/main/java/alluxio/cli/fs/command/FileLocation.java b/dora/shell/src/main/java/alluxio/cli/fs/command/FileLocation.java new file mode 100644 index 000000000000..4aef14d7b900 --- /dev/null +++ b/dora/shell/src/main/java/alluxio/cli/fs/command/FileLocation.java @@ -0,0 +1,95 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.cli.fs.command; + +import java.util.Objects; +import java.util.Set; + +/** + * Description of the file location. + */ +public class FileLocation { + private final String mFileName; + + private final String mPreferredWorker; + + private final boolean mDataOnPreferredWorker; + + private final Set mWorkersThatHaveData; + + /** + * Description of the file location. + * @param fileName the file name + * @param preferredWorker the preferred worker + * @param dataOnPreferredWorker the + * @param workers the workers where the file locate + */ + public FileLocation(String fileName, String preferredWorker, boolean dataOnPreferredWorker, + Set workers) { + mFileName = fileName; + mPreferredWorker = preferredWorker; + mDataOnPreferredWorker = dataOnPreferredWorker; + mWorkersThatHaveData = workers; + } + + /** + * Get the file name. + * @return the file name + */ + public String getFileName() { + return mFileName; + } + + /** + * Get the preferred worker. + * @return the preferred worker + */ + public String getPreferredWorker() { + return mPreferredWorker; + } + + /** + * If data locates at the preferred worker. + * @return if data locates at the preferred worker + */ + public boolean isDataOnPreferredWorker() { + return mDataOnPreferredWorker; + } + + /** + * Get the workers where data locate. + * @return the workers set where data locate + */ + public Set getWorkersThatHaveData() { + return mWorkersThatHaveData; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FileLocation that = (FileLocation) o; + return mDataOnPreferredWorker == that.mDataOnPreferredWorker + && Objects.equals(mFileName, that.mFileName) + && Objects.equals(mPreferredWorker, that.mPreferredWorker) + && Objects.equals(mWorkersThatHaveData, that.mWorkersThatHaveData); + } + + @Override + public int hashCode() { + return Objects.hash(mFileName, mPreferredWorker, mDataOnPreferredWorker, mWorkersThatHaveData); + } +} diff --git a/dora/shell/src/main/java/alluxio/cli/fs/command/LocationCommand.java b/dora/shell/src/main/java/alluxio/cli/fs/command/LocationCommand.java new file mode 100644 index 000000000000..bed120d19e25 --- /dev/null +++ b/dora/shell/src/main/java/alluxio/cli/fs/command/LocationCommand.java @@ -0,0 +1,116 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.cli.fs.command; + +import alluxio.AlluxioURI; +import alluxio.annotation.PublicApi; +import alluxio.cli.CommandUtils; +import alluxio.client.file.DoraCacheFileSystem; +import alluxio.client.file.FileSystemContext; +import alluxio.exception.AlluxioException; +import alluxio.exception.status.InvalidArgumentException; +import alluxio.wire.WorkerNetAddress; + +import com.google.gson.Gson; +import org.apache.commons.cli.CommandLine; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Displays a list of hosts that have the file specified in args stored. + */ +@ThreadSafe +@PublicApi +public final class LocationCommand extends AbstractFileSystemCommand { + /** + * Constructs a new instance to display a list of hosts that have the file specified in args + * stored. + * + * @param fsContext the filesystem of Alluxio + */ + public LocationCommand(FileSystemContext fsContext) { + super(fsContext); + } + + @Override + public String getCommandName() { + return "location"; + } + + @Override + protected void runPlainPath(AlluxioURI plainPath, CommandLine cl) + throws AlluxioException, IOException { + if (mFileSystem.getDoraCacheFileSystem() != null) { + DoraCacheFileSystem doraCacheFileSystem = mFileSystem.getDoraCacheFileSystem(); + Map> pathLocations = + doraCacheFileSystem.checkFileLocation(plainPath); + WorkerNetAddress preferredWorker = doraCacheFileSystem.getWorkerNetAddress(plainPath); + + AlluxioURI ufsFullPath = doraCacheFileSystem.convertToUfsPath(plainPath); + String fileUfsFullName = ufsFullPath.toString(); + boolean dataOnPreferredWorker = false; + Set workersThatHaveDataSet = new HashSet<>(); + + if (pathLocations != null && pathLocations.size() > 0) { + Optional fileUfsFullNameOpt = pathLocations.keySet().stream().findFirst(); + if (fileUfsFullNameOpt.isPresent()) { + List workersThatHaveDataList = pathLocations.get(fileUfsFullName); + if (workersThatHaveDataList != null && !workersThatHaveDataList.isEmpty()) { + dataOnPreferredWorker = workersThatHaveDataList.contains(preferredWorker); + workersThatHaveDataSet = workersThatHaveDataList.stream() + .map(workerNetAddress -> workerNetAddress.getHost()).collect(Collectors.toSet()); + } + } + } + + FileLocation fileLocation = new FileLocation( + fileUfsFullName, + preferredWorker.getHost(), + dataOnPreferredWorker, + workersThatHaveDataSet); + + Gson gson = new Gson(); + String pathLocationsJson = gson.toJson(fileLocation); + System.out.println(pathLocationsJson); + } + } + + @Override + public int run(CommandLine cl) throws AlluxioException, IOException { + String[] args = cl.getArgs(); + AlluxioURI path = new AlluxioURI(args[0]); + runWildCardCmd(path, cl); + return 0; + } + + @Override + public String getUsage() { + return "location "; + } + + @Override + public String getDescription() { + return "Displays the list of hosts storing the specified file."; + } + + @Override + public void validateArgs(CommandLine cl) throws InvalidArgumentException { + CommandUtils.checkNumOfArgsEquals(this, cl, 1); + } +}