From 4d08fde9a6da97c837372202627d0c5fddc32124 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 2 Oct 2014 15:25:20 -0700 Subject: [PATCH 1/6] Fix the way we decide if hasNext is true or not in WALReader. Unit tests for all classes. --- .../storage/WriteAheadLogReader.scala | 22 +++--- .../spark/streaming/storage/TestUtils.scala | 74 +++++++++++++++++++ .../WriteAheadLogRandomReaderSuite.scala | 59 +++++++++++++++ .../storage/WriteAheadLogReaderSuite.scala | 57 ++++++++++++++ .../WriteAheadLogWriterTestSuite.scala | 43 +++++++++++ 5 files changed, 243 insertions(+), 12 deletions(-) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogReaderSuite.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogWriterTestSuite.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala index 75791c2470181..30d5f44c03155 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.streaming.storage -import java.io.Closeable +import java.io.{EOFException, Closeable} private[streaming] class WriteAheadLogReader(path: String) extends Iterator[Array[Byte]] with Closeable { @@ -30,18 +30,16 @@ private[streaming] class WriteAheadLogReader(path: String) if (nextItem.isDefined) { // handle the case where hasNext is called without calling next true } else { - val available = instream.available() - if (available < 4) { // Length of next block (which is an Int = 4 bytes) of data is unavailable! - false + try { + val length = instream.readInt() + val buffer = new Array[Byte](length) + instream.readFully(buffer) + nextItem = Some(buffer) + true + } catch { + case e: EOFException => false + case e: Exception => throw e } - val length = instream.readInt() - if (instream.available() < length) { - false - } - val buffer = new Array[Byte](length) - instream.readFully(buffer) - nextItem = Some(buffer) - true } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala new file mode 100644 index 0000000000000..c547c2ce26f6f --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.{RandomAccessFile, File} + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +object TestUtils { + + val random = new Random() + + /** + * Writes data to the file and returns the an array of the bytes written. + * @param count + * @return + */ + // We don't want to be using the WAL writer to test the reader - it will be painful to figure + // out where the bug is. Instead generate the file by hand and see if the WAL reader can + // handle it. + def writeData(count: Int, file: File): ArrayBuffer[(Array[Byte], Long)] = { + val writtenData = new ArrayBuffer[(Array[Byte], Long)]() + val writer = new RandomAccessFile(file, "rw") + var i = 0 + while (i < count) { + val data = generateRandomData() + writtenData += ((data, writer.getFilePointer)) + writer.writeInt(data.length) + writer.write(data) + i += 1 + } + writer.close() + writtenData + } + + def readData(segments: Seq[FileSegment], file: File): Seq[Array[Byte]] = { + val reader = new RandomAccessFile(file, "r") + segments.map { x => + reader.seek(x.offset) + val data = new Array[Byte](x.length) + reader.readInt() + reader.readFully(data) + data + } + } + + def generateRandomData(): Array[Byte] = { + val data = new Array[Byte](random.nextInt(50)) + random.nextBytes(data) + data + } + + def writeUsingWriter(file: File, input: Seq[Array[Byte]]): Seq[FileSegment] = { + val writer = new WriteAheadLogWriter(file.toString) + val segments = input.map(writer.write) + writer.close() + segments + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala new file mode 100644 index 0000000000000..d658ce86f5e96 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.File + +import com.google.common.io.Files + +import org.apache.spark.streaming.TestSuiteBase + +class WriteAheadLogRandomReaderSuite extends TestSuiteBase { + + test("Test successful reads") { + val file = File.createTempFile("testSuccessFulReads", "") + file.deleteOnExit() + val writtenData = TestUtils.writeData(50, file) + val reader = new WriteAheadLogRandomReader("file:///" + file.toString) + var nextOffset = 0l + writtenData.foreach{ + x => + assert(x._1 === reader.read(new FileSegment(file.toString, x._2, x._1.length))) + nextOffset += (x._2 + 4) + } + reader.close() + } + + test("Test reading data written with writer") { + val dir = Files.createTempDir() + val file = new File(dir, "TestWriter") + try { + val dataToWrite = for (i <- 1 to 50) yield TestUtils.generateRandomData() + val segments = TestUtils.writeUsingWriter(file, dataToWrite) + val iter = dataToWrite.iterator + val reader = new WriteAheadLogRandomReader("file:///" + file.toString) + val writtenData = segments.map { x => + reader.read(x) + } + assert(dataToWrite.toArray === writtenData.toArray) + } finally { + file.delete() + dir.delete() + } + } + +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogReaderSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogReaderSuite.scala new file mode 100644 index 0000000000000..ce37035524d2c --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogReaderSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.File + +import com.google.common.io.Files + +import org.apache.spark.streaming.TestSuiteBase + +class WriteAheadLogReaderSuite extends TestSuiteBase { + + test("Test success") { + val file = File.createTempFile("testSuccessFulReads", "") + file.deleteOnExit() + val writtenData = TestUtils.writeData(50, file) + val reader = new WriteAheadLogReader("file:///" + file.toString) + val iter = writtenData.iterator + iter.foreach { x => + assert(reader.hasNext === true) + assert(reader.next() === x._1) + } + reader.close() + } + + + test("Test reading data written with writer") { + val dir = Files.createTempDir() + val file = new File(dir, "TestWriter") + try { + val dataToWrite = for (i <- 1 to 50) yield TestUtils.generateRandomData() + val segments = TestUtils.writeUsingWriter(file, dataToWrite) + val iter = dataToWrite.iterator + val reader = new WriteAheadLogReader("file:///" + file.toString) + reader.foreach { x => + assert(x === iter.next()) + } + } finally { + file.delete() + dir.delete() + } + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogWriterTestSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogWriterTestSuite.scala new file mode 100644 index 0000000000000..446ffbe6f7d51 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogWriterTestSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.File + +import com.google.common.io.Files + +import org.apache.spark.streaming.TestSuiteBase + +class WriteAheadLogWriterTestSuite extends TestSuiteBase { + + test("Test successful writes") { + + val dir = Files.createTempDir() + val file = new File(dir, "TestWriter") + try { + val dataToWrite = for (i <- 1 to 50) yield TestUtils.generateRandomData() + val writer = new WriteAheadLogWriter("file:///" + file.toString) + val segments = dataToWrite.map(writer.write) + writer.close() + val writtenData = TestUtils.readData(segments, file) + assert(writtenData.toArray === dataToWrite.toArray) + } finally { + file.delete() + dir.delete() + } + } +} From 073d1f88005621837f34b7fa786c5311fc5872e8 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 3 Oct 2014 00:35:57 -0700 Subject: [PATCH 2/6] Use ByteBuffer in the WAL --- .../storage/WriteAheadLogRandomReader.scala | 5 ++-- .../storage/WriteAheadLogReader.scala | 9 +++--- .../storage/WriteAheadLogWriter.scala | 29 +++++++++++++++---- .../spark/streaming/storage/TestUtils.scala | 20 +++++++------ .../WriteAheadLogRandomReaderSuite.scala | 7 ++--- 5 files changed, 46 insertions(+), 24 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala index aee5d192102e8..70c1d90974de0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala @@ -17,13 +17,14 @@ package org.apache.spark.streaming.storage import java.io.Closeable +import java.nio.ByteBuffer private[streaming] class WriteAheadLogRandomReader(path: String) extends Closeable { private val instream = HdfsUtils.getInputStream(path) private var closed = false - def read(segment: FileSegment): Array[Byte] = synchronized { + def read(segment: FileSegment): ByteBuffer = synchronized { assertOpen() instream.seek(segment.offset) val nextLength = instream.readInt() @@ -31,7 +32,7 @@ private[streaming] class WriteAheadLogRandomReader(path: String) extends Closeab "Expected message length to be " + segment.length + ", " + "but was " + nextLength) val buffer = new Array[Byte](nextLength) instream.readFully(buffer) - buffer + ByteBuffer.wrap(buffer) } override def close(): Unit = synchronized { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala index 30d5f44c03155..e3b7c6a898800 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala @@ -17,13 +17,14 @@ package org.apache.spark.streaming.storage import java.io.{EOFException, Closeable} +import java.nio.ByteBuffer private[streaming] class WriteAheadLogReader(path: String) - extends Iterator[Array[Byte]] with Closeable { + extends Iterator[ByteBuffer] with Closeable { private val instream = HdfsUtils.getInputStream(path) private var closed = false - private var nextItem: Option[Array[Byte]] = None + private var nextItem: Option[ByteBuffer] = None override def hasNext: Boolean = synchronized { assertOpen() @@ -34,7 +35,7 @@ private[streaming] class WriteAheadLogReader(path: String) val length = instream.readInt() val buffer = new Array[Byte](length) instream.readFully(buffer) - nextItem = Some(buffer) + nextItem = Some(ByteBuffer.wrap(buffer)) true } catch { case e: EOFException => false @@ -43,7 +44,7 @@ private[streaming] class WriteAheadLogReader(path: String) } } - override def next(): Array[Byte] = synchronized { + override def next(): ByteBuffer = synchronized { // TODO: Possible error case where there are not enough bytes in the stream // TODO: How to handle that? val data = nextItem.getOrElse { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala index f151c17ff66d8..c755481f77f34 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -17,21 +17,40 @@ package org.apache.spark.streaming.storage import java.io.Closeable +import java.nio.ByteBuffer + +import org.apache.hadoop.fs.FSDataOutputStream private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { private val stream = HdfsUtils.getOutputStream(path) private var nextOffset = stream.getPos private var closed = false + private val hflushMethod = { + try { + Some(classOf[FSDataOutputStream].getMethod("hflush", new Array[Class[Object]](0): _*)) + } catch { + case e: Exception => None + } + } // Data is always written as: // - Length - Long // - Data - of length = Length - def write(data: Array[Byte]): FileSegment = synchronized { + def write(data: ByteBuffer): FileSegment = synchronized { assertOpen() - val segment = new FileSegment(path, nextOffset, data.length) - stream.writeInt(data.length) - stream.write(data) - stream.hflush() + val lengthToWrite = data.remaining() + val segment = new FileSegment(path, nextOffset, lengthToWrite) + stream.writeInt(lengthToWrite) + if (data.hasArray) { + stream.write(data.array()) + } else { + // If the buffer is not backed by an array we need to copy the data to an array + data.rewind() // Rewind to ensure all data in the buffer is retrieved + val dataArray = new Array[Byte](lengthToWrite) + data.get(dataArray) + stream.write(dataArray) + } + hflushMethod.foreach(_.invoke(stream)) nextOffset = stream.getPos segment } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala index c547c2ce26f6f..516c1155056f2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.storage import java.io.{RandomAccessFile, File} +import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -33,39 +34,40 @@ object TestUtils { // We don't want to be using the WAL writer to test the reader - it will be painful to figure // out where the bug is. Instead generate the file by hand and see if the WAL reader can // handle it. - def writeData(count: Int, file: File): ArrayBuffer[(Array[Byte], Long)] = { - val writtenData = new ArrayBuffer[(Array[Byte], Long)]() + def writeData(count: Int, file: File): ArrayBuffer[(ByteBuffer, Long)] = { + val writtenData = new ArrayBuffer[(ByteBuffer, Long)]() val writer = new RandomAccessFile(file, "rw") var i = 0 while (i < count) { val data = generateRandomData() writtenData += ((data, writer.getFilePointer)) - writer.writeInt(data.length) - writer.write(data) + data.rewind() + writer.writeInt(data.remaining()) + writer.write(data.array()) i += 1 } writer.close() writtenData } - def readData(segments: Seq[FileSegment], file: File): Seq[Array[Byte]] = { + def readData(segments: Seq[FileSegment], file: File): Seq[ByteBuffer] = { val reader = new RandomAccessFile(file, "r") segments.map { x => reader.seek(x.offset) val data = new Array[Byte](x.length) reader.readInt() reader.readFully(data) - data + ByteBuffer.wrap(data) } } - def generateRandomData(): Array[Byte] = { + def generateRandomData(): ByteBuffer = { val data = new Array[Byte](random.nextInt(50)) random.nextBytes(data) - data + ByteBuffer.wrap(data) } - def writeUsingWriter(file: File, input: Seq[Array[Byte]]): Seq[FileSegment] = { + def writeUsingWriter(file: File, input: Seq[ByteBuffer]): Seq[FileSegment] = { val writer = new WriteAheadLogWriter(file.toString) val segments = input.map(writer.write) writer.close() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala index d658ce86f5e96..1917545477d65 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala @@ -29,11 +29,10 @@ class WriteAheadLogRandomReaderSuite extends TestSuiteBase { file.deleteOnExit() val writtenData = TestUtils.writeData(50, file) val reader = new WriteAheadLogRandomReader("file:///" + file.toString) - var nextOffset = 0l - writtenData.foreach{ + writtenData.foreach { x => - assert(x._1 === reader.read(new FileSegment(file.toString, x._2, x._1.length))) - nextOffset += (x._2 + 4) + val length = x._1.remaining() + assert(x._1 === reader.read(new FileSegment(file.toString, x._2, length))) } reader.close() } From 164bd14022d3fbd998d85810c5b8f845a9041fac Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 3 Oct 2014 10:48:20 -0700 Subject: [PATCH 3/6] Rewind buffer in call cases --- .../apache/spark/streaming/storage/WriteAheadLogWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala index c755481f77f34..7843254014995 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -27,7 +27,7 @@ private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { private var closed = false private val hflushMethod = { try { - Some(classOf[FSDataOutputStream].getMethod("hflush", new Array[Class[Object]](0): _*)) + Some(classOf[FSDataOutputStream].getMethod("hflush")) } catch { case e: Exception => None } @@ -38,6 +38,7 @@ private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { // - Data - of length = Length def write(data: ByteBuffer): FileSegment = synchronized { assertOpen() + data.rewind() // Rewind to ensure all data in the buffer is retrieved val lengthToWrite = data.remaining() val segment = new FileSegment(path, nextOffset, lengthToWrite) stream.writeInt(lengthToWrite) @@ -45,7 +46,6 @@ private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { stream.write(data.array()) } else { // If the buffer is not backed by an array we need to copy the data to an array - data.rewind() // Rewind to ensure all data in the buffer is retrieved val dataArray = new Array[Byte](lengthToWrite) data.get(dataArray) stream.write(dataArray) From d8f14cd1a022c9a197e6508b5e0ebe18df5111da Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 3 Oct 2014 13:35:08 -0700 Subject: [PATCH 4/6] Consolidate all tests into one class. Take Hadoop config as a param to all classes. --- .../spark/streaming/storage/HdfsUtils.scala | 7 +- .../storage/WriteAheadLogRandomReader.scala | 7 +- .../storage/WriteAheadLogReader.scala | 6 +- .../storage/WriteAheadLogWriter.scala | 19 ++- .../spark/streaming/storage/TestUtils.scala | 76 --------- .../WriteAheadLogRandomReaderSuite.scala | 58 ------- .../storage/WriteAheadLogReaderSuite.scala | 57 ------- .../storage/WriteAheadLogSuite.scala | 160 ++++++++++++++++++ .../WriteAheadLogWriterTestSuite.scala | 43 ----- 9 files changed, 184 insertions(+), 249 deletions(-) delete mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala delete mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala delete mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogReaderSuite.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala delete mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogWriterTestSuite.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala index de53c59c68269..079b2fef904a0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala @@ -21,11 +21,10 @@ import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path} private[streaming] object HdfsUtils { - def getOutputStream(path: String): FSDataOutputStream = { + def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { // HDFS is not thread-safe when getFileSystem is called, so synchronize on that val dfsPath = new Path(path) - val conf = new Configuration() val dfs = this.synchronized { dfsPath.getFileSystem(conf) @@ -45,10 +44,10 @@ private[streaming] object HdfsUtils { stream } - def getInputStream(path: String): FSDataInputStream = { + def getInputStream(path: String, conf: Configuration): FSDataInputStream = { val dfsPath = new Path(path) val dfs = this.synchronized { - dfsPath.getFileSystem(new Configuration()) + dfsPath.getFileSystem(conf) } val instream = dfs.open(dfsPath) instream diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala index 70c1d90974de0..3df024834f7a4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala @@ -19,9 +19,12 @@ package org.apache.spark.streaming.storage import java.io.Closeable import java.nio.ByteBuffer -private[streaming] class WriteAheadLogRandomReader(path: String) extends Closeable { +import org.apache.hadoop.conf.Configuration - private val instream = HdfsUtils.getInputStream(path) +private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration) + extends Closeable { + + private val instream = HdfsUtils.getInputStream(path, conf) private var closed = false def read(segment: FileSegment): ByteBuffer = synchronized { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala index e3b7c6a898800..724549e216e93 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala @@ -19,10 +19,12 @@ package org.apache.spark.streaming.storage import java.io.{EOFException, Closeable} import java.nio.ByteBuffer -private[streaming] class WriteAheadLogReader(path: String) +import org.apache.hadoop.conf.Configuration + +private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) extends Iterator[ByteBuffer] with Closeable { - private val instream = HdfsUtils.getInputStream(path) + private val instream = HdfsUtils.getInputStream(path, conf) private var closed = false private var nextItem: Option[ByteBuffer] = None diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala index 7843254014995..4bc0094e5a7cd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -19,10 +19,11 @@ package org.apache.spark.streaming.storage import java.io.Closeable import java.nio.ByteBuffer +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FSDataOutputStream -private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { - private val stream = HdfsUtils.getOutputStream(path) +private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) extends Closeable { + private val stream = HdfsUtils.getOutputStream(path, conf) private var nextOffset = stream.getPos private var closed = false private val hflushMethod = { @@ -45,12 +46,12 @@ private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { if (data.hasArray) { stream.write(data.array()) } else { - // If the buffer is not backed by an array we need to copy the data to an array - val dataArray = new Array[Byte](lengthToWrite) - data.get(dataArray) - stream.write(dataArray) + // If the buffer is not backed by an array we need to write the data byte by byte + while (data.hasRemaining) { + stream.write(data.get()) + } } - hflushMethod.foreach(_.invoke(stream)) + hflush() nextOffset = stream.getPos segment } @@ -60,6 +61,10 @@ private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { stream.close() } + private def hflush() { + hflushMethod.foreach(_.invoke(stream)) + } + private def assertOpen() { HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.") } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala deleted file mode 100644 index 516c1155056f2..0000000000000 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/TestUtils.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.storage - -import java.io.{RandomAccessFile, File} -import java.nio.ByteBuffer - -import scala.collection.mutable.ArrayBuffer -import scala.util.Random - -object TestUtils { - - val random = new Random() - - /** - * Writes data to the file and returns the an array of the bytes written. - * @param count - * @return - */ - // We don't want to be using the WAL writer to test the reader - it will be painful to figure - // out where the bug is. Instead generate the file by hand and see if the WAL reader can - // handle it. - def writeData(count: Int, file: File): ArrayBuffer[(ByteBuffer, Long)] = { - val writtenData = new ArrayBuffer[(ByteBuffer, Long)]() - val writer = new RandomAccessFile(file, "rw") - var i = 0 - while (i < count) { - val data = generateRandomData() - writtenData += ((data, writer.getFilePointer)) - data.rewind() - writer.writeInt(data.remaining()) - writer.write(data.array()) - i += 1 - } - writer.close() - writtenData - } - - def readData(segments: Seq[FileSegment], file: File): Seq[ByteBuffer] = { - val reader = new RandomAccessFile(file, "r") - segments.map { x => - reader.seek(x.offset) - val data = new Array[Byte](x.length) - reader.readInt() - reader.readFully(data) - ByteBuffer.wrap(data) - } - } - - def generateRandomData(): ByteBuffer = { - val data = new Array[Byte](random.nextInt(50)) - random.nextBytes(data) - ByteBuffer.wrap(data) - } - - def writeUsingWriter(file: File, input: Seq[ByteBuffer]): Seq[FileSegment] = { - val writer = new WriteAheadLogWriter(file.toString) - val segments = input.map(writer.write) - writer.close() - segments - } -} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala deleted file mode 100644 index 1917545477d65..0000000000000 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReaderSuite.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.storage - -import java.io.File - -import com.google.common.io.Files - -import org.apache.spark.streaming.TestSuiteBase - -class WriteAheadLogRandomReaderSuite extends TestSuiteBase { - - test("Test successful reads") { - val file = File.createTempFile("testSuccessFulReads", "") - file.deleteOnExit() - val writtenData = TestUtils.writeData(50, file) - val reader = new WriteAheadLogRandomReader("file:///" + file.toString) - writtenData.foreach { - x => - val length = x._1.remaining() - assert(x._1 === reader.read(new FileSegment(file.toString, x._2, length))) - } - reader.close() - } - - test("Test reading data written with writer") { - val dir = Files.createTempDir() - val file = new File(dir, "TestWriter") - try { - val dataToWrite = for (i <- 1 to 50) yield TestUtils.generateRandomData() - val segments = TestUtils.writeUsingWriter(file, dataToWrite) - val iter = dataToWrite.iterator - val reader = new WriteAheadLogRandomReader("file:///" + file.toString) - val writtenData = segments.map { x => - reader.read(x) - } - assert(dataToWrite.toArray === writtenData.toArray) - } finally { - file.delete() - dir.delete() - } - } - -} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogReaderSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogReaderSuite.scala deleted file mode 100644 index ce37035524d2c..0000000000000 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogReaderSuite.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.storage - -import java.io.File - -import com.google.common.io.Files - -import org.apache.spark.streaming.TestSuiteBase - -class WriteAheadLogReaderSuite extends TestSuiteBase { - - test("Test success") { - val file = File.createTempFile("testSuccessFulReads", "") - file.deleteOnExit() - val writtenData = TestUtils.writeData(50, file) - val reader = new WriteAheadLogReader("file:///" + file.toString) - val iter = writtenData.iterator - iter.foreach { x => - assert(reader.hasNext === true) - assert(reader.next() === x._1) - } - reader.close() - } - - - test("Test reading data written with writer") { - val dir = Files.createTempDir() - val file = new File(dir, "TestWriter") - try { - val dataToWrite = for (i <- 1 to 50) yield TestUtils.generateRandomData() - val segments = TestUtils.writeUsingWriter(file, dataToWrite) - val iter = dataToWrite.iterator - val reader = new WriteAheadLogReader("file:///" + file.toString) - reader.foreach { x => - assert(x === iter.next()) - } - } finally { - file.delete() - dir.delete() - } - } -} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala new file mode 100644 index 0000000000000..ed21bdbb399fd --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.{RandomAccessFile, File} +import java.nio.ByteBuffer +import java.util.Random + +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.streaming.TestSuiteBase + +class WriteAheadLogSuite extends TestSuiteBase { + + val hadoopConf = new Configuration() + val random = new Random() + + test("Test successful writes") { + val dir = Files.createTempDir() + val file = new File(dir, "TestWriter") + try { + val dataToWrite = for (i <- 1 to 50) yield generateRandomData() + val writer = new WriteAheadLogWriter("file:///" + file.toString, hadoopConf) + val segments = dataToWrite.map(writer.write) + writer.close() + val writtenData = readData(segments, file) + assert(writtenData.toArray === dataToWrite.toArray) + } finally { + file.delete() + dir.delete() + } + } + + test("Test successful reads using random reader") { + val file = File.createTempFile("TestRandomReads", "") + file.deleteOnExit() + val writtenData = writeData(50, file) + val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + writtenData.foreach { + x => + val length = x._1.remaining() + assert(x._1 === reader.read(new FileSegment(file.toString, x._2, length))) + } + reader.close() + } + + test("Test reading data using random reader written with writer") { + val dir = Files.createTempDir() + val file = new File(dir, "TestRandomReads") + try { + val dataToWrite = for (i <- 1 to 50) yield generateRandomData() + val segments = writeUsingWriter(file, dataToWrite) + val iter = dataToWrite.iterator + val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + val writtenData = segments.map { x => + reader.read(x) + } + assert(dataToWrite.toArray === writtenData.toArray) + } finally { + file.delete() + dir.delete() + } + } + + test("Test successful reads using sequential reader") { + val file = File.createTempFile("TestSequentialReads", "") + file.deleteOnExit() + val writtenData = writeData(50, file) + val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) + val iter = writtenData.iterator + iter.foreach { x => + assert(reader.hasNext === true) + assert(reader.next() === x._1) + } + reader.close() + } + + + test("Test reading data using sequential reader written with writer") { + val dir = Files.createTempDir() + val file = new File(dir, "TestWriter") + try { + val dataToWrite = for (i <- 1 to 50) yield generateRandomData() + val segments = writeUsingWriter(file, dataToWrite) + val iter = dataToWrite.iterator + val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) + reader.foreach { x => + assert(x === iter.next()) + } + } finally { + file.delete() + dir.delete() + } + } + + /** + * Writes data to the file and returns the an array of the bytes written. + * @param count + * @return + */ + // We don't want to be using the WAL writer to test the reader - it will be painful to figure + // out where the bug is. Instead generate the file by hand and see if the WAL reader can + // handle it. + def writeData(count: Int, file: File): ArrayBuffer[(ByteBuffer, Long)] = { + val writtenData = new ArrayBuffer[(ByteBuffer, Long)]() + val writer = new RandomAccessFile(file, "rw") + var i = 0 + while (i < count) { + val data = generateRandomData() + writtenData += ((data, writer.getFilePointer)) + data.rewind() + writer.writeInt(data.remaining()) + writer.write(data.array()) + i += 1 + } + writer.close() + writtenData + } + + def readData(segments: Seq[FileSegment], file: File): Seq[ByteBuffer] = { + val reader = new RandomAccessFile(file, "r") + segments.map { x => + reader.seek(x.offset) + val data = new Array[Byte](x.length) + reader.readInt() + reader.readFully(data) + ByteBuffer.wrap(data) + } + } + + def generateRandomData(): ByteBuffer = { + val data = new Array[Byte](random.nextInt(50)) + random.nextBytes(data) + ByteBuffer.wrap(data) + } + + def writeUsingWriter(file: File, input: Seq[ByteBuffer]): Seq[FileSegment] = { + val writer = new WriteAheadLogWriter(file.toString, hadoopConf) + val segments = input.map(writer.write) + writer.close() + segments + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogWriterTestSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogWriterTestSuite.scala deleted file mode 100644 index 446ffbe6f7d51..0000000000000 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogWriterTestSuite.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.storage - -import java.io.File - -import com.google.common.io.Files - -import org.apache.spark.streaming.TestSuiteBase - -class WriteAheadLogWriterTestSuite extends TestSuiteBase { - - test("Test successful writes") { - - val dir = Files.createTempDir() - val file = new File(dir, "TestWriter") - try { - val dataToWrite = for (i <- 1 to 50) yield TestUtils.generateRandomData() - val writer = new WriteAheadLogWriter("file:///" + file.toString) - val segments = dataToWrite.map(writer.write) - writer.close() - val writtenData = TestUtils.readData(segments, file) - assert(writtenData.toArray === dataToWrite.toArray) - } finally { - file.delete() - dir.delete() - } - } -} From a2457e4ede56a8025c8a1c4c08d3513bd4eea2c6 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 3 Oct 2014 13:44:55 -0700 Subject: [PATCH 5/6] Wrap getting the hflush in a method. --- .../streaming/storage/WriteAheadLogWriter.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala index 4bc0094e5a7cd..6f5441210b983 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.storage import java.io.Closeable +import java.lang.reflect.Method import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration @@ -26,13 +27,7 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) private val stream = HdfsUtils.getOutputStream(path, conf) private var nextOffset = stream.getPos private var closed = false - private val hflushMethod = { - try { - Some(classOf[FSDataOutputStream].getMethod("hflush")) - } catch { - case e: Exception => None - } - } + private val hflushMethod = getHflushMethod() // Data is always written as: // - Length - Long @@ -65,6 +60,14 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) hflushMethod.foreach(_.invoke(stream)) } + private def getHflushMethod(): Option[Method] = { + try { + Some(classOf[FSDataOutputStream].getMethod("hflush")) + } catch { + case e: Exception => None + } + } + private def assertOpen() { HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.") } From bbfeae1167c328174a6fbb563028ccce9b9df8b2 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 3 Oct 2014 16:26:28 -0700 Subject: [PATCH 6/6] Use sync method if hflush is not available. --- .../storage/WriteAheadLogWriter.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala index 6f5441210b983..8a2db8305a7e2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -20,6 +20,8 @@ import java.io.Closeable import java.lang.reflect.Method import java.nio.ByteBuffer +import scala.util.Try + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FSDataOutputStream @@ -27,7 +29,7 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) private val stream = HdfsUtils.getOutputStream(path, conf) private var nextOffset = stream.getPos private var closed = false - private val hflushMethod = getHflushMethod() + private val hflushMethod = getHflushOrSync() // Data is always written as: // - Length - Long @@ -46,7 +48,7 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) stream.write(data.get()) } } - hflush() + hflushOrSync() nextOffset = stream.getPos segment } @@ -56,16 +58,17 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) stream.close() } - private def hflush() { + private def hflushOrSync() { hflushMethod.foreach(_.invoke(stream)) } - private def getHflushMethod(): Option[Method] = { - try { + private def getHflushOrSync(): Option[Method] = { + Try { Some(classOf[FSDataOutputStream].getMethod("hflush")) - } catch { - case e: Exception => None - } + }.recover { + case e: NoSuchMethodException => + Some(classOf[FSDataOutputStream].getMethod("sync")) + }.getOrElse(None) } private def assertOpen() {