Skip to content

Make logs debug to reduce noise #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ private void startBackgroundReconfigurationTasks(ConnectorContext connectorConte
public void run() {
try {
if (consumableTables != null) {
LOGGER.info("Looking for changed DynamoDB tables");
LOGGER.debug("Looking for changed DynamoDB tables");
List<String> consumableTablesRefreshed = tablesProvider.getConsumableTables();
if (!consumableTables.equals(consumableTablesRefreshed)) {
LOGGER.info("Detected changes in DynamoDB tables. Requesting tasks reconfiguration.");
LOGGER.debug("Detected changes in DynamoDB tables. Requesting tasks reconfiguration.");
connectorContext.requestTaskReconfiguration();
}
}
Expand Down Expand Up @@ -131,7 +131,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {

List<Map<String, String>> taskConfigs = new ArrayList<>(consumableTables.size());
for (String table : consumableTables) {
LOGGER.info("Configuring task for table {}", table);
LOGGER.debug("Configuring task for table {}", table);
Map<String, String> taskProps = new HashMap<>(configProperties);

taskProps.put(DynamoDBSourceTaskConfig.TABLE_NAME_CONFIG, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public String version() {
public void start(Map<String, String> configProperties) {

DynamoDBSourceTaskConfig config = new DynamoDBSourceTaskConfig(configProperties);
LOGGER.info("Starting task for table: {}", config.getTableName());
LOGGER.debug("Starting task for table: {}", config.getTableName());

LOGGER.debug("Getting DynamoDB description for table: {}", config.getTableName());
if (client == null) {
Expand All @@ -126,7 +126,7 @@ public void start(Map<String, String> configProperties) {

LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName());
setStateFromOffset();
LOGGER.info("Task status: {}", sourceInfo);
LOGGER.debug("Task status: {}", sourceInfo);

LOGGER.debug("Initiating DynamoDB table scanner and record converter.");
if (tableScanner == null) {
Expand All @@ -136,7 +136,7 @@ public void start(Map<String, String> configProperties) {
}
converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix(), config.getDestinationTopicMap());

LOGGER.info("Starting background KCL worker thread for table: {}", tableDesc.getTableName());
LOGGER.debug("Starting background KCL worker thread for table: {}", tableDesc.getTableName());

AmazonDynamoDBStreams dynamoDBStreamsClient = AwsClients.buildDynamoDbStreamsClient(
config.getAwsRegion(),
Expand Down Expand Up @@ -225,7 +225,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {
Thread.sleep(initSyncDelay * 1000);
}

LOGGER.info("Continuing INIT_SYNC {}", sourceInfo);
LOGGER.debug("Continuing INIT_SYNC {}", sourceInfo);
ScanResult scanResult = tableScanner.getItems(sourceInfo.exclusiveStartKey);

LinkedList<SourceRecord> result = new LinkedList<>();
Expand Down Expand Up @@ -261,7 +261,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {


if (sourceInfo.initSyncStatus == InitSyncStatus.RUNNING) {
LOGGER.info(
LOGGER.debug(
"INIT_SYNC iteration returned {}. Status: {}", result.size(), sourceInfo);
} else {
LOGGER.info("INIT_SYNC FINISHED: {}", sourceInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public List<String> getConsumableTables() {
final TableDescription tableDesc = client.describeTable(table).getTable();

if (this.hasValidConfig(tableDesc, table)) {
LOGGER.info("Table to sync: {}", table);
LOGGER.debug("Table to sync: {}", table);
consumableTables.add(table);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public DynamoDBTablesProvider(AWSResourceGroupsTaggingAPI groupsTaggingAPI,
}

public List<String> getConsumableTables() {
LOGGER.info("Searching for tables with tag.key: {}", ingestionTagKey);
LOGGER.debug("Searching for tables with tag.key: {}", ingestionTagKey);

final List<String> consumableTables = new LinkedList<>();
GetResourcesRequest resourcesRequest = getGetResourcesRequest();
Expand All @@ -58,7 +58,7 @@ public List<String> getConsumableTables() {
final TableDescription tableDesc = client.describeTable(tableName).getTable();

if (hasValidConfig(tableDesc, tableName)) {
LOGGER.info("Table to sync: {}", tableName);
LOGGER.debug("Table to sync: {}", tableName);
consumableTables.add(tableName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void process(ProcessRecordsInput processRecordsInput) {

String firstProcessedSeqNo = records.get(0).getSequenceNumber();
lastProcessedSeqNo = records.get(records.size() - 1).getSequenceNumber();
LOGGER.info("Added {} records to eventsQueue. Table: {} ShardID: {}, FirstSeqNo: {}, LastSeqNo: {}",
LOGGER.debug("Added {} records to eventsQueue. Table: {} ShardID: {}, FirstSeqNo: {}, LastSeqNo: {}",
records.size(),
tableName,
shardId,
Expand All @@ -152,7 +152,7 @@ private void checkpoint(IRecordProcessorCheckpointer checkpointer) {

if (!lastCommittedRecordSequenceNumber.equals("")) { // If at least one record was committed to Kafka
try {
LOGGER.info("KCL checkpoint table: {} shardId: {} at sequenceNumber: {}",
LOGGER.debug("KCL checkpoint table: {} shardId: {} at sequenceNumber: {}",
tableName,
shardId,
lastCommittedRecordSequenceNumber);
Expand Down Expand Up @@ -233,7 +233,7 @@ private void onTerminate(ShutdownInput shutdownInput) throws InvalidStateExcepti
shardId);
shardRegister.remove(shardId);

LOGGER.info(
LOGGER.debug(
"Shard ended. All data committed. Checkpoint and proceed to next one. Table: {} ShardID: {}",
tableName,
shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void start(AmazonDynamoDB dynamoDBClient,
cloudWatchClient);


LOGGER.info("Creating KCL worker for Stream: {} ApplicationName: {} WorkerId: {}",
LOGGER.debug("Creating KCL worker for Stream: {} ApplicationName: {} WorkerId: {}",
clientLibConfiguration.getStreamName(),
clientLibConfiguration.getApplicationName(),
clientLibConfiguration.getWorkerIdentifier()
Expand Down