@@ -208,11 +208,12 @@ private synchronized EditLogFile scanStorageForLatestEdits() throws IOException
208208 while (!files .isEmpty ()) {
209209 EditLogFile latestLog = files .remove (files .size () - 1 );
210210 latestLog .scanLog (Long .MAX_VALUE , false );
211- LOG .info ("Latest log is " + latestLog );
211+ LOG .info ("Latest log is " + latestLog + " ; journal id: " + journalId );
212212 if (latestLog .getLastTxId () == HdfsServerConstants .INVALID_TXID ) {
213213 // the log contains no transactions
214214 LOG .warn ("Latest log " + latestLog + " has no transactions. " +
215- "moving it aside and looking for previous log" );
215+ "moving it aside and looking for previous log"
216+ + " ; journal id: " + journalId );
216217 latestLog .moveAsideEmptyFile ();
217218 } else {
218219 return latestLog ;
@@ -230,7 +231,7 @@ void format(NamespaceInfo nsInfo) throws IOException {
230231 Preconditions .checkState (nsInfo .getNamespaceID () != 0 ,
231232 "can't format with uninitialized namespace info: %s" ,
232233 nsInfo );
233- LOG .info ("Formatting " + this + " with namespace info: " +
234+ LOG .info ("Formatting journal id : " + journalId + " with namespace info: " +
234235 nsInfo );
235236 storage .format (nsInfo );
236237 refreshCachedData ();
@@ -323,7 +324,7 @@ synchronized NewEpochResponseProto newEpoch(
323324 // any other that we've promised.
324325 if (epoch <= getLastPromisedEpoch ()) {
325326 throw new IOException ("Proposed epoch " + epoch + " <= last promise " +
326- getLastPromisedEpoch ());
327+ getLastPromisedEpoch () + " ; journal id: " + journalId );
327328 }
328329
329330 updateLastPromisedEpoch (epoch );
@@ -343,7 +344,8 @@ synchronized NewEpochResponseProto newEpoch(
343344
344345 private void updateLastPromisedEpoch (long newEpoch ) throws IOException {
345346 LOG .info ("Updating lastPromisedEpoch from " + lastPromisedEpoch .get () +
346- " to " + newEpoch + " for client " + Server .getRemoteIp ());
347+ " to " + newEpoch + " for client " + Server .getRemoteIp () +
348+ " ; journal id: " + journalId );
347349 lastPromisedEpoch .set (newEpoch );
348350
349351 // Since we have a new writer, reset the IPC serial - it will start
@@ -378,7 +380,7 @@ synchronized void journal(RequestInfo reqInfo,
378380 }
379381
380382 checkSync (curSegment != null ,
381- "Can't write, no segment open" );
383+ "Can't write, no segment open" + " ; journal id: " + journalId );
382384
383385 if (curSegmentTxId != segmentTxId ) {
384386 // Sanity check: it is possible that the writer will fail IPCs
@@ -389,17 +391,20 @@ synchronized void journal(RequestInfo reqInfo,
389391 // and throw an exception.
390392 JournalOutOfSyncException e = new JournalOutOfSyncException (
391393 "Writer out of sync: it thinks it is writing segment " + segmentTxId
392- + " but current segment is " + curSegmentTxId );
394+ + " but current segment is " + curSegmentTxId
395+ + " ; journal id: " + journalId );
393396 abortCurSegment ();
394397 throw e ;
395398 }
396399
397400 checkSync (nextTxId == firstTxnId ,
398- "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId );
401+ "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId
402+ + " ; journal id: " + journalId );
399403
400404 long lastTxnId = firstTxnId + numTxns - 1 ;
401405 if (LOG .isTraceEnabled ()) {
402- LOG .trace ("Writing txid " + firstTxnId + "-" + lastTxnId );
406+ LOG .trace ("Writing txid " + firstTxnId + "-" + lastTxnId +
407+ " ; journal id: " + journalId );
403408 }
404409
405410 // If the edit has already been marked as committed, we know
@@ -423,7 +428,7 @@ synchronized void journal(RequestInfo reqInfo,
423428
424429 if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD ) {
425430 LOG .warn ("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
426- " took " + milliSeconds + "ms" );
431+ " took " + milliSeconds + "ms" + " ; journal id: " + journalId );
427432 }
428433
429434 if (isLagging ) {
@@ -455,7 +460,7 @@ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
455460 if (reqInfo .getEpoch () < lastPromisedEpoch .get ()) {
456461 throw new IOException ("IPC's epoch " + reqInfo .getEpoch () +
457462 " is less than the last promised epoch " +
458- lastPromisedEpoch .get ());
463+ lastPromisedEpoch .get () + " ; journal id: " + journalId );
459464 } else if (reqInfo .getEpoch () > lastPromisedEpoch .get ()) {
460465 // A newer client has arrived. Fence any previous writers by updating
461466 // the promise.
@@ -465,16 +470,16 @@ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
465470 // Ensure that the IPCs are arriving in-order as expected.
466471 checkSync (reqInfo .getIpcSerialNumber () > currentEpochIpcSerial ,
467472 "IPC serial %s from client %s was not higher than prior highest " +
468- "IPC serial %s" , reqInfo .getIpcSerialNumber (),
469- Server .getRemoteIp (),
470- currentEpochIpcSerial );
473+ "IPC serial %s ; journal id: %s" , reqInfo .getIpcSerialNumber (),
474+ Server .getRemoteIp (), currentEpochIpcSerial , journalId );
471475 currentEpochIpcSerial = reqInfo .getIpcSerialNumber ();
472476
473477 if (reqInfo .hasCommittedTxId ()) {
474478 Preconditions .checkArgument (
475479 reqInfo .getCommittedTxId () >= committedTxnId .get (),
476480 "Client trying to move committed txid backward from " +
477- committedTxnId .get () + " to " + reqInfo .getCommittedTxId ());
481+ committedTxnId .get () + " to " + reqInfo .getCommittedTxId () +
482+ " ; journal id: " + journalId );
478483
479484 committedTxnId .set (reqInfo .getCommittedTxId ());
480485 }
@@ -486,7 +491,7 @@ private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOExcept
486491 if (reqInfo .getEpoch () != lastWriterEpoch .get ()) {
487492 throw new IOException ("IPC's epoch " + reqInfo .getEpoch () +
488493 " is not the current writer epoch " +
489- lastWriterEpoch .get ());
494+ lastWriterEpoch .get () + " ; journal id: " + journalId );
490495 }
491496 }
492497
@@ -497,7 +502,8 @@ public synchronized boolean isFormatted() {
497502 private void checkFormatted () throws JournalNotFormattedException {
498503 if (!isFormatted ()) {
499504 throw new JournalNotFormattedException ("Journal " +
500- storage .getSingularStorageDir () + " not formatted" );
505+ storage .getSingularStorageDir () + " not formatted" +
506+ " ; journal id: " + journalId );
501507 }
502508 }
503509
@@ -542,7 +548,8 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
542548 if (curSegment != null ) {
543549 LOG .warn ("Client is requesting a new log segment " + txid +
544550 " though we are already writing " + curSegment + ". " +
545- "Aborting the current segment in order to begin the new one." );
551+ "Aborting the current segment in order to begin the new one." +
552+ " ; journal id: " + journalId );
546553 // The writer may have lost a connection to us and is now
547554 // re-connecting after the connection came back.
548555 // We should abort our own old segment.
@@ -556,7 +563,7 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
556563 if (existing != null ) {
557564 if (!existing .isInProgress ()) {
558565 throw new IllegalStateException ("Already have a finalized segment " +
559- existing + " beginning at " + txid );
566+ existing + " beginning at " + txid + " ; journal id: " + journalId );
560567 }
561568
562569 // If it's in-progress, it should only contain one transaction,
@@ -565,15 +572,16 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
565572 existing .scanLog (Long .MAX_VALUE , false );
566573 if (existing .getLastTxId () != existing .getFirstTxId ()) {
567574 throw new IllegalStateException ("The log file " +
568- existing + " seems to contain valid transactions" );
575+ existing + " seems to contain valid transactions" +
576+ " ; journal id: " + journalId );
569577 }
570578 }
571579
572580 long curLastWriterEpoch = lastWriterEpoch .get ();
573581 if (curLastWriterEpoch != reqInfo .getEpoch ()) {
574582 LOG .info ("Updating lastWriterEpoch from " + curLastWriterEpoch +
575583 " to " + reqInfo .getEpoch () + " for client " +
576- Server .getRemoteIp ());
584+ Server .getRemoteIp () + " ; journal id: " + journalId );
577585 lastWriterEpoch .set (reqInfo .getEpoch ());
578586 }
579587
@@ -608,8 +616,8 @@ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
608616
609617 checkSync (nextTxId == endTxId + 1 ,
610618 "Trying to finalize in-progress log segment %s to end at " +
611- "txid %s but only written up to txid %s" ,
612- startTxId , endTxId , nextTxId - 1 );
619+ "txid %s but only written up to txid %s ; journal id: %s " ,
620+ startTxId , endTxId , nextTxId - 1 , journalId );
613621 // No need to validate the edit log if the client is finalizing
614622 // the log segment that it was just writing to.
615623 needsValidation = false ;
@@ -618,25 +626,27 @@ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
618626 FileJournalManager .EditLogFile elf = fjm .getLogFile (startTxId );
619627 if (elf == null ) {
620628 throw new JournalOutOfSyncException ("No log file to finalize at " +
621- "transaction ID " + startTxId );
629+ "transaction ID " + startTxId + " ; journal id: " + journalId );
622630 }
623631
624632 if (elf .isInProgress ()) {
625633 if (needsValidation ) {
626634 LOG .info ("Validating log segment " + elf .getFile () + " about to be " +
627- "finalized" );
635+ "finalized ; journal id: " + journalId );
628636 elf .scanLog (Long .MAX_VALUE , false );
629637
630638 checkSync (elf .getLastTxId () == endTxId ,
631639 "Trying to finalize in-progress log segment %s to end at " +
632- "txid %s but log %s on disk only contains up to txid %s" ,
633- startTxId , endTxId , elf .getFile (), elf .getLastTxId ());
640+ "txid %s but log %s on disk only contains up to txid %s " +
641+ "; journal id: %s" ,
642+ startTxId , endTxId , elf .getFile (), elf .getLastTxId (), journalId );
634643 }
635644 fjm .finalizeLogSegment (startTxId , endTxId );
636645 } else {
637646 Preconditions .checkArgument (endTxId == elf .getLastTxId (),
638647 "Trying to re-finalize already finalized log " +
639- elf + " with different endTxId " + endTxId );
648+ elf + " with different endTxId " + endTxId +
649+ " ; journal id: " + journalId );
640650 }
641651
642652 // Once logs are finalized, a different length will never be decided.
@@ -667,7 +677,8 @@ private void purgePaxosDecision(long segmentTxId) throws IOException {
667677 File paxosFile = storage .getPaxosFile (segmentTxId );
668678 if (paxosFile .exists ()) {
669679 if (!paxosFile .delete ()) {
670- throw new IOException ("Unable to delete paxos file " + paxosFile );
680+ throw new IOException ("Unable to delete paxos file " + paxosFile +
681+ " ; journal id: " + journalId );
671682 }
672683 }
673684 }
@@ -717,7 +728,7 @@ SegmentStateProto getSegmentInfo(long segmentTxId)
717728 }
718729 if (elf .getLastTxId () == HdfsServerConstants .INVALID_TXID ) {
719730 LOG .info ("Edit log file " + elf + " appears to be empty. " +
720- "Moving it aside..." );
731+ "Moving it aside..." + " ; journal id: " + journalId );
721732 elf .moveAsideEmptyFile ();
722733 return null ;
723734 }
@@ -727,7 +738,7 @@ SegmentStateProto getSegmentInfo(long segmentTxId)
727738 .setIsInProgress (elf .isInProgress ())
728739 .build ();
729740 LOG .info ("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " +
730- TextFormat .shortDebugString (ret ));
741+ TextFormat .shortDebugString (ret ) + " ; journal id: " + journalId );
731742 return ret ;
732743 }
733744
@@ -771,7 +782,7 @@ public synchronized PrepareRecoveryResponseProto prepareRecovery(
771782
772783 PrepareRecoveryResponseProto resp = builder .build ();
773784 LOG .info ("Prepared recovery for segment " + segmentTxId + ": " +
774- TextFormat .shortDebugString (resp ));
785+ TextFormat .shortDebugString (resp ) + " ; journal id: " + journalId );
775786 return resp ;
776787 }
777788
@@ -792,8 +803,8 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
792803 // at least one transaction.
793804 Preconditions .checkArgument (segment .getEndTxId () > 0 &&
794805 segment .getEndTxId () >= segmentTxId ,
795- "bad recovery state for segment %s: %s" ,
796- segmentTxId , TextFormat .shortDebugString (segment ));
806+ "bad recovery state for segment %s: %s ; journal id: %s " ,
807+ segmentTxId , TextFormat .shortDebugString (segment ), journalId );
797808
798809 PersistedRecoveryPaxosData oldData = getPersistedPaxosData (segmentTxId );
799810 PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData .newBuilder ()
@@ -806,8 +817,9 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
806817 // checkRequest() call above should filter non-increasing epoch numbers.
807818 if (oldData != null ) {
808819 alwaysAssert (oldData .getAcceptedInEpoch () <= reqInfo .getEpoch (),
809- "Bad paxos transition, out-of-order epochs.\n Old: %s\n New: %s\n " ,
810- oldData , newData );
820+ "Bad paxos transition, out-of-order epochs.\n Old: %s\n New: " +
821+ "%s\n JournalId: %s\n " ,
822+ oldData , newData , journalId );
811823 }
812824
813825 File syncedFile = null ;
@@ -817,15 +829,15 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
817829 currentSegment .getEndTxId () != segment .getEndTxId ()) {
818830 if (currentSegment == null ) {
819831 LOG .info ("Synchronizing log " + TextFormat .shortDebugString (segment ) +
820- ": no current segment in place" );
832+ ": no current segment in place ; journal id: " + journalId );
821833
822834 // Update the highest txid for lag metrics
823835 updateHighestWrittenTxId (Math .max (segment .getEndTxId (),
824836 highestWrittenTxId ));
825837 } else {
826838 LOG .info ("Synchronizing log " + TextFormat .shortDebugString (segment ) +
827839 ": old segment " + TextFormat .shortDebugString (currentSegment ) +
828- " is not the right length" );
840+ " is not the right length ; journal id: " + journalId );
829841
830842 // Paranoid sanity check: if the new log is shorter than the log we
831843 // currently have, we should not end up discarding any transactions
@@ -838,14 +850,15 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
838850 " with new segment " +
839851 TextFormat .shortDebugString (segment ) +
840852 ": would discard already-committed txn " +
841- committedTxnId .get ());
853+ committedTxnId .get () +
854+ " ; journal id: " + journalId );
842855 }
843856
844857 // Another paranoid check: we should not be asked to synchronize a log
845858 // on top of a finalized segment.
846859 alwaysAssert (currentSegment .getIsInProgress (),
847- "Should never be asked to synchronize a different log on top of an " +
848- "already-finalized segment" );
860+ "Should never be asked to synchronize a different log on top of " +
861+ "an already-finalized segment ; journal id: " + journalId );
849862
850863 // If we're shortening the log, update our highest txid
851864 // used for lag metrics.
@@ -858,7 +871,7 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
858871 } else {
859872 LOG .info ("Skipping download of log " +
860873 TextFormat .shortDebugString (segment ) +
861- ": already have up-to-date logs" );
874+ ": already have up-to-date logs ; journal id: " + journalId );
862875 }
863876
864877 // This is one of the few places in the protocol where we have a single
@@ -890,12 +903,12 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
890903 }
891904
892905 LOG .info ("Accepted recovery for segment " + segmentTxId + ": " +
893- TextFormat .shortDebugString (newData ));
906+ TextFormat .shortDebugString (newData ) + " ; journal id: " + journalId );
894907 }
895908
896909 private LongRange txnRange (SegmentStateProto seg ) {
897910 Preconditions .checkArgument (seg .hasEndTxId (),
898- "invalid segment: %s" , seg );
911+ "invalid segment: %s ; journal id: %s " , seg , journalId );
899912 return new LongRange (seg .getStartTxId (), seg .getEndTxId ());
900913 }
901914
@@ -970,7 +983,7 @@ private void completeHalfDoneAcceptRecovery(
970983 if (tmp .exists ()) {
971984 File dst = storage .getInProgressEditLog (segmentId );
972985 LOG .info ("Rolling forward previously half-completed synchronization: " +
973- tmp + " -> " + dst );
986+ tmp + " -> " + dst + " ; journal id: " + journalId );
974987 FileUtil .replaceFile (tmp , dst );
975988 }
976989 }
@@ -991,8 +1004,8 @@ private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId)
9911004 PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData .parseDelimitedFrom (in );
9921005 Preconditions .checkState (ret != null &&
9931006 ret .getSegmentState ().getStartTxId () == segmentTxId ,
994- "Bad persisted data for segment %s: %s" ,
995- segmentTxId , ret );
1007+ "Bad persisted data for segment %s: %s ; journal id: %s " ,
1008+ segmentTxId , ret , journalId );
9961009 return ret ;
9971010 } finally {
9981011 IOUtils .closeStream (in );
@@ -1041,7 +1054,7 @@ public synchronized void doUpgrade(StorageInfo sInfo) throws IOException {
10411054 storage .cTime = sInfo .cTime ;
10421055 int oldLV = storage .getLayoutVersion ();
10431056 storage .layoutVersion = sInfo .layoutVersion ;
1044- LOG .info ("Starting upgrade of edits directory: "
1057+ LOG .info ("Starting upgrade of edits directory: " + storage . getRoot ()
10451058 + ".\n old LV = " + oldLV
10461059 + "; old CTime = " + oldCTime
10471060 + ".\n new LV = " + storage .getLayoutVersion ()
@@ -1112,7 +1125,7 @@ synchronized boolean moveTmpSegmentToCurrent(File tmpFile, File finalFile,
11121125 if (endTxId <= committedTxnId .get ()) {
11131126 if (!finalFile .getParentFile ().exists ()) {
11141127 LOG .error (finalFile .getParentFile () + " doesn't exist. Aborting tmp " +
1115- "segment move to current directory" );
1128+ "segment move to current directory ; journal id: " + journalId );
11161129 return false ;
11171130 }
11181131 Files .move (tmpFile .toPath (), finalFile .toPath (),
@@ -1122,13 +1135,13 @@ synchronized boolean moveTmpSegmentToCurrent(File tmpFile, File finalFile,
11221135 } else {
11231136 success = false ;
11241137 LOG .warn ("Unable to move edits file from " + tmpFile + " to " +
1125- finalFile );
1138+ finalFile + " ; journal id: " + journalId );
11261139 }
11271140 } else {
11281141 success = false ;
11291142 LOG .error ("The endTxId of the temporary file is not less than the " +
11301143 "last committed transaction id. Aborting move to final file" +
1131- finalFile );
1144+ finalFile + " ; journal id: " + journalId );
11321145 }
11331146
11341147 return success ;
0 commit comments