@@ -483,6 +483,7 @@ boolean doWaitForRestart() {
483483 private volatile BlockConstructionStage stage ; // block construction stage
484484 protected long bytesSent = 0 ; // number of bytes that've been sent
485485 private final boolean isLazyPersistFile ;
486+ private long lastPacket ;
486487
487488 /** Nodes have been used in the pipeline before and have failed. */
488489 private final List <DatanodeInfo > failed = new ArrayList <>();
@@ -632,6 +633,7 @@ private void initDataStreaming() {
632633 response = new ResponseProcessor (nodes );
633634 response .start ();
634635 stage = BlockConstructionStage .DATA_STREAMING ;
636+ lastPacket = Time .monotonicNow ();
635637 }
636638
637639 protected void endBlock () {
@@ -653,7 +655,6 @@ private boolean shouldStop() {
653655 */
654656 @ Override
655657 public void run () {
656- long lastPacket = Time .monotonicNow ();
657658 TraceScope scope = null ;
658659 while (!streamerClosed && dfsClient .clientRunning ) {
659660 // if the Responder encountered an error, shutdown Responder
@@ -666,47 +667,38 @@ public void run() {
666667 // process datanode IO errors if any
667668 boolean doSleep = processDatanodeOrExternalError ();
668669
669- final int halfSocketTimeout = dfsClient .getConf ().getSocketTimeout ()/2 ;
670670 synchronized (dataQueue ) {
671671 // wait for a packet to be sent.
672- long now = Time .monotonicNow ();
673- while ((!shouldStop () && dataQueue .size () == 0 &&
674- (stage != BlockConstructionStage .DATA_STREAMING ||
675- now - lastPacket < halfSocketTimeout )) || doSleep ) {
676- long timeout = halfSocketTimeout - (now -lastPacket );
677- timeout = timeout <= 0 ? 1000 : timeout ;
678- timeout = (stage == BlockConstructionStage .DATA_STREAMING )?
679- timeout : 1000 ;
672+ while ((!shouldStop () && dataQueue .isEmpty ()) || doSleep ) {
673+ long timeout = 1000 ;
674+ if (stage == BlockConstructionStage .DATA_STREAMING ) {
675+ timeout = sendHeartbeat ();
676+ }
680677 try {
681678 dataQueue .wait (timeout );
682679 } catch (InterruptedException e ) {
683680 LOG .debug ("Thread interrupted" , e );
684681 }
685682 doSleep = false ;
686- now = Time .monotonicNow ();
687683 }
688684 if (shouldStop ()) {
689685 continue ;
690686 }
691687 // get packet to be sent.
692- if (dataQueue .isEmpty ()) {
693- one = createHeartbeatPacket ();
694- } else {
695- try {
696- backOffIfNecessary ();
697- } catch (InterruptedException e ) {
698- LOG .debug ("Thread interrupted" , e );
699- }
700- one = dataQueue .getFirst (); // regular data packet
701- SpanContext [] parents = one .getTraceParents ();
702- if (parents != null && parents .length > 0 ) {
703- // The original code stored multiple parents in the DFSPacket, and
704- // use them ALL here when creating a new Span. We only use the
705- // last one FOR NOW. Moreover, we don't activate the Span for now.
706- scope = dfsClient .getTracer ().
707- newScope ("dataStreamer" , parents [0 ], false );
708- //scope.getSpan().setParents(parents);
709- }
688+ try {
689+ backOffIfNecessary ();
690+ } catch (InterruptedException e ) {
691+ LOG .debug ("Thread interrupted" , e );
692+ }
693+ one = dataQueue .getFirst (); // regular data packet
694+ SpanContext [] parents = one .getTraceParents ();
695+ if (parents != null && parents .length > 0 ) {
696+ // The original code stored multiple parents in the DFSPacket, and
697+ // use them ALL here when creating a new Span. We only use the
698+ // last one FOR NOW. Moreover, we don't activate the Span for now.
699+ scope = dfsClient .getTracer ().
700+ newScope ("dataStreamer" , parents [0 ], false );
701+ //scope.getSpan().setParents(parents);
710702 }
711703 }
712704
@@ -734,17 +726,8 @@ public void run() {
734726
735727 if (one .isLastPacketInBlock ()) {
736728 // wait for all data packets have been successfully acked
737- synchronized (dataQueue ) {
738- while (!shouldStop () && ackQueue .size () != 0 ) {
739- try {
740- // wait for acks to arrive from datanodes
741- dataQueue .wait (1000 );
742- } catch (InterruptedException e ) {
743- LOG .debug ("Thread interrupted" , e );
744- }
745- }
746- }
747- if (shouldStop ()) {
729+ waitForAllAcks ();
730+ if (shouldStop ()) {
748731 continue ;
749732 }
750733 stage = BlockConstructionStage .PIPELINE_CLOSE ;
@@ -773,8 +756,7 @@ public void run() {
773756 // write out data to remote datanode
774757 try (TraceScope ignored = dfsClient .getTracer ().
775758 newScope ("DataStreamer#writeTo" , spanContext )) {
776- one .writeTo (blockStream );
777- blockStream .flush ();
759+ sendPacket (one );
778760 } catch (IOException e ) {
779761 // HDFS-3398 treat primary DN is down since client is unable to
780762 // write to primary DN. If a failed or restarting node has already
@@ -785,7 +767,6 @@ public void run() {
785767 errorState .markFirstNodeIfNotMarked ();
786768 throw e ;
787769 }
788- lastPacket = Time .monotonicNow ();
789770
790771 // update bytesSent
791772 long tmpBytesSent = one .getLastByteOffsetBlock ();
@@ -800,11 +781,7 @@ public void run() {
800781 // Is this block full?
801782 if (one .isLastPacketInBlock ()) {
802783 // wait for the close packet has been acked
803- synchronized (dataQueue ) {
804- while (!shouldStop () && ackQueue .size () != 0 ) {
805- dataQueue .wait (1000 );// wait for acks to arrive from datanodes
806- }
807- }
784+ waitForAllAcks ();
808785 if (shouldStop ()) {
809786 continue ;
810787 }
@@ -845,6 +822,48 @@ public void run() {
845822 closeInternal ();
846823 }
847824
825+ private void waitForAllAcks () throws IOException {
826+ // wait until all data packets have been successfully acked
827+ synchronized (dataQueue ) {
828+ while (!shouldStop () && !ackQueue .isEmpty ()) {
829+ try {
830+ // wait for acks to arrive from datanodes
831+ dataQueue .wait (sendHeartbeat ());
832+ } catch (InterruptedException e ) {
833+ LOG .debug ("Thread interrupted " , e );
834+ }
835+ }
836+ }
837+ }
838+
839+ private void sendPacket (DFSPacket packet ) throws IOException {
840+ // write out data to remote datanode
841+ try {
842+ packet .writeTo (blockStream );
843+ blockStream .flush ();
844+ } catch (IOException e ) {
845+ // HDFS-3398 treat primary DN is down since client is unable to
846+ // write to primary DN. If a failed or restarting node has already
847+ // been recorded by the responder, the following call will have no
848+ // effect. Pipeline recovery can handle only one node error at a
849+ // time. If the primary node fails again during the recovery, it
850+ // will be taken out then.
851+ errorState .markFirstNodeIfNotMarked ();
852+ throw e ;
853+ }
854+ lastPacket = Time .monotonicNow ();
855+ }
856+
857+ private long sendHeartbeat () throws IOException {
858+ final long heartbeatInterval = dfsClient .getConf ().getSocketTimeout ()/2 ;
859+ long timeout = heartbeatInterval - (Time .monotonicNow () - lastPacket );
860+ if (timeout <= 0 ) {
861+ sendPacket (createHeartbeatPacket ());
862+ timeout = heartbeatInterval ;
863+ }
864+ return timeout ;
865+ }
866+
848867 private void closeInternal () {
849868 closeResponder (); // close and join
850869 closeStream ();
0 commit comments