Skip to content

Commit ec1c3c2

Browse files
author
Paul Ashley
committed
Add conditional write logging to HadoopS3AccessHelper
- Add comprehensive logging to track Hadoop 3.4.2 conditional writes - Log PutObjectOptions creation and usage in all S3 operations - Log multipart upload initiation, completion, and single object uploads - Helps verify conditional write functionality is being invoked - All logs prefixed with '=== CONDITIONAL WRITES:' for easy filtering
1 parent 7aafbda commit ec1c3c2

File tree

2 files changed

+57
-4
lines changed

2 files changed

+57
-4
lines changed

flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,13 @@ public software.amazon.awssdk.services.s3.model.UploadPartResponse uploadPart(
177177

178178
/** Creates default PutObjectOptions for Hadoop 3.4.2. */
179179
private static org.apache.hadoop.fs.s3a.impl.PutObjectOptions createDefaultPutObjectOptions() {
180-
return org.apache.hadoop.fs.s3a.impl.PutObjectOptions.keepingDirs();
180+
org.apache.hadoop.fs.s3a.impl.PutObjectOptions options = org.apache.hadoop.fs.s3a.impl.PutObjectOptions.keepingDirs();
181+
182+
// Log conditional write configuration
183+
LOG.info("=== CONDITIONAL WRITES: Creating PutObjectOptions with keepingDirs() - Hadoop 3.4.2 ===");
184+
LOG.info("=== PutObjectOptions details: {} ===", options.toString());
185+
186+
return options;
181187
}
182188

183189
/**
@@ -413,11 +419,18 @@ private static void validateOutputFile(File file) {
413419
public String startMultiPartUpload(String key) throws IOException {
414420
validateKey(key);
415421
try {
422+
LOG.info("=== CONDITIONAL WRITES: Initiating multipart upload for key: {} ===", key);
423+
416424
// Hadoop 3.4.2 uses AWS SDK v2 and requires PutObjectOptions
417-
String uploadId =
418-
s3accessHelper.initiateMultiPartUpload(key, createDefaultPutObjectOptions());
425+
org.apache.hadoop.fs.s3a.impl.PutObjectOptions putObjectOptions = createDefaultPutObjectOptions();
426+
LOG.info("=== CONDITIONAL WRITES: Using PutObjectOptions: {} ===", putObjectOptions);
427+
428+
String uploadId = s3accessHelper.initiateMultiPartUpload(key, putObjectOptions);
429+
430+
LOG.info("=== CONDITIONAL WRITES: Multipart upload initiated successfully. UploadId: {} ===", uploadId);
419431
return uploadId;
420432
} catch (Exception e) {
433+
LOG.error("=== CONDITIONAL WRITES: Failed to initiate multipart upload for key: {} - Error: {} ===", key, e.getMessage());
421434
throw e;
422435
}
423436
}
@@ -481,6 +494,10 @@ public UploadPartResult uploadPart(
481494
public PutObjectResult putObject(String key, File inputFile) throws IOException {
482495
validateKey(key);
483496
validateInputFile(inputFile);
497+
498+
LOG.info("=== CONDITIONAL WRITES: Putting object for key: {}, file size: {} bytes ===",
499+
key, inputFile.length());
500+
484501
// Hadoop 3.4.2 uses AWS SDK v2 with different put object API
485502
// Create AWS SDK v2 PutObjectRequest with correct bucket name
486503
software.amazon.awssdk.services.s3.model.PutObjectRequest putRequest =
@@ -493,6 +510,8 @@ public PutObjectResult putObject(String key, File inputFile) throws IOException
493510
// Create PutObjectOptions
494511
org.apache.hadoop.fs.s3a.impl.PutObjectOptions putObjectOptions =
495512
createDefaultPutObjectOptions();
513+
514+
LOG.info("=== CONDITIONAL WRITES: Using PutObjectOptions for putObject: {} ===", putObjectOptions);
496515

497516
// Note: For Hadoop 3.4.2, the putObject API with BlockUploadData is designed for
498517
// block-based uploads from memory. For file-based uploads, it's more appropriate
@@ -532,6 +551,9 @@ public PutObjectResult putObject(String key, File inputFile) throws IOException
532551
result.setSSECustomerAlgorithm(response.sseCustomerAlgorithm());
533552
}
534553

554+
LOG.info("=== CONDITIONAL WRITES: PutObject completed successfully for key: {}, ETag: {} ===",
555+
key, result.getETag());
556+
535557
return result;
536558

537559
} catch (software.amazon.awssdk.core.exception.SdkException e) {
@@ -572,6 +594,12 @@ public CompleteMultipartUploadResult commitMultiPartUpload(
572594
.build())
573595
.collect(java.util.stream.Collectors.toList());
574596

597+
LOG.info("=== CONDITIONAL WRITES: Completing multipart upload for key: {}, uploadId: {}, parts: {} ===",
598+
destKey, uploadId, partETags.size());
599+
600+
org.apache.hadoop.fs.s3a.impl.PutObjectOptions putObjectOptions = createDefaultPutObjectOptions();
601+
LOG.info("=== CONDITIONAL WRITES: Using PutObjectOptions for completion: {} ===", putObjectOptions);
602+
575603
// Use the new completeMPUwithRetries API
576604
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse response =
577605
s3accessHelper.completeMPUwithRetries(
@@ -580,7 +608,10 @@ public CompleteMultipartUploadResult commitMultiPartUpload(
580608
completedParts,
581609
length,
582610
errorCount,
583-
createDefaultPutObjectOptions());
611+
putObjectOptions);
612+
613+
LOG.info("=== CONDITIONAL WRITES: Multipart upload completed successfully for key: {}, ETag: {} ===",
614+
destKey, response.eTag());
584615

585616
// Convert AWS SDK v2 response to AWS SDK v1 response
586617
CompleteMultipartUploadResult result = new CompleteMultipartUploadResult();

flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,23 @@
3737
public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
3838

3939
private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
40+
41+
static {
42+
// Static initializer - this will log when the class is first loaded
43+
String timestamp = java.time.Instant.now().toString();
44+
String message = String.format("=== CUSTOM S3FileSystemFactory LOADED [%s] - Hadoop 3.4.2 + Conditional Writes ===", timestamp);
45+
System.err.println(message);
46+
LOG.error(message);
47+
48+
// Also log the JAR location to help with debugging
49+
try {
50+
String jarLocation = S3FileSystemFactory.class.getProtectionDomain().getCodeSource().getLocation().toString();
51+
LOG.error("=== JAR Location: {} ===", jarLocation);
52+
System.err.println("=== JAR Location: " + jarLocation + " ===");
53+
} catch (Exception e) {
54+
LOG.error("=== Could not determine JAR location: {} ===", e.getMessage());
55+
}
56+
}
4057

4158
private static final String[] FLINK_CONFIG_PREFIXES = {"s3.", "s3a.", "fs.s3a."};
4259

@@ -45,10 +62,15 @@ public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
4562
{"fs.s3a.secret-key", "fs.s3a.secret.key"},
4663
{"fs.s3a.path-style-access", "fs.s3a.path.style.access"},
4764
{"fs.s3a.requester-pays-enabled", "fs.s3a.requester.pays.enabled"},
65+
{"fs.s3a.create-conditional-enabled", "fs.s3a.create.conditional.enabled"},
66+
{"s3a.create-conditional-enabled", "fs.s3a.create.conditional.enabled"}
4867
};
4968

5069
public S3FileSystemFactory() {
5170
super("Hadoop s3a file system", createHadoopConfigLoader());
71+
72+
// Distinctive log message to confirm our custom JAR is loaded
73+
LOG.error("=== CUSTOM S3 FILESYSTEM JAR LOADED - Hadoop 3.4.2 + Conditional Writes ===");
5274
}
5375

5476
@Override

0 commit comments

Comments
 (0)