2323import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_SIGNED_PAYLOAD_TRAILER ;
2424import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_UNSIGNED_PAYLOAD_TRAILER ;
2525import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_CONTENT_SHA256 ;
26+ import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_DECODED_CONTENT_LENGTH ;
2627import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_TRAILER ;
2728import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils .moveContentLength ;
2829
2930import  java .nio .ByteBuffer ;
3031import  java .nio .charset .StandardCharsets ;
3132import  java .util .ArrayList ;
32- import  java .util .Collections ;
3333import  java .util .List ;
34+ import  java .util .Optional ;
35+ import  java .util .concurrent .CompletableFuture ;
3436import  org .reactivestreams .Publisher ;
3537import  software .amazon .awssdk .annotations .SdkInternalApi ;
3638import  software .amazon .awssdk .checksums .SdkChecksum ;
3739import  software .amazon .awssdk .checksums .spi .ChecksumAlgorithm ;
3840import  software .amazon .awssdk .http .ContentStreamProvider ;
3941import  software .amazon .awssdk .http .Header ;
4042import  software .amazon .awssdk .http .SdkHttpRequest ;
43+ import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .AsyncChunkEncodedPayload ;
4144import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChecksumTrailerProvider ;
4245import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedInputStream ;
46+ import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPayload ;
47+ import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPublisher ;
4348import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4ChunkExtensionProvider ;
4449import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4TrailerProvider ;
50+ import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SyncChunkEncodedPayload ;
4551import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .TrailerProvider ;
46- import  software .amazon .awssdk .http .auth .aws .internal .signer .io .ChecksumInputStream ;
4752import  software .amazon .awssdk .http .auth .aws .internal .signer .io .ResettableContentStreamProvider ;
4853import  software .amazon .awssdk .utils .BinaryUtils ;
4954import  software .amazon .awssdk .utils .Pair ;
@@ -73,51 +78,67 @@ public static Builder builder() {
7378
7479    @ Override 
7580    public  ContentStreamProvider  sign (ContentStreamProvider  payload , V4RequestSigningResult  requestSigningResult ) {
76-         SdkHttpRequest .Builder  request  = requestSigningResult .getSignedRequest ();
77- 
78-         String  checksum  = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
79-             () -> new  IllegalArgumentException (X_AMZ_CONTENT_SHA256  + " must be set!" )
80-         );
81- 
8281        ChunkedEncodedInputStream .Builder  chunkedEncodedInputStreamBuilder  = ChunkedEncodedInputStream 
8382            .builder ()
8483            .inputStream (payload .newStream ())
8584            .chunkSize (chunkSize )
8685            .header (chunk  -> Integer .toHexString (chunk .remaining ()).getBytes (StandardCharsets .UTF_8 ));
8786
88-         preExistingTrailers .forEach (trailer  -> chunkedEncodedInputStreamBuilder .addTrailer (() -> trailer ));
87+         SyncChunkEncodedPayload  chunkedPayload  = new  SyncChunkEncodedPayload (chunkedEncodedInputStreamBuilder );
88+         signCommon (chunkedPayload , requestSigningResult );
89+ 
90+         return  new  ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
91+     }
92+ 
93+     @ Override 
94+     public  Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult  requestSigningResult ) {
95+         ChunkedEncodedPublisher .Builder  chunkedStreamBuilder  = ChunkedEncodedPublisher .builder ()
96+                                                                                       .publisher (payload )
97+                                                                                       .chunkSize (chunkSize )
98+                                                                                       .addEmptyTrailingChunk (true );
99+ 
100+         AsyncChunkEncodedPayload  checksumPayload  = new  AsyncChunkEncodedPayload (chunkedStreamBuilder );
101+         signCommon (checksumPayload , requestSigningResult );
102+ 
103+         return  chunkedStreamBuilder .build ();
104+     }
105+ 
106+     private  void  signCommon (ChunkedEncodedPayload  payload , V4RequestSigningResult  requestSigningResult ) {
107+         preExistingTrailers .forEach (t  -> payload .addTrailer (() -> t ));
108+ 
109+         SdkHttpRequest .Builder  request  = requestSigningResult .getSignedRequest ();
110+ 
111+         payload .decodedContentLength (request .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
112+                                             .map (Long ::parseLong )
113+                                             .orElse (0L ));
114+ 
115+         String  checksum  = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
116+             () -> new  IllegalArgumentException (X_AMZ_CONTENT_SHA256  + " must be set!" )
117+         );
89118
90119        switch  (checksum ) {
91120            case  STREAMING_SIGNED_PAYLOAD : {
92121                RollingSigner  rollingSigner  = new  RollingSigner (requestSigningResult .getSigningKey (),
93122                                                                requestSigningResult .getSignature ());
94-                 chunkedEncodedInputStreamBuilder .addExtension (new  SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
123+                 payload .addExtension (new  SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
95124                break ;
96125            }
97126            case  STREAMING_UNSIGNED_PAYLOAD_TRAILER :
98-                 setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
127+                 setupChecksumTrailerIfNeeded (payload );
99128                break ;
100129            case  STREAMING_SIGNED_PAYLOAD_TRAILER : {
130+                 setupChecksumTrailerIfNeeded (payload );
101131                RollingSigner  rollingSigner  = new  RollingSigner (requestSigningResult .getSigningKey (),
102132                                                                requestSigningResult .getSignature ());
103-                 chunkedEncodedInputStreamBuilder .addExtension (new  SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
104-                 setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
105-                 chunkedEncodedInputStreamBuilder .addTrailer (
106-                     new  SigV4TrailerProvider (chunkedEncodedInputStreamBuilder .trailers (), rollingSigner , credentialScope )
133+                 payload .addExtension (new  SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
134+                 payload .addTrailer (
135+                     new  SigV4TrailerProvider (payload .trailers (), rollingSigner , credentialScope )
107136                );
108137                break ;
109138            }
110139            default :
111140                throw  new  UnsupportedOperationException ();
112141        }
113- 
114-         return  new  ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
115-     }
116- 
117-     @ Override 
118-     public  Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult  requestSigningResult ) {
119-         // TODO(sra-identity-and-auth): implement this first and remove addFlexibleChecksumInTrailer logic in HttpChecksumStage 
120-         throw  new  UnsupportedOperationException ();
121142    }
122143
123144    @ Override 
@@ -127,27 +148,66 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
127148        setupPreExistingTrailers (request );
128149
129150        // pre-existing trailers 
151+         encodedContentLength  = calculateEncodedContentLength (request , contentLength );
152+ 
153+         if  (checksumAlgorithm  != null ) {
154+             String  checksumHeaderName  = checksumHeaderName (checksumAlgorithm );
155+             request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
156+         }
157+         request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
158+         request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
159+     }
160+ 
161+     @ Override 
162+     public  CompletableFuture <Pair <SdkHttpRequest .Builder , Optional <Publisher <ByteBuffer >>>> beforeSigningAsync (
163+         SdkHttpRequest .Builder  request , Publisher <ByteBuffer > payload ) {
164+         return  moveContentLength (request , payload )
165+             .thenApply (p  -> {
166+                 SdkHttpRequest .Builder  requestBuilder  = p .left ();
167+                 setupPreExistingTrailers (requestBuilder );
168+ 
169+                 long  decodedContentLength  = requestBuilder .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
170+                                                           .map (Long ::parseLong )
171+                                                           // should not happen, this header is added by moveContentLength 
172+                                                           .orElseThrow (() -> new  RuntimeException (X_AMZ_DECODED_CONTENT_LENGTH 
173+                                                                                                   + " header not present" ));
174+ 
175+                 long  encodedContentLength  = calculateEncodedContentLength (request , decodedContentLength );
176+ 
177+                 if  (checksumAlgorithm  != null ) {
178+                     String  checksumHeaderName  = checksumHeaderName (checksumAlgorithm );
179+                     request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
180+                 }
181+                 request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
182+                 request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
183+                 return  Pair .of (requestBuilder , p .right ());
184+             });
185+     }
186+ 
187+     private  long  calculateEncodedContentLength (SdkHttpRequest .Builder  requestBuilder , long  decodedContentLength ) {
188+         long  encodedContentLength  = 0 ;
189+ 
130190        encodedContentLength  += calculateExistingTrailersLength ();
131191
132-         String  checksum  = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
192+         String  checksum  = requestBuilder .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
133193            () -> new  IllegalArgumentException (X_AMZ_CONTENT_SHA256  + " must be set!" )
134194        );
135195
136196        switch  (checksum ) {
137197            case  STREAMING_SIGNED_PAYLOAD : {
138198                long  extensionsLength  = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes> 
139-                 encodedContentLength  += calculateChunksLength (contentLength , extensionsLength );
199+                 encodedContentLength  += calculateChunksLength (decodedContentLength , extensionsLength );
140200                break ;
141201            }
142202            case  STREAMING_UNSIGNED_PAYLOAD_TRAILER :
143203                if  (checksumAlgorithm  != null ) {
144204                    encodedContentLength  += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
145205                }
146-                 encodedContentLength  += calculateChunksLength (contentLength , 0 );
206+                 encodedContentLength  += calculateChunksLength (decodedContentLength , 0 );
147207                break ;
148208            case  STREAMING_SIGNED_PAYLOAD_TRAILER : {
149209                long  extensionsLength  = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes> 
150-                 encodedContentLength  += calculateChunksLength (contentLength , extensionsLength );
210+                 encodedContentLength  += calculateChunksLength (decodedContentLength , extensionsLength );
151211                if  (checksumAlgorithm  != null ) {
152212                    encodedContentLength  += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
153213                }
@@ -161,12 +221,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
161221        // terminating \r\n 
162222        encodedContentLength  += 2 ;
163223
164-         if  (checksumAlgorithm  != null ) {
165-             String  checksumHeaderName  = checksumHeaderName (checksumAlgorithm );
166-             request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
167-         }
168-         request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
169-         request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
224+         return  encodedContentLength ;
170225    }
171226
172227    /** 
@@ -250,25 +305,17 @@ private long calculateChecksumTrailerLength(String checksumHeaderName) {
250305        return  lengthInBytes  + 2 ;
251306    }
252307
253-     /** 
254-      * Add the checksum as a trailer to the chunk-encoded stream. 
255-      * <p> 
256-      * If the checksum-algorithm is not present, then nothing is done. 
257-      */ 
258-     private  void  setupChecksumTrailerIfNeeded (ChunkedEncodedInputStream .Builder  builder ) {
308+     private  void  setupChecksumTrailerIfNeeded (ChunkedEncodedPayload  payload ) {
259309        if  (checksumAlgorithm  == null ) {
260310            return ;
261311        }
262312        String  checksumHeaderName  = checksumHeaderName (checksumAlgorithm );
263313        SdkChecksum  sdkChecksum  = fromChecksumAlgorithm (checksumAlgorithm );
264-         ChecksumInputStream  checksumInputStream  = new  ChecksumInputStream (
265-             builder .inputStream (),
266-             Collections .singleton (sdkChecksum )
267-         );
268314
269315        TrailerProvider  checksumTrailer  = new  ChecksumTrailerProvider (sdkChecksum , checksumHeaderName );
270316
271-         builder .inputStream (checksumInputStream ).addTrailer (checksumTrailer );
317+         payload .checksumPayload (sdkChecksum );
318+         payload .addTrailer (checksumTrailer );
272319    }
273320
274321    static  class  Builder  {
0 commit comments