Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decrease memory usage when csv&gzip is on #212

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.doris.spark.load
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.json.JsonMapper
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
Expand All @@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.slf4j.{Logger, LoggerFactory}

import java.io.{ByteArrayOutputStream, IOException}
import java.io.{ByteArrayOutputStream, IOException, InputStream}
import java.net.{HttpURLConnection, URL}
import java.nio.charset.StandardCharsets
import java.util
Expand Down Expand Up @@ -375,14 +376,13 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader

if (compressType.nonEmpty) {
if ("gz".equalsIgnoreCase(compressType.get) && format == DataFormat.CSV) {
val recordBatchString = new RecordBatchString(RecordBatch.newBuilder(iterator.asJava)
val recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(iterator.asJava)
.format(format)
.sep(columnSeparator)
.delim(lineDelimiter)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough)
val content = recordBatchString.getContent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that formatting the entire batch of data into a string object takes up extra memory, and copying between streams reduces the memory usage to a read buffer size.

val compressedData = compressByGZ(content)
val compressedData = compressByGZ(recodeBatchInputStream)
entity = Some(new ByteArrayEntity(compressedData))
}
else {
Expand Down Expand Up @@ -457,6 +457,31 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader
compressedData
}

/**
* compress data by gzip
*
* @param contentInputStream data content
* @throws
* @return compressed byte array data
*/
@throws[IOException]
def compressByGZ(contentInputStream: InputStream): Array[Byte] = {
var compressedData: Array[Byte] = null
try {
val baos = new ByteArrayOutputStream
val gzipOutputStream = new GZIPOutputStream(baos)
try {
IOUtils.copy(contentInputStream, gzipOutputStream)
gzipOutputStream.finish()
compressedData = baos.toByteArray
} finally {
if (baos != null) baos.close()
if (gzipOutputStream != null) gzipOutputStream.close()
}
}
compressedData
}

/**
* handle stream load response
*
Expand Down
Loading