Skip to content

Commit

Permalink
#168: Return exported records count after export (#169)
Browse files Browse the repository at this point in the history
Fixes #168
  • Loading branch information
morazow authored Aug 10, 2021
1 parent 2ac4ea2 commit 406d464
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 35 deletions.
18 changes: 12 additions & 6 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ jobs:
strategy:
fail-fast: false
matrix:
scala: [ 2.12.14 ]
exasol-docker-version: [ 6.2.15-d1, 7.0.11 ]
include:
- scala: 2.12.14
exasol-docker-version: 6.2.15-d1
sonar-run: false
- scala: 2.12.14
exasol-docker-version: 7.0.11
sonar-run: true

steps:
- name: Checkout the Repository
Expand All @@ -27,8 +32,8 @@ jobs:
- name: Pull Docker Images
run: |
docker pull exasol/docker-db:${{ matrix.exasol-docker-version }}
docker pull localstack/localstack:0.12.15
docker pull alluxio/alluxio:2.6.0
docker pull localstack/localstack:0.12.16
docker pull alluxio/alluxio:2.6.1
- name: Cache Local SBT Dependencies
uses: actions/cache@v2
Expand All @@ -49,6 +54,7 @@ jobs:
run: ./scripts/ci.sh
env:
SCALA_VERSION: ${{ matrix.scala }}
EXASOL_DOCKER_VERSION: ${{ matrix.exasol-docker-version }}

- name: Upload Coverage Results to Coveralls
run: sbt coveralls
Expand All @@ -57,12 +63,12 @@ jobs:

# This required because of the sonarcloud-github-action docker volume mapping.
- name: Prepare for Sonar Cloud Scan
if: matrix.exasol-docker-version == '7.0.11'
if: matrix.sonar-run
run: |
find . -name scoverage.xml -exec sed -i 's#/home/runner/work/cloud-storage-extension/cloud-storage-extension#/github/workspace#g' {} +
- name: Sonar Cloud Scan
if: matrix.exasol-docker-version == '7.0.11'
if: matrix.sonar-run
uses: sonarsource/sonarcloud-github-action@master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lazy val root =
project
.in(file("."))
.settings(moduleName := "exasol-cloud-storage-extension")
.settings(version := "1.3.0")
.settings(version := "1.3.1")
.settings(orgSettings)
.settings(buildSettings)
.settings(Settings.projectSettings(scalaVersion))
Expand Down
1 change: 1 addition & 0 deletions doc/changes/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Releases

* [1.3.1](changes_1.3.1.md)
* [1.3.0](changes_1.3.0.md)
* [1.2.0](changes_1.2.0.md)
* [1.1.0](changes_1.1.0.md)
Expand Down
28 changes: 28 additions & 0 deletions doc/changes/changes_1.3.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Cloud Storage Extension 1.3.1, released 2021-08-10

Code name: Return Exported Records Count

## Summary

This release fixes an issue in the export so that the number of exported records are shown after the export process.

## Bug Fixes

* #168: Return the number of rows exported after export

## Dependency Updates

### Runtime Dependency Updates

* Updated `com.exasol:parquet-io-java:1.0.2` to `1.0.3`
* Updated `org.alluxio:alluxio-core-client-hdfs:2.6.0` to `2.6.1`

### Test Dependency Updates

* Updated `com.exasol:exasol-testcontainers:3.5.3` to `4.0.0`
* Updated `com.exasol:hamcrest-resultset-matcher:1.4.0` to `1.4.1`
* Updated `com.exasol:test-db-builder-java:3.2.0` to `3.2.1`

### Plugin Updates

