diff --git a/hail/python/hail/fs/hadoop_fs.py b/hail/python/hail/fs/hadoop_fs.py index 7ff30a2bf6f..e8c49cf5a02 100644 --- a/hail/python/hail/fs/hadoop_fs.py +++ b/hail/python/hail/fs/hadoop_fs.py @@ -6,7 +6,15 @@ import dateutil.parser from hailtop.fs.fs import FS -from hailtop.fs.stat_result import FileType, FileListEntry +from hailtop.fs.stat_result import FileType, FileListEntry, FileStatus + + +def _file_status_scala_to_python(file_status: Dict[str, Any]) -> FileStatus: + dt = dateutil.parser.isoparse(file_status['modification_time']) + mtime = time.mktime(dt.timetuple()) + return FileStatus( + path=file_status['path'], owner=file_status['owner'], size=file_status['size'], modification_time=mtime + ) def _file_list_entry_scala_to_python(file_list_entry: Dict[str, Any]) -> FileListEntry: @@ -69,9 +77,21 @@ def is_file(self, path: str) -> bool: def is_dir(self, path: str) -> bool: return self._jfs.isDir(path) + def fast_stat(self, path: str) -> FileStatus: + '''Get information about a path other than its file/directory status. + + In the cloud, determining if a given path is a file, a directory, or both is expensive. This + method simply returns file metadata if there is a file at this path. If there is no file at + this path, this operation will fail. The presence or absence of a directory at this path + does not affect the behaviors of this method. + + ''' + file_status_dict = json.loads(self._utils_package_object.fileStatus(self._jfs, path)) + return _file_status_scala_to_python(file_status_dict) + def stat(self, path: str) -> FileListEntry: - stat_dict = json.loads(self._utils_package_object.fileListEntry(self._jfs, path)) - return _file_list_entry_scala_to_python(stat_dict) + file_list_entry_dict = json.loads(self._utils_package_object.fileListEntry(self._jfs, path)) + return _file_list_entry_scala_to_python(file_list_entry_dict) def ls(self, path: str) -> List[FileListEntry]: return [ diff --git a/hail/python/hailtop/fs/stat_result.py b/hail/python/hailtop/fs/stat_result.py index 4cbb27c6267..cbb03384431 100644 --- a/hail/python/hailtop/fs/stat_result.py +++ b/hail/python/hailtop/fs/stat_result.py @@ -10,6 +10,23 @@ class FileType(Enum): SYMLINK = auto() +class FileStatus(NamedTuple): + path: str + owner: Union[None, str, int] + size: int + # common point between unix, google, and hadoop filesystems, represented as a unix timestamp + modification_time: Optional[float] + + def to_legacy_dict(self) -> Dict[str, Any]: + return { + 'path': self.path, + 'owner': self.owner, + 'size_bytes': self.size, + 'size': filesize(self.size), + 'modification_time': self.modification_time, + } + + class FileListEntry(NamedTuple): path: str owner: Union[None, str, int] diff --git a/hail/python/test/hail/methods/test_impex.py b/hail/python/test/hail/methods/test_impex.py index 4fb62dcb6f4..77148af82d0 100644 --- a/hail/python/test/hail/methods/test_impex.py +++ b/hail/python/test/hail/methods/test_impex.py @@ -1,4 +1,5 @@ import json +import re import os import pytest import shutil @@ -1511,14 +1512,17 @@ def test_old_index_file_throws_error(self): with hl.TemporaryFilename() as f: hl.current_backend().fs.copy(bgen_file, f) - with pytest.raises(FatalError, match='have no .idx2 index file'): + + expected_missing_idx2_error_message = re.compile(f'have no .idx2 index file.*{f}.*', re.DOTALL) + + with pytest.raises(FatalError, match=expected_missing_idx2_error_message): hl.import_bgen(f, ['GT', 'GP'], sample_file, n_partitions=3) try: with hl.current_backend().fs.open(f + '.idx', 'wb') as fobj: fobj.write(b'') - with pytest.raises(FatalError, match='have no .idx2 index file'): + with pytest.raises(FatalError, match=expected_missing_idx2_error_message): hl.import_bgen(f, ['GT', 'GP'], sample_file) finally: hl.current_backend().fs.remove(f + '.idx') diff --git a/hail/python/test/hail/utils/test_utils.py b/hail/python/test/hail/utils/test_utils.py index 7ff17a1225c..be2251a789e 100644 --- a/hail/python/test/hail/utils/test_utils.py +++ b/hail/python/test/hail/utils/test_utils.py @@ -172,7 +172,7 @@ def test_hadoop_ls_file_that_does_not_exist(self): except FileNotFoundError: pass except FatalError as err: - assert 'FileNotFoundException: a_file_that_does_not_exist' in err.args[0] + assert 'FileNotFoundException: file:/io/a_file_that_does_not_exist' in err.args[0] else: assert False diff --git a/hail/src/main/scala/is/hail/expr/ir/AbstractMatrixTableSpec.scala b/hail/src/main/scala/is/hail/expr/ir/AbstractMatrixTableSpec.scala index 6f3b21f7928..d189d633e0a 100644 --- a/hail/src/main/scala/is/hail/expr/ir/AbstractMatrixTableSpec.scala +++ b/hail/src/main/scala/is/hail/expr/ir/AbstractMatrixTableSpec.scala @@ -10,7 +10,7 @@ import org.json4s._ import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods.parse -import java.io.OutputStreamWriter +import java.io.{FileNotFoundException, OutputStreamWriter} import scala.collection.mutable import scala.language.{existentials, implicitConversions} @@ -36,15 +36,22 @@ object RelationalSpec { new MatrixTypeSerializer def readMetadata(fs: FS, path: String): JValue = { - if (!fs.isDir(path)) { - if (!fs.exists(path)) { - fatal(s"No file or directory found at $path") - } else { - fatal(s"MatrixTable and Table files are directories; path '$path' is not a directory") - } - } val metadataFile = path + "/metadata.json.gz" - val jv = using(fs.open(metadataFile))(in => parse(in)) + val jv = + try + using(fs.open(metadataFile))(in => parse(in)) + catch { + case exc: FileNotFoundException => + if (fs.isFile(path)) { + fatal(s"MatrixTable and Table files are directories; path '$path' is a file.") + } else { + if (fs.isDir(path)) { + fatal(s"MatrixTable is corrupted: $path/metadata.json.gz is missing.") + } else { + fatal(s"No file or directory found at $path.") + } + } + } val fileVersion = jv \ "file_version" match { case JInt(rep) => SemanticVersion(rep.toInt) diff --git a/hail/src/main/scala/is/hail/expr/ir/GenericLines.scala b/hail/src/main/scala/is/hail/expr/ir/GenericLines.scala index 2644fae0dbb..41c3911ec94 100644 --- a/hail/src/main/scala/is/hail/expr/ir/GenericLines.scala +++ b/hail/src/main/scala/is/hail/expr/ir/GenericLines.scala @@ -2,7 +2,9 @@ package is.hail.expr.ir import is.hail.backend.spark.SparkBackend import is.hail.io.compress.BGzipInputStream -import is.hail.io.fs.{BGZipCompressionCodec, FileListEntry, FS, Positioned, PositionedInputStream} +import is.hail.io.fs.{ + BGZipCompressionCodec, FileListEntry, FileStatus, FS, Positioned, PositionedInputStream, +} import is.hail.io.tabix.{TabixLineIterator, TabixReader} import is.hail.types.virtual.{TBoolean, TInt32, TInt64, TString, TStruct, Type} import is.hail.utils._ @@ -258,7 +260,7 @@ object GenericLines { def read( fs: FS, - fileListEntries0: IndexedSeq[FileListEntry], + fileStatuses0: IndexedSeq[_ <: FileStatus], nPartitions: Option[Int], blockSizeInMB: Option[Int], minPartitions: Option[Int], @@ -266,8 +268,8 @@ object GenericLines { allowSerialRead: Boolean, filePerPartition: Boolean = false, ): GenericLines = { - val fileListEntries = fileListEntries0.zipWithIndex.filter(_._1.getLen > 0) - val totalSize = fileListEntries.map(_._1.getLen).sum + val fileStatuses = fileStatuses0.zipWithIndex.filter(_._1.getLen > 0) + val totalSize = fileStatuses.map(_._1.getLen).sum var totalPartitions = nPartitions match { case Some(nPartitions) => nPartitions @@ -282,7 +284,7 @@ object GenericLines { case None => } - val contexts = fileListEntries.flatMap { case (fileListEntry, fileNum) => + val contexts = fileStatuses.flatMap { case (fileListEntry, fileNum) => val size = fileListEntry.getLen val codec = fs.getCodecFromPath(fileListEntry.getPath, gzAsBGZ) diff --git a/hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala b/hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala index 522f53ac912..a76d10421a4 100644 --- a/hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala +++ b/hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala @@ -8,7 +8,9 @@ import is.hail.expr.ir.lowering.{LowererUnsupportedOperation, TableStage, TableS import is.hail.expr.ir.streams.StreamProducer import is.hail.io.fs.{FileListEntry, FS} import is.hail.rvd.RVDPartitioner -import is.hail.types.{BaseTypeWithRequiredness, RStruct, TableType, VirtualTypeWithReq} +import is.hail.types.{ + BaseTypeWithRequiredness, RStruct, TableType, TypeWithRequiredness, VirtualTypeWithReq, +} import is.hail.types.physical._ import is.hail.types.physical.stypes.EmitType import is.hail.types.physical.stypes.concrete.{SJavaString, SStackStruct, SStackStructValue} diff --git a/hail/src/main/scala/is/hail/expr/ir/TableIR.scala b/hail/src/main/scala/is/hail/expr/ir/TableIR.scala index a99885485a6..426a8cd5b2f 100644 --- a/hail/src/main/scala/is/hail/expr/ir/TableIR.scala +++ b/hail/src/main/scala/is/hail/expr/ir/TableIR.scala @@ -42,7 +42,7 @@ object TableIR { def read(fs: FS, path: String, dropRows: Boolean = false, requestedType: Option[TableType] = None) : TableRead = { val successFile = path + "/_SUCCESS" - if (!fs.exists(path + "/_SUCCESS")) + if (!fs.isFile(path + "/_SUCCESS")) fatal(s"write failed: file not found: $successFile") val tr = TableNativeReader.read(fs, path, None) diff --git a/hail/src/main/scala/is/hail/expr/ir/analyses/SemanticHash.scala b/hail/src/main/scala/is/hail/expr/ir/analyses/SemanticHash.scala index cee67316316..5f7ae0402ce 100644 --- a/hail/src/main/scala/is/hail/expr/ir/analyses/SemanticHash.scala +++ b/hail/src/main/scala/is/hail/expr/ir/analyses/SemanticHash.scala @@ -396,7 +396,7 @@ case object SemanticHash extends Logging { case Some(etag) => etag.getBytes case None => - path.getBytes ++ Bytes.fromLong(fs.fileListEntry(path).getModificationTime) + path.getBytes ++ Bytes.fromLong(fs.fileStatus(path).getModificationTime) } def levelOrder(root: BaseIR): Iterator[(BaseIR, Int)] = { diff --git a/hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala b/hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala index 6640ad4a2c2..4e7339db03d 100644 --- a/hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala +++ b/hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala @@ -165,11 +165,11 @@ object LoadBgen { badFiles += file matches.flatMap { fileListEntry => - val file = fileListEntry.getPath.toString + val file = fileListEntry.getPath if (!file.endsWith(".bgen")) warn(s"input file does not have .bgen extension: $file") - if (fs.isDir(file)) + if (fileListEntry.isDirectory) fs.listDirectory(file) .filter(fileListEntry => ".*part-[0-9]+(-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?".r.matches( @@ -193,22 +193,25 @@ object LoadBgen { def getAllFilePaths(fs: FS, files: Array[String]): Array[String] = getAllFileListEntries(fs, files).map(_.getPath.toString) - def getBgenFileMetadata(ctx: ExecuteContext, files: Array[String], indexFiles: Array[String]) - : Array[BgenFileMetadata] = { + def getBgenFileMetadata( + ctx: ExecuteContext, + files: Array[FileListEntry], + indexFilePaths: Array[String], + ): Array[BgenFileMetadata] = { val fs = ctx.fs - require(files.length == indexFiles.length) - val headers = getFileHeaders(fs, files) + require(files.length == indexFilePaths.length) + val headers = getFileHeaders(fs, files.map(_.getPath)) val cacheByRG: mutable.Map[Option[String], (String, Array[Long]) => Array[AnyRef]] = mutable.Map.empty - headers.zip(indexFiles).map { case (h, indexFile) => - val (keyType, annotationType) = IndexReader.readTypes(fs, indexFile) + headers.zip(indexFilePaths).map { case (h, indexFilePath) => + val (keyType, annotationType) = IndexReader.readTypes(fs, indexFilePath) val rg = keyType.asInstanceOf[TStruct].field("locus").typ match { case TLocus(rg) => Some(rg) case _ => None } - val metadata = IndexReader.readMetadata(fs, indexFile, keyType, annotationType) + val metadata = IndexReader.readMetadata(fs, indexFilePath, keyType, annotationType) val indexVersion = SemanticVersion(metadata.fileVersion) val (leafSpec, internalSpec) = BgenSettings.indexCodecSpecs(indexVersion, rg) @@ -226,12 +229,12 @@ object LoadBgen { val nVariants = metadata.nKeys val rangeBounds = if (nVariants > 0) { - val Array(start, end) = getKeys(indexFile, Array[Long](0L, nVariants - 1)) + val Array(start, end) = getKeys(indexFilePath, Array[Long](0L, nVariants - 1)) Interval(start, end, includesStart = true, includesEnd = true) } else null BgenFileMetadata( - indexFile, + indexFilePath, indexVersion, h, rg, @@ -245,9 +248,9 @@ object LoadBgen { } } - def getIndexFileNames(fs: FS, files: Array[String], indexFileMap: Map[String, String]) + def getIndexFileNames(fs: FS, files: Array[FileListEntry], indexFileMap: Map[String, String]) : Array[String] = { - def absolutePath(rel: String): String = fs.fileListEntry(rel).getPath.toString + def absolutePath(rel: String): String = fs.fileStatus(rel).getPath val fileMapping = Option(indexFileMap) .getOrElse(Map.empty[String, String]) @@ -260,19 +263,23 @@ object LoadBgen { | ${badExtensions.mkString("\n ")})""".stripMargin ) - files.map(absolutePath).map(f => fileMapping.getOrElse(f, f + ".idx2")) + files.map(f => fileMapping.getOrElse(f.getPath, f.getPath + ".idx2")) } - def getIndexFiles(fs: FS, files: Array[String], indexFileMap: Map[String, String]) + def getIndexFiles(fs: FS, files: Array[FileListEntry], indexFileMap: Map[String, String]) : Array[String] = { val indexFiles = getIndexFileNames(fs, files, indexFileMap) - val missingIdxFiles = files.zip(indexFiles).filterNot { case (f, index) => - fs.exists(index) && index.endsWith("idx2") - }.map(_._1) - if (missingIdxFiles.nonEmpty) + + val bgenFilesWhichAreMisssingIdx2Files = files.zip(indexFiles).filterNot { + case (f, index) => index.endsWith("idx2") && fs.isFile(index + "/index") && fs.isFile( + index + "/metadata.json.gz" + ) + }.map(_._1.getPath) + + if (bgenFilesWhichAreMisssingIdx2Files.nonEmpty) fatal( s"""The following BGEN files have no .idx2 index file. Use 'index_bgen' to create the index file once before calling 'import_bgen': - | ${missingIdxFiles.mkString("\n ")})""".stripMargin + | ${bgenFilesWhichAreMisssingIdx2Files.mkString("\n ")}""".stripMargin ) indexFiles } @@ -376,9 +383,9 @@ object MatrixBGENReader { def apply(ctx: ExecuteContext, params: MatrixBGENReaderParameters): MatrixBGENReader = { val fs = ctx.fs - val allFiles = LoadBgen.getAllFilePaths(fs, params.files.toArray) - val indexFiles = LoadBgen.getIndexFiles(fs, allFiles, params.indexFileMap) - val fileMetadata = LoadBgen.getBgenFileMetadata(ctx, allFiles, indexFiles) + val allFiles = LoadBgen.getAllFileListEntries(fs, params.files.toArray) + val indexFilePaths = LoadBgen.getIndexFiles(fs, allFiles, params.indexFileMap) + val fileMetadata = LoadBgen.getBgenFileMetadata(ctx, allFiles, indexFilePaths) assert(fileMetadata.nonEmpty) if (fileMetadata.exists(md => md.indexVersion != fileMetadata.head.indexVersion)) { fatal( diff --git a/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala b/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala index 60235c554ac..68388b112b3 100644 --- a/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala @@ -27,6 +27,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileNotFoundExcepti import java.net.URI import java.nio.file.Paths import java.time.Duration +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -98,7 +99,7 @@ object AzureStorageFS { val schemes: Array[String] = Array("hail-az", "https") def parseUrl(filename: String): AzureStorageFSURL = { - val scheme = new URI(filename).getScheme + val scheme = filename.split(":")(0) if (scheme == "hail-az") { parseHailAzUrl(filename) } else if (scheme == "https") { @@ -153,32 +154,23 @@ object AzureStorageFS { } object AzureStorageFileListEntry { - def apply(path: String, isDir: Boolean, blobProperties: BlobProperties) - : BlobStorageFileListEntry = - if (isDir) { - new BlobStorageFileListEntry(path, null, 0, true) - } else { - new BlobStorageFileListEntry( - path, - blobProperties.getLastModified.toEpochSecond, - blobProperties.getBlobSize, - false, - ) - } - - def apply(blobPath: String, blobItem: BlobItem): BlobStorageFileListEntry = { + def apply(rootUrl: AzureStorageFSURL, blobItem: BlobItem): BlobStorageFileListEntry = { + val url = rootUrl.withPath(blobItem.getName) if (blobItem.isPrefix) { - new BlobStorageFileListEntry(blobPath, null, 0, true) + dir(url) } else { val properties = blobItem.getProperties new BlobStorageFileListEntry( - blobPath, + url.toString, properties.getLastModified.toEpochSecond, properties.getContentLength, false, ) } } + + def dir(url: AzureStorageFSURL): BlobStorageFileListEntry = + new BlobStorageFileListEntry(url.toString, null, 0, true) } class AzureBlobServiceClientCache( @@ -221,8 +213,6 @@ class AzureBlobServiceClientCache( class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS { type URL = AzureStorageFSURL - import AzureStorageFS.log - override def parseUrl(filename: String): URL = AzureStorageFS.parseUrl(filename) override def validUrl(filename: String): Boolean = @@ -422,10 +412,7 @@ class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS { // collect all children of this directory (blobs and subdirectories) val prefixMatches = blobContainerClient.listBlobsByHierarchy(prefix) - prefixMatches.forEach { blobItem => - val blobPath = dropTrailingSlash(url.withPath(blobItem.getName).toString()) - statList += AzureStorageFileListEntry(blobPath, blobItem) - } + prefixMatches.forEach(blobItem => statList += AzureStorageFileListEntry(url, blobItem)) statList.toArray } @@ -434,35 +421,39 @@ class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS { globWithPrefix(prefix = url.withPath(""), path = dropTrailingSlash(url.path)) } - override def fileListEntry(url: URL): FileListEntry = retryTransientErrors { + override def fileStatus(url: AzureStorageFSURL): FileStatus = retryTransientErrors { if (url.path == "") { - return new BlobStorageFileListEntry(url.toString, null, 0, true) + return AzureStorageFileListEntry.dir(url) } - val blobClient: BlobClient = getBlobClient(url) - val blobContainerClient: BlobContainerClient = getContainerClient(url) - - val prefix = dropTrailingSlash(url.path) + "/" - val options: ListBlobsOptions = new ListBlobsOptions().setPrefix(prefix).setMaxResultsPerPage(1) - val prefixMatches = blobContainerClient.listBlobs(options, timeout) - val isDir = prefixMatches.iterator().hasNext - - val filename = dropTrailingSlash(url.toString) - - val blobProperties = if (!isDir) { + val blobClient = getBlobClient(url) + val blobProperties = try blobClient.getProperties catch { - case e: BlobStorageException => - if (e.getStatusCode == 404) - throw new FileNotFoundException(s"File not found: $filename") - else - throw e + case e: BlobStorageException if e.getStatusCode == 404 => + throw new FileNotFoundException(url.toString) } - } else - null - AzureStorageFileListEntry(filename, isDir, blobProperties) + new BlobStorageFileStatus( + url.toString, + blobProperties.getLastModified.toEpochSecond, + blobProperties.getBlobSize, + ) + } + + override def fileListEntry(url: URL): FileListEntry = { + if (url.getPath == "") + return AzureStorageFileListEntry.dir(url) + + val it = { + val containerClient = getContainerClient(url) + val options = new ListBlobsOptions().setPrefix(dropTrailingSlash(url.getPath)) + val prefixMatches = containerClient.listBlobsByHierarchy("/", options, timeout) + prefixMatches.iterator() + }.asScala.map(AzureStorageFileListEntry.apply(url, _)) + + fileListEntryFromIterator(url, it) } override def eTag(url: URL): Some[String] = @@ -471,7 +462,7 @@ class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS { } def makeQualified(filename: String): String = { - AzureStorageFS.parseUrl(filename) + parseUrl(filename) filename } } diff --git a/hail/src/main/scala/is/hail/io/fs/FS.scala b/hail/src/main/scala/is/hail/io/fs/FS.scala index 1408ebc5450..510bffdbe64 100644 --- a/hail/src/main/scala/is/hail/io/fs/FS.scala +++ b/hail/src/main/scala/is/hail/io/fs/FS.scala @@ -18,6 +18,7 @@ import scala.io.Source import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream import org.apache.commons.io.IOUtils import org.apache.hadoop +import org.apache.log4j.Logger trait Positioned { def getPosition: Long @@ -62,31 +63,56 @@ trait FSURL { def getPath: String } -trait FileListEntry { +trait FileStatus { def getPath: String + def getActualUrl: String def getModificationTime: java.lang.Long def getLen: Long - def isDirectory: Boolean def isSymlink: Boolean - def isFile: Boolean def getOwner: String + def isFileOrFileAndDirectory: Boolean = true } -class BlobStorageFileListEntry( - path: String, +trait FileListEntry extends FileStatus { + def isFile: Boolean + def isDirectory: Boolean + override def isFileOrFileAndDirectory: Boolean = isFile +} + +class BlobStorageFileStatus( + actualUrl: String, modificationTime: java.lang.Long, size: Long, - isDir: Boolean, -) extends FileListEntry { - def getPath: String = path +) extends FileStatus { + // NB: it is called getPath but it *must* return the URL *with* the scheme. + def getPath: String = + dropTrailingSlash( + actualUrl + ) // getPath is a backwards compatible method: in the past, Hail dropped trailing slashes + def getActualUrl: String = actualUrl def getModificationTime: java.lang.Long = modificationTime def getLen: Long = size - def isDirectory: Boolean = isDir - def isFile: Boolean = !isDir def isSymlink: Boolean = false def getOwner: String = null } +class BlobStorageFileListEntry( + actualUrl: String, + modificationTime: java.lang.Long, + size: Long, + isDir: Boolean, +) extends BlobStorageFileStatus( + actualUrl, + modificationTime, + size, + ) with FileListEntry { + def isDirectory: Boolean = isDir + def isFile: Boolean = !isDir + override def isFileOrFileAndDirectory = isFile + override def toString: String = s"BSFLE($actualUrl $modificationTime $size $isDir)" + +} + trait CompressionCodec { def makeInputStream(is: InputStream): InputStream @@ -106,6 +132,8 @@ object BGZipCompressionCodec extends CompressionCodec { def makeOutputStream(os: OutputStream): OutputStream = new BGzipOutputStream(os) } +class FileAndDirectoryException(message: String) extends RuntimeException(message) + object FSUtil { def dropTrailingSlash(path: String): String = { if (path.isEmpty) @@ -265,11 +293,15 @@ object FS { new HadoopFS(new SerializableHadoopConfiguration(new hadoop.conf.Configuration())), )) } + + private val log = Logger.getLogger(getClass.getName()) } trait FS extends Serializable { type URL <: FSURL + import FS.log + def parseUrl(filename: String): URL def validUrl(filename: String): Boolean @@ -378,7 +410,7 @@ trait FS extends Serializable { def glob(url: URL): Array[FileListEntry] - def globWithPrefix(prefix: URL, path: String) = { + def globWithPrefix(prefix: URL, path: String): Array[FileListEntry] = { val components = if (path == "") Array.empty[String] @@ -430,6 +462,84 @@ trait FS extends Serializable { /** Return the file's HTTP etag, if the underlying file system supports etags. */ def eTag(url: URL): Option[String] + final def fileStatus(filename: String): FileStatus = fileStatus(parseUrl(filename)) + + def fileStatus(url: URL): FileStatus + + protected def fileListEntryFromIterator( + url: URL, + it: Iterator[FileListEntry], + ): FileListEntry = { + val urlStr = url.toString + val noSlash = dropTrailingSlash(urlStr) + val withSlash = noSlash + "/" + + var continue = it.hasNext + var fileFle: FileListEntry = null + var trailingSlashFle: FileListEntry = null + var dirFle: FileListEntry = null + while (continue) { + val fle = it.next() + + if (fle.isFile) { + if (fle.getActualUrl == noSlash) { + fileFle = fle + } else if (fle.getActualUrl == withSlash) { + // This is a *blob* whose name has a trailing slash e.g. "gs://bucket/object/". Users + // really ought to avoid creating these. + trailingSlashFle = fle + } + } else if (fle.isDirectory && dropTrailingSlash(fle.getActualUrl) == noSlash) { + // In Google, "directory" entries always have a trailing slash. + // + // In Azure, "directory" entries never have a trailing slash. + dirFle = fle + } + + continue = + it.hasNext && (fle.getActualUrl <= withSlash) // cloud storage APIs return blobs in alphabetical order, so we need not keep searching after withSlash + } + + if (fileFle != null) { + if (dirFle != null) { + if (trailingSlashFle != null) { + throw new FileAndDirectoryException( + s"${url.toString} appears twice as a file (once with and once without a trailing slash) and once as a directory." + ) + } else { + throw new FileAndDirectoryException( + s"${url.toString} appears as both file ${fileFle.getActualUrl} and directory ${dirFle.getActualUrl}." + ) + } + } else { + if (trailingSlashFle != null) { + log.warn( + s"Two blobs exist matching ${url.toString}: once with and once without a trailing slash. We will return the one without a trailing slash." + ) + } + fileFle + } + } else { + if (dirFle != null) { + if (trailingSlashFle != null) { + log.warn( + s"A blob with a literal trailing slash exists as well as blobs with that prefix. We will treat this as a directory. ${url.toString}" + ) + } + dirFle + } else { + if (trailingSlashFle != null) { + throw new FileNotFoundException( + s"A blob with a literal trailing slash exists. These are sometimes uses to indicate empty directories. " + + s"Hail does not support this behavior. This folder is treated as if it does not exist. ${url.toString}" + ) + } else { + throw new FileNotFoundException(url.toString) + } + } + } + } + final def fileListEntry(filename: String): FileListEntry = fileListEntry(parseUrl(filename)) def fileListEntry(url: URL): FileListEntry @@ -492,13 +602,13 @@ trait FS extends Serializable { final def getFileSize(filename: String): Long = getFileSize(parseUrl(filename)) - def getFileSize(url: URL): Long = fileListEntry(url).getLen + def getFileSize(url: URL): Long = fileStatus(url).getLen final def isFile(filename: String): Boolean = isFile(parseUrl(filename)) final def isFile(url: URL): Boolean = try - fileListEntry(url).isFile + fileStatus(url).isFileOrFileAndDirectory catch { case _: FileNotFoundException => false } @@ -601,21 +711,22 @@ trait FS extends Serializable { else if (!header && headerFileListEntry.nonEmpty) fatal(s"Found unexpected header file") - val partFileListEntries = partFilesOpt match { + val partFileStatuses: Array[_ <: FileStatus] = partFilesOpt match { case None => glob(sourceFolder + "/part-*") - case Some(files) => files.map(f => fileListEntry(sourceFolder + "/" + f)).toArray + case Some(files) => files.map(f => fileStatus(sourceFolder + "/" + f)).toArray + } + + val sortedPartFileStatuses = partFileStatuses.sortBy { fileStatus => + getPartNumber(fileStatus.getPath) } - val sortedPartFileListEntries = - partFileListEntries.sortBy(fs => getPartNumber(new hadoop.fs.Path(fs.getPath).getName)) - if (sortedPartFileListEntries.length != numPartFilesExpected) - fatal( - s"Expected $numPartFilesExpected part files but found ${sortedPartFileListEntries.length}" - ) - val filesToMerge = headerFileListEntry ++ sortedPartFileListEntries + if (sortedPartFileStatuses.length != numPartFilesExpected) + fatal(s"Expected $numPartFilesExpected part files but found ${sortedPartFileStatuses.length}") + + val filesToMerge: Array[FileStatus] = headerFileListEntry ++ sortedPartFileStatuses info(s"merging ${filesToMerge.length} files totalling " + - s"${readableBytes(sortedPartFileListEntries.map(_.getLen).sum)}...") + s"${readableBytes(filesToMerge.map(_.getLen).sum)}...") val (_, dt) = time { copyMergeList(filesToMerge, destinationFile, deleteSource) @@ -631,22 +742,22 @@ trait FS extends Serializable { } def copyMergeList( - srcFileListEntries: Array[FileListEntry], + srcFileStatuses: Array[_ <: FileStatus], destFilename: String, deleteSource: Boolean = true, ) { val codec = Option(getCodecFromPath(destFilename)) val isBGzip = codec.exists(_ == BGZipCompressionCodec) - require(srcFileListEntries.forall { - fileListEntry => fileListEntry.getPath != destFilename && fileListEntry.isFile + require(srcFileStatuses.forall { + fileStatus => fileStatus.getPath != destFilename && fileStatus.isFileOrFileAndDirectory }) using(createNoCompression(destFilename)) { os => var i = 0 - while (i < srcFileListEntries.length) { - val fileListEntry = srcFileListEntries(i) - val lenAdjust: Long = if (isBGzip && i < srcFileListEntries.length - 1) + while (i < srcFileStatuses.length) { + val fileListEntry = srcFileStatuses(i) + val lenAdjust: Long = if (isBGzip && i < srcFileStatuses.length - 1) -28 else 0 @@ -658,19 +769,17 @@ trait FS extends Serializable { } if (deleteSource) { - srcFileListEntries.foreach { fileListEntry => - delete(fileListEntry.getPath.toString, recursive = true) - } + srcFileStatuses.foreach(fileStatus => delete(fileStatus.getPath, recursive = true)) } } def concatenateFiles(sourceNames: Array[String], destFilename: String): Unit = { - val fileListEntries = sourceNames.map(fileListEntry(_)) + val fileStatuses = sourceNames.map(fileStatus(_)) - info(s"merging ${fileListEntries.length} files totalling " + - s"${readableBytes(fileListEntries.map(_.getLen).sum)}...") + info(s"merging ${fileStatuses.length} files totalling " + + s"${readableBytes(fileStatuses.map(_.getLen).sum)}...") - val (_, timing) = time(copyMergeList(fileListEntries, destFilename, deleteSource = false)) + val (_, timing) = time(copyMergeList(fileStatuses, destFilename, deleteSource = false)) info(s"while writing:\n $destFilename\n merge time: ${formatTime(timing)}") } diff --git a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala index 11290347eae..b86bd1d7e1c 100644 --- a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala @@ -8,9 +8,7 @@ import java.io.{ByteArrayInputStream, FileNotFoundException, IOException} import java.net.URI import java.nio.ByteBuffer import java.nio.file.Paths -import scala.jdk.CollectionConverters.{ - asJavaIterableConverter, asScalaIteratorConverter, iterableAsScalaIterableConverter, -} +import scala.jdk.CollectionConverters._ import com.google.api.client.googleapis.json.GoogleJsonResponseException import com.google.auth.oauth2.ServiceAccountCredentials @@ -46,7 +44,7 @@ object GoogleStorageFS { private[this] val GCS_URI_REGEX = "^gs:\\/\\/([a-z0-9_\\-\\.]+)(\\/.*)?".r def parseUrl(filename: String): GoogleStorageFSURL = { - val scheme = new URI(filename).getScheme + val scheme = filename.split(":")(0) if (scheme == null || scheme != "gs") { throw new IllegalArgumentException(s"Invalid scheme, expected gs: $scheme") } @@ -68,10 +66,8 @@ object GoogleStorageFileListEntry { def apply(blob: Blob): BlobStorageFileListEntry = { val isDir = blob.isDirectory - val name = dropTrailingSlash(blob.getName) - new BlobStorageFileListEntry( - s"gs://${blob.getBucket}/$name", + s"gs://${blob.getBucket}/${blob.getName}", if (isDir) null else @@ -80,6 +76,9 @@ object GoogleStorageFileListEntry { isDir, ) } + + def dir(url: GoogleStorageFSURL): BlobStorageFileListEntry = + return new BlobStorageFileListEntry(url.toString, null, 0, true) } object RequesterPaysConfiguration { @@ -467,35 +466,51 @@ class GoogleStorageFS( .toArray } - override def fileListEntry(url: URL): FileListEntry = retryTransientErrors { - val path = dropTrailingSlash(url.path) + private[this] def getBlob(url: URL) = retryTransientErrors { + handleRequesterPays( + (options: Seq[BlobGetOption]) => + storage.get(url.bucket, url.path, options: _*), + BlobGetOption.userProject _, + url.bucket, + ) + } + override def fileStatus(url: URL): FileStatus = retryTransientErrors { if (url.path == "") - return new BlobStorageFileListEntry(s"gs://${url.bucket}", null, 0, true) + return GoogleStorageFileListEntry.dir(url) - val blobs = retryTransientErrors { + val blob = getBlob(url) + + if (blob == null) { + throw new FileNotFoundException(url.toString) + } + + new BlobStorageFileStatus( + url.toString, + blob.getUpdateTimeOffsetDateTime.toInstant().toEpochMilli(), + blob.getSize, + ) + } + + override def fileListEntry(url: URL): FileListEntry = { + if (url.getPath == "") { + return GoogleStorageFileListEntry.dir(url) + } + + val prefix = dropTrailingSlash(url.path) + val it = retryTransientErrors { handleRequesterPays( (options: Seq[BlobListOption]) => storage.list( url.bucket, - (BlobListOption.prefix(path) +: BlobListOption.currentDirectory() +: options): _* + (BlobListOption.prefix(prefix) +: BlobListOption.currentDirectory() +: options): _* ), - BlobListOption.userProject, + BlobListOption.userProject _, url.bucket, ) - } - - val it = blobs.getValues.iterator.asScala - while (it.hasNext) { - val b = it.next() - var name = b.getName - while (name.endsWith("/")) - name = name.dropRight(1) - if (name == path) - return GoogleStorageFileListEntry(b) - } + }.iterateAll().asScala.map(GoogleStorageFileListEntry.apply(_)).iterator - throw new FileNotFoundException(url.toString()) + fileListEntryFromIterator(url, it) } override def eTag(url: URL): Some[String] = { diff --git a/hail/src/main/scala/is/hail/io/fs/HadoopFS.scala b/hail/src/main/scala/is/hail/io/fs/HadoopFS.scala index 8793491a739..3d48dd1d4a4 100644 --- a/hail/src/main/scala/is/hail/io/fs/HadoopFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/HadoopFS.scala @@ -16,6 +16,8 @@ class HadoopFileListEntry(fs: hadoop.fs.FileStatus) extends FileListEntry { def getPath: String = fs.getPath.toString + def getActualUrl: String = fs.getPath.toString + def getModificationTime: java.lang.Long = fs.getModificationTime def getLen: Long = fs.getLen @@ -71,14 +73,15 @@ object HadoopFS { } } -case class HadoopFSURL(val path: String, conf: SerializableHadoopConfiguration) extends FSURL { - val hadoopPath = new hadoop.fs.Path(path) - val hadoopFs = hadoopPath.getFileSystem(conf.value) +case class HadoopFSURL(path: String, conf: SerializableHadoopConfiguration) extends FSURL { + private[this] val unqualifiedHadoopPath = new hadoop.fs.Path(path) + val hadoopFs = unqualifiedHadoopPath.getFileSystem(conf.value) + val hadoopPath = hadoopFs.makeQualified(unqualifiedHadoopPath) - def addPathComponent(c: String): HadoopFSURL = HadoopFSURL(s"$path/$c", conf) - def getPath: String = path + def addPathComponent(c: String): HadoopFSURL = HadoopFSURL(s"${hadoopPath.toString}/$c", conf) + def getPath: String = hadoopPath.toString def fromString(s: String): HadoopFSURL = HadoopFSURL(s, conf) - override def toString(): String = path + override def toString(): String = hadoopPath.toString } class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends FS { @@ -151,10 +154,10 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends override def globAll(filenames: Iterable[String]): Array[FileListEntry] = { filenames.flatMap { filename => - val statuses = glob(filename) - if (statuses.isEmpty) + val fles = glob(filename) + if (fles.isEmpty) warn(s"'$filename' refers to no files") - statuses + fles }.toArray } @@ -168,6 +171,14 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends files.map(fileListEntry => new HadoopFileListEntry(fileListEntry)) } + override def fileStatus(url: URL): FileStatus = { + val fle = fileListEntry(url) + if (fle.isDirectory) { + throw new FileNotFoundException(url.getPath) + } + fle + } + def fileListEntry(url: URL): FileListEntry = new HadoopFileListEntry(url.hadoopFs.getFileStatus(url.hadoopPath)) diff --git a/hail/src/main/scala/is/hail/io/fs/RouterFS.scala b/hail/src/main/scala/is/hail/io/fs/RouterFS.scala index 95a73da959c..7d8e9df3c52 100644 --- a/hail/src/main/scala/is/hail/io/fs/RouterFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/RouterFS.scala @@ -56,6 +56,8 @@ class RouterFS(fss: IndexedSeq[FS]) extends FS { def glob(url: URL): Array[FileListEntry] = url.fs.glob(url.url) + def fileStatus(url: URL): FileStatus = url.fs.fileStatus(url.url) + def fileListEntry(url: URL): FileListEntry = url.fs.fileListEntry(url.url) override def eTag(url: URL): Option[String] = url.fs.eTag(url.url) diff --git a/hail/src/main/scala/is/hail/io/reference/FASTAReader.scala b/hail/src/main/scala/is/hail/io/reference/FASTAReader.scala index 227bdb4ff8c..3d701d7ea32 100644 --- a/hail/src/main/scala/is/hail/io/reference/FASTAReader.scala +++ b/hail/src/main/scala/is/hail/io/reference/FASTAReader.scala @@ -61,9 +61,9 @@ object FASTAReader { fs.copyRecode(indexFile, localIndexFile) } - if (!fs.exists(localFastaFile)) + if (!fs.isFile(localFastaFile)) fatal(s"Error while copying FASTA file to local file system. Did not find '$localFastaFile'.") - if (!fs.exists(localIndexFile)) + if (!fs.isFile(localIndexFile)) fatal( s"Error while copying FASTA index file to local file system. Did not find '$localIndexFile'." ) diff --git a/hail/src/main/scala/is/hail/linalg/BlockMatrix.scala b/hail/src/main/scala/is/hail/linalg/BlockMatrix.scala index 198d666d5f1..cb3d32e0ec4 100644 --- a/hail/src/main/scala/is/hail/linalg/BlockMatrix.scala +++ b/hail/src/main/scala/is/hail/linalg/BlockMatrix.scala @@ -197,7 +197,7 @@ object BlockMatrix { val metadataRelativePath = "/metadata.json" def checkWriteSuccess(fs: FS, uri: String) { - if (!fs.exists(uri + "/_SUCCESS")) + if (!fs.isFile(uri + "/_SUCCESS")) fatal( s"Error reading block matrix. Earlier write failed: no success indicator found at uri $uri" ) diff --git a/hail/src/main/scala/is/hail/methods/MatrixExportEntriesByCol.scala b/hail/src/main/scala/is/hail/methods/MatrixExportEntriesByCol.scala index b5df80bec40..cf3cdcea9d5 100644 --- a/hail/src/main/scala/is/hail/methods/MatrixExportEntriesByCol.scala +++ b/hail/src/main/scala/is/hail/methods/MatrixExportEntriesByCol.scala @@ -164,9 +164,9 @@ case class MatrixExportEntriesByCol( val newFiles = mv.sparkContext.parallelize(0 until ns, numSlices = ns) .map { sampleIdx => val partFilePath = path + "/" + partFile(digitsNeeded(nCols), sampleIdx, TaskContext.get) - val fileListEntries = - partFolders.map(pf => fsBc.value.fileListEntry(pf + s"/$sampleIdx" + extension)) - fsBc.value.copyMergeList(fileListEntries, partFilePath, deleteSource = false) + val fileStatuses = + partFolders.map(pf => fsBc.value.fileStatus(pf + s"/$sampleIdx" + extension)) + fsBc.value.copyMergeList(fileStatuses, partFilePath, deleteSource = false) partFilePath }.collect() diff --git a/hail/src/main/scala/is/hail/utils/Py4jUtils.scala b/hail/src/main/scala/is/hail/utils/Py4jUtils.scala index d3314d03b78..baf8fc092bc 100644 --- a/hail/src/main/scala/is/hail/utils/Py4jUtils.scala +++ b/hail/src/main/scala/is/hail/utils/Py4jUtils.scala @@ -2,7 +2,7 @@ package is.hail.utils import is.hail.HailContext import is.hail.expr.JSONAnnotationImpex -import is.hail.io.fs.{FileListEntry, FS, SeekableDataInputStream} +import is.hail.io.fs.{FileListEntry, FileStatus, FS, SeekableDataInputStream} import is.hail.types.virtual.Type import org.json4s.JsonAST._ @@ -56,16 +56,20 @@ trait Py4jUtils { JsonMethods.compact(JArray(statuses.map(fs => fileListEntryToJson(fs)).toList)) } + def fileStatus(fs: FS, path: String): String = { + val stat = fs.fileStatus(path) + JsonMethods.compact(fileStatusToJson(stat)) + } + def fileListEntry(fs: FS, path: String): String = { val stat = fs.fileListEntry(path) JsonMethods.compact(fileListEntryToJson(stat)) } - private def fileListEntryToJson(fs: FileListEntry): JObject = { + private def fileStatusToJson(fs: FileStatus): JObject = { JObject( "path" -> JString(fs.getPath.toString), "size" -> JInt(fs.getLen), - "is_dir" -> JBool(fs.isDirectory), "is_link" -> JBool(fs.isSymlink), "modification_time" -> (if (fs.getModificationTime != null) @@ -85,6 +89,9 @@ trait Py4jUtils { ) } + private def fileListEntryToJson(fs: FileListEntry): JObject = + JObject(fileStatusToJson(fs).obj :+ ("is_dir" -> JBool(fs.isDirectory))) + private val kilo: Long = 1024 private val mega: Long = kilo * 1024 private val giga: Long = mega * 1024 diff --git a/hail/src/main/scala/is/hail/utils/richUtils/RichRDD.scala b/hail/src/main/scala/is/hail/utils/richUtils/RichRDD.scala index 62b44a384cc..f2849a5a442 100644 --- a/hail/src/main/scala/is/hail/utils/richUtils/RichRDD.scala +++ b/hail/src/main/scala/is/hail/utils/richUtils/RichRDD.scala @@ -122,7 +122,7 @@ class RichRDD[T](val r: RDD[T]) extends AnyVal { } } - if (!fs.exists(parallelOutputPath + "/_SUCCESS")) + if (!fs.isFile(parallelOutputPath + "/_SUCCESS")) fatal("write failed: no success indicator found") if (exportType == ExportType.CONCATENATED) { diff --git a/hail/src/main/scala/is/hail/variant/ReferenceGenome.scala b/hail/src/main/scala/is/hail/variant/ReferenceGenome.scala index 5daf95f9e6e..1d04a6137d6 100644 --- a/hail/src/main/scala/is/hail/variant/ReferenceGenome.scala +++ b/hail/src/main/scala/is/hail/variant/ReferenceGenome.scala @@ -19,7 +19,7 @@ import is.hail.utils._ import org.json4s._ import org.json4s.jackson.{JsonMethods, Serialization} -import java.io.InputStream +import java.io.{FileNotFoundException, InputStream} import java.lang.ThreadLocal import scala.collection.JavaConverters._ import scala.collection.mutable @@ -376,10 +376,12 @@ case class ReferenceGenome( val tmpdir = ctx.localTmpdir val fs = ctx.fs - if (!fs.exists(fastaFile)) - fatal(s"FASTA file '$fastaFile' does not exist or you do not have access.") - if (!fs.exists(indexFile)) - fatal(s"FASTA index file '$indexFile' does not exist or you do not have access.") + if (!fs.isFile(fastaFile)) + fatal(s"FASTA file '$fastaFile' does not exist, is not a file, or you do not have access.") + if (!fs.isFile(indexFile)) + fatal( + s"FASTA index file '$indexFile' does not exist, is not a file, or you do not have access." + ) fastaFilePath = fastaFile fastaIndexPath = indexFile @@ -458,10 +460,10 @@ case class ReferenceGenome( val tmpdir = ctx.localTmpdir val fs = ctx.fs - if (!fs.exists(chainFile)) - fatal(s"Chain file '$chainFile' does not exist.") + if (!fs.isFile(chainFile)) + fatal(s"Chain file '$chainFile' does not exist, is not a file, or you do not have access.") - val chainFilePath = fs.fileListEntry(chainFile).getPath + val chainFilePath = fs.parseUrl(chainFile).toString val lo = LiftOver(fs, chainFilePath) val destRG = ctx.getReference(destRGName) lo.checkChainFile(this, destRG) @@ -502,7 +504,7 @@ case class ReferenceGenome( // since removeLiftover updates both maps, so we don't check to see if liftoverMap has // keys that are not in chainFiles for ((destRGName, chainFile) <- chainFiles) { - val chainFilePath = fs.fileListEntry(chainFile).getPath + val chainFilePath = fs.parseUrl(chainFile).toString liftoverMap.get(destRGName) match { case Some(lo) if lo.chainFile == chainFilePath => // do nothing case _ => liftoverMap += destRGName -> LiftOver(fs, chainFilePath) @@ -511,8 +513,8 @@ case class ReferenceGenome( // add sequence if (fastaFilePath != null) { - val fastaPath = fs.fileListEntry(fastaFilePath).getPath - val indexPath = fs.fileListEntry(fastaIndexPath).getPath + val fastaPath = fs.parseUrl(fastaFilePath).toString + val indexPath = fs.parseUrl(fastaIndexPath).toString if ( fastaReaderCfg == null || fastaReaderCfg.fastaFile != fastaPath || fastaReaderCfg.indexFile != indexPath ) { @@ -644,10 +646,12 @@ object ReferenceGenome { val tmpdir = ctx.localTmpdir val fs = ctx.fs - if (!fs.exists(fastaFile)) - fatal(s"FASTA file '$fastaFile' does not exist.") - if (!fs.exists(indexFile)) - fatal(s"FASTA index file '$indexFile' does not exist.") + if (!fs.isFile(fastaFile)) + fatal(s"FASTA file '$fastaFile' does not exist, is not a file, or you do not have access.") + if (!fs.isFile(indexFile)) + fatal( + s"FASTA index file '$indexFile' does not exist, is not a file, or you do not have access." + ) val index = using(fs.open(indexFile))(new FastaSequenceIndex(_)) @@ -673,23 +677,28 @@ object ReferenceGenome { } def readReferences(fs: FS, path: String): Array[ReferenceGenome] = { - if (fs.exists(path)) { - val refs = fs.listDirectory(path) - val rgs = mutable.Set[ReferenceGenome]() - refs.foreach { fileSystem => - val rgPath = fileSystem.getPath.toString - val rg = using(fs.open(rgPath))(read) - val name = rg.name - if (!rgs.contains(rg) && !hailReferences.contains(name)) - rgs += rg + val refs = + try + fs.listDirectory(path) + catch { + case exc: FileNotFoundException => + return Array() } - rgs.toArray - } else Array() + + val rgs = mutable.Set[ReferenceGenome]() + refs.foreach { fileSystem => + val rgPath = fileSystem.getPath.toString + val rg = using(fs.open(rgPath))(read) + val name = rg.name + if (!rgs.contains(rg) && !hailReferences.contains(name)) + rgs += rg + } + rgs.toArray } def writeReference(fs: FS, path: String, rg: ReferenceGenome) { val rgPath = path + "/" + rg.name + ".json.gz" - if (!hailReferences.contains(rg.name) && !fs.exists(rgPath)) + if (!hailReferences.contains(rg.name) && !fs.isFile(rgPath)) rg.asInstanceOf[ReferenceGenome].write(fs, rgPath) } diff --git a/hail/src/test/scala/is/hail/expr/ir/analyses/SemanticHashSuite.scala b/hail/src/test/scala/is/hail/expr/ir/analyses/SemanticHashSuite.scala index d61573c418a..281ab77e26a 100644 --- a/hail/src/test/scala/is/hail/expr/ir/analyses/SemanticHashSuite.scala +++ b/hail/src/test/scala/is/hail/expr/ir/analyses/SemanticHashSuite.scala @@ -345,6 +345,7 @@ class SemanticHashSuite extends HailSuite { override def glob(url: FakeURL): Array[FileListEntry] = Array(new FileListEntry { override def getPath: String = url.getPath + override def getActualUrl(): String = url.getPath override def getModificationTime: lang.Long = ??? override def getLen: Long = ??? override def isDirectory: Boolean = ??? diff --git a/hail/src/test/scala/is/hail/io/IndexSuite.scala b/hail/src/test/scala/is/hail/io/IndexSuite.scala index 3705947d44a..fd5323a150f 100644 --- a/hail/src/test/scala/is/hail/io/IndexSuite.scala +++ b/hail/src/test/scala/is/hail/io/IndexSuite.scala @@ -113,7 +113,8 @@ class IndexSuite extends HailSuite { branchingFactor, attributes, ) - assert(fs.getFileSize(file) != 0) + assert(fs.getFileSize(file + "/index") != 0) + assert(fs.getFileSize(file + "/metadata.json.gz") != 0) val index = indexReader(file, TStruct("a" -> TBoolean)) @@ -133,7 +134,8 @@ class IndexSuite extends HailSuite { @Test def testEmptyKeys() { val file = ctx.createTmpPath("empty", "idx") writeIndex(file, Array.empty[String], Array.empty[Annotation], TStruct("a" -> TBoolean), 2) - assert(fs.getFileSize(file) != 0) + assert(fs.getFileSize(file + "/index") != 0) + assert(fs.getFileSize(file + "/metadata.json.gz") != 0) val index = indexReader(file, TStruct("a" -> TBoolean)) intercept[IllegalArgumentException](index.queryByIndex(0L)) assert(index.queryByKey("moo").isEmpty) diff --git a/hail/src/test/scala/is/hail/io/compress/BGzipCodecSuite.scala b/hail/src/test/scala/is/hail/io/compress/BGzipCodecSuite.scala index de4e556de3f..499f3eb028f 100644 --- a/hail/src/test/scala/is/hail/io/compress/BGzipCodecSuite.scala +++ b/hail/src/test/scala/is/hail/io/compress/BGzipCodecSuite.scala @@ -80,7 +80,7 @@ class BGzipCodecSuite extends HailSuite { @Test def testGenericLinesSimpleUncompressed() { val lines = Source.fromFile(uncompPath).getLines().toFastSeq - val uncompStatus = fs.fileListEntry(uncompPath) + val uncompStatus = fs.fileStatus(uncompPath) var i = 0 while (i < 16) { val lines2 = GenericLines.collect( @@ -95,7 +95,7 @@ class BGzipCodecSuite extends HailSuite { @Test def testGenericLinesSimpleBGZ() { val lines = Source.fromFile(uncompPath).getLines().toFastSeq - val compStatus = fs.fileListEntry(compPath) + val compStatus = fs.fileStatus(compPath) var i = 0 while (i < 16) { val lines2 = GenericLines.collect( @@ -111,7 +111,7 @@ class BGzipCodecSuite extends HailSuite { val lines = Source.fromFile(uncompPath).getLines().toFastSeq // won't split, just run once - val gzStatus = fs.fileListEntry(gzPath) + val gzStatus = fs.fileStatus(gzPath) val lines2 = GenericLines.collect( fs, GenericLines.read(fs, Array(gzStatus), Some(7), None, None, false, true), @@ -121,7 +121,7 @@ class BGzipCodecSuite extends HailSuite { @Test def testGenericLinesRefuseGZ() { interceptFatal("Cowardly refusing") { - val gzStatus = fs.fileListEntry(gzPath) + val gzStatus = fs.fileStatus(gzPath) GenericLines.read(fs, Array(gzStatus), Some(7), None, None, false, false) } } diff --git a/hail/src/test/scala/is/hail/io/fs/AzureStorageFSSuite.scala b/hail/src/test/scala/is/hail/io/fs/AzureStorageFSSuite.scala index 6dfdc020470..033cbbe2851 100644 --- a/hail/src/test/scala/is/hail/io/fs/AzureStorageFSSuite.scala +++ b/hail/src/test/scala/is/hail/io/fs/AzureStorageFSSuite.scala @@ -7,7 +7,7 @@ import org.scalatest.testng.TestNGSuite import org.testng.SkipException import org.testng.annotations.{BeforeClass, Test} -class AzureStorageFSSuite extends TestNGSuite with FSSuite { +class AzureStorageFSSuite extends FSSuite { @BeforeClass def beforeclass(): Unit = { if (System.getenv("HAIL_CLOUD") != "azure") { @@ -42,9 +42,4 @@ class AzureStorageFSSuite extends TestNGSuite with FSSuite { } assert(false) } - - @Test def testETag(): Unit = { - val etag = fs.eTag(s"$fsResourcesRoot/a") - assert(etag.nonEmpty) - } } diff --git a/hail/src/test/scala/is/hail/io/fs/FSSuite.scala b/hail/src/test/scala/is/hail/io/fs/FSSuite.scala index b1cc788042f..f165f4e1812 100644 --- a/hail/src/test/scala/is/hail/io/fs/FSSuite.scala +++ b/hail/src/test/scala/is/hail/io/fs/FSSuite.scala @@ -1,5 +1,6 @@ package is.hail.io.fs +import is.hail.{HailSuite, TestUtils} import is.hail.HailSuite import is.hail.backend.ExecuteContext import is.hail.io.fs.FSUtil.dropTrailingSlash @@ -8,6 +9,7 @@ import is.hail.utils._ import java.io.FileNotFoundException import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.FileAlreadyExistsException import org.scalatest.testng.TestNGSuite import org.testng.annotations.Test @@ -37,12 +39,7 @@ trait FSSuite extends TestNGSuite { def pathsRelResourcesRoot(statuses: Array[FileListEntry]): Set[String] = pathsRelRoot(fsResourcesRoot, statuses) - @Test def testExists(): Unit = { - assert(fs.exists(r("/a"))) - - assert(fs.exists(r("/zzz"))) - assert(!fs.exists(r("/z"))) // prefix - + @Test def testExistsOnDirectory(): Unit = { assert(fs.exists(r("/dir"))) assert(fs.exists(r("/dir/"))) @@ -50,6 +47,21 @@ trait FSSuite extends TestNGSuite { assert(!fs.exists(r("/does_not_exist_dir/"))) } + @Test def testExistsOnFile(): Unit = { + assert(fs.exists(r("/a"))) + + assert(fs.exists(r("/zzz"))) + assert(!fs.exists(r("/z"))) // prefix + } + + @Test def testFileStatusOnFile(): Unit = { + // file + val f = r("/a") + val s = fs.fileStatus(f) + assert(s.getPath == f) + assert(s.getLen == 12) + } + @Test def testFileListEntryOnFile(): Unit = { // file val f = r("/a") @@ -60,6 +72,13 @@ trait FSSuite extends TestNGSuite { assert(s.getLen == 12) } + @Test def testFileStatusOnDirIsFailure(): Unit = { + val f = r("/dir") + TestUtils.interceptException[FileNotFoundException](r("/dir"))( + fs.fileStatus(r("/dir")) + ) + } + @Test def testFileListEntryOnDir(): Unit = { // file val f = r("/dir") @@ -195,6 +214,20 @@ trait FSSuite extends TestNGSuite { assert(pathsRelRoot(root, statuses) == Set("")) } + @Test def testFileEndingWithPeriod: Unit = { + val f = fs.makeQualified(t()) + fs.touch(f + "/foo.") + val statuses = fs.listDirectory(f) + assert(statuses.length == 1, statuses) + val status = statuses(0) + if (this.isInstanceOf[AzureStorageFSSuite]) { + // https://github.com/Azure/azure-sdk-for-java/issues/36674 + assert(status.getPath == f + "/foo") + } else { + assert(status.getPath == f + "/foo.") + } + } + @Test def testGlobRootWithSlash(): Unit = { if (root.endsWith("/")) return @@ -436,6 +469,169 @@ trait FSSuite extends TestNGSuite { assert(is.read() == 1.toByte) } } + + @Test def fileAndDirectoryIsError(): Unit = { + val d = t() + fs.mkDir(d) + fs.touch(s"$d/x/file") + try { + fs.touch(s"$d/x") + fs.fileListEntry(s"$d/x") + assert(false) + } catch { + /* Hadoop, in particular, errors when you touch an object whose name is a prefix of another + * object. */ + case exc: FileAndDirectoryException + if exc.getMessage() == s"$d/x appears as both file $d/x and directory $d/x/." => + case exc: FileNotFoundException if exc.getMessage() == s"$d/x (Is a directory)" => + } + } + + @Test def testETag(): Unit = { + val etag = fs.eTag(s"$fsResourcesRoot/a") + if (fs.parseUrl(fsResourcesRoot).toString.startsWith("file:")) { + // only the local file system should lack etags. + assert(etag.isEmpty) + } else { + assert(etag.nonEmpty) + } + } + + @Test def fileAndDirectoryIsErrorEvenIfPrefixedFileIsNotLexicographicallyFirst(): Unit = { + val d = t() + fs.mkDir(d) + fs.touch(s"$d/x") + // fs.touch(s"$d/x ") // Hail does not support spaces in path names + fs.touch(s"$d/x!") + fs.touch(s"$d/x${'"'}") + fs.touch(s"$d/x#") + fs.touch(s"$d/x$$") + // fs.touch(s"$d/x%") // Azure dislikes %'s + // java.lang.IllegalArgumentException: URLDecoder: Incomplete trailing escape (%) pattern + // at java.net.URLDecoder.decode(URLDecoder.java:187) + // at is.hail.shadedazure.com.azure.storage.common.Utility.decode(Utility.java:88) + // at is.hail.shadedazure.com.azure.storage.common.Utility.urlDecode(Utility.java:55) + /* at + * is.hail.shadedazure.com.azure.storage.blob.specialized.BlobAsyncClientBase.(BlobAsyncClientBase.java:238) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.specialized.BlobAsyncClientBase.(BlobAsyncClientBase.java:202) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobAsyncClient.(BlobAsyncClient.java:154) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobContainerAsyncClient.getBlobAsyncClient(BlobContainerAsyncClient.java:194) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobContainerAsyncClient.getBlobAsyncClient(BlobContainerAsyncClient.java:172) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobContainerClient.getBlobClient(BlobContainerClient.java:98) */ + // at is.hail.io.fs.AzureStorageFS.$anonfun$getBlobClient$1(AzureStorageFS.scala:255) + fs.touch(s"$d/x&") + fs.touch(s"$d/x'") + fs.touch(s"$d/x)") + fs.touch(s"$d/x(") + fs.touch(s"$d/x*") + fs.touch(s"$d/x+") + fs.touch(s"$d/x,") + fs.touch(s"$d/x-") + // fs.touch(s"$d/x.") // https://github.com/Azure/azure-sdk-for-java/issues/36674 + try { + fs.touch(s"$d/x/file") + fs.fileListEntry(s"$d/x") + assert(false) + } catch { + /* Hadoop, in particular, errors when you touch an object whose name is a prefix of another + * object. */ + case exc: FileAndDirectoryException + if exc.getMessage() == s"$d/x appears as both file $d/x and directory $d/x/." => + case exc: FileAlreadyExistsException + if exc.getMessage() == s"Destination exists and is not a directory: $d/x" => + } + } + + @Test def fileListEntrySeesDirectoryEvenIfPrefixedFileIsNotLexicographicallyFirst(): Unit = { + val d = t() + fs.mkDir(d) + // fs.touch(s"$d/x ") // Hail does not support spaces in path names + fs.touch(s"$d/x!") + fs.touch(s"$d/x${'"'}") + fs.touch(s"$d/x#") + fs.touch(s"$d/x$$") + // fs.touch(s"$d/x%") // Azure dislikes %'s + // java.lang.IllegalArgumentException: URLDecoder: Incomplete trailing escape (%) pattern + // at java.net.URLDecoder.decode(URLDecoder.java:187) + // at is.hail.shadedazure.com.azure.storage.common.Utility.decode(Utility.java:88) + // at is.hail.shadedazure.com.azure.storage.common.Utility.urlDecode(Utility.java:55) + /* at + * is.hail.shadedazure.com.azure.storage.blob.specialized.BlobAsyncClientBase.(BlobAsyncClientBase.java:238) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.specialized.BlobAsyncClientBase.(BlobAsyncClientBase.java:202) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobAsyncClient.(BlobAsyncClient.java:154) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobContainerAsyncClient.getBlobAsyncClient(BlobContainerAsyncClient.java:194) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobContainerAsyncClient.getBlobAsyncClient(BlobContainerAsyncClient.java:172) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobContainerClient.getBlobClient(BlobContainerClient.java:98) */ + // at is.hail.io.fs.AzureStorageFS.$anonfun$getBlobClient$1(AzureStorageFS.scala:255) + fs.touch(s"$d/x&") + fs.touch(s"$d/x'") + fs.touch(s"$d/x)") + fs.touch(s"$d/x(") + fs.touch(s"$d/x*") + fs.touch(s"$d/x+") + fs.touch(s"$d/x,") + fs.touch(s"$d/x-") + // fs.touch(s"$d/x.") // https://github.com/Azure/azure-sdk-for-java/issues/36674 + fs.touch(s"$d/x/file") + + val fle = fs.fileListEntry(s"$d/x") + assert(fle.isDirectory) + assert(!fle.isFile) + } + + @Test def fileListEntrySeesFileEvenWithPeersPreceedingThePositionOfANonPresentDirectoryEntry() + : Unit = { + val d = t() + fs.mkDir(d) + fs.touch(s"$d/x") + // fs.touch(s"$d/x ") // Hail does not support spaces in path names + fs.touch(s"$d/x!") + fs.touch(s"$d/x${'"'}") + fs.touch(s"$d/x#") + fs.touch(s"$d/x$$") + // fs.touch(s"$d/x%") // Azure dislikes %'s + // java.lang.IllegalArgumentException: URLDecoder: Incomplete trailing escape (%) pattern + // at java.net.URLDecoder.decode(URLDecoder.java:187) + // at is.hail.shadedazure.com.azure.storage.common.Utility.decode(Utility.java:88) + // at is.hail.shadedazure.com.azure.storage.common.Utility.urlDecode(Utility.java:55) + /* at + * is.hail.shadedazure.com.azure.storage.blob.specialized.BlobAsyncClientBase.(BlobAsyncClientBase.java:238) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.specialized.BlobAsyncClientBase.(BlobAsyncClientBase.java:202) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobAsyncClient.(BlobAsyncClient.java:154) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobContainerAsyncClient.getBlobAsyncClient(BlobContainerAsyncClient.java:194) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobContainerAsyncClient.getBlobAsyncClient(BlobContainerAsyncClient.java:172) */ + /* at + * is.hail.shadedazure.com.azure.storage.blob.BlobContainerClient.getBlobClient(BlobContainerClient.java:98) */ + // at is.hail.io.fs.AzureStorageFS.$anonfun$getBlobClient$1(AzureStorageFS.scala:255) + fs.touch(s"$d/x&") + fs.touch(s"$d/x'") + fs.touch(s"$d/x)") + fs.touch(s"$d/x(") + fs.touch(s"$d/x*") + fs.touch(s"$d/x+") + fs.touch(s"$d/x,") + fs.touch(s"$d/x-") + // fs.touch(s"$d/x.") // https://github.com/Azure/azure-sdk-for-java/issues/36674 + + val fle = fs.fileListEntry(s"$d/x") + assert(!fle.isDirectory) + assert(fle.isFile) + assert(fle.getPath == fs.parseUrl(s"$d/x").toString) + } } class HadoopFSSuite extends HailSuite with FSSuite { @@ -445,9 +641,4 @@ class HadoopFSSuite extends HailSuite with FSSuite { "file:" + new java.io.File("./src/test/resources/fs").getCanonicalPath override lazy val tmpdir: String = ctx.tmpdir - - @Test def testETag(): Unit = { - val etag = fs.eTag(s"$fsResourcesRoot/a") - assert(etag.isEmpty) - } } diff --git a/hail/src/test/scala/is/hail/io/fs/FakeFS.scala b/hail/src/test/scala/is/hail/io/fs/FakeFS.scala index 1626d47b696..26578742e57 100644 --- a/hail/src/test/scala/is/hail/io/fs/FakeFS.scala +++ b/hail/src/test/scala/is/hail/io/fs/FakeFS.scala @@ -2,6 +2,7 @@ package is.hail.io.fs case class FakeURL(path: String) extends FSURL { def getPath(): String = path + def getActualUrl(): String = path } abstract class FakeFS extends FS { @@ -13,6 +14,7 @@ abstract class FakeFS extends FS { override def createNoCompression(url: FakeURL): PositionedDataOutputStream = ??? override def delete(url: FakeURL, recursive: Boolean): Unit = ??? override def eTag(url: FakeURL): Option[String] = ??? + override def fileStatus(url: FakeURL): FileStatus = ??? override def fileListEntry(url: FakeURL): FileListEntry = ??? override def glob(url: FakeURL): Array[FileListEntry] = ??? override def listDirectory(url: FakeURL): Array[FileListEntry] = ??? diff --git a/hail/src/test/scala/is/hail/io/fs/GoogleStorageFSSuite.scala b/hail/src/test/scala/is/hail/io/fs/GoogleStorageFSSuite.scala index 4395ef368db..ff7c234d56a 100644 --- a/hail/src/test/scala/is/hail/io/fs/GoogleStorageFSSuite.scala +++ b/hail/src/test/scala/is/hail/io/fs/GoogleStorageFSSuite.scala @@ -42,10 +42,4 @@ class GoogleStorageFSSuite extends TestNGSuite with FSSuite { } assert(false) } - - @Test def testETag(): Unit = { - val etag = fs.eTag(s"$fsResourcesRoot/a") - assert(etag.nonEmpty) - } - } diff --git a/hail/src/test/scala/is/hail/utils/RichRDDSuite.scala b/hail/src/test/scala/is/hail/utils/RichRDDSuite.scala index 1248bd1e1d7..3d79016da44 100644 --- a/hail/src/test/scala/is/hail/utils/RichRDDSuite.scala +++ b/hail/src/test/scala/is/hail/utils/RichRDDSuite.scala @@ -41,7 +41,7 @@ class RichRDDSuite extends HailSuite { separateHeader + "/header.gz", separateHeader + "/part-00000.gz", separateHeader + "/part-00001.gz", - ).flatMap((x: String) => fs.glob(x)) + ).map(x => fs.fileStatus(x)) fs.copyMergeList(mergeList, merged, deleteSource = false) assert(read(merged) sameElements read(concatenated)) diff --git a/hail/src/test/scala/is/hail/utils/UtilsSuite.scala b/hail/src/test/scala/is/hail/utils/UtilsSuite.scala index 33bb3e1ea55..6f67a4b32af 100644 --- a/hail/src/test/scala/is/hail/utils/UtilsSuite.scala +++ b/hail/src/test/scala/is/hail/utils/UtilsSuite.scala @@ -92,10 +92,9 @@ class UtilsSuite extends HailSuite { val fs = new HadoopFS(new SerializableHadoopConfiguration(sc.hadoopConfiguration)) val partFileNames = fs.glob("src/test/resources/part-*") - .map(fileListEntry => (fileListEntry, new hadoop.fs.Path(fileListEntry.getPath))).sortBy { - case (fileListEntry, path) => - getPartNumber(path.getName) - }.map(_._2.getName) + .sortBy(fileListEntry => getPartNumber(fileListEntry.getPath)).map(_.getPath.split( + "/" + ).last) assert(partFileNames(0) == "part-40001" && partFileNames(1) == "part-100001") }