Skip to content

Commit

Permalink
source-mssql: convert to bulk CDK
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Dec 5, 2024
1 parent 8182b8e commit b685ade
Show file tree
Hide file tree
Showing 103 changed files with 6,709 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FeedReader(

/** Reads records from this [feed]. */
suspend fun read() {
log.info { "SGX reading stream${feed.label}" }
var partitionsCreatorID = 1L
while (true) {
// Create PartitionReader instances.
Expand All @@ -49,6 +50,10 @@ class FeedReader(
// Publish stream completion.
root.streamStatusManager.notifyComplete(feed)
break
} else {
log.info {
"SGX reading partition $partitionsCreatorID for feed ${feed.label}. partitionReaders=${partitionReaders}"
}
}
// Launch coroutines which read from each partition.
val scheduledPartitionReaders =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package io.airbyte.cdk.read

import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.read.PartitionsCreator.TryAcquireResourcesStatus
import io.github.oshai.kotlinlogging.KotlinLogging

private val log = KotlinLogging.logger {}
/**
* [PartitionsCreatorFactory] must be implemented by each source connector and serves as the
* entrypoint to how READ operations are executed for that connector, via the [PartitionsCreator]
Expand Down Expand Up @@ -153,4 +155,8 @@ interface PartitionReader {
data class PartitionReadCheckpoint(
val opaqueStateValue: OpaqueStateValue,
val numRecords: Long,
)
) {
init {
log.info { "SGX PartitionReadCheckpoint: $opaqueStateValue, $numRecords" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class JdbcSelectQuerier(
get() = metaChanges

init {
log.info { "Querying ${q.sql}" }
log.info { "Querying ${q.sql} with parameters ${q.bindings.map { it.value }}" }
try {
conn = jdbcConnectionFactory.get()
stmt = conn!!.prepareStatement(q.sql)
Expand Down
33 changes: 33 additions & 0 deletions airbyte-integrations/connectors/source-mssql-v1/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# MsSQL (SQL Server) Source

## Performance Test

To run performance tests in commandline:

```shell
./gradlew :airbyte-integrations:connectors:source-mssql:performanceTest [--cpulimit=cpulimit/<limit>] [--memorylimit=memorylimit/<limit>]
```

In pull request:

```shell
/test-performance connector=connectors/source-mssql [--cpulimit=cpulimit/<limit>] [--memorylimit=memorylimit/<limit>]
```

- `cpulimit`: Limit the number of CPUs. The minimum is `2`. E.g. `--cpulimit=cpulimit/2`.
- `memorylimit`: Limit the size of the memory. Must include the unit at the end (e.g. `MB`, `GB`). The minimum size is `6MB`. E.g. `--memorylimit=memorylimit/4GB`.
- When none of the CPU or memory limit is provided, the performance tests will run without memory or CPU limitations. The available resource will be bound that those specified in `ResourceRequirements.java`.

### Use MsSQL script to populate the benchmark database

In order to create a database with a certain number of tables, and a certain number of records in each of them,
you need to follow a few simple steps.

1. Create a new database.
2. Follow the TODOs in [create_mssql_benchmarks.sql](src/test-performance/sql/create_mssql_benchmarks.sql) to change the number of tables, and the number of records of different sizes.
3. Execute the script with your changes for the new database. You can run the script with the MySQL client:
```bash
cd airbyte-integrations/connectors/source-mssql
sqlcmd -S Serverinstance -E -i src/test-performance/sql/create_mssql_benchmarks.sql
```
4. After the script finishes its work, you will receive the number of tables specified in the script, with names starting with **test_0** and ending with **test\_(the number of tables minus 1)**.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-mssql:dev
tests:
spec:
- spec_path: "src/test-integration/resources/expected_spec.json"
config_path: "src/test-integration/resources/dummy_config.json"
backward_compatibility_tests_config:
disable_for_version: "0.4.25"
41 changes: 41 additions & 0 deletions airbyte-integrations/connectors/source-mssql-v1/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
plugins {
id 'airbyte-java-connector'
}

airbyteJavaConnector {
cdkVersionRequired = '0.45.1'
features = ['db-sources']
useLocalCdk = false
}

java {
// TODO: rewrite code to avoid javac warnings in the first place
compileJava {
options.compilerArgs += "-Xlint:-try,-rawtypes"
}
compileTestFixturesJava {
options.compilerArgs += "-Xlint:-this-escape"
}
}

application {
mainClass = 'io.airbyte.integrations.source.mssql.MssqlSource'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation 'com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11'
implementation 'io.debezium:debezium-embedded:2.7.1.Final'
implementation 'io.debezium:debezium-connector-sqlserver:2.6.2.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

testFixturesImplementation 'org.testcontainers:mssqlserver:1.19.0'

testImplementation 'org.awaitility:awaitility:4.2.0'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.testcontainers:mssqlserver:1.19.0'
}

compileKotlin {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
testExecutionConcurrency=-1
JunitMethodExecutionTimeout=5 m
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-mssql-v1/icon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import pytest

pytest_plugins = ("connector_acceptance_test.plugin",)


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
# TODO: setup test dependencies if needed. otherwise remove the TODO comments
yield
# TODO: clean up test dependencies
Loading

0 comments on commit b685ade

Please sign in to comment.