From 8ffd14fcbff7c07f472d3cf760ed51eeca357f6d Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Mon, 29 Apr 2024 13:34:23 -0600 Subject: [PATCH 01/18] Update hardcoded kafka properties to be configurable --- docker-compose-confluent-cloud.yml | 3 +++ docker-compose.yml | 3 +++ run.sh | 3 +++ sample.env | 3 +++ .../its/jpo/ode/aws/depositor/AwsDepositor.java | 17 +++++++++++++---- 5 files changed, 25 insertions(+), 4 deletions(-) diff --git a/docker-compose-confluent-cloud.yml b/docker-compose-confluent-cloud.yml index 6b7086f..aae2331 100644 --- a/docker-compose-confluent-cloud.yml +++ b/docker-compose-confluent-cloud.yml @@ -22,4 +22,7 @@ services: KAFKA_TYPE: ${KAFKA_TYPE} CONFLUENT_KEY: ${CONFLUENT_KEY} CONFLUENT_SECRET: ${CONFLUENT_SECRET} + KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT} + KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS} + KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7fa1e7e..e9ca4ca 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,9 @@ services: AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} HEADER_ACCEPT: ${HEADER_ACCEPT} HEADER_X_API_KEY: ${HEADER_X_API_KEY} + KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT} + KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS} + KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS} depends_on: - kafka diff --git a/run.sh b/run.sh index a3baff4..4054614 100644 --- a/run.sh +++ b/run.sh @@ -22,6 +22,9 @@ export API_ENDPOINT=$API_ENDPOINT export KAFKA_TYPE=$KAFKA_TYPE export CONFLUENT_KEY=$CONFLUENT_KEY export CONFLUENT_SECRET=$CONFLUENT_SECRET +export KAFKA_ENABLE_AUTO_COMMIT=$KAFKA_ENABLE_AUTO_COMMIT +export KAFKA_AUTO_COMMIT_INTERVAL_MS=$KAFKA_AUTO_COMMIT_INTERVAL_MS +export KAFKA_SESSION_TIMEOUT_MS=$KAFKA_SESSION_TIMEOUT_MS # build echo "Compiling." diff --git a/sample.env b/sample.env index b559847..bbf6ca1 100644 --- a/sample.env +++ b/sample.env @@ -14,6 +14,9 @@ HEADER_X_API_KEY= KAFKA_TYPE= CONFLUENT_KEY= CONFLUENT_SECRET= +KAFKA_ENABLE_AUTO_COMMIT= +KAFKA_AUTO_COMMIT_INTERVAL_MS= +KAFKA_SESSION_TIMEOUT_MS= # MONGODB Variables MONGO_IP=${DOCKER_HOST_IP} diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java index 3dd4ba2..279264c 100644 --- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java +++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java @@ -22,6 +22,7 @@ import java.io.Writer; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -87,6 +88,10 @@ public class AwsDepositor { private String AWS_SESSION_TOKEN; private String AWS_EXPIRATION; + private String KAFKA_ENABLE_AUTO_COMMIT; + private String KAFKA_AUTO_COMMIT_INTERVAL_MS; + private String KAFKA_SESSION_TIMEOUT_MS; + public static void main(String[] args) throws Exception { AwsDepositor awsDepositor = new AwsDepositor(); awsDepositor.run(args); @@ -149,10 +154,14 @@ public void run(String[] args) throws Exception { addConfluentProperties(props); } + KAFKA_AUTO_COMMIT_INTERVAL_MS = getEnvironmentVariable("KAFKA_AUTO_COMMIT_INTERVAL_MS", "1000"); + KAFKA_ENABLE_AUTO_COMMIT = getEnvironmentVariable("KAFKA_ENABLE_AUTO_COMMIT", "false"); + KAFKA_SESSION_TIMEOUT_MS = getEnvironmentVariable("KAFKA_SESSION_TIMEOUT_MS", "30000"); + props.put("group.id", group); - props.put("enable.auto.commit", "false"); - props.put("auto.commit.interval.ms", "1000"); - props.put("session.timeout.ms", "30000"); + props.put("enable.auto.commit", KAFKA_ENABLE_AUTO_COMMIT); + props.put("auto.commit.interval.ms", KAFKA_AUTO_COMMIT_INTERVAL_MS); + props.put("session.timeout.ms", KAFKA_SESSION_TIMEOUT_MS); boolean depositToS3 = false; @@ -176,7 +185,7 @@ public void run(String[] args) throws Exception { boolean gotMessages = false; while (true) { - ConsumerRecords records = stringConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + ConsumerRecords records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); if (records != null && !records.isEmpty()) { for (ConsumerRecord record : records) { try { From a4bebce5068a8d7e9943056eb09cec81343253f8 Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Tue, 30 Apr 2024 12:59:28 -0600 Subject: [PATCH 02/18] Update sample.env comments --- sample.env | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sample.env b/sample.env index bbf6ca1..f2dd709 100644 --- a/sample.env +++ b/sample.env @@ -14,9 +14,9 @@ HEADER_X_API_KEY= KAFKA_TYPE= CONFLUENT_KEY= CONFLUENT_SECRET= -KAFKA_ENABLE_AUTO_COMMIT= -KAFKA_AUTO_COMMIT_INTERVAL_MS= -KAFKA_SESSION_TIMEOUT_MS= +KAFKA_ENABLE_AUTO_COMMIT= # Defaults to false +KAFKA_AUTO_COMMIT_INTERVAL_MS= # Defaults to 1000 +KAFKA_SESSION_TIMEOUT_MS= # Defaults to 30000 # MONGODB Variables MONGO_IP=${DOCKER_HOST_IP} From c733e463d354e5e95642d15afbbac86dd1eeba6e Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Thu, 9 May 2024 07:56:38 -0600 Subject: [PATCH 03/18] Moved sample.env comments to separate lines --- sample.env | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sample.env b/sample.env index f2dd709..46d4fe0 100644 --- a/sample.env +++ b/sample.env @@ -14,9 +14,12 @@ HEADER_X_API_KEY= KAFKA_TYPE= CONFLUENT_KEY= CONFLUENT_SECRET= -KAFKA_ENABLE_AUTO_COMMIT= # Defaults to false -KAFKA_AUTO_COMMIT_INTERVAL_MS= # Defaults to 1000 -KAFKA_SESSION_TIMEOUT_MS= # Defaults to 30000 +# Defaults to false +KAFKA_ENABLE_AUTO_COMMIT= +# Defaults to 1000 +KAFKA_AUTO_COMMIT_INTERVAL_MS= +# Defaults to 30000 +KAFKA_SESSION_TIMEOUT_MS= # MONGODB Variables MONGO_IP=${DOCKER_HOST_IP} From 371f2a3ffa85a9dfdcb02232b120a00509326b5b Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Wed, 15 May 2024 13:08:19 -0600 Subject: [PATCH 04/18] Update AwsDepositor.java to include GCS BLOB support --- .../jpo/ode/aws/depositor/AwsDepositor.java | 40 +++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java index 3dd4ba2..d32ae93 100644 --- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java +++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java @@ -22,6 +22,7 @@ import java.io.Writer; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -47,6 +48,10 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -158,14 +163,22 @@ public void run(String[] args) throws Exception { boolean depositToS3 = false; AmazonS3 s3 = null; AmazonKinesisFirehoseAsync firehose = null; + Storage gcsStorage = null; + if (destination != null && destination.equals("s3")) { depositToS3 = true; s3 = createS3Client(awsRegion); - } else { + } else if (destination != null && destination.equals("firehose")) { firehose = buildFirehoseClient(awsRegion); + } else if (destination != null && destination.equals("gcs")) { + gcsStorage = StorageOptions.getDefaultInstance().getService(); + } else { + logger.error("Invalid destination: " + destination); + System.exit(1); } + while (true) { KafkaConsumer stringConsumer = new KafkaConsumer(props); @@ -176,15 +189,20 @@ public void run(String[] args) throws Exception { boolean gotMessages = false; while (true) { - ConsumerRecords records = stringConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + ConsumerRecords records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); if (records != null && !records.isEmpty()) { for (ConsumerRecord record : records) { try { gotMessages = true; if (depositToS3) { depositToS3(s3, record); - } else { + } else if (destination.equals("firehose")){ depositToFirehose(firehose, record); + } else if (destination.equals("gcs")) { + depositToGCS(gcsStorage, bucketName, record); + } else { + logger.error("Invalid destination: " + destination); + System.exit(1); } } catch (Exception e) { int retryTimeout = 5000; @@ -320,6 +338,22 @@ private void depositToS3(AmazonS3 s3, ConsumerRecord record) thr } } + private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord record) { + String recordValue = record.value(); + Bucket bucket = gcsStorage.get(depositBucket); + byte[] bytes = recordValue.getBytes(Charset.defaultCharset()); + + long time = System.currentTimeMillis(); + String timeStamp = Long.toString(time); + + Blob blob = bucket.create(timeStamp, bytes); + if (blob != null) { + logger.debug("Record successfully uploaded to GCS"); + } else { + logger.error("Failed to upload record to GCS bucket: " + recordValue); + } + } + private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) { // Default is to deposit to Kinesis/Firehose, override via .env // variables if S3 deposit desired From f8708f15011535593bc6c0a7290ef21caefb0de0 Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Wed, 15 May 2024 13:08:51 -0600 Subject: [PATCH 05/18] Add directory for google credential files --- resources/google/.gitignore | 2 ++ resources/google/README.md | 3 +++ resources/google/sample_gcp_service_account.json | 12 ++++++++++++ 3 files changed, 17 insertions(+) create mode 100644 resources/google/.gitignore create mode 100644 resources/google/README.md create mode 100644 resources/google/sample_gcp_service_account.json diff --git a/resources/google/.gitignore b/resources/google/.gitignore new file mode 100644 index 0000000..a00abba --- /dev/null +++ b/resources/google/.gitignore @@ -0,0 +1,2 @@ +*.json +!sample_gcp_service_account.json \ No newline at end of file diff --git a/resources/google/README.md b/resources/google/README.md new file mode 100644 index 0000000..ceb37e8 --- /dev/null +++ b/resources/google/README.md @@ -0,0 +1,3 @@ +# Google Resources + +This folder is used as a location to put the GCP service account json file that will be used for the S3 Depositor. Please name the file gcp_service_account.json, a sample file has been provided in this folder. diff --git a/resources/google/sample_gcp_service_account.json b/resources/google/sample_gcp_service_account.json new file mode 100644 index 0000000..4ce5e90 --- /dev/null +++ b/resources/google/sample_gcp_service_account.json @@ -0,0 +1,12 @@ +{ + "type": "service_account", + "project_id": "", + "private_key_id": "", + "private_key": "", + "client_email": "", + "client_id": "", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "" +} From ab2a4e2abb16ce91238580156c47b01bea47fe08 Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Wed, 15 May 2024 13:12:40 -0600 Subject: [PATCH 06/18] Update sample.env, README --- README.md | 2 +- pom.xml | 6 ++++++ sample.env | 11 +++++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 27a2542..0fdc51d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # AWS Deposit Service -This project is intended to serve as a consumer application to subscribe to a Kafka topic of streaming JSON, package the results as a JSON file, and deposits the resulting file into a predetermined Firehose/Kinesis or S3 bucket. This runs alongside the ODE and when deployed using Docker Compose, runs in a Docker container. +This project is intended to serve as a consumer application to subscribe to a Kafka topic of streaming JSON, package the results as a JSON file, and deposits the resulting file into a predetermined Firehose/Kinesis, S3 Bucket, or Google Cloud Storage Bucket. This runs alongside the ODE and when deployed using Docker Compose, runs in a Docker container. ## Quick Run The use of AWS credentials is being read from the machine's environmental variables. You may also set them in your bash profile. Note that when using Docker Compose from the main `jpo-ode` repository, these variables are set in the `.env` present in that repo. diff --git a/pom.xml b/pom.xml index 7b23662..2e878a6 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,12 @@ 1.7.36 compile + + + com.google.cloud + google-cloud-storage + 2.37.0 + ${project.artifactId} diff --git a/sample.env b/sample.env index b559847..8ef23b7 100644 --- a/sample.env +++ b/sample.env @@ -3,6 +3,7 @@ BOOTSTRAP_SERVER=${DOCKER_HOST_IP}:9092 API_ENDPOINT= DEPOSIT_TOPIC= DEPOSIT_GROUP= +# Bucket name to deposit to (required for all destination types) DEPOSIT_BUCKET_NAME= DEPOSIT_KEY_NAME= AWS_SESSION_TOKEN= @@ -23,3 +24,13 @@ MONGO_DB_PASS= MONGO_PORT=27017 MONGO_URI=mongodb://${MONGO_DB_USER}:${MONGO_DB_PASS}@${MONGO_IP}:${MONGO_PORT}/ MONGO_COLLECTION_TTL=7 # days + +# Destination to deposit messages to +## Currently supported types are "s3" (Amazon S3 Bucket), "firehose" (Amazon Firehose), or "gcs" (Google Cloud Storage) +DESTINATION= + +# Google Cloud Storage Variables +## path to service account key file (json format) +### If using gcs as a destination please note that the service account used must have +### storage.buckets.get and storage.objects.create permissions to deposit messages correctly. +GOOGLE_APPLICATION_CREDENTIALS='' \ No newline at end of file From 3a6622278dec5a50287593e6bbbc933beebbe07a Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Wed, 15 May 2024 13:13:27 -0600 Subject: [PATCH 07/18] docker-compose, run.sh update --- docker-compose-confluent-cloud.yml | 3 +++ docker-compose.yml | 3 +++ run.sh | 1 + 3 files changed, 7 insertions(+) diff --git a/docker-compose-confluent-cloud.yml b/docker-compose-confluent-cloud.yml index 6b7086f..90d35f4 100644 --- a/docker-compose-confluent-cloud.yml +++ b/docker-compose-confluent-cloud.yml @@ -22,4 +22,7 @@ services: KAFKA_TYPE: ${KAFKA_TYPE} CONFLUENT_KEY: ${CONFLUENT_KEY} CONFLUENT_SECRET: ${CONFLUENT_SECRET} + GOOGLE_APPLICATION_CREDENTIALS: '/google/gcp_service_account.json' + volumes: + - ${GOOGLE_APPLICATION_CREDENTIALS}:/google/gcp_service_account.json d \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7fa1e7e..42781c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,9 @@ services: AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} HEADER_ACCEPT: ${HEADER_ACCEPT} HEADER_X_API_KEY: ${HEADER_X_API_KEY} + GOOGLE_APPLICATION_CREDENTIALS: '/google/gcp_service_account.json' + volumes: + - ${GOOGLE_APPLICATION_CREDENTIALS}:/google/gcp_service_account.json depends_on: - kafka diff --git a/run.sh b/run.sh index a3baff4..ab44092 100644 --- a/run.sh +++ b/run.sh @@ -22,6 +22,7 @@ export API_ENDPOINT=$API_ENDPOINT export KAFKA_TYPE=$KAFKA_TYPE export CONFLUENT_KEY=$CONFLUENT_KEY export CONFLUENT_SECRET=$CONFLUENT_SECRET +export GOOGLE_APPLICATION_CREDENTIALS=$GOOGLE_APPLICATION_CREDENTIALS # build echo "Compiling." From 1b9a660e77cf7d0c5738d7b690b4a0a321761ff9 Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Wed, 22 May 2024 08:27:23 -0600 Subject: [PATCH 08/18] Update readme, sample.env, destination null check --- README.md | 7 ++++-- docker-compose-confluent-cloud.yml | 2 +- sample.env | 5 ++-- .../jpo/ode/aws/depositor/AwsDepositor.java | 24 ++++++++++--------- 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 0fdc51d..d0b9c07 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,13 @@ -# AWS Deposit Service +# Message Deposit Service -This project is intended to serve as a consumer application to subscribe to a Kafka topic of streaming JSON, package the results as a JSON file, and deposits the resulting file into a predetermined Firehose/Kinesis, S3 Bucket, or Google Cloud Storage Bucket. This runs alongside the ODE and when deployed using Docker Compose, runs in a Docker container. +This project is intended to serve as a consumer application to subscribe to a Kafka topic of streaming JSON, package the results as a JSON file, and deposit the resulting file into a predetermined Firehose/Kinesis, S3 Bucket, or Google Cloud Storage Bucket (GCS). This runs alongside the ODE and when deployed using Docker Compose, runs in a Docker container. ## Quick Run The use of AWS credentials is being read from the machine's environmental variables. You may also set them in your bash profile. Note that when using Docker Compose from the main `jpo-ode` repository, these variables are set in the `.env` present in that repo. +If depositing to GCS, credentials are read from a JSON service account key file. A sample service account file can be found at ./resources/google/sample_gcp_service_account.json. +Please note that if depositing to GCS the service account will need the storage.buckets.get and storage.objects.create permissions. + ``` export K_AWS_ACCESS_KEY_ID = AccessKeyId export K_AWS_SECRET_ACCESS_SECRET = SecretAccessKey diff --git a/docker-compose-confluent-cloud.yml b/docker-compose-confluent-cloud.yml index 90d35f4..da0fb04 100644 --- a/docker-compose-confluent-cloud.yml +++ b/docker-compose-confluent-cloud.yml @@ -24,5 +24,5 @@ services: CONFLUENT_SECRET: ${CONFLUENT_SECRET} GOOGLE_APPLICATION_CREDENTIALS: '/google/gcp_service_account.json' volumes: - - ${GOOGLE_APPLICATION_CREDENTIALS}:/google/gcp_service_account.json d + - ${GOOGLE_APPLICATION_CREDENTIALS}:/google/gcp_service_account.json \ No newline at end of file diff --git a/sample.env b/sample.env index 8ef23b7..7e20421 100644 --- a/sample.env +++ b/sample.env @@ -31,6 +31,5 @@ DESTINATION= # Google Cloud Storage Variables ## path to service account key file (json format) -### If using gcs as a destination please note that the service account used must have -### storage.buckets.get and storage.objects.create permissions to deposit messages correctly. -GOOGLE_APPLICATION_CREDENTIALS='' \ No newline at end of file +### If GOOGLE_APPLICATION_CREDENTIALS is blank, regardless of destination, it may cause issues when running the S3 depositor with Docker. +GOOGLE_APPLICATION_CREDENTIALS='./resources/google/sample_gcp_service_account.json' \ No newline at end of file diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java index d32ae93..f16ae19 100644 --- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java +++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java @@ -165,17 +165,19 @@ public void run(String[] args) throws Exception { AmazonKinesisFirehoseAsync firehose = null; Storage gcsStorage = null; - if (destination != null && destination.equals("s3")) { - depositToS3 = true; - s3 = createS3Client(awsRegion); - - } else if (destination != null && destination.equals("firehose")) { - firehose = buildFirehoseClient(awsRegion); - } else if (destination != null && destination.equals("gcs")) { - gcsStorage = StorageOptions.getDefaultInstance().getService(); - } else { - logger.error("Invalid destination: " + destination); - System.exit(1); + if (destination != null) { + if (destination.equals("s3")) { + depositToS3 = true; + s3 = createS3Client(awsRegion); + + } else if (destination.equals("firehose")) { + firehose = buildFirehoseClient(awsRegion); + } else if (destination.equals("gcs")) { + gcsStorage = StorageOptions.getDefaultInstance().getService(); + } else { + logger.error("Invalid destination: " + destination); + System.exit(1); + } } From 81f997c8514e5139d1c34f1c1bef66b19630638d Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Wed, 22 May 2024 09:55:12 -0600 Subject: [PATCH 09/18] Remove destination null check --- .../jpo/ode/aws/depositor/AwsDepositor.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java index f16ae19..a73a31a 100644 --- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java +++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java @@ -165,19 +165,16 @@ public void run(String[] args) throws Exception { AmazonKinesisFirehoseAsync firehose = null; Storage gcsStorage = null; - if (destination != null) { - if (destination.equals("s3")) { - depositToS3 = true; - s3 = createS3Client(awsRegion); - - } else if (destination.equals("firehose")) { - firehose = buildFirehoseClient(awsRegion); - } else if (destination.equals("gcs")) { - gcsStorage = StorageOptions.getDefaultInstance().getService(); - } else { - logger.error("Invalid destination: " + destination); - System.exit(1); - } + if (destination.equals("s3")) { + depositToS3 = true; + s3 = createS3Client(awsRegion); + } else if (destination.equals("firehose")) { + firehose = buildFirehoseClient(awsRegion); + } else if (destination.equals("gcs")) { + gcsStorage = StorageOptions.getDefaultInstance().getService(); + } else { + logger.error("Invalid destination: " + destination); + System.exit(1); } From 02adfa7643d6a25ce56d55f6d6587298d609d29f Mon Sep 17 00:00:00 2001 From: dmccoystephenson Date: Fri, 24 May 2024 13:51:09 -0600 Subject: [PATCH 10/18] Updated `Release_notes.md` for 1.5.0 release --- docs/Release_notes.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/Release_notes.md b/docs/Release_notes.md index 40d1869..12043c7 100644 --- a/docs/Release_notes.md +++ b/docs/Release_notes.md @@ -1,6 +1,16 @@ Jpo-s3-deposit Release Notes ---------------------------- +Version 1.5.0, released May 2024 +---------------------------------------- +### **Summary** +The changes for the jpo-s3-deposit 1.5.0 release include updated hard-coded kafka properties to be configurable and added support for GCS BLOB storage. + +Enhancements in this release +- CDOT PR 18: Updated hard-coded kafka properties to be configurable +- CDOT PR 19: Added support for GCS BLOB storage + + Version 1.4.0, released February 2024 ---------------------------------------- From 52590fa0df86c6f85b392e05754c408a2f421604 Mon Sep 17 00:00:00 2001 From: dmccoystephenson Date: Fri, 24 May 2024 14:03:58 -0600 Subject: [PATCH 11/18] Updated month for 1.5.0 release in `Release_notes.md` --- docs/Release_notes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/Release_notes.md b/docs/Release_notes.md index 12043c7..499d6bc 100644 --- a/docs/Release_notes.md +++ b/docs/Release_notes.md @@ -1,7 +1,7 @@ Jpo-s3-deposit Release Notes ---------------------------- -Version 1.5.0, released May 2024 +Version 1.5.0, released June 2024 ---------------------------------------- ### **Summary** The changes for the jpo-s3-deposit 1.5.0 release include updated hard-coded kafka properties to be configurable and added support for GCS BLOB storage. From 45c3c68e8bef5d61a3d031dd11c0f2d92ac8ab0d Mon Sep 17 00:00:00 2001 From: dmccoystephenson Date: Tue, 28 May 2024 13:47:54 -0600 Subject: [PATCH 12/18] Changed version to 1.5.0-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7b23662..8cc40f8 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ usdot.jpo.ode jpo-aws-depositor - 1.4.0-SNAPSHOT + 1.5.0-SNAPSHOT jar JPO AWS Depositor From 1cc54d02ab3c87df0a7d1009581841fee1633a04 Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Thu, 30 May 2024 13:26:21 -0600 Subject: [PATCH 13/18] Update dev container to use Java 21 --- .devcontainer/Dockerfile | 44 +++++++++++++++++---------------- .devcontainer/devcontainer.json | 21 ++++------------ .vscode/launch.json | 33 ++----------------------- 3 files changed, 30 insertions(+), 68 deletions(-) diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 89430ad..8482b7c 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,28 +1,30 @@ -# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.217.4/containers/java/.devcontainer/base.Dockerfile +# Install openJDK version 21 (includes maven, gradle, and node) +FROM cimg/openjdk:21.0.2-node -# [Choice] Java version (use -bullseye variants on local arm64/Apple Silicon): 11, 17, 11-bullseye, 17-bullseye, 11-buster, 17-buster -ARG VARIANT="17" -FROM mcr.microsoft.com/vscode/devcontainers/java:0-${VARIANT} +# set user to root to allow apt-get to run +USER root -# [Option] Install Maven -ARG INSTALL_MAVEN="false" -ARG MAVEN_VERSION="" -# [Option] Install Gradle -ARG INSTALL_GRADLE="false" -ARG GRADLE_VERSION="" -RUN if [ "${INSTALL_MAVEN}" = "true" ]; then su vscode -c "umask 0002 && . /usr/local/sdkman/bin/sdkman-init.sh && sdk install maven \"${MAVEN_VERSION}\""; fi \ - && if [ "${INSTALL_GRADLE}" = "true" ]; then su vscode -c "umask 0002 && . /usr/local/sdkman/bin/sdkman-init.sh && sdk install gradle \"${GRADLE_VERSION}\""; fi +ARG USERNAME=vscode +ARG USER_UID=1000 +ARG USER_GID=$USER_UID -# [Choice] Node.js version: none, lts/*, 16, 14, 12, 10 -ARG NODE_VERSION="none" -RUN if [ "${NODE_VERSION}" != "none" ]; then su vscode -c "umask 0002 && . /usr/local/share/nvm/nvm.sh && nvm install ${NODE_VERSION} 2>&1"; fi - -# [Optional] Uncomment this section to install additional OS packages. -# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ -# && apt-get -y install --no-install-recommends +# Create non-root user vscode with sudo support +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get update \ + # + # Create a non-root user to use if preferred - see https://aka.ms/vscode-remote/containers/non-root-user. + && groupadd --gid $USER_GID $USERNAME \ + && useradd -s /bin/bash --uid $USER_UID --gid $USER_GID -m $USERNAME \ + && apt-get install -y sudo \ + && echo $USERNAME ALL=\(root\) NOPASSWD:ALL > /etc/sudoers.d/$USERNAME\ + && chmod 0440 /etc/sudoers.d/$USERNAME # [Optional] Uncomment this line to install global node packages. -# RUN su vscode -c "source /usr/local/share/nvm/nvm.sh && npm install -g " 2>&1 +# RUN npm install -g # install kafkacat for testing purposes -RUN apt-get update && apt-get install -y kafkacat \ No newline at end of file +RUN apt-get update && apt-get install -y kafkacat + +# [Optional] Uncomment this section to install additional OS packages. +# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ +# && apt-get -y install --no-install-recommends \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 7356ea1..05f2647 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,22 +3,7 @@ { "name": "Java", "build": { - "dockerfile": "Dockerfile", - "args": { - // Update the VARIANT arg to pick a Java version: 11, 17 - // Append -bullseye or -buster to pin to an OS version. - // Use the -bullseye variants on local arm64/Apple Silicon. - "VARIANT": "11", - // Options - "INSTALL_MAVEN": "true", - "INSTALL_GRADLE": "false", - "NODE_VERSION": "none" - } - }, - - // Set *default* container specific settings.json values on container create. - "settings": { - "java.home": "/docker-java-home" + "dockerfile": "Dockerfile" }, // Add the IDs of extensions you want installed when the container is created. @@ -26,6 +11,10 @@ "vscjava.vscode-java-pack" ], + "containerEnv": { + "SHELL": "/bin/bash" + }, + // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], diff --git a/.vscode/launch.json b/.vscode/launch.json index 8a1a4c4..e69fa28 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,40 +6,11 @@ "configurations": [ { "type": "java", - "name": "S3D Local", + "name": "S3D", "request": "launch", "mainClass": "us.dot.its.jpo.ode.aws.depositor.AwsDepositor", "projectName": "jpo-aws-depositor", - "args": ["--bootstrap-server", ":9092", - "-d", "s3", - "-g", "testgroup", - "-k", "testkey", - "-b", "s3dtestbucket", - "-t", "test"], - "env": { - "AWS_ACCESS_KEY_ID": "", - "AWS_SECRET_KEY": "" - } - }, - { - "type": "java", - "name": "S3D CC", - "request": "launch", - "mainClass": "us.dot.its.jpo.ode.aws.depositor.AwsDepositor", - "projectName": "jpo-aws-depositor", - "args": ["--bootstrap-server", ":9092", - "-g", "testgroup", - "-t", "topic.OdeBsmJson", - "-b", "s3dtestbucket", - "-k", "testkey", - "-d", "s3"], - "env": { - "AWS_ACCESS_KEY_ID": "", - "AWS_SECRET_KEY": "", - "KAFKA_TYPE": "CONFLUENT", - "CONFLUENT_KEY": "", - "CONFLUENT_SECRET": "" - } + "envFile": "${workspaceFolder}/.env", } ] } \ No newline at end of file From c8a04ab3d098ee54d918774b147600992e2ae011 Mon Sep 17 00:00:00 2001 From: dmccoystephenson Date: Fri, 31 May 2024 11:26:04 -0600 Subject: [PATCH 14/18] Added CDOT PR 22 to release notes --- docs/Release_notes.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/Release_notes.md b/docs/Release_notes.md index 499d6bc..b068ad4 100644 --- a/docs/Release_notes.md +++ b/docs/Release_notes.md @@ -4,11 +4,12 @@ Jpo-s3-deposit Release Notes Version 1.5.0, released June 2024 ---------------------------------------- ### **Summary** -The changes for the jpo-s3-deposit 1.5.0 release include updated hard-coded kafka properties to be configurable and added support for GCS BLOB storage. +The changes for the jpo-s3-deposit 1.5.0 release include updated hard-coded kafka properties to be configurable, added support for GCS BLOB storage & an updated java version for the dev container. Enhancements in this release - CDOT PR 18: Updated hard-coded kafka properties to be configurable - CDOT PR 19: Added support for GCS BLOB storage +- CDOT PR 22: Updated dev container java version to 21 Version 1.4.0, released February 2024 From b664cf8e6b9106954224ee1b37a14a67307794a3 Mon Sep 17 00:00:00 2001 From: dmccoystephenson Date: Mon, 3 Jun 2024 15:04:00 -0600 Subject: [PATCH 15/18] Updated capitalization in release notes for version 1.5.0 --- docs/Release_notes.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/Release_notes.md b/docs/Release_notes.md index b068ad4..b65d2bc 100644 --- a/docs/Release_notes.md +++ b/docs/Release_notes.md @@ -4,12 +4,15 @@ Jpo-s3-deposit Release Notes Version 1.5.0, released June 2024 ---------------------------------------- ### **Summary** -The changes for the jpo-s3-deposit 1.5.0 release include updated hard-coded kafka properties to be configurable, added support for GCS BLOB storage & an updated java version for the dev container. +The changes for the jpo-s3-deposit 1.5.0 release include updated hard-coded Kafka properties to be configurable, added support for GCS BLOB storage & an updated Java version for the dev container. Enhancements in this release -- CDOT PR 18: Updated hard-coded kafka properties to be configurable +- CDOT PR 18: Updated hard-coded Kafka properties to be configurable - CDOT PR 19: Added support for GCS BLOB storage -- CDOT PR 22: Updated dev container java version to 21 +- CDOT PR 22: Updated dev container Java version to 21 + +Known Issues: +- No known issues at this time. Version 1.4.0, released February 2024 From 3ca277a115658481883b24b05026057c49cb9cf1 Mon Sep 17 00:00:00 2001 From: dmccoystephenson Date: Tue, 11 Jun 2024 07:17:00 -0600 Subject: [PATCH 16/18] Generalized some log statements in AwsDepositor --- .../java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java index 98fe5fc..3a39fff 100644 --- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java +++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java @@ -213,8 +213,8 @@ public void run(String[] args) throws Exception { } } catch (Exception e) { int retryTimeout = 5000; - logger.error("Error depositing to AWS. Retrying in " + retryTimeout / 1000 + " seconds", - e); + String destinationName = depositToS3 ? "S3" : destination; + logger.error("Error depositing to destination '" + destinationName + "'. Retrying in " + retryTimeout / 1000 + " seconds", e); Thread.sleep(retryTimeout); } } @@ -226,7 +226,7 @@ public void run(String[] args) throws Exception { } } } catch (Exception e) { - logger.error("Server Error. reconnecting to AWS ", e); + logger.error("Server Error. reconnecting to destination ", e); } finally { stringConsumer.close(); } From b2083fbc3d7ee94cc9443fb3d71cab47b701aef3 Mon Sep 17 00:00:00 2001 From: dmccoystephenson Date: Tue, 11 Jun 2024 07:19:06 -0600 Subject: [PATCH 17/18] Added comment regarding GOOGLE_APPLICATION_CREDENTIALS env var --- src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java index 3a39fff..d85a2d4 100644 --- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java +++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java @@ -179,6 +179,7 @@ public void run(String[] args) throws Exception { } else if (destination.equals("firehose")) { firehose = buildFirehoseClient(awsRegion); } else if (destination.equals("gcs")) { + // The file path specified by GOOGLE_APPLICATION_CREDENTIALS will be used here. gcsStorage = StorageOptions.getDefaultInstance().getService(); } else { logger.error("Invalid destination: " + destination); From 981a39f7dfb323e1d102b186980aa5c3b9351bcf Mon Sep 17 00:00:00 2001 From: Saikrishna Bairamoni <84093461+SaikrishnaBairamoni@users.noreply.github.com> Date: Wed, 12 Jun 2024 12:00:41 -0400 Subject: [PATCH 18/18] Update dockerhub.yml --- .github/workflows/dockerhub.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/dockerhub.yml b/.github/workflows/dockerhub.yml index e332482..2ec9f06 100644 --- a/.github/workflows/dockerhub.yml +++ b/.github/workflows/dockerhub.yml @@ -20,8 +20,12 @@ jobs: with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Replcae Docker tag + id: set_tag + run: echo "TAG=$(echo ${GITHUB_REF##*/} | sed 's/\//-/g')" >> $GITHUB_ENV + - name: Build uses: docker/build-push-action@v3 with: push: true - tags: usdotjpoode/jpo-s3-deposit:${{ github.ref_name }} + tags: usdotjpoode/jpo-s3-deposit:${{ env.TAG }}