* Updated `com.timushev.sbt:sbt-updates:0.5.3` to `0.6.0`
10 changes: 5 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ object Dependencies {

// Runtime dependencies versions
private val ImportExportUDFVersion = "0.2.0"
private val ParquetIOVersion = "1.0.2"
private val ParquetIOVersion = "1.0.3"
private val HadoopVersion = "3.3.1"
private val DeltaVersion = "0.7.0"
private val OrcVersion = "1.6.9"
private val GoogleStorageVersion = "1.9.4-hadoop3"
private val SparkSQLVersion = "3.0.1"
private val AlluxioCoreHDFSVersion = "2.6.0"
private val AlluxioCoreHDFSVersion = "2.6.1"

// Test dependencies versions
private val ScalaTestVersion = "3.2.9"
private val ScalaTestPlusVersion = "1.0.0-M2"
private val MockitoCoreVersion = "3.11.2"
private val HamcrestVersion = "2.2"
private val ExasolHamcrestMatcherVersion = "1.4.0"
private val ExasolTestDBBuilderVersion = "3.2.0"
private val ExasolTestContainersVersion = "3.5.3"
private val ExasolHamcrestMatcherVersion = "1.4.1"
private val ExasolTestDBBuilderVersion = "3.2.1"
private val ExasolTestContainersVersion = "4.0.0"
private val TestContainersLocalstackVersion = "1.16.0"
private val TestContainersScalaVersion = "0.39.5"

Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.1")
// Adds a `dependencyUpdates` task to check Maven repositories for
// dependency updates
// http://github.com/rtimush/sbt-updates
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.3")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.0")

// Adds `scalastyle` a coding style checker and enforcer
// https://github.com/scalastyle/scalastyle-sbt-plugin
Expand Down
2 changes: 1 addition & 1 deletion project/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Used to get updates for plugins
// see https://github.com/rtimush/sbt-updates/issues/10
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.3")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.0")
9 changes: 7 additions & 2 deletions scripts/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ cd "$BASE_DIR"
DEFAULT_SCALA_VERSION=2.12.14

if [[ -z "${SCALA_VERSION:-}" ]]; then
echo "Environment variable SCALA_VERSION is not set"
echo "Using DEFAULT_SCALA_VERSION: $DEFAULT_SCALA_VERSION"
echo "Environment variable SCALA_VERSION is not set."
echo "Using DEFAULT_SCALA_VERSION: $DEFAULT_SCALA_VERSION."
SCALA_VERSION=$DEFAULT_SCALA_VERSION
fi

if [[ -z "${EXASOL_DOCKER_VERSION:-}" ]]; then
echo "Environment variable for EXASOL_DOCKER_VERSION is not set."
echo "Using default version defined in the integration tests."
fi

