Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Directory sources #274

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/src/main/paradox/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ Java
: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/FileTailSourceTest.java) { #simple-lines }


### Directory

`Directory.ls(path)` lists all files and directories
directly in a given directory:

Scala
: @@snip (../../../../file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala) { #ls }

Java
: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java) { #ls }

`Directory.walk(path)` traverses all subdirectories and lists
files and directories depth first:

Scala
: @@snip (../../../../file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala) { #walk }

Java
: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java) { #walk }


### DirectoryChangesSource

The `DirectoryChangesSource` will emit elements every time there is a change to a watched directory
Expand Down
26 changes: 26 additions & 0 deletions file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.file.javadsl;

import akka.NotUsed;
import akka.stream.javadsl.Source;

import java.nio.file.Path;

public final class Directory {

/**
* List all files in the given directory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

files and directories

*/
public static Source<Path, NotUsed> ls(Path directory) {
return akka.stream.alpakka.file.scaladsl.Directory.ls(directory).asJava();
}

/**
* Recursively list files and directories in the given directory, depth first.
*/
public static Source<Path, NotUsed> walk(Path directory) {
return akka.stream.alpakka.file.scaladsl.Directory.walk(directory).asJava();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public String toString() {
* the JDK implementation is slow, it will not help lowering this
* @param maxBufferSize Maximum number of buffered directory changes before the stage fails
*/
@SuppressWarnings("unchecked")
public static Source<Pair<Path, DirectoryChange>, NotUsed> create(Path directoryPath, FiniteDuration pollInterval, int maxBufferSize) {
return Source.fromGraph(new DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, Pair::apply));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.file.scaladsl

import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{Files, Path}
import java.util.function.BiPredicate

import akka.NotUsed
import akka.stream.scaladsl.{Source, StreamConverters}

object Directory {

/**
* List all files in the given directory
*/
def ls(directory: Path): Source[Path, NotUsed] = {
require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't")
StreamConverters.fromJavaStream(() => Files.list(directory))
}

/**
* Recursively list files in the given directory and its subdirectories. Listing is done
* depth first.
*/
def walk(directory: Path): Source[Path, NotUsed] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to expose the overloads too:
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#walk-java.nio.file.Path-int-java.nio.file.FileVisitOption...-
the visit options and depth seem very useful when someone would be using this API

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started out with that, but then thought that it is so simple to create it yourself, so maybe more specific needs can be met by just looking at the sources of this and copying that single line of code...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with alpakka we're targeting an "works out of the box" and "I would not look at the sources" use cases/people, so I'd really add those parameters IMHO

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was rather from the not covering every conceivable use case under the sun, just the "normal" ones point of view, especially the filter which could be achieved by composition (with a bit of alloc overhead, but hey this is disk io we are talking about here).

Could you come up with a common use case for depths other than 0, which is covered by ls and all the depth (find ./) which is covered by this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depth is pretty important to protect from entering infinite loops with symlinks to an upper directory if I remember correctly how these APIs work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I'll add it. Skipping filter though, as that can be achieved with composition.

require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't")
StreamConverters.fromJavaStream(() => Files.walk(directory))
}

private def allFilesFilter = new BiPredicate[Path, BasicFileAttributes] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used

override def test(t: Path, u: BasicFileAttributes): Boolean = true
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import java.util.function.BiFunction

import akka.NotUsed
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.javadsl.DirectoryChangesSource
import akka.stream.scaladsl.Source

import scala.concurrent.duration.FiniteDuration
Expand All @@ -30,6 +29,8 @@ object DirectoryChangesSource {
def apply(directoryPath: Path,
pollInterval: FiniteDuration,
maxBufferSize: Int): Source[(Path, DirectoryChange), NotUsed] =
Source.fromGraph(new DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, tupler))
Source.fromGraph(
new akka.stream.alpakka.file.javadsl.DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, tupler)
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.file.javadsl;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.TestKit;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.FiniteDuration;

import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class DirectoryTest {

private FileSystem fs;
private ActorSystem system;
private Materializer materializer;

@Before
public void setup() {
fs = Jimfs.newFileSystem(Configuration.unix());
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);
}


@Test
public void listFiles() throws Exception {
final Path dir = fs.getPath("listfiles");
Files.createDirectories(dir);
final Path file1 = Files.createFile(dir.resolve("file1"));
final Path file2 = Files.createFile(dir.resolve("file2"));

// #ls
final Source<Path, NotUsed> source = Directory.ls(dir);
// #ls

final List<Path> result = source.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(result.size(), 2);
assertEquals(result.get(0), file1);
assertEquals(result.get(1), file2);
}

@Test
public void walkAFileTree() throws Exception {
final Path root = fs.getPath("walk");
Files.createDirectories(root);
final Path subdir1 = root.resolve("subdir1");
Files.createDirectories(subdir1);
final Path file1 = subdir1.resolve("file1");
Files.createFile(file1);
final Path subdir2 = root.resolve("subdir2");
Files.createDirectories(subdir2);
final Path file2 = subdir2.resolve("file2");
Files.createFile(file2);

// #walk
final Source<Path, NotUsed> source = Directory.walk(root);
// #walk

final List<Path> result = source.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(result, Arrays.asList(root, subdir1, file1, subdir2, file2));
}

@After
public void tearDown() throws Exception {
fs.close();
fs = null;
TestKit.shutdownActorSystem(system, FiniteDuration.create(10, TimeUnit.SECONDS), true);
system = null;
materializer = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.file.scaladsl

import java.nio.file.{Files, Path}

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.TestKit
import com.google.common.jimfs.{Configuration, Jimfs}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

class DirectorySpec
extends TestKit(ActorSystem("directoryspec"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {

private val fs = Jimfs.newFileSystem(Configuration.forCurrentPlatform.toBuilder.build)
private implicit val mat = ActorMaterializer()

"The directory source factory" should {
"list files" in {
val dir = fs.getPath("listfiles")
Files.createDirectories(dir)
val paths = (0 to 100).map { n =>
val name = s"file$n"
Files.createFile(dir.resolve(name))
}

// #ls
val source: Source[Path, NotUsed] = Directory.ls(dir)
// #ls

val result = source.runWith(Sink.seq).futureValue
result.toSet shouldEqual paths.toSet
}

"walk a file tree" in {
val root = fs.getPath("walk")
Files.createDirectories(root)
val subdir1 = root.resolve("subdir1")
Files.createDirectories(subdir1)
val file1 = subdir1.resolve("file1")
Files.createFile(file1)
val subdir2 = root.resolve("subdir2")
Files.createDirectories(subdir2)
val file2 = subdir2.resolve("file2")
Files.createFile(file2)

// #walk
val files: Source[Path, NotUsed] = Directory.walk(root)
// #walk

val result = files.runWith(Sink.seq).futureValue
result shouldEqual List(root, subdir1, file1, subdir2, file2)
}
}

override protected def afterAll(): Unit =
fs.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import akka.stream.{ActorMaterializer, Materializer}

import scala.concurrent.duration._

object FileTailSourceSpec extends {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah funny

object FileTailSourceSpec {

// small sample of usage, tails the first argument file path
def main(args: Array[String]): Unit = {
Expand Down