Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Avoid Class A Operations. #13885

Merged
merged 31 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions hail/python/hail/fs/hadoop_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
import dateutil.parser

from hailtop.fs.fs import FS
from hailtop.fs.stat_result import FileType, FileListEntry
from hailtop.fs.stat_result import FileType, FileListEntry, FileStatus


def _file_status_scala_to_python(file_status: Dict[str, Any]) -> FileStatus:
dt = dateutil.parser.isoparse(file_status['modification_time'])
mtime = time.mktime(dt.timetuple())
return FileStatus(
path=file_status['path'], owner=file_status['owner'], size=file_status['size'], modification_time=mtime
)


def _file_list_entry_scala_to_python(file_list_entry: Dict[str, Any]) -> FileListEntry:
Expand Down Expand Up @@ -69,9 +77,21 @@ def is_file(self, path: str) -> bool:
def is_dir(self, path: str) -> bool:
return self._jfs.isDir(path)

def fast_stat(self, path: str) -> FileStatus:
'''Get information about a path other than its file/directory status.

In the cloud, determining if a given path is a file, a directory, or both is expensive. This
method simply returns file metadata if there is a file at this path. If there is no file at
this path, this operation will fail. The presence or absence of a directory at this path
does not affect the behaviors of this method.

'''
file_status_dict = json.loads(self._utils_package_object.fileStatus(self._jfs, path))
return _file_status_scala_to_python(file_status_dict)

def stat(self, path: str) -> FileListEntry:
stat_dict = json.loads(self._utils_package_object.fileListEntry(self._jfs, path))
return _file_list_entry_scala_to_python(stat_dict)
file_list_entry_dict = json.loads(self._utils_package_object.fileListEntry(self._jfs, path))
return _file_list_entry_scala_to_python(file_list_entry_dict)

