Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#168: Return exported records count after export #169

Merged
merged 8 commits into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ jobs:
strategy:
fail-fast: false
matrix:
scala: [ 2.12.14 ]
exasol-docker-version: [ 6.2.15-d1, 7.0.11 ]
include:
- scala: 2.12.14
exasol-docker-version: 6.2.15-d1
sonar-run: false
- scala: 2.12.14
exasol-docker-version: 7.0.11
sonar-run: true

steps:
- name: Checkout the Repository
Expand All @@ -27,8 +32,8 @@ jobs:
- name: Pull Docker Images
run: |
docker pull exasol/docker-db:${{ matrix.exasol-docker-version }}
docker pull localstack/localstack:0.12.15
docker pull alluxio/alluxio:2.6.0
docker pull localstack/localstack:0.12.16
docker pull alluxio/alluxio:2.6.1

- name: Cache Local SBT Dependencies
uses: actions/cache@v2
Expand All @@ -49,6 +54,7 @@ jobs:
run: ./scripts/ci.sh
env:
SCALA_VERSION: ${{ matrix.scala }}
EXASOL_DOCKER_VERSION: ${{ matrix.exasol-docker-version }}

- name: Upload Coverage Results to Coveralls
run: sbt coveralls
Expand All @@ -57,12 +63,12 @@ jobs:

# This required because of the sonarcloud-github-action docker volume mapping.
- name: Prepare for Sonar Cloud Scan
if: matrix.exasol-docker-version == '7.0.11'
if: matrix.sonar-run
run: |
find . -name scoverage.xml -exec sed -i 's#/home/runner/work/cloud-storage-extension/cloud-storage-extension#/github/workspace#g' {} +

- name: Sonar Cloud Scan
if: matrix.exasol-docker-version == '7.0.11'
if: matrix.sonar-run
uses: sonarsource/sonarcloud-github-action@master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lazy val root =
project
.in(file("."))
.settings(moduleName := "exasol-cloud-storage-extension")
.settings(version := "1.3.0")
.settings(version := "1.3.1")
.settings(orgSettings)
.settings(buildSettings)
.settings(Settings.projectSettings(scalaVersion))
Expand Down
1 change: 1 addition & 0 deletions doc/changes/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Releases

* [1.3.1](changes_1.3.1.md)
* [1.3.0](changes_1.3.0.md)
* [1.2.0](changes_1.2.0.md)
* [1.1.0](changes_1.1.0.md)
Expand Down
28 changes: 28 additions & 0 deletions doc/changes/changes_1.3.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Cloud Storage Extension 1.3.1, released 2021-08-10

Code name: Return Exported Records Count

## Summary

This release fixes an issue in the export so that the number of exported records are shown after the export process.

## Bug Fixes

* #168: Return the number of rows exported after export

## Dependency Updates

### Runtime Dependency Updates

* Updated `com.exasol:parquet-io-java:1.0.2` to `1.0.3`
* Updated `org.alluxio:alluxio-core-client-hdfs:2.6.0` to `2.6.1`

### Test Dependency Updates

* Updated `com.exasol:exasol-testcontainers:3.5.3` to `4.0.0`
* Updated `com.exasol:hamcrest-resultset-matcher:1.4.0` to `1.4.1`
* Updated `com.exasol:test-db-builder-java:3.2.0` to `3.2.1`

### Plugin Updates

