Skip to content

Commit

Permalink
fix: update configuration migration properties (#650)
Browse files Browse the repository at this point in the history
* fix: remove max retry attempts configuration migration property

* fix: update config migration connector class

* test: update tests with updated examples

* chore: update logging with connector specificity

* chore: update logging

* fix: pretty print config in log
  • Loading branch information
dhrudevalia authored Sep 4, 2024
1 parent 0e57933 commit a7e28e1
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class ConfigurationMigrator(private val settings: Map<String, String>) {
data class PropertyConverter(val updatedConfigKey: String, val migrationHandler: () -> String)

private val propertyConverterMap: Map<String, PropertyConverter> = mutableMapOf(
// Kafka
"connector.class" to PropertyConverter("connector.class") {convertConnectorClass(settings["connector.class"] as String)},
// Common
DATABASE to PropertyConverter("neo4j.database") { settings[DATABASE] as String },
SERVER_URI to PropertyConverter("neo4j.uri") { settings[SERVER_URI] as String },
Expand All @@ -74,7 +76,7 @@ class ConfigurationMigrator(private val settings: Map<String, String>) {
CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS to PropertyConverter("neo4j.pool.idle-time-before-connection-test") { convertMsecs(settings[CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS] as String) },
CONNECTION_POOL_MAX_SIZE to PropertyConverter("neo4j.pool.max-connection-pool-size") {settings[CONNECTION_POOL_MAX_SIZE] as String},
RETRY_BACKOFF_MSECS to PropertyConverter("neo4j.max-retry-time") { convertMsecs(settings[RETRY_BACKOFF_MSECS] as String) },
RETRY_MAX_ATTEMPTS to PropertyConverter("neo4j.max-retry-attempts") {settings[RETRY_MAX_ATTEMPTS] as String},
RETRY_MAX_ATTEMPTS to PropertyConverter("") {settings[RETRY_MAX_ATTEMPTS] as String},
// Sink
TOPIC_CDC_SOURCE_ID to PropertyConverter("neo4j.cdc.source-id.topics") {settings[TOPIC_CDC_SOURCE_ID] as String},
TOPIC_CDC_SOURCE_ID_LABEL_NAME to PropertyConverter("neo4j.cdc.source-id.label-name") {settings[TOPIC_CDC_SOURCE_ID_LABEL_NAME] as String},
Expand All @@ -95,6 +97,14 @@ class ConfigurationMigrator(private val settings: Map<String, String>) {
ENFORCE_SCHEMA to PropertyConverter("") {settings[ENFORCE_SCHEMA] as String}
)

private fun convertConnectorClass(className: String): String {
return when (className) {
"streams.kafka.connect.source.Neo4jSourceConnector" -> "org.neo4j.connectors.kafka.source.Neo4jConnector"
"streams.kafka.connect.sink.Neo4jSinkConnector" -> "org.neo4j.connectors.kafka.sink.Neo4jConnector"
else -> ""
}
}

// Configuration properties that have user-defined keys
private val prefixConverterMap: Map<String, String> = mutableMapOf(
Neo4jSinkConnectorConfig.TOPIC_PATTERN_NODE_PREFIX to "neo4j.pattern.node.topic.",
Expand Down Expand Up @@ -136,7 +146,6 @@ class ConfigurationMigrator(private val settings: Map<String, String>) {

return updatedConfig
}

companion object {
/**
* Converts milliseconds format into new format of time units
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ class Neo4jSinkConnector: SinkConnector() {
override fun stop() {
val migratedConfig = ConfigurationMigrator(settings).migrateToV51()
val mapper = ObjectMapper()
val jsonConfig = mapper.writeValueAsString(migratedConfig)
log.info("Migrated Sink configuration to v5.1 connector format: {}", jsonConfig)
val jsonConfig = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(migratedConfig)
log.info(
"The migrated settings for 5.1 version of Neo4j Sink Connector '{}' is: `{}`",
settings["name"],
jsonConfig
)
}

override fun version(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,20 @@ class Neo4jSourceService(private val config: Neo4jSourceConnectorConfig, offsetS
log.info("Error while closing Driver instance:", it)
}

val migratedConfig = ConfigurationMigrator(config.originals() as Map<String, String>).migrateToV51().toMutableMap()
val originalConfig = config.originals() as Map<String, String>
val migratedConfig = ConfigurationMigrator(originalConfig).migrateToV51().toMutableMap()

log.debug("Defaulting v5.1 migrated configuration offset to last checked timestamp: {}", lastCheck)
migratedConfig["neo4j.start-from"] = "USER_PROVIDED"
migratedConfig["neo4j.start-from.value"] = lastCheck

val mapper = ObjectMapper()
val jsonConfig = mapper.writeValueAsString(migratedConfig)
log.info("Migrated Source configuration to v5.1 connector format: {}", jsonConfig)
val jsonConfig = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(migratedConfig)
log.info(
"The migrated settings for 5.1 version of Neo4j Source Connector '{}' is: `{}`",
originalConfig["name"],
jsonConfig
)

log.info("Neo4j Source Service closed successfully")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ConfigurationMigratorTest {
mapOf(
"neo4j.topic.pattern.merge.node.properties.enabled" to "true",
"neo4j.server.uri" to "neo4j+s://x.x.x.x",
"neo4j.retry.max.attemps" to "1"
"neo4j.encryption.enabled" to "false"
)

// When the configuration is migrated
Expand All @@ -27,15 +27,16 @@ class ConfigurationMigratorTest {
assertEquals(originals.size, migratedConfig.size)
assertEquals(migratedConfig["neo4j.pattern.node.merge-properties"], "true")
assertEquals(migratedConfig["neo4j.uri"], "neo4j+s://x.x.x.x")
assertEquals(migratedConfig["neo4j.max-retry-attempts"], "1")
assertEquals(migratedConfig["neo4j.security.encrypted"], "false")
}

@Test fun `should not migrate keys with no matching configuration key`() {
// Given a configuration which has no equivalent in the updated connector
val originals = mapOf(
"neo4j.encryption.ca.certificate.path" to "./cert.pem",
"neo4j.source.type" to SourceType.QUERY.toString(),
"neo4j.enforce.schema" to "true"
"neo4j.enforce.schema" to "true",
"neo4j.retry.max.attemps" to "1"
)

// When the configuration is migrated
Expand Down Expand Up @@ -104,7 +105,7 @@ class ConfigurationMigratorTest {

// Then those options should still be included
assertEquals(originals.size, migratedConfig.size)
assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.source.Neo4jSourceConnector")
assertEquals(migratedConfig["connector.class"], "org.neo4j.connectors.kafka.source.Neo4jConnector")
assertEquals(migratedConfig["key.converter"], "io.confluent.connect.avro.AvroConverter")
assertEquals(migratedConfig["arbitrary.config.key"], "arbitrary.value")
}
Expand All @@ -121,7 +122,7 @@ class ConfigurationMigratorTest {
assertEquals(12, migratedConfig.size)

assertEquals(migratedConfig["neo4j.query.topic"], "my-topic")
assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.source.Neo4jSourceConnector")
assertEquals(migratedConfig["connector.class"], "org.neo4j.connectors.kafka.source.Neo4jConnector")
assertEquals(migratedConfig["key.converter"], "io.confluent.connect.avro.AvroConverter")
assertEquals(migratedConfig["key.converter.schema.registry.url"], "http://schema-registry:8081")
assertEquals(migratedConfig["value.converter"], "io.confluent.connect.avro.AvroConverter")
Expand Down Expand Up @@ -152,7 +153,7 @@ class ConfigurationMigratorTest {
assertEquals(15, migratedConfig.size)

assertEquals(migratedConfig["topics"], "my-topic")
assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.sink.Neo4jSinkConnector")
assertEquals(migratedConfig["connector.class"], "org.neo4j.connectors.kafka.sink.Neo4jConnector")
assertEquals(migratedConfig["key.converter"], "org.apache.kafka.connect.json.JsonConverter")
assertEquals(migratedConfig["key.converter.schemas.enable"], "false")
assertEquals(migratedConfig["value.converter"], "org.apache.kafka.connect.json.JsonConverter")
Expand Down

0 comments on commit a7e28e1

Please sign in to comment.