diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java index 0e95906..1290d6f 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java @@ -90,10 +90,10 @@ private void startBackgroundReconfigurationTasks(ConnectorContext connectorConte public void run() { try { if (consumableTables != null) { - LOGGER.info("Looking for changed DynamoDB tables"); + LOGGER.debug("Looking for changed DynamoDB tables"); List consumableTablesRefreshed = tablesProvider.getConsumableTables(); if (!consumableTables.equals(consumableTablesRefreshed)) { - LOGGER.info("Detected changes in DynamoDB tables. Requesting tasks reconfiguration."); + LOGGER.debug("Detected changes in DynamoDB tables. Requesting tasks reconfiguration."); connectorContext.requestTaskReconfiguration(); } } @@ -131,7 +131,7 @@ public List> taskConfigs(int maxTasks) { List> taskConfigs = new ArrayList<>(consumableTables.size()); for (String table : consumableTables) { - LOGGER.info("Configuring task for table {}", table); + LOGGER.debug("Configuring task for table {}", table); Map taskProps = new HashMap<>(configProperties); taskProps.put(DynamoDBSourceTaskConfig.TABLE_NAME_CONFIG, table); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index b1c42e7..8e9ae27 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -110,7 +110,7 @@ public String version() { public void start(Map configProperties) { DynamoDBSourceTaskConfig config = new DynamoDBSourceTaskConfig(configProperties); - LOGGER.info("Starting task for table: {}", config.getTableName()); + LOGGER.debug("Starting task for table: {}", config.getTableName()); LOGGER.debug("Getting DynamoDB description for table: {}", config.getTableName()); if (client == null) { @@ -126,7 +126,7 @@ public void start(Map configProperties) { LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName()); setStateFromOffset(); - LOGGER.info("Task status: {}", sourceInfo); + LOGGER.debug("Task status: {}", sourceInfo); LOGGER.debug("Initiating DynamoDB table scanner and record converter."); if (tableScanner == null) { @@ -136,7 +136,7 @@ public void start(Map configProperties) { } converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix(), config.getDestinationTopicMap()); - LOGGER.info("Starting background KCL worker thread for table: {}", tableDesc.getTableName()); + LOGGER.debug("Starting background KCL worker thread for table: {}", tableDesc.getTableName()); AmazonDynamoDBStreams dynamoDBStreamsClient = AwsClients.buildDynamoDbStreamsClient( config.getAwsRegion(), @@ -225,7 +225,7 @@ private LinkedList initSync() throws Exception { Thread.sleep(initSyncDelay * 1000); } - LOGGER.info("Continuing INIT_SYNC {}", sourceInfo); + LOGGER.debug("Continuing INIT_SYNC {}", sourceInfo); ScanResult scanResult = tableScanner.getItems(sourceInfo.exclusiveStartKey); LinkedList result = new LinkedList<>(); @@ -261,7 +261,7 @@ private LinkedList initSync() throws Exception { if (sourceInfo.initSyncStatus == InitSyncStatus.RUNNING) { - LOGGER.info( + LOGGER.debug( "INIT_SYNC iteration returned {}. Status: {}", result.size(), sourceInfo); } else { LOGGER.info("INIT_SYNC FINISHED: {}", sourceInfo); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/ConfigTablesProvider.java b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/ConfigTablesProvider.java index 914b0e1..b3f8a03 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/ConfigTablesProvider.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/ConfigTablesProvider.java @@ -28,7 +28,7 @@ public List getConsumableTables() { final TableDescription tableDesc = client.describeTable(table).getTable(); if (this.hasValidConfig(tableDesc, table)) { - LOGGER.info("Table to sync: {}", table); + LOGGER.debug("Table to sync: {}", table); consumableTables.add(table); } } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/DynamoDBTablesProvider.java b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/DynamoDBTablesProvider.java index 49c4e72..28613db 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/DynamoDBTablesProvider.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/DynamoDBTablesProvider.java @@ -43,7 +43,7 @@ public DynamoDBTablesProvider(AWSResourceGroupsTaggingAPI groupsTaggingAPI, } public List getConsumableTables() { - LOGGER.info("Searching for tables with tag.key: {}", ingestionTagKey); + LOGGER.debug("Searching for tables with tag.key: {}", ingestionTagKey); final List consumableTables = new LinkedList<>(); GetResourcesRequest resourcesRequest = getGetResourcesRequest(); @@ -58,7 +58,7 @@ public List getConsumableTables() { final TableDescription tableDesc = client.describeTable(tableName).getTable(); if (hasValidConfig(tableDesc, tableName)) { - LOGGER.info("Table to sync: {}", tableName); + LOGGER.debug("Table to sync: {}", tableName); consumableTables.add(tableName); } } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessor.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessor.java index 4269a92..f62009a 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessor.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessor.java @@ -131,7 +131,7 @@ private void process(ProcessRecordsInput processRecordsInput) { String firstProcessedSeqNo = records.get(0).getSequenceNumber(); lastProcessedSeqNo = records.get(records.size() - 1).getSequenceNumber(); - LOGGER.info("Added {} records to eventsQueue. Table: {} ShardID: {}, FirstSeqNo: {}, LastSeqNo: {}", + LOGGER.debug("Added {} records to eventsQueue. Table: {} ShardID: {}, FirstSeqNo: {}, LastSeqNo: {}", records.size(), tableName, shardId, @@ -152,7 +152,7 @@ private void checkpoint(IRecordProcessorCheckpointer checkpointer) { if (!lastCommittedRecordSequenceNumber.equals("")) { // If at least one record was committed to Kafka try { - LOGGER.info("KCL checkpoint table: {} shardId: {} at sequenceNumber: {}", + LOGGER.debug("KCL checkpoint table: {} shardId: {} at sequenceNumber: {}", tableName, shardId, lastCommittedRecordSequenceNumber); @@ -233,7 +233,7 @@ private void onTerminate(ShutdownInput shutdownInput) throws InvalidStateExcepti shardId); shardRegister.remove(shardId); - LOGGER.info( + LOGGER.debug( "Shard ended. All data committed. Checkpoint and proceed to next one. Table: {} ShardID: {}", tableName, shardId); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java index f6bd839..a62166a 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java @@ -77,7 +77,7 @@ public void start(AmazonDynamoDB dynamoDBClient, cloudWatchClient); - LOGGER.info("Creating KCL worker for Stream: {} ApplicationName: {} WorkerId: {}", + LOGGER.debug("Creating KCL worker for Stream: {} ApplicationName: {} WorkerId: {}", clientLibConfiguration.getStreamName(), clientLibConfiguration.getApplicationName(), clientLibConfiguration.getWorkerIdentifier()