4949import org .apache .hadoop .thirdparty .com .google .common .util .concurrent .MoreExecutors ;
5050import org .apache .hadoop .thirdparty .com .google .common .util .concurrent .ThreadFactoryBuilder ;
5151
52+ import com .sun .tools .javac .util .Convert ;
5253import org .slf4j .Logger ;
5354import org .slf4j .LoggerFactory ;
5455
7576import static org .apache .hadoop .fs .azurebfs .AzureBlobFileSystemStore .extractEtagHeader ;
7677import static org .apache .hadoop .fs .azurebfs .constants .AbfsHttpConstants .*;
7778import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .DEFAULT_DELETE_CONSIDERED_IDEMPOTENT ;
79+ import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .ONE_MB ;
7880import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .SERVER_SIDE_ENCRYPTION_ALGORITHM ;
7981import static org .apache .hadoop .fs .azurebfs .constants .FileSystemUriSchemes .HTTPS_SCHEME ;
8082import static org .apache .hadoop .fs .azurebfs .constants .HttpHeaderConfigurations .*;
@@ -761,6 +763,8 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
761763 requestHeaders .add (new AbfsHttpHeader (USER_AGENT , userAgentRetry ));
762764 }
763765
766+ addCheckSumHeaderForWrite (requestHeaders , buffer );
767+
764768 // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
765769 String sasTokenForReuse = appendSASTokenToQuery (path , SASTokenProvider .WRITE_OPERATION ,
766770 abfsUriQueryBuilder , cachedSasToken );
@@ -978,9 +982,12 @@ public AbfsRestOperation read(final String path, final long position, final byte
978982 TracingContext tracingContext ) throws AzureBlobFileSystemException {
979983 final List <AbfsHttpHeader > requestHeaders = createDefaultHeaders ();
980984 addCustomerProvidedKeyHeaders (requestHeaders );
981- requestHeaders .add (new AbfsHttpHeader (RANGE ,
982- String .format ("bytes=%d-%d" , position , position + bufferLength - 1 )));
985+
986+ AbfsHttpHeader rangeHeader = new AbfsHttpHeader (RANGE ,
987+ String .format ("bytes=%d-%d" , position , position + bufferLength - 1 ));
988+ requestHeaders .add (rangeHeader );
983989 requestHeaders .add (new AbfsHttpHeader (IF_MATCH , eTag ));
990+ addCheckSumHeaderForRead (requestHeaders , bufferLength , rangeHeader );
984991
985992 final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder ();
986993 // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
@@ -999,6 +1006,8 @@ public AbfsRestOperation read(final String path, final long position, final byte
9991006 bufferLength , sasTokenForReuse );
10001007 op .execute (tracingContext );
10011008
1009+ verifyCheckSumForRead (buffer , op .getResult ());
1010+
10021011 return op ;
10031012 }
10041013
@@ -1412,6 +1421,54 @@ private void appendIfNotEmpty(StringBuilder sb, String regEx,
14121421 }
14131422 }
14141423
1424+ private void addCheckSumHeaderForRead (List <AbfsHttpHeader > requestHeaders ,
1425+ final int bufferLength , final AbfsHttpHeader rangeHeader ) {
1426+ if (getAbfsConfiguration ().getIsChecksumEnabled () &&
1427+ requestHeaders .contains (rangeHeader ) && bufferLength <= 4 * ONE_MB ) {
1428+ requestHeaders .add (new AbfsHttpHeader (X_MS_RANGE_GET_CONTENT_MD5 , TRUE ));
1429+ }
1430+ }
1431+
1432+ private void addCheckSumHeaderForWrite (List <AbfsHttpHeader > requestHeaders ,
1433+ final byte [] buffer ) {
1434+ if (getAbfsConfiguration ().getIsChecksumEnabled ()) {
1435+ try {
1436+ MessageDigest md5Digest = MessageDigest .getInstance ("MD5" );
1437+ byte [] md5Bytes = md5Digest .digest (buffer );
1438+ String md5Hash = Base64 .getEncoder ().encodeToString (md5Bytes );
1439+ requestHeaders .add (new AbfsHttpHeader (CONTENT_MD5 , md5Hash ));
1440+ } catch (NoSuchAlgorithmException e ) {
1441+ e .printStackTrace ();
1442+ }
1443+ }
1444+ }
1445+
1446+ private void verifyCheckSumForRead (final byte [] buffer , final AbfsHttpOperation result )
1447+ throws AbfsRestOperationException {
1448+ if (getAbfsConfiguration ().getIsChecksumEnabled ()) {
1449+ // Number of bytes returned by server could be less than or equal to what
1450+ // caller requests. In case it is less, extra bytes will be initialized to 0
1451+ // Server returned MD5 Hash will be computed on what server returned.
1452+ // We need to get exact data that server returned and compute its md5 hash
1453+ // Computed hash should be equal to what server returned
1454+ int numberOfBytesRead = (int )result .getBytesReceived ();
1455+ byte [] dataRead = new byte [numberOfBytesRead ];
1456+ System .arraycopy (buffer , 0 , dataRead , 0 , numberOfBytesRead );
1457+
1458+ try {
1459+ MessageDigest md5Digest = MessageDigest .getInstance ("MD5" );
1460+ byte [] md5Bytes = md5Digest .digest (dataRead );
1461+ String md5HashComputed = Base64 .getEncoder ().encodeToString (md5Bytes );
1462+ String md5HashActual = result .getResponseHeader (CONTENT_MD5 );
1463+ if (!md5HashComputed .equals (md5HashActual )) {
1464+ throw new AbfsRestOperationException (-1 , "-1" , "Checksum Check Failed" , new IOException ());
1465+ }
1466+ } catch (NoSuchAlgorithmException e ) {
1467+ e .printStackTrace ();
1468+ }
1469+ }
1470+ }
1471+
14151472 @ VisibleForTesting
14161473 URL getBaseUrl () {
14171474 return baseUrl ;
0 commit comments