def ls(self, path: str) -> List[FileListEntry]:
return [
Expand Down
17 changes: 17 additions & 0 deletions hail/python/hailtop/fs/stat_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ class FileType(Enum):
SYMLINK = auto()


class FileStatus(NamedTuple):
path: str
owner: Union[None, str, int]
size: int
# common point between unix, google, and hadoop filesystems, represented as a unix timestamp
modification_time: Optional[float]

def to_legacy_dict(self) -> Dict[str, Any]:
return {
'path': self.path,
'owner': self.owner,
'size_bytes': self.size,
'size': filesize(self.size),
'modification_time': self.modification_time,
}


class FileListEntry(NamedTuple):
path: str
owner: Union[None, str, int]
Expand Down
8 changes: 6 additions & 2 deletions hail/python/test/hail/methods/test_impex.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import re
import os
import pytest
import shutil
Expand Down Expand Up @@ -1511,14 +1512,17 @@ def test_old_index_file_throws_error(self):

with hl.TemporaryFilename() as f:
hl.current_backend().fs.copy(bgen_file, f)
with pytest.raises(FatalError, match='have no .idx2 index file'):

expected_missing_idx2_error_message = re.compile(f'have no .idx2 index file.*{f}.*', re.DOTALL)

with pytest.raises(FatalError, match=expected_missing_idx2_error_message):
hl.import_bgen(f, ['GT', 'GP'], sample_file, n_partitions=3)

try:
with hl.current_backend().fs.open(f + '.idx', 'wb') as fobj:
fobj.write(b'')

with pytest.raises(FatalError, match='have no .idx2 index file'):
with pytest.raises(FatalError, match=expected_missing_idx2_error_message):
hl.import_bgen(f, ['GT', 'GP'], sample_file)
finally:
hl.current_backend().fs.remove(f + '.idx')
Expand Down
2 changes: 1 addition & 1 deletion hail/python/test/hail/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_hadoop_ls_file_that_does_not_exist(self):
except FileNotFoundError:
pass
except FatalError as err:
assert 'FileNotFoundException: a_file_that_does_not_exist' in err.args[0]
assert 'FileNotFoundException: file:/io/a_file_that_does_not_exist' in err.args[0]
jigold marked this conversation as resolved.
Show resolved Hide resolved
else:
assert False

Expand Down
25 changes: 16 additions & 9 deletions hail/src/main/scala/is/hail/expr/ir/AbstractMatrixTableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods.parse

import java.io.OutputStreamWriter
import java.io.{FileNotFoundException, OutputStreamWriter}
import scala.collection.mutable
import scala.language.{existentials, implicitConversions}

Expand All @@ -36,15 +36,22 @@ object RelationalSpec {
new MatrixTypeSerializer

def readMetadata(fs: FS, path: String): JValue = {
if (!fs.isDir(path)) {
if (!fs.exists(path)) {
fatal(s"No file or directory found at $path")
} else {
fatal(s"MatrixTable and Table files are directories; path '$path' is not a directory")
}
}
val metadataFile = path + "/metadata.json.gz"
val jv = using(fs.open(metadataFile))(in => parse(in))
val jv =
try
using(fs.open(metadataFile))(in => parse(in))
catch {
case exc: FileNotFoundException =>
if (fs.isFile(path)) {
fatal(s"MatrixTable and Table files are directories; path '$path' is a file.")
} else {
if (fs.isDir(path)) {
fatal(s"MatrixTable is corrupted: $path/metadata.json.gz is missing.")
} else {
fatal(s"No file or directory found at $path.")
}
}
}

val fileVersion = jv \ "file_version" match {
case JInt(rep) => SemanticVersion(rep.toInt)
Expand Down
12 changes: 7 additions & 5 deletions hail/src/main/scala/is/hail/expr/ir/GenericLines.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package is.hail.expr.ir

import is.hail.backend.spark.SparkBackend
import is.hail.io.compress.BGzipInputStream
import is.hail.io.fs.{BGZipCompressionCodec, FileListEntry, FS, Positioned, PositionedInputStream}
import is.hail.io.fs.{
BGZipCompressionCodec, FileListEntry, FileStatus, FS, Positioned, PositionedInputStream,
}
import is.hail.io.tabix.{TabixLineIterator, TabixReader}
import is.hail.types.virtual.{TBoolean, TInt32, TInt64, TString, TStruct, Type}
import is.hail.utils._
Expand Down Expand Up @@ -258,16 +260,16 @@ object GenericLines {

def read(
fs: FS,
fileListEntries0: IndexedSeq[FileListEntry],
fileStatuses0: IndexedSeq[_ <: FileStatus],
nPartitions: Option[Int],
blockSizeInMB: Option[Int],
minPartitions: Option[Int],
gzAsBGZ: Boolean,
allowSerialRead: Boolean,
filePerPartition: Boolean = false,
): GenericLines = {
val fileListEntries = fileListEntries0.zipWithIndex.filter(_._1.getLen > 0)
val totalSize = fileListEntries.map(_._1.getLen).sum
val fileStatuses = fileStatuses0.zipWithIndex.filter(_._1.getLen > 0)
val totalSize = fileStatuses.map(_._1.getLen).sum

var totalPartitions = nPartitions match {
case Some(nPartitions) => nPartitions
Expand All @@ -282,7 +284,7 @@ object GenericLines {
case None =>
}

val contexts = fileListEntries.flatMap { case (fileListEntry, fileNum) =>
val contexts = fileStatuses.flatMap { case (fileListEntry, fileNum) =>
val size = fileListEntry.getLen
val codec = fs.getCodecFromPath(fileListEntry.getPath, gzAsBGZ)

Expand Down
4 changes: 3 additions & 1 deletion hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import is.hail.expr.ir.lowering.{LowererUnsupportedOperation, TableStage, TableS
import is.hail.expr.ir.streams.StreamProducer
import is.hail.io.fs.{FileListEntry, FS}
import is.hail.rvd.RVDPartitioner
import is.hail.types.{BaseTypeWithRequiredness, RStruct, TableType, VirtualTypeWithReq}
import is.hail.types.{
BaseTypeWithRequiredness, RStruct, TableType, TypeWithRequiredness, VirtualTypeWithReq,
}
import is.hail.types.physical._
import is.hail.types.physical.stypes.EmitType
import is.hail.types.physical.stypes.concrete.{SJavaString, SStackStruct, SStackStructValue}
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/expr/ir/TableIR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object TableIR {
def read(fs: FS, path: String, dropRows: Boolean = false, requestedType: Option[TableType] = None)
: TableRead = {
val successFile = path + "/_SUCCESS"
if (!fs.exists(path + "/_SUCCESS"))
if (!fs.isFile(path + "/_SUCCESS"))
fatal(s"write failed: file not found: $successFile")

val tr = TableNativeReader.read(fs, path, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ case object SemanticHash extends Logging {
case Some(etag) =>
etag.getBytes
case None =>
path.getBytes ++ Bytes.fromLong(fs.fileListEntry(path).getModificationTime)
path.getBytes ++ Bytes.fromLong(fs.fileStatus(path).getModificationTime)
}

def levelOrder(root: BaseIR): Iterator[(BaseIR, Int)] = {
Expand Down
53 changes: 30 additions & 23 deletions hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ object LoadBgen {
badFiles += file

matches.flatMap { fileListEntry =>
val file = fileListEntry.getPath.toString
val file = fileListEntry.getPath
if (!file.endsWith(".bgen"))
warn(s"input file does not have .bgen extension: $file")

if (fs.isDir(file))
if (fileListEntry.isDirectory)
fs.listDirectory(file)
.filter(fileListEntry =>
".*part-[0-9]+(-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?".r.matches(
Expand All @@ -193,22 +193,25 @@ object LoadBgen {
def getAllFilePaths(fs: FS, files: Array[String]): Array[String] =
getAllFileListEntries(fs, files).map(_.getPath.toString)

def getBgenFileMetadata(ctx: ExecuteContext, files: Array[String], indexFiles: Array[String])
: Array[BgenFileMetadata] = {
def getBgenFileMetadata(
ctx: ExecuteContext,
files: Array[FileListEntry],
indexFilePaths: Array[String],
): Array[BgenFileMetadata] = {
val fs = ctx.fs
require(files.length == indexFiles.length)
val headers = getFileHeaders(fs, files)
require(files.length == indexFilePaths.length)
val headers = getFileHeaders(fs, files.map(_.getPath))

val cacheByRG: mutable.Map[Option[String], (String, Array[Long]) => Array[AnyRef]] =
mutable.Map.empty

headers.zip(indexFiles).map { case (h, indexFile) =>
val (keyType, annotationType) = IndexReader.readTypes(fs, indexFile)
headers.zip(indexFilePaths).map { case (h, indexFilePath) =>
val (keyType, annotationType) = IndexReader.readTypes(fs, indexFilePath)
val rg = keyType.asInstanceOf[TStruct].field("locus").typ match {
case TLocus(rg) => Some(rg)
case _ => None
}
val metadata = IndexReader.readMetadata(fs, indexFile, keyType, annotationType)
val metadata = IndexReader.readMetadata(fs, indexFilePath, keyType, annotationType)
val indexVersion = SemanticVersion(metadata.fileVersion)
val (leafSpec, internalSpec) = BgenSettings.indexCodecSpecs(indexVersion, rg)

Expand All @@ -226,12 +229,12 @@ object LoadBgen {
val nVariants = metadata.nKeys

val rangeBounds = if (nVariants > 0) {
val Array(start, end) = getKeys(indexFile, Array[Long](0L, nVariants - 1))
val Array(start, end) = getKeys(indexFilePath, Array[Long](0L, nVariants - 1))
Interval(start, end, includesStart = true, includesEnd = true)
} else null

BgenFileMetadata(
indexFile,
indexFilePath,
indexVersion,
h,
rg,
Expand All @@ -245,9 +248,9 @@ object LoadBgen {
}
}

def getIndexFileNames(fs: FS, files: Array[String], indexFileMap: Map[String, String])
def getIndexFileNames(fs: FS, files: Array[FileListEntry], indexFileMap: Map[String, String])
: Array[String] = {
def absolutePath(rel: String): String = fs.fileListEntry(rel).getPath.toString
def absolutePath(rel: String): String = fs.fileStatus(rel).getPath

val fileMapping = Option(indexFileMap)
.getOrElse(Map.empty[String, String])
Expand All @@ -260,19 +263,23 @@ object LoadBgen {
| ${badExtensions.mkString("\n ")})""".stripMargin
)

files.map(absolutePath).map(f => fileMapping.getOrElse(f, f + ".idx2"))
files.map(f => fileMapping.getOrElse(f.getPath, f.getPath + ".idx2"))
}

def getIndexFiles(fs: FS, files: Array[String], indexFileMap: Map[String, String])
def getIndexFiles(fs: FS, files: Array[FileListEntry], indexFileMap: Map[String, String])
: Array[String] = {
val indexFiles = getIndexFileNames(fs, files, indexFileMap)
val missingIdxFiles = files.zip(indexFiles).filterNot { case (f, index) =>
fs.exists(index) && index.endsWith("idx2")
}.map(_._1)
if (missingIdxFiles.nonEmpty)

val bgenFilesWhichAreMisssingIdx2Files = files.zip(indexFiles).filterNot {
case (f, index) => index.endsWith("idx2") && fs.isFile(index + "/index") && fs.isFile(
index + "/metadata.json.gz"
)
}.map(_._1.getPath)

if (bgenFilesWhichAreMisssingIdx2Files.nonEmpty)
fatal(
s"""The following BGEN files have no .idx2 index file. Use 'index_bgen' to create the index file once before calling 'import_bgen':
| ${missingIdxFiles.mkString("\n ")})""".stripMargin
| ${bgenFilesWhichAreMisssingIdx2Files.mkString("\n ")}""".stripMargin
)
indexFiles
}
Expand Down Expand Up @@ -376,9 +383,9 @@ object MatrixBGENReader {
def apply(ctx: ExecuteContext, params: MatrixBGENReaderParameters): MatrixBGENReader = {
val fs = ctx.fs

val allFiles = LoadBgen.getAllFilePaths(fs, params.files.toArray)
val indexFiles = LoadBgen.getIndexFiles(fs, allFiles, params.indexFileMap)
val fileMetadata = LoadBgen.getBgenFileMetadata(ctx, allFiles, indexFiles)
val allFiles = LoadBgen.getAllFileListEntries(fs, params.files.toArray)
val indexFilePaths = LoadBgen.getIndexFiles(fs, allFiles, params.indexFileMap)
val fileMetadata = LoadBgen.getBgenFileMetadata(ctx, allFiles, indexFilePaths)
assert(fileMetadata.nonEmpty)
if (fileMetadata.exists(md => md.indexVersion != fileMetadata.head.indexVersion)) {
fatal(
Expand Down
Loading
Loading