2626import java .io .IOException ;
2727import java .util .ArrayList ;
2828import java .util .EnumSet ;
29+ import java .util .HashMap ;
2930import java .util .List ;
3031
3132import org .apache .commons .logging .Log ;
4445import org .apache .hadoop .security .token .delegation .DelegationKey ;
4546import org .apache .hadoop .yarn .api .records .ApplicationAttemptId ;
4647import org .apache .hadoop .yarn .api .records .ApplicationId ;
48+ import org .apache .hadoop .yarn .api .records .ReservationId ;
4749import org .apache .hadoop .yarn .conf .YarnConfiguration ;
4850import org .apache .hadoop .yarn .proto .YarnServerCommonProtos .VersionProto ;
4951import org .apache .hadoop .yarn .proto .YarnServerResourceManagerRecoveryProtos .AMRMTokenSecretManagerStateProto ;
5052import org .apache .hadoop .yarn .proto .YarnServerResourceManagerRecoveryProtos .ApplicationAttemptStateDataProto ;
5153import org .apache .hadoop .yarn .proto .YarnServerResourceManagerRecoveryProtos .ApplicationStateDataProto ;
5254import org .apache .hadoop .yarn .proto .YarnServerResourceManagerRecoveryProtos .EpochProto ;
55+ import org .apache .hadoop .yarn .proto .YarnServerResourceManagerRecoveryProtos .ReservationAllocationStateProto ;
5356import org .apache .hadoop .yarn .security .client .RMDelegationTokenIdentifier ;
5457import org .apache .hadoop .yarn .server .records .Version ;
5558import org .apache .hadoop .yarn .server .records .impl .pb .VersionPBImpl ;
7679 * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
7780 * separately. The currentMasterkey and nextMasterkey have been stored.
7881 * Also, AMRMToken has been removed from ApplicationAttemptState.
82+ *
83+ * Changes from 1.2 to 1.3, Addition of ReservationSystem state.
7984 */
8085public class FileSystemRMStateStore extends RMStateStore {
8186
8287 public static final Log LOG = LogFactory .getLog (FileSystemRMStateStore .class );
8388
8489 protected static final String ROOT_DIR_NAME = "FSRMStateRoot" ;
8590 protected static final Version CURRENT_VERSION_INFO = Version
86- .newInstance (1 , 2 );
91+ .newInstance (1 , 3 );
8792 protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
8893 "AMRMTokenSecretManagerNode" ;
8994
@@ -108,6 +113,8 @@ public class FileSystemRMStateStore extends RMStateStore {
108113 Path fsWorkingPath ;
109114
110115 Path amrmTokenSecretManagerRoot ;
116+ private Path reservationRoot ;
117+
111118 @ Override
112119 public synchronized void initInternal (Configuration conf )
113120 throws Exception {
@@ -117,6 +124,7 @@ public synchronized void initInternal(Configuration conf)
117124 rmAppRoot = new Path (rootDirPath , RM_APP_ROOT );
118125 amrmTokenSecretManagerRoot =
119126 new Path (rootDirPath , AMRMTOKEN_SECRET_MANAGER_ROOT );
127+ reservationRoot = new Path (rootDirPath , RESERVATION_SYSTEM_ROOT );
120128 fsNumRetries =
121129 conf .getInt (YarnConfiguration .FS_RM_STATE_STORE_NUM_RETRIES ,
122130 YarnConfiguration .DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES );
@@ -153,6 +161,7 @@ protected synchronized void startInternal() throws Exception {
153161 mkdirsWithRetries (rmDTSecretManagerRoot );
154162 mkdirsWithRetries (rmAppRoot );
155163 mkdirsWithRetries (amrmTokenSecretManagerRoot );
164+ mkdirsWithRetries (reservationRoot );
156165 }
157166
158167 @ Override
@@ -222,9 +231,24 @@ public synchronized RMState loadState() throws Exception {
222231 loadRMAppState (rmState );
223232 // recover AMRMTokenSecretManager
224233 loadAMRMTokenSecretManagerState (rmState );
234+ // recover reservation state
235+ loadReservationSystemState (rmState );
225236 return rmState ;
226237 }
227238
239+ private void loadReservationSystemState (RMState rmState ) throws Exception {
240+ try {
241+ final ReservationStateFileProcessor fileProcessor = new
242+ ReservationStateFileProcessor (rmState );
243+ final Path rootDirectory = this .reservationRoot ;
244+
245+ processDirectoriesOfFiles (fileProcessor , rootDirectory );
246+ } catch (Exception e ) {
247+ LOG .error ("Failed to load state." , e );
248+ throw e ;
249+ }
250+ }
251+
228252 private void loadAMRMTokenSecretManagerState (RMState rmState )
229253 throws Exception {
230254 checkAndResumeUpdateOperation (amrmTokenSecretManagerRoot );
@@ -248,50 +272,12 @@ private void loadAMRMTokenSecretManagerState(RMState rmState)
248272
249273 private void loadRMAppState (RMState rmState ) throws Exception {
250274 try {
251- List <ApplicationAttemptStateData > attempts =
252- new ArrayList <ApplicationAttemptStateData >();
253-
254- for (FileStatus appDir : listStatusWithRetries (rmAppRoot )) {
255- checkAndResumeUpdateOperation (appDir .getPath ());
256- for (FileStatus childNodeStatus :
257- listStatusWithRetries (appDir .getPath ())) {
258- assert childNodeStatus .isFile ();
259- String childNodeName = childNodeStatus .getPath ().getName ();
260- if (checkAndRemovePartialRecordWithRetries (
261- childNodeStatus .getPath ())) {
262- continue ;
263- }
264- byte [] childData = readFileWithRetries (childNodeStatus .getPath (),
265- childNodeStatus .getLen ());
266- // Set attribute if not already set
267- setUnreadableBySuperuserXattrib (childNodeStatus .getPath ());
268- if (childNodeName .startsWith (ApplicationId .appIdStrPrefix )) {
269- // application
270- if (LOG .isDebugEnabled ()) {
271- LOG .debug ("Loading application from node: " + childNodeName );
272- }
273- ApplicationStateDataPBImpl appState =
274- new ApplicationStateDataPBImpl (
275- ApplicationStateDataProto .parseFrom (childData ));
276- ApplicationId appId =
277- appState .getApplicationSubmissionContext ().getApplicationId ();
278- rmState .appState .put (appId , appState );
279- } else if (childNodeName
280- .startsWith (ApplicationAttemptId .appAttemptIdStrPrefix )) {
281- // attempt
282- if (LOG .isDebugEnabled ()) {
283- LOG .debug ("Loading application attempt from node: "
284- + childNodeName );
285- }
286- ApplicationAttemptStateDataPBImpl attemptState =
287- new ApplicationAttemptStateDataPBImpl (
288- ApplicationAttemptStateDataProto .parseFrom (childData ));
289- attempts .add (attemptState );
290- } else {
291- LOG .info ("Unknown child node with name: " + childNodeName );
292- }
293- }
294- }
275+ List <ApplicationAttemptStateData > attempts = new ArrayList <>();
276+ final RMAppStateFileProcessor rmAppStateFileProcessor =
277+ new RMAppStateFileProcessor (rmState , attempts );
278+ final Path rootDirectory = this .rmAppRoot ;
279+
280+ processDirectoriesOfFiles (rmAppStateFileProcessor , rootDirectory );
295281
296282 // go through all attempts and add them to their apps, Ideally, each
297283 // attempt node must have a corresponding app node, because remove
@@ -309,6 +295,29 @@ private void loadRMAppState(RMState rmState) throws Exception {
309295 }
310296 }
311297
298+ private void processDirectoriesOfFiles (
299+ RMStateFileProcessor rmAppStateFileProcessor , Path rootDirectory )
300+ throws Exception {
301+ for (FileStatus dir : listStatusWithRetries (rootDirectory )) {
302+ checkAndResumeUpdateOperation (dir .getPath ());
303+ String dirName = dir .getPath ().getName ();
304+ for (FileStatus fileNodeStatus : listStatusWithRetries (dir .getPath ())) {
305+ assert fileNodeStatus .isFile ();
306+ String fileName = fileNodeStatus .getPath ().getName ();
307+ if (checkAndRemovePartialRecordWithRetries (fileNodeStatus .getPath ())) {
308+ continue ;
309+ }
310+ byte [] fileData = readFileWithRetries (fileNodeStatus .getPath (),
311+ fileNodeStatus .getLen ());
312+ // Set attribute if not already set
313+ setUnreadableBySuperuserXattrib (fileNodeStatus .getPath ());
314+
315+ rmAppStateFileProcessor .processChildNode (dirName , fileName ,
316+ fileData );
317+ }
318+ }
319+ }
320+
312321 private boolean checkAndRemovePartialRecord (Path record ) throws IOException {
313322 // If the file ends with .tmp then it shows that it failed
314323 // during saving state into state store. The file will be deleted as a
@@ -843,6 +852,41 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
843852 }
844853 }
845854
855+ @ Override
856+ protected void storeReservationState (
857+ ReservationAllocationStateProto reservationAllocation , String planName ,
858+ String reservationIdName ) throws Exception {
859+ Path planCreatePath = getNodePath (reservationRoot , planName );
860+ mkdirsWithRetries (planCreatePath );
861+ Path reservationPath = getNodePath (planCreatePath , reservationIdName );
862+ LOG .info ("Storing state for reservation " + reservationIdName + " from " +
863+ "plan " + planName + " at path " + reservationPath );
864+ byte [] reservationData = reservationAllocation .toByteArray ();
865+ writeFileWithRetries (reservationPath , reservationData , true );
866+ }
867+
868+ @ Override
869+ protected void updateReservationState (
870+ ReservationAllocationStateProto reservationAllocation , String planName ,
871+ String reservationIdName ) throws Exception {
872+ Path planCreatePath = getNodePath (reservationRoot , planName );
873+ Path reservationPath = getNodePath (planCreatePath , reservationIdName );
874+ LOG .info ("Updating state for reservation " + reservationIdName + " from " +
875+ "plan " + planName + " at path " + reservationPath );
876+ byte [] reservationData = reservationAllocation .toByteArray ();
877+ updateFile (reservationPath , reservationData , true );
878+ }
879+
880+ @ Override
881+ protected void removeReservationState (
882+ String planName , String reservationIdName ) throws Exception {
883+ Path planCreatePath = getNodePath (reservationRoot , planName );
884+ Path reservationPath = getNodePath (planCreatePath , reservationIdName );
885+ LOG .info ("Removing state for reservation " + reservationIdName + " from " +
886+ "plan " + planName + " at path " + reservationPath );
887+ deleteFileWithRetries (reservationPath );
888+ }
889+
846890 @ VisibleForTesting
847891 public int getNumRetries () {
848892 return fsNumRetries ;
@@ -853,13 +897,84 @@ public long getRetryInterval() {
853897 return fsRetryInterval ;
854898 }
855899
856- private void setUnreadableBySuperuserXattrib (Path p )
857- throws IOException {
900+ private void setUnreadableBySuperuserXattrib (Path p ) throws IOException {
858901 if (fs .getScheme ().toLowerCase ().contains ("hdfs" )
859902 && intermediateEncryptionEnabled
860903 && !fs .getXAttrs (p ).containsKey (UNREADABLE_BY_SUPERUSER_XATTRIB )) {
861904 fs .setXAttr (p , UNREADABLE_BY_SUPERUSER_XATTRIB , null ,
862905 EnumSet .of (XAttrSetFlag .CREATE ));
863906 }
864907 }
908+
909+ private static class ReservationStateFileProcessor implements
910+ RMStateFileProcessor {
911+ private RMState rmState ;
912+ public ReservationStateFileProcessor (RMState state ) {
913+ this .rmState = state ;
914+ }
915+
916+ @ Override
917+ public void processChildNode (String planName , String childNodeName ,
918+ byte [] childData ) throws IOException {
919+ ReservationAllocationStateProto allocationState =
920+ ReservationAllocationStateProto .parseFrom (childData );
921+ if (!rmState .getReservationState ().containsKey (planName )) {
922+ rmState .getReservationState ().put (planName ,
923+ new HashMap <ReservationId , ReservationAllocationStateProto >());
924+ }
925+ ReservationId reservationId =
926+ ReservationId .parseReservationId (childNodeName );
927+ rmState .getReservationState ().get (planName ).put (reservationId ,
928+ allocationState );
929+ }
930+ }
931+
932+ private static class RMAppStateFileProcessor implements RMStateFileProcessor {
933+ private RMState rmState ;
934+ private List <ApplicationAttemptStateData > attempts ;
935+
936+ public RMAppStateFileProcessor (RMState rmState ,
937+ List <ApplicationAttemptStateData > attempts ) {
938+ this .rmState = rmState ;
939+ this .attempts = attempts ;
940+ }
941+
942+ @ Override
943+ public void processChildNode (String appDirName , String childNodeName ,
944+ byte [] childData )
945+ throws com .google .protobuf .InvalidProtocolBufferException {
946+ if (childNodeName .startsWith (ApplicationId .appIdStrPrefix )) {
947+ // application
948+ if (LOG .isDebugEnabled ()) {
949+ LOG .debug ("Loading application from node: " + childNodeName );
950+ }
951+ ApplicationStateDataPBImpl appState =
952+ new ApplicationStateDataPBImpl (
953+ ApplicationStateDataProto .parseFrom (childData ));
954+ ApplicationId appId =
955+ appState .getApplicationSubmissionContext ().getApplicationId ();
956+ rmState .appState .put (appId , appState );
957+ } else if (childNodeName .startsWith (
958+ ApplicationAttemptId .appAttemptIdStrPrefix )) {
959+ // attempt
960+ if (LOG .isDebugEnabled ()) {
961+ LOG .debug ("Loading application attempt from node: "
962+ + childNodeName );
963+ }
964+ ApplicationAttemptStateDataPBImpl attemptState =
965+ new ApplicationAttemptStateDataPBImpl (
966+ ApplicationAttemptStateDataProto .parseFrom (childData ));
967+ attempts .add (attemptState );
968+ } else {
969+ LOG .info ("Unknown child node with name: " + childNodeName );
970+ }
971+ }
972+ }
973+
974+ // Interface for common state processing of directory of file layout
975+ private interface RMStateFileProcessor {
976+ void processChildNode (String appDirName , String childNodeName ,
977+ byte [] childData )
978+ throws IOException ;
979+ }
865980}
0 commit comments