* Updated `com.timushev.sbt:sbt-updates:0.5.3` to `0.6.0`
10 changes: 5 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ object Dependencies {

// Runtime dependencies versions
private val ImportExportUDFVersion = "0.2.0"
private val ParquetIOVersion = "1.0.2"
private val ParquetIOVersion = "1.0.3"
private val HadoopVersion = "3.3.1"
private val DeltaVersion = "0.7.0"
private val OrcVersion = "1.6.9"
private val GoogleStorageVersion = "1.9.4-hadoop3"
private val SparkSQLVersion = "3.0.1"
private val AlluxioCoreHDFSVersion = "2.6.0"
private val AlluxioCoreHDFSVersion = "2.6.1"

// Test dependencies versions
private val ScalaTestVersion = "3.2.9"
private val ScalaTestPlusVersion = "1.0.0-M2"
private val MockitoCoreVersion = "3.11.2"
private val HamcrestVersion = "2.2"
private val ExasolHamcrestMatcherVersion = "1.4.0"
private val ExasolTestDBBuilderVersion = "3.2.0"
private val ExasolTestContainersVersion = "3.5.3"
private val ExasolHamcrestMatcherVersion = "1.4.1"
private val ExasolTestDBBuilderVersion = "3.2.1"
private val ExasolTestContainersVersion = "4.0.0"
private val TestContainersLocalstackVersion = "1.16.0"
private val TestContainersScalaVersion = "0.39.5"

Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.1")
// Adds a `dependencyUpdates` task to check Maven repositories for
// dependency updates
// http://github.com/rtimush/sbt-updates
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.3")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.0")

// Adds `scalastyle` a coding style checker and enforcer
// https://github.com/scalastyle/scalastyle-sbt-plugin
Expand Down
2 changes: 1 addition & 1 deletion project/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Used to get updates for plugins
// see https://github.com/rtimush/sbt-updates/issues/10
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.3")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.0")
9 changes: 7 additions & 2 deletions scripts/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ cd "$BASE_DIR"
DEFAULT_SCALA_VERSION=2.12.14

if [[ -z "${SCALA_VERSION:-}" ]]; then
echo "Environment variable SCALA_VERSION is not set"
echo "Using DEFAULT_SCALA_VERSION: $DEFAULT_SCALA_VERSION"
echo "Environment variable SCALA_VERSION is not set."
echo "Using DEFAULT_SCALA_VERSION: $DEFAULT_SCALA_VERSION."
SCALA_VERSION=$DEFAULT_SCALA_VERSION
fi

if [[ -z "${EXASOL_DOCKER_VERSION:-}" ]]; then
echo "Environment variable for EXASOL_DOCKER_VERSION is not set."
echo "Using default version defined in the integration tests."
fi

