diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties index 5034021a6..ba637f03f 100644 --- a/src/main/config/secor.common.properties +++ b/src/main/config/secor.common.properties @@ -104,6 +104,13 @@ secor.gs.bucket=secor_gs # Google cloud storage path where files are stored within the bucket. secor.gs.path=data +# Use direct uploads +# WARNING: disables resumable uploads, files are uploaded in a single request +# This may help prevent IOException: insufficient data written, +# see https://github.com/pinterest/secor/issues/177 +# https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload +secor.gs.upload.direct=false + # Zookeeper config. zookeeper.session.timeout.ms=3000 zookeeper.sync.time.ms=200 diff --git a/src/main/config/secor.dev.gs.properties b/src/main/config/secor.dev.gs.properties index d914462eb..5eb765bb1 100644 --- a/src/main/config/secor.dev.gs.properties +++ b/src/main/config/secor.dev.gs.properties @@ -19,4 +19,11 @@ secor.gs.path=data # Application credentials configuration file # https://developers.google.com/identity/protocols/application-default-credentials -secor.gs.credentials.path=google_app_credentials.json \ No newline at end of file +secor.gs.credentials.path=google_app_credentials.json + +# Use direct uploads +# WARNING: disables resumable uploads, files are uploaded in a single request +# This may help prevent IOException: insufficient data written, +# see https://github.com/pinterest/secor/issues/177 +# https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload +secor.gs.upload.direct=false diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 64af04ceb..e8f20c708 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -395,6 +395,10 @@ public int getGsReadTimeoutInMs() { return getInt("secor.gs.read.timeout.ms", 3 * 60000); } + public boolean getGsDirectUpload() { + return getBoolean("secor.gs.upload.direct"); + } + public int getFinalizerDelaySeconds() { return getInt("partitioner.finalizer.delay.seconds"); } diff --git a/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java b/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java index 4fe4894c2..e5a762d58 100644 --- a/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java +++ b/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java @@ -69,6 +69,7 @@ public Handle upload(LogFilePath localPath) throws Exception { final String gsBucket = mConfig.getGsBucket(); final String gsKey = localPath.withPrefix(mConfig.getGsPath()).getLogFilePath(); final File localFile = new File(localPath.getLogFilePath()); + final boolean directUpload = mConfig.getGsDirectUpload(); LOG.info("uploading file {} to gs://{}/{}", localFile, gsBucket, gsKey); @@ -81,6 +82,10 @@ public void run() { try { Storage.Objects.Insert request = mClient.objects().insert(gsBucket, storageObject, storageContent); + if (directUpload) { + request.getMediaHttpUploader().setDirectUploadEnabled(true); + } + request.getMediaHttpUploader().setProgressListener(new MediaHttpUploaderProgressListener() { @Override public void progressChanged(MediaHttpUploader uploader) throws IOException {