Skip to content

Commit 8be8d18

Browse files
committed
adds in audit support
1 parent bcf8c35 commit 8be8d18

File tree

7 files changed

+95
-13
lines changed

7 files changed

+95
-13
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1920,7 +1920,9 @@ private FSDataInputStream executeOpen(
19201920
.withContext(readContext.build())
19211921
.withObjectAttributes(createObjectAttributes(path, fileStatus))
19221922
.withStreamStatistics(inputStreamStats)
1923-
.withEncryptionSecrets(getEncryptionSecrets());
1923+
.withEncryptionSecrets(getEncryptionSecrets())
1924+
.withAuditSpan(auditSpan);
1925+
19241926
return new FSDataInputStream(getStore().readObject(parameters));
19251927
}
19261928

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222

2323
import software.amazon.awssdk.core.SdkRequest;
24+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
2425
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
2526
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
2627
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
@@ -50,6 +51,8 @@
5051
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
5152
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
5253
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
54+
import static software.amazon.awssdk.core.interceptor.SdkExecutionAttribute.OPERATION_NAME;
55+
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
5356

5457
/**
5558
* Extract information from a request.
@@ -193,6 +196,18 @@ private RequestInfo writing(final String verb,
193196
|| request instanceof CreateSessionRequest;
194197
}
195198

199+
/**
200+
* If spanId and operation name are set by dependencies such as AAL, then this returns true. Allows for auditing
201+
* of requests which are made outside S3A's requestFactory.
202+
*
203+
* @param executionAttributes request execution attributes
204+
* @return true if request is audited outside of current span
205+
*/
206+
public static boolean isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) {
207+
return executionAttributes.getAttribute(SPAN_ID) != null
208+
&& executionAttributes.getAttribute(OPERATION_NAME) != null;
209+
}
210+
196211
/**
197212
* Predicate which returns true if the request is part of the
198213
* multipart upload API -and which therefore must be rejected

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
6262
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
6363
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
64+
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan;
6465
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
6566
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
6667
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT;
@@ -69,6 +70,8 @@
6970
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID;
7071
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER;
7172
import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName;
73+
import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
74+
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
7275

7376
/**
7477
* The LoggingAuditor logs operations at DEBUG (in SDK Request) and
@@ -85,7 +88,6 @@ public class LoggingAuditor
8588
private static final Logger LOG =
8689
LoggerFactory.getLogger(LoggingAuditor.class);
8790

88-
8991
/**
9092
* Some basic analysis for the logs.
9193
*/
@@ -267,8 +269,9 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
267269
*/
268270
private class LoggingAuditSpan extends AbstractAuditSpanImpl {
269271

270-
private final HttpReferrerAuditHeader referrer;
272+
private HttpReferrerAuditHeader referrer;
271273

274+
private final HttpReferrerAuditHeader.Builder headerBuilder;
272275
/**
273276
* Attach Range of data for GetObject Request.
274277
* @param request the sdk request to be modified
@@ -300,7 +303,7 @@ private LoggingAuditSpan(
300303
final String path2) {
301304
super(spanId, operationName);
302305

303-
this.referrer = HttpReferrerAuditHeader.builder()
306+
this.headerBuilder = HttpReferrerAuditHeader.builder()
304307
.withContextId(getAuditorId())
305308
.withSpanId(spanId)
306309
.withOperationName(operationName)
@@ -312,8 +315,9 @@ private LoggingAuditSpan(
312315
currentThreadID())
313316
.withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp()))
314317
.withEvaluated(context.getEvaluatedEntries())
315-
.withFilter(filters)
316-
.build();
318+
.withFilter(filters);
319+
320+
this.referrer = this.headerBuilder.build();
317321

318322
this.description = referrer.buildHttpReferrer();
319323
}
@@ -384,12 +388,33 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
384388
SdkHttpRequest httpRequest = context.httpRequest();
385389
SdkRequest sdkRequest = context.request();
386390

391+
// If spanId and operationName are set in execution attributes, then use these values,
392+
// instead of the ones in the current span. This is useful when requests are happening in dependencies such as
393+
// the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL
394+
// will attach the current spanId and operationName via execution attributes during it's request creation. These
395+
// can then used to update the values in the logger and referrer header. Without this overwriting, the operation
396+
// name and corresponding span will be whichever is active on the thread the request is getting executed on.
397+
boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes);
398+
399+
String spanId = isRequestAuditedOutsideCurrentSpan ?
400+
executionAttributes.getAttribute(SPAN_ID) : getSpanId();
401+
402+
String operationName = isRequestAuditedOutsideCurrentSpan ?
403+
executionAttributes.getAttribute(OPERATION_NAME) : getOperationName();
404+
405+
if (isRequestAuditedOutsideCurrentSpan) {
406+
this.headerBuilder.withSpanId(spanId);
407+
this.headerBuilder.withOperationName(operationName);
408+
this.referrer = this.headerBuilder.build();
409+
}
410+
387411
// attach range for GetObject requests
388412
attachRangeFromRequest(httpRequest, executionAttributes);
389413

390414
// for delete op, attach the number of files to delete
391415
attachDeleteKeySizeAttribute(sdkRequest);
392416

417+
393418
// build the referrer header
394419
final String header = referrer.buildHttpReferrer();
395420
// update the outer class's field.
@@ -400,11 +425,12 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
400425
.appendHeader(HEADER_REFERRER, header)
401426
.build();
402427
}
428+
403429
if (LOG.isDebugEnabled()) {
404430
LOG.debug("[{}] {} Executing {} with {}; {}",
405431
currentThreadID(),
406-
getSpanId(),
407-
getOperationName(),
432+
spanId,
433+
operationName,
408434
analyzer.analyze(context.request()),
409435
header);
410436
}
@@ -533,10 +559,12 @@ public void beforeExecution(Context.BeforeExecution context,
533559
+ analyzer.analyze(context.request());
534560
final String unaudited = getSpanId() + " "
535561
+ UNAUDITED_OPERATION + " " + error;
562+
// If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it
563+
// as an audited request.
536564
if (isRequestNotAlwaysInSpan(context.request())) {
537-
// can get by auditing during a copy, so don't overreact
565+
// can get by auditing during a copy, so don't overreact.
538566
LOG.debug(unaudited);
539-
} else {
567+
} else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) {
540568
final RuntimeException ex = new AuditFailureException(unaudited);
541569
LOG.debug(unaudited, ex);
542570
if (isRejectOutOfSpan()) {
@@ -547,5 +575,4 @@ public void beforeExecution(Context.BeforeExecution context,
547575
super.beforeExecution(context, executionAttributes);
548576
}
549577
}
550-
551578
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
3737
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
3838
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
39+
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;
3940
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
4041
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
4142
import software.amazon.s3.analyticsaccelerator.util.S3URI;
@@ -257,12 +258,18 @@ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters pa
257258
.etag(parameters.getObjectAttributes().getETag()).build());
258259
}
259260

261+
260262
if (parameters.getEncryptionSecrets().getEncryptionMethod() == S3AEncryptionMethods.SSE_C) {
261263
EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets())
262264
.ifPresent(base64customerKey -> openStreamInformationBuilder.encryptionSecrets(
263265
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build()));
264266
}
265267

268+
openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder()
269+
.operationName(parameters.getAuditSpan().getOperationName())
270+
.spanId(parameters.getAuditSpan().getSpanId())
271+
.build());
272+
266273
return openStreamInformationBuilder.build();
267274
}
268275

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ public StreamFactoryRequirements factoryRequirements() {
9797
vectorContext.setMinSeekForVectoredReads(0);
9898

9999
return new StreamFactoryRequirements(0,
100-
0, vectorContext,
101-
StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
100+
0, vectorContext);
102101
}
103102

104103
@Override

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
2626
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
2727
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
28+
import org.apache.hadoop.fs.store.audit.AuditSpan;
2829

2930
import static java.util.Objects.requireNonNull;
3031

@@ -75,6 +76,11 @@ public final class ObjectReadParameters {
7576
*/
7677
private EncryptionSecrets encryptionSecrets;
7778

79+
/**
80+
* Span for which this stream is being created.
81+
*/
82+
private AuditSpan auditSpan;
83+
7884
/**
7985
* Getter.
8086
* @return Encryption secrets.
@@ -196,6 +202,24 @@ public ObjectReadParameters withDirectoryAllocator(final LocalDirAllocator value
196202
return this;
197203
}
198204

205+
/**
206+
* Getter.
207+
* @return Audit span.
208+
*/
209+
public AuditSpan getAuditSpan() {
210+
return auditSpan;
211+
}
212+
213+
/**
214+
* Set audit span.
215+
* @param value new value
216+
* @return the builder
217+
*/
218+
public ObjectReadParameters withAuditSpan(final AuditSpan value) {
219+
auditSpan = value;
220+
return this;
221+
}
222+
199223
/**
200224
* Validate that all attributes are as expected.
201225
* Mock tests can skip this if required.
@@ -210,6 +234,7 @@ public ObjectReadParameters validate() {
210234
requireNonNull(objectAttributes, "objectAttributes");
211235
requireNonNull(streamStatistics, "streamStatistics");
212236
requireNonNull(encryptionSecrets, "encryptionSecrets");
237+
requireNonNull(auditSpan, "auditSpan");
213238
return this;
214239
}
215240
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
4343
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
4444
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
45+
import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
4546
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
4647
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
4748
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
109110
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
110111
fs.close();
111112
verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1);
113+
114+
// Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because the read policy is WHOLE_FILE,
115+
// in which case, AAL will start prefetching till EoF on file open in 8MB chunks. The file read here
116+
// s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of ~21MB, resulting in 3 GETS:
117+
// [5-8388612, 8388613-16777220, 16777221-21511173].
118+
verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);
112119
}
113120

114121
@Test

0 commit comments

Comments
 (0)