run_self_check () {
echo "############################################"
echo "# #"
Expand Down
19 changes: 15 additions & 4 deletions src/it/scala/com/exasol/cloudetl/BaseIntegrationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import com.exasol.dbbuilder.dialects.exasol.ExasolObjectFactory
import com.exasol.dbbuilder.dialects.exasol.ExasolSchema
import com.exasol.dbbuilder.dialects.exasol.udf.UdfScript

import com.typesafe.scalalogging.LazyLogging
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll with LazyLogging {
private[this] val JAR_DIRECTORY_PATTERN = "scala-"
private[this] val JAR_NAME_PATTERN = "cloud-storage-extension-"
private[this] val DEFAULT_EXASOL_DOCKER_IMAGE = "7.0.11"

val network = DockerNamedNetwork("it-tests", true)
val exasolContainer = {
val c: ExasolContainer[_] = new ExasolContainer(getExasolDockerImageVersion())
c.withExposedPorts(8563, 2580)
c.withNetwork(network)
c.withReuse(true)
c
Expand Down Expand Up @@ -150,7 +150,18 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
}
}

private[this] def getExasolDockerImageVersion(): String =
System.getProperty("EXASOL_DOCKER_VERSION", DEFAULT_EXASOL_DOCKER_IMAGE)
private[this] def getExasolDockerImageVersion(): String = {
val dockerVersion = System.getenv("EXASOL_DOCKER_VERSION")
if (dockerVersion == null) {
logger.info(
s"No 'EXASOL_DOCKER_VERSION' environment variable is not set, " +
s"using default '$DEFAULT_EXASOL_DOCKER_IMAGE' version."
)
DEFAULT_EXASOL_DOCKER_IMAGE
} else {
logger.info(s"Using docker '$dockerVersion' version from environment.")
dockerVersion
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ trait BaseS3IntegrationTest extends BaseIntegrationTest {
val LONG_MIN = Long.MIN_VALUE
val LONG_MAX = Long.MAX_VALUE

val LOCALSTACK_DOCKER_IMAGE = DockerImageName.parse("localstack/localstack:0.12.15")
val LOCALSTACK_DOCKER_IMAGE = DockerImageName.parse("localstack/localstack:0.12.16")
val s3Container = new LocalStackContainer(LOCALSTACK_DOCKER_IMAGE)
.withServices(S3)
.withReuse(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ object TableDataExporter extends LazyLogging {
val vmId = metadata.getVmId()
val sink = new BatchSizedSink(nodeId, vmId, iterator.size(), columns, bucket)
logger.info(s"Starting export from node: $nodeId, vm: $vmId.")

do {
sink.write(getRow(iterator, columns))
} while (iterator.next())
sink.close()

iterator.emit(sink.getTotalRecords())
logger.info(s"Exported '${sink.getTotalRecords()}' records from node '$nodeId' and vm '$vmId'.")
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/exasol/cloudetl/sink/BatchSizedSink.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.exasol.cloudetl.sink

import java.lang.{Long => JLong}
import java.util.UUID

import com.exasol.cloudetl.bucket.Bucket
Expand Down Expand Up @@ -44,7 +45,7 @@ final class BatchSizedSink(
private[this] var totalRecords: Long = 0

/** Returns the total number of records written so far. */
def getTotalRecords(): Long = totalRecords
def getTotalRecords(): JLong = totalRecords

/** @inheritdoc */
override def createWriter(path: String): Writer[Row] = new Writer[Row] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.exasol.cloudetl.scriptclasses

import java.lang.Long
import java.nio.file.Files
import java.nio.file.Path

Expand Down Expand Up @@ -33,10 +34,7 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat
private[this] var iterator: ExaIterator = _
private[this] val defaultProperties = Map("DATA_FORMAT" -> "PARQUET")

final def createMockedIterator(
resourceDir: String,
extraProperties: Map[String, String]
): ExaIterator = {
final def createMockedIterator(resourceDir: String, extraProperties: Map[String, String]): ExaIterator = {
val properties = defaultProperties ++ Map("BUCKET_PATH" -> resourceDir) ++ extraProperties
val mockedIterator = mockExasolIterator(properties)

Expand Down Expand Up @@ -95,7 +93,7 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat
test("run exports table rows") {
TableDataExporter.run(metadata, iterator)

verify(metadata, times(1)).getInputColumnCount
verify(metadata, times(1)).getInputColumnCount()
for { idx <- 3 to 10 } {
verify(metadata, times(1)).getInputColumnType(idx)
verify(metadata, times(1)).getInputColumnPrecision(idx)
Expand All @@ -111,6 +109,7 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat
verify(iterator, times(2)).getBoolean(8)
verify(iterator, times(2)).getDate(9)
verify(iterator, times(2)).getTimestamp(10)
verify(iterator, times(1)).emit(Long.valueOf(2))
}

test("imports exported rows from a path") {
Expand All @@ -124,8 +123,8 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat

FilesDataImporter.run(mock[ExaMetadata], importIter)

val totalRecords = 2
verify(importIter, times(totalRecords)).emit(Seq(any[Object]): _*)
verify(importIter, times(2)).emit(Seq(any[Object]): _*)
verify(iterator, times(1)).emit(Long.valueOf(2))
}

test("export creates file without compression extension if compression codec is not set") {
Expand All @@ -144,10 +143,7 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat
checkExportFileExtensions(outputPath, ".snappy")
}

private[this] def checkExportFileExtensions(
outputPath: Path,
compressionCodec: String
): Unit = {
private[this] def checkExportFileExtensions(outputPath: Path, compressionCodec: String): Unit = {
assert(getOutputPathFiles().forall(_.endsWith(s"$compressionCodec.parquet")))
()
}
Expand Down

0 comments on commit 406d464

Please sign in to comment.