run_self_check () {
echo "############################################"
echo "# #"
Expand Down
19 changes: 15 additions & 4 deletions src/it/scala/com/exasol/cloudetl/BaseIntegrationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import com.exasol.dbbuilder.dialects.exasol.ExasolObjectFactory
import com.exasol.dbbuilder.dialects.exasol.ExasolSchema
import com.exasol.dbbuilder.dialects.exasol.udf.UdfScript

import com.typesafe.scalalogging.LazyLogging
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll with LazyLogging {
private[this] val JAR_DIRECTORY_PATTERN = "scala-"
private[this] val JAR_NAME_PATTERN = "cloud-storage-extension-"
private[this] val DEFAULT_EXASOL_DOCKER_IMAGE = "7.0.11"

val network = DockerNamedNetwork("it-tests", true)
val exasolContainer = {
val c: ExasolContainer[_] = new ExasolContainer(getExasolDockerImageVersion())
c.withExposedPorts(8563, 2580)
c.withNetwork(network)
c.withReuse(true)
c
Expand Down Expand Up @@ -150,7 +150,18 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
}
}

private[this] def getExasolDockerImageVersion(): String =
System.getProperty("EXASOL_DOCKER_VERSION", DEFAULT_EXASOL_DOCKER_IMAGE)
private[this] def getExasolDockerImageVersion(): String = {
val dockerVersion = System.getenv("EXASOL_DOCKER_VERSION")
if (dockerVersion == null) {
logger.info(
s"No 'EXASOL_DOCKER_VERSION' environment variable is not set, " +
s"using default '$DEFAULT_EXASOL_DOCKER_IMAGE' version."
)
DEFAULT_EXASOL_DOCKER_IMAGE
} else {
logger.info(s"Using docker '$dockerVersion' version from environment.")
dockerVersion
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ trait BaseS3IntegrationTest extends BaseIntegrationTest {
val LONG_MIN = Long.MIN_VALUE
val LONG_MAX = Long.MAX_VALUE

val LOCALSTACK_DOCKER_IMAGE = DockerImageName.parse("localstack/localstack:0.12.15")
val LOCALSTACK_DOCKER_IMAGE = DockerImageName.parse("localstack/localstack:0.12.16")
val s3Container = new LocalStackContainer(LOCALSTACK_DOCKER_IMAGE)
.withServices(S3)
.withReuse(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ object TableDataExporter extends LazyLogging {
val vmId = metadata.getVmId()
val sink = new BatchSizedSink(nodeId, vmId, iterator.size(), columns, bucket)
logger.info(s"Starting export from node: $nodeId, vm: $vmId.")

do {
sink.write(getRow(iterator, columns))
} while (iterator.next())
sink.close()

iterator.emit(sink.getTotalRecords())
logger.info(s"Exported '${sink.getTotalRecords()}' records from node '$nodeId' and vm '$vmId'.")
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/exasol/cloudetl/sink/BatchSizedSink.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.exasol.cloudetl.sink

import java.lang.{Long => JLong}
import java.util.UUID

import com.exasol.cloudetl.bucket.Bucket
Expand Down Expand Up @@ -44,7 +45,7 @@ final class BatchSizedSink(
private[this] var totalRecords: Long = 0

/** Returns the total number of records written so far. */
def getTotalRecords(): Long = totalRecords
def getTotalRecords(): JLong = totalRecords

/** @inheritdoc */
override def createWriter(path: String): Writer[Row] = new Writer[Row] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.exasol.cloudetl.scriptclasses

import java.lang.Long
import java.nio.file.Files
import java.nio.file.Path

Expand Down Expand Up @@ -33,10 +34,7 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat
private[this] var iterator: ExaIterator = _
private[this] val defaultProperties = Map("DATA_FORMAT" -> "PARQUET")

final def createMockedIterator(
resourceDir: String,
extraProperties: Map[String, String]
): ExaIterator = {
final def createMockedIterator(resourceDir: String, extraProperties: Map[String, String]): ExaIterator = {
val properties = defaultProperties ++ Map("BUCKET_PATH" -> resourceDir) ++ extraProperties
val mockedIterator = mockExasolIterator(properties)

Expand Down Expand Up @@ -95,7 +93,7 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat
test("run exports table rows") {
TableDataExporter.run(metadata, iterator)

verify(metadata, times(1)).getInputColumnCount
verify(metadata, times(1)).getInputColumnCount()
for { idx <- 3 to 10 } {
verify(metadata, times(1)).getInputColumnType(idx)
verify(metadata, times(1)).getInputColumnPrecision(idx)
Expand All @@ -111,6 +109,7 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat
verify(iterator, times(2)).getBoolean(8)
verify(iterator, times(2)).getDate(9)
verify(iterator, times(2)).getTimestamp(10)
verify(iterator, times(1)).emit(Long.valueOf(2))
}

test("imports exported rows from a path") {
Expand All @@ -124,8 +123,8 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat

FilesDataImporter.run(mock[ExaMetadata], importIter)

val totalRecords = 2
verify(importIter, times(totalRecords)).emit(Seq(any[Object]): _*)
verify(importIter, times(2)).emit(Seq(any[Object]): _*)
verify(iterator, times(1)).emit(Long.valueOf(2))
}

test("export creates file without compression extension if compression codec is not set") {
Expand All @@ -144,10 +143,7 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat
checkExportFileExtensions(outputPath, ".snappy")
}

private[this] def checkExportFileExtensions(
outputPath: Path,
compressionCodec: String
): Unit = {
private[this] def checkExportFileExtensions(outputPath: Path, compressionCodec: String): Unit = {
assert(getOutputPathFiles().forall(_.endsWith(s"$compressionCodec.parquet")))
()
}
Expand Down