Skip to content

Conversation

@steveloughran
Copy link
Contributor

What changes were proposed in this pull request?

This PR optimises the filesystem metadata reads in FileInputDStream, by moving the filters used in FileSystem.globStatus and FileSystem.listStatus into filtering of the FileStatus instances returned in the results, so avoiding the need to create FileStatus instances within the FileSystem operation.

  • This doesn't add overhead to the filtering process; that's done as post-processing in theFileSystem glob/list operations anyway.
  • At worst it may result in larger lists being built up and returned.
  • For every glob match of a file, the code saves 1 RPC calls to the HDFS NN; 1 GET against S3
  • For every glob match of a directory, the code the code saves 1 RPC call and 2-3 HTTP calls to S3 for the directory check (including a slow List call whenever the directory has children as it doesn't exist as a blob any more)
  • for the modtime check of every file, it saves a Hadoop RPC call, against all object stores which don't implement any client-side cache, an HTTP GET.
  • By entirely eliminating all getFileStatus() calls in the listed files, it should reduce the risk of AWS S3 throttling the HTTP request, as it does when too many requests are made to parts of a single S3 bucket.

How was this patch tested?

Running the spark streaming tests as a regression suite. In the SPARK-7481 cloud code, I could add a test against S3 which prints to stdout the exact number of HTTP requests made to S3 before and after the patch, so as to validate speedup. (the S3A metrics in Hadoop 2.8+ are accessible at the API level, but as they are only accessible in a new API added in 2.8; it'd stop that proposed module building against Hadoop 2.7. Logging and manual assessment is the only cross-version strategy.

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64140 has finished for PR 14731 at commit 738c51b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Aug 20, 2016

LGTM. Does this sort of change make sense elsewhere where PathFilter is used? I glanced at the others and it looked like a wash in other cases.

@steveloughran
Copy link
Contributor Author

steveloughran commented Aug 20, 2016

I'm going to scan through and tune them elsewhere; really I'm going by uses of the listFiles calls

There's actually no significant use elsewhere that I can see; just a couple of uses which filter on filename —so there is no cost penalty.

  • SparkHadoopUtil.listLeafStatuses() does implement its own directory recursion to find files; FileSystem.listFiles(path, true) does that, and on S3A will do flat scan that is O(files/5000); no directory overhead at all.
  • Otherwise, globStatus() can be pretty slow against object stores, but the fix there isn't in the client code; it means someone needs to implement HADOOP-13371, S3A globber to use bulk listObject call over recursive directory scan —more specifically, an implementation scalable to production datasets.

Returning to this patch, should I cut out the caching? I think it is superfluous.

  // Read-through cache of file mod times, used to speed up mod time lookups
  @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)

@srowen
Copy link
Member

srowen commented Aug 20, 2016

Why is the caching superfluous -- because no file is evaluated more than once here?

@steveloughran
Copy link
Contributor Author

to be precise: the caching of file modification times is superfluous. It's there to avoid the cost of executing getFileStatus() on previously scanned files. Once you use the FileStatus returned in a listing, you aren't calling getFileStatus(), hence: no need to cache

@srowen
Copy link
Member

srowen commented Aug 20, 2016

Ah right, you already have the modification time for free. Sounds good, remove the caching.

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64142 has finished for PR 14731 at commit 6e8ace0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent is wrong here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also fs is pretty confusing, because in this context it is often used to refer to as FileSystem. We should pick a different word.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this

@SparkQA
Copy link

SparkQA commented Aug 20, 2016

Test build #64156 has finished for PR 14731 at commit b08e3c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Aug 23, 2016

This is ready to go right @steveloughran ? LGTM

@steveloughran
Copy link
Contributor Author

LGTM. I was trying to see if there was a way to create a good test here by triggering the takes-too-long codepath and having a counter, but there's no obvious way to do that deterministically. I am doing a test for this against s3 in the spark-cloud module I'm writing; I can look at the printed counts of getFileStatus before/after the patch to see the difference, but the actual (testable) metrics are only accessible with forthcoming Hadoop 2.8 release.

TL;DR: no easy test, so there's nothing left to do

@steveloughran
Copy link
Contributor Author

steveloughran commented Aug 23, 2016

Actually, I've just noticed that DStream behaviour isn't in sync with the streaming programming guide, which says "files written in nested directories not supported)". That is: SPARK-14796 didn't patch the docs.

it may as well be fixed in this patch. How about, in the bullet points underneath

  • Wildcards may be used to specify a set of directories to scan for new files, for example hdfs://nn1:8050/users/alice/logs/2016-*/*.gz
  • New directories and their contents will be discovered as they arrive

Special points for object stores

  • Wildcard lookup may be very slow with some object stores.
  • Directory rename is not atomic; if a directory is renamed into the streaming source, then the files within may only be discovered and process across a multiple streaming windows.

There's another optimisation; use the SparkHadoopUtils.isGlobPath() predicate to recognise when the dir path isn't a wildcard, in which case just do a simple listFiles(). Until that shortcutting is done automatically in the Hadoop FS implementation, spark can do it on its side. As the listFiles() call was what was used before SPARK-14796, it has to be compatible, else SPARK-14796 has broken things

Finally, any exception in the scan is caught and triggers a log @ warning and reset... It looks to me that this would include the FNFE raised by directory not existing. I think a better message can be displayed there and the reset() operation skipped...that's not going going to solve the problem in the filesystem

@steveloughran
Copy link
Contributor Author

steveloughran commented Aug 23, 2016

I've now done the s3a streaming test/example

this uses a pattern of s3a/path/sub* as the directory path; then creates a file in a directory and renames the dir to match the path; verifies that the file was found in the time period allocated

https://gist.github.com/steveloughran/c8b39a7b87a9bd63d7a383bda8687e7e

Notable that the scan of the empty dir took 150ms; once there's data in the tree the time jumps up to 500ms once there are two entries under the tree, one dir and one file

summary stats show 72 getFileStatus calls at the FS API, mapping to 140 HEAD calls and 88 LIST operations, on Hadoop branch-2

 S3AFileSystem{uri=s3a://stevel-ireland-new, workingDir=s3a://steve-ireland-new/user/stevel, inputPolicy=sequential, partSize=104857600, enableMultiObjectsDelete=true, maxKeys=5000, readAhead=65536, blockSize=1048576, multiPartThreshold=2147483647, statistics {292 bytes read, 292 bytes written, 101 read ops, 0 large read ops, 11 write ops}, 
 metrics {{Context=S3AFileSystem}
{FileSystemId=343b706a-c238-4d71-9ed8-8083601ac28a-hwdev-steve-ireland-new}
{fsURI=s3a://hwdev-steve-ireland-new}
{files_created=1}
{files_copied=1}
{files_copied_bytes=292}
{files_deleted=1}
{directories_created=3}
{directories_deleted=0}
{ignored_errors=2}
{op_copy_from_local_file=0}
{op_exists=1}
{op_get_file_status=72}
{op_glob_status=16}
{op_is_directory=0}
{op_is_file=0}
{op_list_files=0}
{op_list_located_status=0}
{op_list_status=27}
{op_mkdirs=2}
{op_rename=1}
{object_copy_requests=0}
{object_delete_requests=3}
{object_list_requests=88}
{object_continue_list_requests=0}
{object_metadata_requests=140}
{object_multipart_aborted=0}
{object_put_bytes=292}
{object_put_requests=4}
{stream_read_fully_operations=0}
{stream_bytes_skipped_on_seek=0}
{stream_bytes_backwards_on_seek=0}
{stream_bytes_read=292}
{streamOpened=1}
{stream_backward_seek_pperations=0}
{stream_read_operations_incomplete=0}
{stream_bytes_discarded_in_abort=0}
{stream_close_operations=1}
{stream_read_operations=1}
{stream_aborted=0}
{stream_forward_seek_operations=0}
{streamClosed=1}
{stream_seek_operations=0}
{stream_bytes_read_in_close=0}
{stream_read_exceptions=0} }}

I'm going to do a test run with the modification here and see what it does to listing and status

@steveloughran steveloughran force-pushed the cloud/SPARK-17159-listfiles branch from b08e3c9 to 79b57a2 Compare August 23, 2016 17:53
@steveloughran
Copy link
Contributor Author

  1. updated the code to bypass the glob routine when there is no wildcard; this bypasses something fairly inefficient.
  2. reporting FNFE on that base dir differently; skip the stack trace (maybe: log at a lower level?).
  3. Updated the docs with a special list of blobstore best practises.

It's a bit hard to get some of that phrasing of what the wildcard does right; needs careful review.

Tested using my s3 streaming test, which did use a * in the wildcard. All works, but no improvements in speed on what is a fairly unrealistic structure. The time to recursively list object stores remotely is tangibly slow. Maybe that should go in the text too: "it can be take seconds to scan object stores for new data, with the time being proportional to directory depth and the number of files in a directory. Shallow and wide directory trees are faster"

@SparkQA
Copy link

SparkQA commented Aug 23, 2016

Test build #64296 has finished for PR 14731 at commit 79b57a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a regular expression though, what was the intent here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't it? My mistake. I wanted to show something fairly complex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(2015|2016) would be what means "the string '2015' or '2016'", and .. would mean "any two characters"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going with s3a://bucket/logs/(2015,2016)-*-friday

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need (2015|2016) rather than (2015,2016). Also this is going to match zero or more hyphens followed by "-friday". I think you mean ".." or ".{2}" or at least ".+" instead of "*" if this is a regex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round () or {} brackets? Because the wildcard pattern used in isGlobPath() is "{}[]*?\\". Curly only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, is this not a regular expression? I'd change the doc then to not describe it as one. It sounds like it's following some other kind of glob syntax.

@steveloughran
Copy link
Contributor Author

The logic has got complex enough it merits unit tests. Pulling into SparkHadoopUtils itself and writing some for the possible: simple, glob matches one , glob matches 1+, glob doesn't match, file not found

@steveloughran
Copy link
Contributor Author

Having looked at the source code, FileSystem.globStatus() uses the glob patterns, which are not the same as the posix regexp ones. org.apache.hadoop.fs.GlobPattern does the conversion.

For the docs, I'll just use a wildcard * in the example, rather than try anything more sophisticated.

@SparkQA
Copy link

SparkQA commented Aug 24, 2016

Test build #64368 has finished for PR 14731 at commit b63abfe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@steveloughran steveloughran force-pushed the cloud/SPARK-17159-listfiles branch from b63abfe to 9bc0ea9 Compare August 26, 2016 15:13
@SparkQA
Copy link

SparkQA commented Aug 26, 2016

Test build #64486 has finished for PR 14731 at commit 9bc0ea9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 26, 2016

Test build #64488 has finished for PR 14731 at commit fe40bd2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that this method is necessarily ambiguous, because you can't actually distinguish globs from other paths. Is this really needed? that's why the FS API exposes two methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It goes with the globPathIfNecessary call —you want to rename it to be consistent.

Regarding the FS APIs, There's way to many list operations in the FS APIs, each with different flaws.

  1. The simple list(path, filter): Array[FS] operations don't scale to a directory with hundreds of thousands of files, hence the remote iterator versions
  2. None of them provide any consistency guarantees. Worth knowing. This is more common in remote iterators as the iteration window is bigger, but even in those that return arrays, in a large enough directory things may change during the enum
  3. Anything that treewaks is very suboptimal on blobstores, somewhat inefficient for deep trees.
  4. listFiles(path, recursive=true) is the sole one which object stores can currently optimise by avoiding the treewalk and just doing a bulk list. HADOOP-13208 has added that for S3A.
  5. ..but that method filters out all directories, which means that apps which do want directories too are out of luck.
  6. globStatus() is even less efficient than the others ... have a look at the source to see why.
  7. In HADOOP-13371 I'm exploring an optimised globber, but I don't want to write one which collapses at scale (i.e in production).

I've added some comments in HADOOP-13371 about what to do there, I will probably do that "no regexp -> simple return" strategy implemented in this patch. But it will only benefit s3a in Hadoop 2.8+; patching spark benefits everything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'm narrowly concerned with the ambiguity of the behavior of a single method, because you can't distinguish between a path with a "?" in it and a glob wildcard for example. The rest seems orthogonal?

The change as it stood to resolve the issue in the OP seemed OK. This is bigger now and I'm not as sure about the rest of the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

essentially if anything which might be a wildcard is hit, it gets handed off to the globber for the full interpretation. Same for ^ and ], which are only part of a pattern within the context of an opening [

Its only those strings which can be verified to be regexp free in a simple context-free string scan that say "absolutely no patterns here"

regarding the bigger change: most of it is isolation of the sensitive code and the tests to verify behaviour

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, but then that's wrong if for example my path actually has a ? or ^ or ] in it. It doesn't seem essential and seems even problematic to add this behavior change to an otherwise clear fix.

Copy link
Contributor Author

@steveloughran steveloughran Aug 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recognise your concerns.

  1. the actual check is the one used already in globPathIfNecessary; that's used in DataSource and FileStreamSource and is pretty much the main way wildcards are used to locate files. If it was broken, things would have surfaced.
  2. it is essentially an optimisation to bypass a bit of code which, if you looked at its internals, isn't very efficient. The check for expansion characters is pessimistic, to avoid picking up on wildcard patterns.
  3. ...but, there are no standalone tests for globPathIfNecessary—so it may be that there are some failure modes that haven't yet surfaced; regressions waiting to happen.

Given it's a less significant optimisation than dodging all the getFileStatus calls, how about I split that into its own patch, something which should also add those checks for globPathIfNecessary? I'll trim this patch down to the changes in the streaming package —production code and associated tests

@SparkQA
Copy link

SparkQA commented Aug 27, 2016

Test build #64534 has finished for PR 14731 at commit 4134620.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@steveloughran steveloughran force-pushed the cloud/SPARK-17159-listfiles branch from 4134620 to b60f175 Compare August 30, 2016 17:10
@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64662 has finished for PR 14731 at commit b60f175.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@steveloughran
Copy link
Contributor Author

The Hadoop FS Spec has now been updated to declare exactly what HDFS does w.r.t timestamps, and warn that what other filesystems and object stores do are implementation and installation specific features: filesystem.md

That is the associated documentation update with this one; some of the content there was originally here, but moved over to the hadoop docs for the HDFS team to take the blame for when it changes.

@steveloughran
Copy link
Contributor Author

Any more comments?

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to start from scratch every time I review this ... so this looks like it does more than just optimize a check for new files. It adds docs too. I don't know if the examples are essential. The extra info about how streaming works could be useful but isn't that separate? It's easier to get in small directed changes. This one has been going on for months and I know that's not in anyone's interest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't already have this defined and available elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but in a module that isn't the one where these tests were, so it'd need more dependency logic or pulling it up into a common module, which, if done properly, makes for a big diff

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, it's only otherwise defined in the SQL test utils class. Well something we could unify one day, maybe not such a big deal now here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use Files.write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look @ that; I think I went with IOU as it was on the CP and I'd never had bad experiences of it. Guava, well...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, there's a straightforward reason: the test is using the hadoop FS APIs, opening an input stream from a Path and writing to it; Files.write is working with a local file. It doesn't work with hadoop FileSystem and Path classes, so could only be used by abusing knowledge of path URLs. Going through FileSystem/Path uses the same API as you'd use in production, so is the more rigorous test.

… filters and into filtering of the FileStatus instances returned in the results, so avoiding the need to create FileStatus intances for

-This doesn't add overhead to the filtering process; that's done as post-processing in FileSystem anyway. At worst it may result in larger lists being built up and returned.
-For every glob match, the code saves 2 RPC calls to the HDFS NN
-The code saves 1-3 HTTP calls to S3 for the directory check (including a slow List call whenever the directory has children as it doesn't exist as a blob any more)
-for the modtime check of every file, it saves an HTTP GET

The whole modtime cache can be eliminated; it's a performance optimisation to avoid the overhead of the file checks, one that is no longer needed.
… time costs 0 to evaluate, caching it actually consumes memory and the time for a lookup.
…sues. Also note that 1s granularity is the resolution from HDFS; other filesystems may have a different resolution. The only one I know that is worse is FAT16/FAT32, which is accurate to 2s, but nobody should be using that except on SSD cards and USB sticks
…carded listing; handle FNFE specially, add the docs
… existing/similar method. Add tests for the behaviour. Update docs with suggested fixes, and review/edit.
…taken for a public method and so needing to declare a return type.
…which doesn't shortcut on a non-wildcard operation
…s of how HDFS doesn't update file length or modtime until close or a block boundary is reached.
…ate the mtime field; this is covered in the streaming section to emphasise why write + rename is the strategy for streaming in files in HDFS. That strategy does also work in object stores, though the rename operation is O(data)
…a temp dur. Docs now refer reader to the Hadoop FS spec for any details about what object stores do
…store text and update slightly to make things a bit clearer. The more I learn about object stores, the less they resemble file systems.
@steveloughran steveloughran force-pushed the cloud/SPARK-17159-listfiles branch from 724495b to a3aaf26 Compare March 21, 2017 17:09
@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74990 has finished for PR 14731 at commit a3aaf26.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@steveloughran
Copy link
Contributor Author

Is there anything else I need to do here?

@steveloughran
Copy link
Contributor Author

@srowen anything else I need to do here?

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've timed out on this change. It changes every time and still doesn't match the title. I don't think this is a great way to pursue changes like this.

@steveloughran
Copy link
Contributor Author

steveloughran commented Apr 24, 2017

Ok, I shall start again with a whole new PR of the current state

@rxin
Copy link
Contributor

rxin commented Apr 24, 2017

Steve I think the main point is you should also respect the time of reviewers. The way most of your pull requests manifest have been suboptimal: they often start with a very early WIP (which is not necessarily a problem), and once in a while (e.g. a month or two) you update it to almost completely change it. The time itself is a problem. It requires a lot of context switching to review your pull requests. In addition, every time you update it it looks like a complete new giant pull request.

@steveloughran
Copy link
Contributor Author

Reynold, I know very much about the time of reviewers, I put 1+h a day on the hadoop codebase reviewing stuff, generally trying to review the work of non-colleagues, so as to pull in the broad set of contributions which are needed..

I have been trying to get some object store related patches into spark alongside the foundational work in fundamentally transforming how we work with object storage, especially S3, in Hadoop. Without the spark side changes, a lot gets lost: here the performance is approx 100-300mS/file when scanning an object store.

here I've split things in two, docs and diff. Both are independent, both are reasonably tractable. If they can be reviewed fast and added, there's no problems of patches ageing, everyone having to resync.

We can get this out the way, and you've have fewer reasons to be unhappy with me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants