diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala index 75b224afca39b..069a9a215675c 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala @@ -33,7 +33,7 @@ object EventTransformer extends Logging { Array[Byte]) = { val bodyLength = in.readInt() val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) + in.readFully(bodyBuff) val numHeaders = in.readInt() val headers = new java.util.HashMap[CharSequence, CharSequence] @@ -41,12 +41,12 @@ object EventTransformer extends Logging { for (i <- 0 until numHeaders) { val keyLength = in.readInt() val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) + in.readFully(keyBuff) val key: String = Utils.deserialize(keyBuff) val valLength = in.readInt() val valBuff = new Array[Byte](valLength) - in.read(valBuff) + in.readFully(valBuff) val value: String = Utils.deserialize(valBuff) headers.put(key, value)