diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 89430ad..8482b7c 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,28 +1,30 @@ -# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.217.4/containers/java/.devcontainer/base.Dockerfile +# Install openJDK version 21 (includes maven, gradle, and node) +FROM cimg/openjdk:21.0.2-node -# [Choice] Java version (use -bullseye variants on local arm64/Apple Silicon): 11, 17, 11-bullseye, 17-bullseye, 11-buster, 17-buster -ARG VARIANT="17" -FROM mcr.microsoft.com/vscode/devcontainers/java:0-${VARIANT} +# set user to root to allow apt-get to run +USER root -# [Option] Install Maven -ARG INSTALL_MAVEN="false" -ARG MAVEN_VERSION="" -# [Option] Install Gradle -ARG INSTALL_GRADLE="false" -ARG GRADLE_VERSION="" -RUN if [ "${INSTALL_MAVEN}" = "true" ]; then su vscode -c "umask 0002 && . /usr/local/sdkman/bin/sdkman-init.sh && sdk install maven \"${MAVEN_VERSION}\""; fi \ - && if [ "${INSTALL_GRADLE}" = "true" ]; then su vscode -c "umask 0002 && . /usr/local/sdkman/bin/sdkman-init.sh && sdk install gradle \"${GRADLE_VERSION}\""; fi +ARG USERNAME=vscode +ARG USER_UID=1000 +ARG USER_GID=$USER_UID -# [Choice] Node.js version: none, lts/*, 16, 14, 12, 10 -ARG NODE_VERSION="none" -RUN if [ "${NODE_VERSION}" != "none" ]; then su vscode -c "umask 0002 && . /usr/local/share/nvm/nvm.sh && nvm install ${NODE_VERSION} 2>&1"; fi - -# [Optional] Uncomment this section to install additional OS packages. -# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ -# && apt-get -y install --no-install-recommends +# Create non-root user vscode with sudo support +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get update \ + # + # Create a non-root user to use if preferred - see https://aka.ms/vscode-remote/containers/non-root-user. + && groupadd --gid $USER_GID $USERNAME \ + && useradd -s /bin/bash --uid $USER_UID --gid $USER_GID -m $USERNAME \ + && apt-get install -y sudo \ + && echo $USERNAME ALL=\(root\) NOPASSWD:ALL > /etc/sudoers.d/$USERNAME\ + && chmod 0440 /etc/sudoers.d/$USERNAME # [Optional] Uncomment this line to install global node packages. -# RUN su vscode -c "source /usr/local/share/nvm/nvm.sh && npm install -g " 2>&1 +# RUN npm install -g # install kafkacat for testing purposes -RUN apt-get update && apt-get install -y kafkacat \ No newline at end of file +RUN apt-get update && apt-get install -y kafkacat + +# [Optional] Uncomment this section to install additional OS packages. +# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ +# && apt-get -y install --no-install-recommends \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 7356ea1..05f2647 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,22 +3,7 @@ { "name": "Java", "build": { - "dockerfile": "Dockerfile", - "args": { - // Update the VARIANT arg to pick a Java version: 11, 17 - // Append -bullseye or -buster to pin to an OS version. - // Use the -bullseye variants on local arm64/Apple Silicon. - "VARIANT": "11", - // Options - "INSTALL_MAVEN": "true", - "INSTALL_GRADLE": "false", - "NODE_VERSION": "none" - } - }, - - // Set *default* container specific settings.json values on container create. - "settings": { - "java.home": "/docker-java-home" + "dockerfile": "Dockerfile" }, // Add the IDs of extensions you want installed when the container is created. @@ -26,6 +11,10 @@ "vscjava.vscode-java-pack" ], + "containerEnv": { + "SHELL": "/bin/bash" + }, + // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], diff --git a/.github/workflows/dockerhub.yml b/.github/workflows/dockerhub.yml index e332482..2ec9f06 100644 --- a/.github/workflows/dockerhub.yml +++ b/.github/workflows/dockerhub.yml @@ -20,8 +20,12 @@ jobs: with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Replcae Docker tag + id: set_tag + run: echo "TAG=$(echo ${GITHUB_REF##*/} | sed 's/\//-/g')" >> $GITHUB_ENV + - name: Build uses: docker/build-push-action@v3 with: push: true - tags: usdotjpoode/jpo-s3-deposit:${{ github.ref_name }} + tags: usdotjpoode/jpo-s3-deposit:${{ env.TAG }} diff --git a/.vscode/launch.json b/.vscode/launch.json index 8a1a4c4..e69fa28 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,40 +6,11 @@ "configurations": [ { "type": "java", - "name": "S3D Local", + "name": "S3D", "request": "launch", "mainClass": "us.dot.its.jpo.ode.aws.depositor.AwsDepositor", "projectName": "jpo-aws-depositor", - "args": ["--bootstrap-server", ":9092", - "-d", "s3", - "-g", "testgroup", - "-k", "testkey", - "-b", "s3dtestbucket", - "-t", "test"], - "env": { - "AWS_ACCESS_KEY_ID": "", - "AWS_SECRET_KEY": "" - } - }, - { - "type": "java", - "name": "S3D CC", - "request": "launch", - "mainClass": "us.dot.its.jpo.ode.aws.depositor.AwsDepositor", - "projectName": "jpo-aws-depositor", - "args": ["--bootstrap-server", ":9092", - "-g", "testgroup", - "-t", "topic.OdeBsmJson", - "-b", "s3dtestbucket", - "-k", "testkey", - "-d", "s3"], - "env": { - "AWS_ACCESS_KEY_ID": "", - "AWS_SECRET_KEY": "", - "KAFKA_TYPE": "CONFLUENT", - "CONFLUENT_KEY": "", - "CONFLUENT_SECRET": "" - } + "envFile": "${workspaceFolder}/.env", } ] } \ No newline at end of file diff --git a/README.md b/README.md index 27a2542..d0b9c07 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,13 @@ -# AWS Deposit Service +# Message Deposit Service -This project is intended to serve as a consumer application to subscribe to a Kafka topic of streaming JSON, package the results as a JSON file, and deposits the resulting file into a predetermined Firehose/Kinesis or S3 bucket. This runs alongside the ODE and when deployed using Docker Compose, runs in a Docker container. +This project is intended to serve as a consumer application to subscribe to a Kafka topic of streaming JSON, package the results as a JSON file, and deposit the resulting file into a predetermined Firehose/Kinesis, S3 Bucket, or Google Cloud Storage Bucket (GCS). This runs alongside the ODE and when deployed using Docker Compose, runs in a Docker container. ## Quick Run The use of AWS credentials is being read from the machine's environmental variables. You may also set them in your bash profile. Note that when using Docker Compose from the main `jpo-ode` repository, these variables are set in the `.env` present in that repo. +If depositing to GCS, credentials are read from a JSON service account key file. A sample service account file can be found at ./resources/google/sample_gcp_service_account.json. +Please note that if depositing to GCS the service account will need the storage.buckets.get and storage.objects.create permissions. + ``` export K_AWS_ACCESS_KEY_ID = AccessKeyId export K_AWS_SECRET_ACCESS_SECRET = SecretAccessKey diff --git a/docker-compose-confluent-cloud.yml b/docker-compose-confluent-cloud.yml index 6b7086f..373dcc7 100644 --- a/docker-compose-confluent-cloud.yml +++ b/docker-compose-confluent-cloud.yml @@ -22,4 +22,10 @@ services: KAFKA_TYPE: ${KAFKA_TYPE} CONFLUENT_KEY: ${CONFLUENT_KEY} CONFLUENT_SECRET: ${CONFLUENT_SECRET} + KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT} + KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS} + KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS} + GOOGLE_APPLICATION_CREDENTIALS: '/google/gcp_service_account.json' + volumes: + - ${GOOGLE_APPLICATION_CREDENTIALS}:/google/gcp_service_account.json \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7fa1e7e..8a53bf8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,12 @@ services: AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} HEADER_ACCEPT: ${HEADER_ACCEPT} HEADER_X_API_KEY: ${HEADER_X_API_KEY} + KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT} + KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS} + KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS} + GOOGLE_APPLICATION_CREDENTIALS: '/google/gcp_service_account.json' + volumes: + - ${GOOGLE_APPLICATION_CREDENTIALS}:/google/gcp_service_account.json depends_on: - kafka diff --git a/docs/Release_notes.md b/docs/Release_notes.md index 40d1869..b65d2bc 100644 --- a/docs/Release_notes.md +++ b/docs/Release_notes.md @@ -1,6 +1,20 @@ Jpo-s3-deposit Release Notes ---------------------------- +Version 1.5.0, released June 2024 +---------------------------------------- +### **Summary** +The changes for the jpo-s3-deposit 1.5.0 release include updated hard-coded Kafka properties to be configurable, added support for GCS BLOB storage & an updated Java version for the dev container. + +Enhancements in this release +- CDOT PR 18: Updated hard-coded Kafka properties to be configurable +- CDOT PR 19: Added support for GCS BLOB storage +- CDOT PR 22: Updated dev container Java version to 21 + +Known Issues: +- No known issues at this time. + + Version 1.4.0, released February 2024 ---------------------------------------- diff --git a/pom.xml b/pom.xml index 7b23662..d6e0ad1 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ usdot.jpo.ode jpo-aws-depositor - 1.4.0-SNAPSHOT + 1.5.0-SNAPSHOT jar JPO AWS Depositor @@ -73,6 +73,12 @@ 1.7.36 compile + + + com.google.cloud + google-cloud-storage + 2.37.0 + ${project.artifactId} diff --git a/resources/google/.gitignore b/resources/google/.gitignore new file mode 100644 index 0000000..a00abba --- /dev/null +++ b/resources/google/.gitignore @@ -0,0 +1,2 @@ +*.json +!sample_gcp_service_account.json \ No newline at end of file diff --git a/resources/google/README.md b/resources/google/README.md new file mode 100644 index 0000000..ceb37e8 --- /dev/null +++ b/resources/google/README.md @@ -0,0 +1,3 @@ +# Google Resources + +This folder is used as a location to put the GCP service account json file that will be used for the S3 Depositor. Please name the file gcp_service_account.json, a sample file has been provided in this folder. diff --git a/resources/google/sample_gcp_service_account.json b/resources/google/sample_gcp_service_account.json new file mode 100644 index 0000000..4ce5e90 --- /dev/null +++ b/resources/google/sample_gcp_service_account.json @@ -0,0 +1,12 @@ +{ + "type": "service_account", + "project_id": "", + "private_key_id": "", + "private_key": "", + "client_email": "", + "client_id": "", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "" +} diff --git a/run.sh b/run.sh index a3baff4..3063665 100644 --- a/run.sh +++ b/run.sh @@ -22,6 +22,10 @@ export API_ENDPOINT=$API_ENDPOINT export KAFKA_TYPE=$KAFKA_TYPE export CONFLUENT_KEY=$CONFLUENT_KEY export CONFLUENT_SECRET=$CONFLUENT_SECRET +export GOOGLE_APPLICATION_CREDENTIALS=$GOOGLE_APPLICATION_CREDENTIALS +export KAFKA_ENABLE_AUTO_COMMIT=$KAFKA_ENABLE_AUTO_COMMIT +export KAFKA_AUTO_COMMIT_INTERVAL_MS=$KAFKA_AUTO_COMMIT_INTERVAL_MS +export KAFKA_SESSION_TIMEOUT_MS=$KAFKA_SESSION_TIMEOUT_MS # build echo "Compiling." diff --git a/sample.env b/sample.env index b559847..a7f202e 100644 --- a/sample.env +++ b/sample.env @@ -3,6 +3,7 @@ BOOTSTRAP_SERVER=${DOCKER_HOST_IP}:9092 API_ENDPOINT= DEPOSIT_TOPIC= DEPOSIT_GROUP= +# Bucket name to deposit to (required for all destination types) DEPOSIT_BUCKET_NAME= DEPOSIT_KEY_NAME= AWS_SESSION_TOKEN= @@ -14,6 +15,12 @@ HEADER_X_API_KEY= KAFKA_TYPE= CONFLUENT_KEY= CONFLUENT_SECRET= +# Defaults to false +KAFKA_ENABLE_AUTO_COMMIT= +# Defaults to 1000 +KAFKA_AUTO_COMMIT_INTERVAL_MS= +# Defaults to 30000 +KAFKA_SESSION_TIMEOUT_MS= # MONGODB Variables MONGO_IP=${DOCKER_HOST_IP} @@ -23,3 +30,12 @@ MONGO_DB_PASS= MONGO_PORT=27017 MONGO_URI=mongodb://${MONGO_DB_USER}:${MONGO_DB_PASS}@${MONGO_IP}:${MONGO_PORT}/ MONGO_COLLECTION_TTL=7 # days + +# Destination to deposit messages to +## Currently supported types are "s3" (Amazon S3 Bucket), "firehose" (Amazon Firehose), or "gcs" (Google Cloud Storage) +DESTINATION= + +# Google Cloud Storage Variables +## path to service account key file (json format) +### If GOOGLE_APPLICATION_CREDENTIALS is blank, regardless of destination, it may cause issues when running the S3 depositor with Docker. +GOOGLE_APPLICATION_CREDENTIALS='./resources/google/sample_gcp_service_account.json' \ No newline at end of file diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java index 3dd4ba2..d85a2d4 100644 --- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java +++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java @@ -22,6 +22,7 @@ import java.io.Writer; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -47,6 +48,10 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -87,6 +92,10 @@ public class AwsDepositor { private String AWS_SESSION_TOKEN; private String AWS_EXPIRATION; + private String KAFKA_ENABLE_AUTO_COMMIT; + private String KAFKA_AUTO_COMMIT_INTERVAL_MS; + private String KAFKA_SESSION_TIMEOUT_MS; + public static void main(String[] args) throws Exception { AwsDepositor awsDepositor = new AwsDepositor(); awsDepositor.run(args); @@ -149,23 +158,35 @@ public void run(String[] args) throws Exception { addConfluentProperties(props); } + KAFKA_AUTO_COMMIT_INTERVAL_MS = getEnvironmentVariable("KAFKA_AUTO_COMMIT_INTERVAL_MS", "1000"); + KAFKA_ENABLE_AUTO_COMMIT = getEnvironmentVariable("KAFKA_ENABLE_AUTO_COMMIT", "false"); + KAFKA_SESSION_TIMEOUT_MS = getEnvironmentVariable("KAFKA_SESSION_TIMEOUT_MS", "30000"); + props.put("group.id", group); - props.put("enable.auto.commit", "false"); - props.put("auto.commit.interval.ms", "1000"); - props.put("session.timeout.ms", "30000"); + props.put("enable.auto.commit", KAFKA_ENABLE_AUTO_COMMIT); + props.put("auto.commit.interval.ms", KAFKA_AUTO_COMMIT_INTERVAL_MS); + props.put("session.timeout.ms", KAFKA_SESSION_TIMEOUT_MS); boolean depositToS3 = false; AmazonS3 s3 = null; AmazonKinesisFirehoseAsync firehose = null; - if (destination != null && destination.equals("s3")) { + Storage gcsStorage = null; + + if (destination.equals("s3")) { depositToS3 = true; s3 = createS3Client(awsRegion); - - } else { + } else if (destination.equals("firehose")) { firehose = buildFirehoseClient(awsRegion); + } else if (destination.equals("gcs")) { + // The file path specified by GOOGLE_APPLICATION_CREDENTIALS will be used here. + gcsStorage = StorageOptions.getDefaultInstance().getService(); + } else { + logger.error("Invalid destination: " + destination); + System.exit(1); } + while (true) { KafkaConsumer stringConsumer = new KafkaConsumer(props); @@ -176,20 +197,25 @@ public void run(String[] args) throws Exception { boolean gotMessages = false; while (true) { - ConsumerRecords records = stringConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + ConsumerRecords records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); if (records != null && !records.isEmpty()) { for (ConsumerRecord record : records) { try { gotMessages = true; if (depositToS3) { depositToS3(s3, record); - } else { + } else if (destination.equals("firehose")){ depositToFirehose(firehose, record); + } else if (destination.equals("gcs")) { + depositToGCS(gcsStorage, bucketName, record); + } else { + logger.error("Invalid destination: " + destination); + System.exit(1); } } catch (Exception e) { int retryTimeout = 5000; - logger.error("Error depositing to AWS. Retrying in " + retryTimeout / 1000 + " seconds", - e); + String destinationName = depositToS3 ? "S3" : destination; + logger.error("Error depositing to destination '" + destinationName + "'. Retrying in " + retryTimeout / 1000 + " seconds", e); Thread.sleep(retryTimeout); } } @@ -201,7 +227,7 @@ public void run(String[] args) throws Exception { } } } catch (Exception e) { - logger.error("Server Error. reconnecting to AWS ", e); + logger.error("Server Error. reconnecting to destination ", e); } finally { stringConsumer.close(); } @@ -320,6 +346,22 @@ private void depositToS3(AmazonS3 s3, ConsumerRecord record) thr } } + private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord record) { + String recordValue = record.value(); + Bucket bucket = gcsStorage.get(depositBucket); + byte[] bytes = recordValue.getBytes(Charset.defaultCharset()); + + long time = System.currentTimeMillis(); + String timeStamp = Long.toString(time); + + Blob blob = bucket.create(timeStamp, bytes); + if (blob != null) { + logger.debug("Record successfully uploaded to GCS"); + } else { + logger.error("Failed to upload record to GCS bucket: " + recordValue); + } + } + private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) { // Default is to deposit to Kinesis/Firehose, override via .env // variables if S3 deposit desired