diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc
index ea8667aa1b713..a03d2bb248dc4 100644
--- a/docs/reference/indices/flush.asciidoc
+++ b/docs/reference/indices/flush.asciidoc
@@ -103,8 +103,7 @@ which returns something similar to:
                      "max_seq_no" : "-1",
                      "sync_id" : "AVvFY-071siAOuFGEO9P", <1>
                      "max_unsafe_auto_id_timestamp" : "-1",
-                     "min_retained_seq_no" : "0",
-                     "retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"
+                     "min_retained_seq_no" : "0"
                    },
                    "num_docs" : 0
                  }
@@ -119,7 +118,6 @@ which returns something similar to:
 // TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
 // TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
 // TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
-// TESTRESPONSE[s/"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/]
 <1> the `sync id` marker
 
 [float]
diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java
index 56d8c6bab6184..dbe779864fe47 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -114,7 +114,6 @@ public abstract class Engine implements Closeable {
     public static final String SYNC_COMMIT_ID = "sync_id";
     public static final String HISTORY_UUID_KEY = "history_uuid";
     public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
-    public static final String RETENTION_LEASES = "retention_leases";
     public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
 
     protected final ShardId shardId;
diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 2def84f875b17..32354ab4b16d7 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -51,7 +51,6 @@
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.SuppressForbidden;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lucene.LoggerInfoStream;
 import org.elasticsearch.common.lucene.Lucene;
@@ -75,7 +74,6 @@
 import org.elasticsearch.index.merge.MergeStats;
 import org.elasticsearch.index.merge.OnGoingMerge;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
-import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
@@ -2344,13 +2342,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
                 commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
                 commitData.put(HISTORY_UUID_KEY, historyUUID);
                 if (softDeleteEnabled) {
-                    /*
-                     * We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
-                     * retained sequence number, and the retention leases.
-                     */
-                    final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
-                    commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
-                    commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
+                    commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
                 }
                 logger.trace("committing writer with commit data [{}]", commitData);
                 return commitData.entrySet().iterator();
diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java
index 9a9c7bd0ee869..4c9ee0be92f46 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java
@@ -21,7 +21,6 @@
 
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.search.Query;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.seqno.RetentionLease;
@@ -107,10 +106,6 @@ private synchronized void releaseRetentionLock() {
      * Operations whose seq# is least this value should exist in the Lucene index.
      */
     synchronized long getMinRetainedSeqNo() {
-        return getRetentionPolicy().v1();
-    }
-
-    public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
         /*
          * When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
          * locked for peer recovery.
@@ -151,7 +146,7 @@ public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
              */
             minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
         }
-        return Tuple.tuple(minRetainedSeqNo, retentionLeases);
+        return minRetainedSeqNo;
     }
 
     /**
diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
index 3e4e83d365ec1..566a81b3af4b0 100644
--- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
+++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
@@ -32,6 +32,8 @@
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.gateway.WriteStateException;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.shard.IndexShard;
@@ -39,6 +41,7 @@
 import org.elasticsearch.index.shard.ShardId;
 
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -318,6 +321,40 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re
         }
     }
 
+    /**
+     * Loads the latest retention leases from their dedicated state file.
+     *
+     * @param path the path to the directory containing the state file
+     * @return the retention leases
+     * @throws IOException if an I/O exception occurs reading the retention leases
+     */
+    public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
+        final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
+        if (retentionLeases == null) {
+            return RetentionLeases.EMPTY;
+        }
+        return retentionLeases;
+    }
+
+    private final Object retentionLeasePersistenceLock = new Object();
+
+    /**
+     * Persists the current retention leases to their dedicated state file.
+     *
+     * @param path the path to the directory containing the state file
+     * @throws WriteStateException if an exception occurs writing the state file
+     */
+    public void persistRetentionLeases(final Path path) throws WriteStateException {
+        synchronized (retentionLeasePersistenceLock) {
+            final RetentionLeases currentRetentionLeases;
+            synchronized (this) {
+                currentRetentionLeases = retentionLeases;
+            }
+            logger.trace("persisting retention leases [{}]", currentRetentionLeases);
+            RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
+        }
+    }
+
     public static class CheckpointState implements Writeable {
 
         /**
diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java
index e1d362d98764a..e6d6ed3fe825f 100644
--- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java
+++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java
@@ -19,13 +19,16 @@
 
 package org.elasticsearch.index.seqno;
 
+import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Locale;
 import java.util.Objects;
 
 /**
@@ -34,7 +37,7 @@
  * otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
  * number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr").
  */
-public final class RetentionLease implements Writeable {
+public final class RetentionLease implements ToXContent, Writeable {
 
     private final String id;
 
@@ -94,10 +97,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
         if (id.isEmpty()) {
             throw new IllegalArgumentException("retention lease ID can not be empty");
         }
-        if (id.contains(":") || id.contains(";") || id.contains(",")) {
-            // retention lease IDs can not contain these characters because they are used in encoding retention leases
-            throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]");
-        }
         if (retainingSequenceNumber < 0) {
             throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
         }
@@ -108,10 +107,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
         if (source.isEmpty()) {
             throw new IllegalArgumentException("retention lease source can not be empty");
         }
-        if (source.contains(":") || source.contains(";") || source.contains(",")) {
-            // retention lease sources can not contain these characters because they are used in encoding retention leases
-            throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]");
-        }
         this.id = id;
         this.retainingSequenceNumber = retainingSequenceNumber;
         this.timestamp = timestamp;
@@ -145,43 +140,49 @@ public void writeTo(final StreamOutput out) throws IOException {
         out.writeString(source);
     }
 
-    /**
-     * Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is
-     * encoded in the format <code>id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}</code>.
-     *
-     * @param retentionLease the retention lease
-     * @return the encoding of the retention lease
-     */
-    static String encodeRetentionLease(final RetentionLease retentionLease) {
-        Objects.requireNonNull(retentionLease);
-        return String.format(
-                Locale.ROOT,
-                "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
-                retentionLease.id,
-                retentionLease.retainingSequenceNumber,
-                retentionLease.timestamp,
-                retentionLease.source);
+    private static final ParseField ID_FIELD = new ParseField("id");
+    private static final ParseField RETAINING_SEQUENCE_NUMBER_FIELD = new ParseField("retaining_sequence_number");
+    private static final ParseField TIMESTAMP_FIELD = new ParseField("timestamp");
+    private static final ParseField SOURCE_FIELD = new ParseField("source");
+
+    private static ConstructingObjectParser<RetentionLease, Void> PARSER = new ConstructingObjectParser<>(
+            "retention_leases",
+            (a) -> new RetentionLease((String) a[0], (Long) a[1], (Long) a[2], (String) a[3]));
+
+    static {
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
+        PARSER.declareLong(ConstructingObjectParser.constructorArg(), RETAINING_SEQUENCE_NUMBER_FIELD);
+        PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD);
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), SOURCE_FIELD);
+    }
+
+    @Override
+    public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
+        builder.startObject();
+        {
+            builder.field(ID_FIELD.getPreferredName(), id);
+            builder.field(RETAINING_SEQUENCE_NUMBER_FIELD.getPreferredName(), retainingSequenceNumber);
+            builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp);
+            builder.field(SOURCE_FIELD.getPreferredName(), source);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public boolean isFragment() {
+        return false;
     }
 
     /**
-     * Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
+     * Parses a retention lease from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention lease was
+     * converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}.
      *
-     * @param encodedRetentionLease an encoded retention lease
-     * @return the decoded retention lease
+     * @param parser the parser
+     * @return a retention lease
      */
-    static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
-        Objects.requireNonNull(encodedRetentionLease);
-        final String[] fields = encodedRetentionLease.split(";");
-        assert fields.length == 4 : Arrays.toString(fields);
-        assert fields[0].matches("id:[^:;,]+") : fields[0];
-        final String id = fields[0].substring("id:".length());
-        assert fields[1].matches("retaining_seq_no:\\d+") : fields[1];
-        final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length()));
-        assert fields[2].matches("timestamp:\\d+") : fields[2];
-        final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length()));
-        assert fields[3].matches("source:[^:;,]+") : fields[3];
-        final String source = fields[3].substring("source:".length());
-        return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
+    public static RetentionLease fromXContent(final XContentParser parser) {
+        return PARSER.apply(parser, null);
     }
 
     @Override
diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java
index 906b505dad7e3..4033dcf0c4bef 100644
--- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java
+++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java
@@ -37,6 +37,7 @@
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.gateway.WriteStateException;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardClosedException;
 import org.elasticsearch.index.shard.ShardId;
@@ -119,19 +120,21 @@ public void backgroundSync(
     }
 
     @Override
-    protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(final Request request, final IndexShard primary) {
+    protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
+            final Request request,
+            final IndexShard primary) throws WriteStateException {
         Objects.requireNonNull(request);
         Objects.requireNonNull(primary);
-        primary.afterWriteOperation();
+        primary.persistRetentionLeases();
         return new PrimaryResult<>(request, new ReplicationResponse());
     }
 
     @Override
-    protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){
+    protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws WriteStateException {
         Objects.requireNonNull(request);
         Objects.requireNonNull(replica);
         replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
-        replica.afterWriteOperation();
+        replica.persistRetentionLeases();
         return new ReplicaResult();
     }
 
diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java
index 9be7ab046eb8b..760271e53ee1e 100644
--- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java
+++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java
@@ -25,7 +25,6 @@
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.WriteResponse;
 import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
@@ -39,6 +38,7 @@
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.gateway.WriteStateException;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardClosedException;
 import org.elasticsearch.index.shard.ShardId;
@@ -121,31 +121,26 @@ public void sync(
     }
 
     @Override
-    protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(final Request request, final IndexShard primary) {
+    protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
+            final Request request,
+            final IndexShard primary) throws WriteStateException {
         Objects.requireNonNull(request);
         Objects.requireNonNull(primary);
-        // we flush to ensure that retention leases are committed
-        flush(primary);
+        primary.persistRetentionLeases();
         return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
     }
 
     @Override
-    protected WriteReplicaResult<Request> shardOperationOnReplica(final Request request, final IndexShard replica) {
+    protected WriteReplicaResult<Request> shardOperationOnReplica(
+            final Request request,
+            final IndexShard replica) throws WriteStateException {
         Objects.requireNonNull(request);
         Objects.requireNonNull(replica);
         replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
-        // we flush to ensure that retention leases are committed
-        flush(replica);
+        replica.persistRetentionLeases();
         return new WriteReplicaResult<>(request, null, null, replica, logger);
     }
 
-    private void flush(final IndexShard indexShard) {
-        final FlushRequest flushRequest = new FlushRequest();
-        flushRequest.force(true);
-        flushRequest.waitIfOngoing(true);
-        indexShard.flush(flushRequest);
-    }
-
     public static final class Request extends ReplicatedWriteRequest<Request> {
 
         private RetentionLeases retentionLeases;
diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java
index 5a9d9e333b27b..3bad887282502 100644
--- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java
+++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java
@@ -19,15 +19,20 @@
 
 package org.elasticsearch.index.seqno;
 
+import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.gateway.MetaDataStateFormat;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Locale;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Function;
@@ -37,7 +42,7 @@
  * Represents a versioned collection of retention leases. We version the collection of retention leases to ensure that sync requests that
  * arrive out of order on the replica, using the version to ensure that older sync requests are rejected.
  */
-public class RetentionLeases implements Writeable {
+public class RetentionLeases implements ToXContent, Writeable {
 
     private final long primaryTerm;
 
@@ -157,54 +162,59 @@ public void writeTo(final StreamOutput out) throws IOException {
         out.writeCollection(leases.values());
     }
 
-    /**
-     * Encodes a retention lease collection as a string. This encoding can be decoded by
-     * {@link RetentionLeases#decodeRetentionLeases(String)}. The encoding is a comma-separated encoding of each retention lease as encoded
-     * by {@link RetentionLease#encodeRetentionLease(RetentionLease)}, prefixed by the version of the retention lease collection.
-     *
-     * @param retentionLeases the retention lease collection
-     * @return the encoding of the retention lease collection
-     */
-    public static String encodeRetentionLeases(final RetentionLeases retentionLeases) {
-        Objects.requireNonNull(retentionLeases);
-        return String.format(
-                Locale.ROOT,
-                "primary_term:%d;version:%d;%s",
-                retentionLeases.primaryTerm,
-                retentionLeases.version,
-                retentionLeases.leases.values().stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")));
+    private static final ParseField PRIMARY_TERM_FIELD = new ParseField("primary_term");
+    private static final ParseField VERSION_FIELD = new ParseField("version");
+    private static final ParseField LEASES_FIELD = new ParseField("leases");
+
+    @SuppressWarnings("unchecked")
+    private static ConstructingObjectParser<RetentionLeases, Void> PARSER = new ConstructingObjectParser<>(
+            "retention_leases",
+            (a) -> new RetentionLeases((Long) a[0], (Long) a[1], (Collection<RetentionLease>) a[2]));
+
+    static {
+        PARSER.declareLong(ConstructingObjectParser.constructorArg(), PRIMARY_TERM_FIELD);
+        PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION_FIELD);
+        PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> RetentionLease.fromXContent(p), LEASES_FIELD);
+    }
+
+    @Override
+    public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
+        builder.field(PRIMARY_TERM_FIELD.getPreferredName(), primaryTerm);
+        builder.field(VERSION_FIELD.getPreferredName(), version);
+        builder.startArray(LEASES_FIELD.getPreferredName());
+        {
+            for (final RetentionLease retentionLease : leases.values()) {
+                retentionLease.toXContent(builder, params);
+            }
+        }
+        builder.endArray();
+        return builder;
     }
 
     /**
-     * Decodes retention leases encoded by {@link #encodeRetentionLeases(RetentionLeases)}.
+     * Parses a retention leases collection from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention
+     * leases were converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}.
      *
-     * @param encodedRetentionLeases an encoded retention lease collection
-     * @return the decoded retention lease collection
+     * @param parser the parser
+     * @return a retention leases collection
      */
-    public static RetentionLeases decodeRetentionLeases(final String encodedRetentionLeases) {
-        Objects.requireNonNull(encodedRetentionLeases);
-        if (encodedRetentionLeases.isEmpty()) {
-            return EMPTY;
+    public static RetentionLeases fromXContent(final XContentParser parser) {
+        return PARSER.apply(parser, null);
+    }
+
+    static final MetaDataStateFormat<RetentionLeases> FORMAT = new MetaDataStateFormat<RetentionLeases>("retention-leases-") {
+
+        @Override
+        public void toXContent(final XContentBuilder builder, final RetentionLeases retentionLeases) throws IOException {
+            retentionLeases.toXContent(builder, ToXContent.EMPTY_PARAMS);
         }
-        assert encodedRetentionLeases.matches("primary_term:\\d+;version:\\d+;.*") : encodedRetentionLeases;
-        final int firstSemicolon = encodedRetentionLeases.indexOf(";");
-        final long primaryTerm = Long.parseLong(encodedRetentionLeases.substring("primary_term:".length(), firstSemicolon));
-        final int secondSemicolon = encodedRetentionLeases.indexOf(";", firstSemicolon + 1);
-        final long version = Long.parseLong(encodedRetentionLeases.substring(firstSemicolon + 1 + "version:".length(), secondSemicolon));
-        final Collection<RetentionLease> leases;
-        if (secondSemicolon + 1 == encodedRetentionLeases.length()) {
-            leases = Collections.emptyList();
-        } else {
-            assert Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(","))
-                    .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
-                    : encodedRetentionLeases;
-            leases = Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(","))
-                    .map(RetentionLease::decodeRetentionLease)
-                    .collect(Collectors.toList());
+
+        @Override
+        public RetentionLeases fromXContent(final XContentParser parser) {
+            return RetentionLeases.fromXContent(parser);
         }
 
-        return new RetentionLeases(primaryTerm, version, leases);
-    }
+    };
 
     @Override
     public boolean equals(Object o) {
@@ -237,7 +247,16 @@ public String toString() {
      * @return the map from retention lease ID to retention lease
      */
     private static Map<String, RetentionLease> toMap(final Collection<RetentionLease> leases) {
-        return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()));
+        // use a linked hash map to preserve order
+        return leases.stream()
+                .collect(Collectors.toMap(
+                        RetentionLease::id,
+                        Function.identity(),
+                        (left, right) -> {
+                            assert left.id().equals(right.id()) : "expected [" + left.id() + "] to equal [" + right.id() + "]";
+                            throw new IllegalStateException("duplicate retention lease ID [" + left.id() + "]");
+                        },
+                        LinkedHashMap::new));
     }
 
     /**
diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 50400c6961741..2526869c6454d 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -66,6 +66,7 @@
 import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.gateway.WriteStateException;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -1431,7 +1432,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
         final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
         final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
         replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
-        replicationTracker.updateRetentionLeasesOnReplica(getRetentionLeases(store.readLastCommittedSegmentsInfo()));
+        updateRetentionLeasesOnReplica(loadRetentionLeases());
         trimUnsafeCommits();
         synchronized (mutex) {
             verifyNotClosed();
@@ -1451,14 +1452,6 @@ private void innerOpenEngineAndTranslog() throws IOException {
         assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
     }
 
-    static RetentionLeases getRetentionLeases(final SegmentInfos segmentInfos) {
-        final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES);
-        if (committedRetentionLeases == null) {
-            return RetentionLeases.EMPTY;
-        }
-        return RetentionLeases.decodeRetentionLeases(committedRetentionLeases);
-    }
-
     private void trimUnsafeCommits() throws IOException {
         assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running";
         final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
@@ -2003,6 +1996,27 @@ public void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases
         replicationTracker.updateRetentionLeasesOnReplica(retentionLeases);
     }
 
+    /**
+     * Loads the latest retention leases from their dedicated state file.
+     *
+     * @return the retention leases
+     * @throws IOException if an I/O exception occurs reading the retention leases
+     */
+    public RetentionLeases loadRetentionLeases() throws IOException {
+        verifyNotClosed();
+        return replicationTracker.loadRetentionLeases(path.getShardStatePath());
+    }
+
+    /**
+     * Persists the current retention leases to their dedicated state file.
+     *
+     * @throws WriteStateException if an exception occurs writing the state file
+     */
+    public void persistRetentionLeases() throws WriteStateException {
+        verifyNotClosed();
+        replicationTracker.persistRetentionLeases(path.getShardStatePath());
+    }
+
     /**
      * Syncs the current retention leases to all replicas.
      */
diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java
index c693d6b9d80fe..46f75f0db3745 100644
--- a/server/src/main/java/org/elasticsearch/index/store/Store.java
+++ b/server/src/main/java/org/elasticsearch/index/store/Store.java
@@ -1548,13 +1548,6 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
                     + translogUUID + "]");
             }
             if (startingIndexCommit.equals(lastIndexCommitCommit) == false) {
-                /*
-                 * Unlike other commit tags, the retention-leases tag is not restored when an engine is
-                 * recovered from translog. We need to manually copy it from the last commit to the safe commit;
-                 * otherwise we might lose the latest committed retention leases when re-opening an engine.
-                 */
-                final Map<String, String> userData = new HashMap<>(startingIndexCommit.getUserData());
-                userData.put(Engine.RETENTION_LEASES, lastIndexCommitCommit.getUserData().getOrDefault(Engine.RETENTION_LEASES, ""));
                 try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
                     // this achieves two things:
                     // - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
@@ -1565,7 +1558,7 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
 
                     // The new commit will use segment files from the starting commit but userData from the last commit by default.
                     // Thus, we need to manually set the userData from the starting commit to the new commit.
-                    writer.setLiveCommitData(userData.entrySet());
+                    writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet());
                     writer.commit();
                 }
             }
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index 76f2200a47d82..00f07ed84e527 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -307,6 +307,7 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> l
             indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
             // Persist the global checkpoint.
             indexShard.sync();
+            indexShard.persistRetentionLeases();
             indexShard.finalizeRecovery();
             return null;
         });
diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index d9ed5cd2c719e..8d0865a652578 100644
--- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -5361,16 +5361,6 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
                 engine.flush(true, true);
                 assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)),
                     equalTo(engine.getMinRetainedSeqNo()));
-                final RetentionLeases leases = retentionLeasesHolder.get();
-                if (leases.leases().isEmpty()) {
-                    assertThat(
-                            engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
-                            equalTo("primary_term:" + primaryTerm + ";version:" + retentionLeasesVersion.get() + ";"));
-                } else {
-                    assertThat(
-                            engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
-                            equalTo(RetentionLeases.encodeRetentionLeases(leases)));
-                }
             }
             if (rarely()) {
                 engine.forceMerge(randomBoolean());
diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java
index e4da636deaf6d..3c71e4fede3d5 100644
--- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java
+++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java
@@ -37,8 +37,6 @@
 import java.util.function.Supplier;
 
 import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 
@@ -121,30 +119,6 @@ public void testSoftDeletesRetentionLock() {
         assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
     }
 
-    public void testAlwaysFetchLatestRetentionLeases() {
-        final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
-        final Collection<RetentionLease> leases = new ArrayList<>();
-        final int numLeases = randomIntBetween(0, 10);
-        for (int i = 0; i < numLeases; i++) {
-            leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test"));
-        }
-        final Supplier<RetentionLeases> leasesSupplier =
-                () -> new RetentionLeases(
-                        randomNonNegativeLong(),
-                        randomNonNegativeLong(),
-                        Collections.unmodifiableCollection(new ArrayList<>(leases)));
-        final SoftDeletesPolicy policy =
-                new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier);
-        if (randomBoolean()) {
-            policy.acquireRetentionLock();
-        }
-        if (numLeases == 0) {
-            assertThat(policy.getRetentionPolicy().v2().leases(), empty());
-        } else {
-            assertThat(policy.getRetentionPolicy().v2().leases(), contains(leases.toArray(new RetentionLease[0])));
-        }
-    }
-
     public void testWhenGlobalCheckpointDictatesThePolicy() {
         final int retentionOperations = randomIntBetween(0, 1024);
         final AtomicLong globalCheckpoint = new AtomicLong(randomLongBetween(0, Long.MAX_VALUE - 2));
diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java
index a9aae80db6ca4..967328514a98d 100644
--- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java
+++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java
@@ -24,16 +24,21 @@
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.gateway.WriteStateException;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.IndexSettingsModule;
 
+import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -460,6 +465,109 @@ public void testReplicaIgnoresOlderRetentionLeasesVersion() {
         }
     }
 
+    public void testLoadAndPersistRetentionLeases() throws IOException {
+        final AllocationId allocationId = AllocationId.newInitializing();
+        long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
+        final ReplicationTracker replicationTracker = new ReplicationTracker(
+                new ShardId("test", "_na", 0),
+                allocationId.getId(),
+                IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
+                primaryTerm,
+                UNASSIGNED_SEQ_NO,
+                value -> {},
+                () -> 0L,
+                (leases, listener) -> {});
+        replicationTracker.updateFromMaster(
+                randomNonNegativeLong(),
+                Collections.singleton(allocationId.getId()),
+                routingTable(Collections.emptySet(), allocationId),
+                Collections.emptySet());
+        replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
+        final int length = randomIntBetween(0, 8);
+        for (int i = 0; i < length; i++) {
+            if (rarely() && primaryTerm < Long.MAX_VALUE) {
+                primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
+                replicationTracker.setOperationPrimaryTerm(primaryTerm);
+            }
+            final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
+            replicationTracker.addRetentionLease(
+                    Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
+        }
+
+        final Path path = createTempDir();
+        replicationTracker.persistRetentionLeases(path);
+        assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
+    }
+
+    /**
+     * Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
+     * This test can fail without the synchronization block in that method.
+     *
+     * @throws IOException if an I/O exception occurs loading the retention lease state file
+     */
+    public void testPersistRetentionLeasesUnderConcurrency() throws IOException {
+        final AllocationId allocationId = AllocationId.newInitializing();
+        long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
+        final ReplicationTracker replicationTracker = new ReplicationTracker(
+                new ShardId("test", "_na", 0),
+                allocationId.getId(),
+                IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
+                primaryTerm,
+                UNASSIGNED_SEQ_NO,
+                value -> {},
+                () -> 0L,
+                (leases, listener) -> {});
+        replicationTracker.updateFromMaster(
+                randomNonNegativeLong(),
+                Collections.singleton(allocationId.getId()),
+                routingTable(Collections.emptySet(), allocationId),
+                Collections.emptySet());
+        replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
+        final int length = randomIntBetween(0, 8);
+        for (int i = 0; i < length; i++) {
+            if (rarely() && primaryTerm < Long.MAX_VALUE) {
+                primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
+                replicationTracker.setOperationPrimaryTerm(primaryTerm);
+            }
+            final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
+            replicationTracker.addRetentionLease(
+                    Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
+        }
+
+        final Path path = createTempDir();
+        final int numberOfThreads = randomIntBetween(1, 2 * Runtime.getRuntime().availableProcessors());
+        final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
+        final Thread[] threads = new Thread[numberOfThreads];
+        for (int i = 0; i < numberOfThreads; i++) {
+            final String id = Integer.toString(length + i);
+            threads[i] = new Thread(() -> {
+                try {
+                    barrier.await();
+                    final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
+                    replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test-" + id, ActionListener.wrap(() -> {}));
+                    replicationTracker.persistRetentionLeases(path);
+                    barrier.await();
+                } catch (final BrokenBarrierException | InterruptedException | WriteStateException e) {
+                    throw new AssertionError(e);
+                }
+            });
+            threads[i].start();
+        }
+
+        try {
+            // synchronize the threads invoking ReplicationTracker#persistRetentionLeases(Path path)
+            barrier.await();
+            // wait for all the threads to finish
+            barrier.await();
+            for (int i = 0; i < numberOfThreads; i++) {
+                threads[i].join();
+            }
+        } catch (final BrokenBarrierException | InterruptedException e) {
+            throw new AssertionError(e);
+        }
+        assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
+    }
+
     private void assertRetentionLeases(
             final ReplicationTracker replicationTracker,
             final int size,
diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java
index e738c04d2a1bb..4567f3e382337 100644
--- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java
+++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java
@@ -30,6 +30,7 @@
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.gateway.WriteStateException;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.shard.IndexShard;
@@ -91,7 +92,7 @@ public void tearDown() throws Exception {
         super.tearDown();
     }
 
-    public void testRetentionLeaseBackgroundSyncActionOnPrimary() {
+    public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws WriteStateException {
         final IndicesService indicesService = mock(IndicesService.class);
 
         final Index index = new Index("index", "uuid");
@@ -120,13 +121,13 @@ public void testRetentionLeaseBackgroundSyncActionOnPrimary() {
 
         final ReplicationOperation.PrimaryResult<RetentionLeaseBackgroundSyncAction.Request> result =
                 action.shardOperationOnPrimary(request, indexShard);
-        // the retention leases on the shard should be periodically flushed
-        verify(indexShard).afterWriteOperation();
+        // the retention leases on the shard should be persisted
+        verify(indexShard).persistRetentionLeases();
         // we should forward the request containing the current retention leases to the replica
         assertThat(result.replicaRequest(), sameInstance(request));
     }
 
-    public void testRetentionLeaseBackgroundSyncActionOnReplica() {
+    public void testRetentionLeaseBackgroundSyncActionOnReplica() throws WriteStateException {
         final IndicesService indicesService = mock(IndicesService.class);
 
         final Index index = new Index("index", "uuid");
@@ -156,8 +157,8 @@ public void testRetentionLeaseBackgroundSyncActionOnReplica() {
         final TransportReplicationAction.ReplicaResult result = action.shardOperationOnReplica(request, indexShard);
         // the retention leases on the shard should be updated
         verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases);
-        // the retention leases on the shard should be periodically flushed
-        verify(indexShard).afterWriteOperation();
+        // the retention leases on the shard should be persisted
+        verify(indexShard).persistRetentionLeases();
         // the result should indicate success
         final AtomicBoolean success = new AtomicBoolean();
         result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java
index d92db46701df8..44a8cd70c42eb 100644
--- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java
+++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java
@@ -28,7 +28,6 @@
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
-import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
@@ -103,10 +102,8 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
             latch.await();
             retentionLock.close();
 
-            // check retention leases have been committed on the primary
-            final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
-                    primary.commitStats().getUserData().get(Engine.RETENTION_LEASES));
-            assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases)));
+            // check retention leases have been written on the primary
+            assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases())));
 
             // check current retention leases have been synced to all replicas
             for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
@@ -118,10 +115,8 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
                 final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
                 assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
 
-                // check retention leases have been committed on the replica
-                final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
-                        replica.commitStats().getUserData().get(Engine.RETENTION_LEASES));
-                assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
+                // check retention leases have been written on the replica
+                assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases())));
             }
         }
     }
@@ -165,10 +160,8 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception {
             latch.await();
             retentionLock.close();
 
-            // check retention leases have been committed on the primary
-            final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
-                    primary.commitStats().getUserData().get(Engine.RETENTION_LEASES));
-            assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases)));
+            // check retention leases have been written on the primary
+            assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases())));
 
             // check current retention leases have been synced to all replicas
             for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
@@ -180,10 +173,8 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception {
                 final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
                 assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
 
-                // check retention leases have been committed on the replica
-                final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
-                        replica.commitStats().getUserData().get(Engine.RETENTION_LEASES));
-                assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
+                // check retention leases have been written on the replica
+                assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases())));
             }
         }
     }
@@ -322,7 +313,6 @@ public void testBackgroundRetentionLeaseSync() throws Exception {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38588")
     public void testRetentionLeasesSyncOnRecovery() throws Exception {
         final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
         internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
@@ -378,6 +368,9 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
                     .getShardOrNull(new ShardId(resolveIndex("index"), 0));
             final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
             assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
+
+            // check retention leases have been written on the replica; see RecoveryTarget#finalizeRecovery
+            assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases())));
         }
     }
 
diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java
index 18817d784b131..80baa23a4d7ac 100644
--- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java
+++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java
@@ -21,7 +21,6 @@
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.replication.TransportWriteAction;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
@@ -29,6 +28,7 @@
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.gateway.WriteStateException;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.shard.IndexShard;
@@ -90,7 +90,7 @@ public void tearDown() throws Exception {
         super.tearDown();
     }
 
-    public void testRetentionLeaseSyncActionOnPrimary() {
+    public void testRetentionLeaseSyncActionOnPrimary() throws WriteStateException {
         final IndicesService indicesService = mock(IndicesService.class);
 
         final Index index = new Index("index", "uuid");
@@ -118,18 +118,15 @@ public void testRetentionLeaseSyncActionOnPrimary() {
 
         final TransportWriteAction.WritePrimaryResult<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> result =
                 action.shardOperationOnPrimary(request, indexShard);
-        // the retention leases on the shard should be flushed
-        final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
-        verify(indexShard).flush(flushRequest.capture());
-        assertTrue(flushRequest.getValue().force());
-        assertTrue(flushRequest.getValue().waitIfOngoing());
+        // the retention leases on the shard should be persisted
+        verify(indexShard).persistRetentionLeases();
         // we should forward the request containing the current retention leases to the replica
         assertThat(result.replicaRequest(), sameInstance(request));
         // we should start with an empty replication response
         assertNull(result.finalResponseIfSuccessful.getShardInfo());
     }
 
-    public void testRetentionLeaseSyncActionOnReplica() {
+    public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException {
         final IndicesService indicesService = mock(IndicesService.class);
 
         final Index index = new Index("index", "uuid");
@@ -159,11 +156,8 @@ public void testRetentionLeaseSyncActionOnReplica() {
                 action.shardOperationOnReplica(request, indexShard);
         // the retention leases on the shard should be updated
         verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases);
-        // the retention leases on the shard should be flushed
-        final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
-        verify(indexShard).flush(flushRequest.capture());
-        assertTrue(flushRequest.getValue().force());
-        assertTrue(flushRequest.getValue().waitIfOngoing());
+        // the retention leases on the shard should be persisteed
+        verify(indexShard).persistRetentionLeases();
         // the result should indicate success
         final AtomicBoolean success = new AtomicBoolean();
         result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java
index bd2dee78b05ed..f38a806bd7b95 100644
--- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java
+++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java
@@ -31,14 +31,6 @@
 
 public class RetentionLeaseTests extends ESTestCase {
 
-    public void testInvalidId() {
-        final String id = "id" + randomFrom(":", ";", ",");
-        final IllegalArgumentException e = expectThrows(
-                IllegalArgumentException.class,
-                () -> new RetentionLease(id, randomNonNegativeLong(), randomNonNegativeLong(), "source"));
-        assertThat(e, hasToString(containsString("retention lease ID can not contain any of [:;,] but was [" + id + "]")));
-    }
-
     public void testEmptyId() {
         final IllegalArgumentException e = expectThrows(
                 IllegalArgumentException.class,
@@ -64,14 +56,6 @@ public void testTimestampOutOfRange() {
         assertThat(e, hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range")));
     }
 
-    public void testInvalidSource() {
-        final String source = "source" + randomFrom(":", ";", ",");
-        final IllegalArgumentException e = expectThrows(
-                IllegalArgumentException.class,
-                () -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), source));
-        assertThat(e, hasToString(containsString("retention lease source can not contain any of [:;,] but was [" + source + "]")));
-    }
-
     public void testEmptySource() {
         final IllegalArgumentException e = expectThrows(
                 IllegalArgumentException.class,
@@ -93,13 +77,4 @@ public void testRetentionLeaseSerialization() throws IOException {
         }
     }
 
-    public void testRetentionLeaseEncoding() {
-        final String id = randomAlphaOfLength(8);
-        final long retainingSequenceNumber = randomNonNegativeLong();
-        final long timestamp = randomNonNegativeLong();
-        final String source = randomAlphaOfLength(8);
-        final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
-        assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease));
-    }
-
 }
diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseXContentTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseXContentTests.java
new file mode 100644
index 0000000000000..159e85b572b98
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseXContentTests.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.seqno;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+
+public class RetentionLeaseXContentTests extends AbstractXContentTestCase<RetentionLease> {
+
+    @Override
+    protected RetentionLease createTestInstance() {
+        final String id = randomAlphaOfLength(8);
+        final long retainingSequenceNumber = randomNonNegativeLong();
+        final long timestamp = randomNonNegativeLong();
+        final String source = randomAlphaOfLength(8);
+        return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
+    }
+
+    @Override
+    protected RetentionLease doParseInstance(final XContentParser parser) throws IOException {
+        return RetentionLease.fromXContent(parser);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return false;
+    }
+
+}
diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java
index 33cc83f602860..28444c7825e4d 100644
--- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java
+++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java
@@ -19,13 +19,18 @@
 
 package org.elasticsearch.index.seqno;
 
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.test.ESTestCase;
 
+import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
@@ -49,30 +54,6 @@ public void testVersionOutOfRange() {
         assertThat(e, hasToString(containsString("version must be non-negative but was [" + version + "]")));
     }
 
-    public void testRetentionLeasesEncoding() {
-        final long primaryTerm = randomNonNegativeLong();
-        final long version = randomNonNegativeLong();
-        final int length = randomIntBetween(0, 8);
-        final List<RetentionLease> retentionLeases = new ArrayList<>(length);
-        for (int i = 0; i < length; i++) {
-            final String id = randomAlphaOfLength(8);
-            final long retainingSequenceNumber = randomNonNegativeLong();
-            final long timestamp = randomNonNegativeLong();
-            final String source = randomAlphaOfLength(8);
-            final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
-            retentionLeases.add(retentionLease);
-        }
-        final RetentionLeases decodedRetentionLeases =
-                RetentionLeases.decodeRetentionLeases(
-                        RetentionLeases.encodeRetentionLeases(new RetentionLeases(primaryTerm, version, retentionLeases)));
-        assertThat(decodedRetentionLeases.version(), equalTo(version));
-        if (length == 0) {
-            assertThat(decodedRetentionLeases.leases(), empty());
-        } else {
-            assertThat(decodedRetentionLeases.leases(), containsInAnyOrder(retentionLeases.toArray(new RetentionLease[0])));
-        }
-    }
-
     public void testSupersedesByPrimaryTerm() {
         final long lowerPrimaryTerm = randomLongBetween(1, Long.MAX_VALUE);
         final RetentionLeases left = new RetentionLeases(lowerPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
@@ -92,4 +73,48 @@ public void testSupersedesByVersion() {
         assertFalse(left.supersedes(right));
     }
 
+    public void testRetentionLeasesRejectsDuplicates() {
+        final RetentionLeases retentionLeases = randomRetentionLeases(false);
+        final RetentionLease retentionLease = randomFrom(retentionLeases.leases());
+        final IllegalStateException e = expectThrows(
+                IllegalStateException.class,
+                () -> new RetentionLeases(
+                        retentionLeases.primaryTerm(),
+                        retentionLeases.version(),
+                        Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())));
+        assertThat(e, hasToString(containsString("duplicate retention lease ID [" + retentionLease.id() + "]")));
+    }
+
+    public void testLeasesPreservesIterationOrder() {
+        final RetentionLeases retentionLeases = randomRetentionLeases(true);
+        if (retentionLeases.leases().isEmpty()) {
+            assertThat(retentionLeases.leases(), empty());
+        } else {
+            assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0])));
+        }
+    }
+
+    public void testRetentionLeasesMetaDataStateFormat() throws IOException {
+        final Path path = createTempDir();
+        final RetentionLeases retentionLeases = randomRetentionLeases(true);
+        RetentionLeases.FORMAT.writeAndCleanup(retentionLeases, path);
+        assertThat(RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path), equalTo(retentionLeases));
+    }
+
+    private RetentionLeases randomRetentionLeases(boolean allowEmpty) {
+        final long primaryTerm = randomNonNegativeLong();
+        final long version = randomNonNegativeLong();
+        final int length = randomIntBetween(allowEmpty ? 0 : 1, 8);
+        final List<RetentionLease> leases = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            final String id = randomAlphaOfLength(8);
+            final long retainingSequenceNumber = randomNonNegativeLong();
+            final long timestamp = randomNonNegativeLong();
+            final String source = randomAlphaOfLength(8);
+            final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
+            leases.add(retentionLease);
+        }
+        return new RetentionLeases(primaryTerm, version, leases);
+    }
+
 }
diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesXContentTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesXContentTests.java
new file mode 100644
index 0000000000000..5fc2ace16ee94
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesXContentTests.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.seqno;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RetentionLeasesXContentTests extends AbstractXContentTestCase<RetentionLeases> {
+
+    @Override
+    protected RetentionLeases createTestInstance() {
+        final long primaryTerm = randomNonNegativeLong();
+        final long version = randomNonNegativeLong();
+        final int length = randomIntBetween(0, 8);
+        final List<RetentionLease> leases = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            final String id = randomAlphaOfLength(8);
+            final long retainingSequenceNumber = randomNonNegativeLong();
+            final long timestamp = randomNonNegativeLong();
+            final String source = randomAlphaOfLength(8);
+            final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
+            leases.add(retentionLease);
+        }
+        return new RetentionLeases(primaryTerm, version, leases);
+    }
+
+    @Override
+    protected RetentionLeases doParseInstance(final XContentParser parser) throws IOException {
+        return RetentionLeases.fromXContent(parser);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return false;
+    }
+
+}
diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java
index 5f103d484f8c1..566d1feaf007d 100644
--- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java
+++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java
@@ -19,43 +19,30 @@
 
 package org.elasticsearch.index.shard;
 
-import org.apache.lucene.index.SegmentInfos;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.flush.FlushRequest;
-import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRoutingHelper;
-import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexSettings;
-import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.engine.InternalEngine;
 import org.elasticsearch.index.engine.InternalEngineFactory;
 import org.elasticsearch.index.seqno.RetentionLease;
 import org.elasticsearch.index.seqno.RetentionLeaseStats;
 import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SequenceNumbers;
-import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
@@ -221,7 +208,7 @@ private void runExpirationTest(final boolean primary) throws IOException {
         }
     }
 
-    public void testCommit() throws IOException {
+    public void testPersistence() throws IOException {
         final Settings settings = Settings.builder()
                 .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
                 .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), Long.MAX_VALUE, TimeUnit.NANOSECONDS)
@@ -242,19 +229,17 @@ public void testCommit() throws IOException {
 
             currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE));
 
-            // force a commit
-            indexShard.flush(new FlushRequest().force(true));
+            // force the retention leases to persist
+            indexShard.persistRetentionLeases();
 
-            // the committed retention leases should equal our current retention leases
-            final SegmentInfos segmentCommitInfos = indexShard.store().readLastCommittedSegmentsInfo();
-            assertTrue(segmentCommitInfos.getUserData().containsKey(Engine.RETENTION_LEASES));
+            // the written retention leases should equal our current retention leases
             final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
-            final RetentionLeases committedRetentionLeases = IndexShard.getRetentionLeases(segmentCommitInfos);
+            final RetentionLeases writtenRetentionLeases = indexShard.loadRetentionLeases();
             if (retentionLeases.leases().isEmpty()) {
-                assertThat(committedRetentionLeases.version(), equalTo(0L));
-                assertThat(committedRetentionLeases.leases(), empty());
+                assertThat(writtenRetentionLeases.version(), equalTo(0L));
+                assertThat(writtenRetentionLeases.leases(), empty());
             } else {
-                assertThat(committedRetentionLeases.version(), equalTo((long) length));
+                assertThat(writtenRetentionLeases.version(), equalTo((long) length));
                 assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0])));
             }
 
@@ -304,76 +289,6 @@ public void testRetentionLeaseStats() throws IOException {
         }
     }
 
-    public void testRecoverFromStoreReserveRetentionLeases() throws Exception {
-        final AtomicBoolean throwDuringRecoverFromTranslog = new AtomicBoolean();
-        final IndexShard shard = newStartedShard(false, Settings.builder().put("index.soft_deletes.enabled", true).build(),
-            config -> new InternalEngine(config) {
-                @Override
-                public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
-                                                          long recoverUpToSeqNo) throws IOException {
-                    if (throwDuringRecoverFromTranslog.get()) {
-                        throw new RuntimeException("crashed before recover from translog is completed");
-                    }
-                    return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
-                }
-            });
-        final List<RetentionLease> leases = new ArrayList<>();
-        long version = randomLongBetween(0, 100);
-        long primaryTerm = randomLongBetween(1, 100);
-        final int iterations = randomIntBetween(1, 10);
-        for (int i = 0; i < iterations; i++) {
-            if (randomBoolean()) {
-                indexDoc(shard, "_doc", Integer.toString(i));
-            } else {
-                leases.add(new RetentionLease(Integer.toString(i), randomNonNegativeLong(),
-                    randomLongBetween(Integer.MAX_VALUE, Long.MAX_VALUE), "test"));
-            }
-            if (randomBoolean()) {
-                if (randomBoolean()) {
-                    version += randomLongBetween(1, 100);
-                    primaryTerm += randomLongBetween(0, 100);
-                    shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
-                    shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
-                }
-            }
-            if (randomBoolean()) {
-                shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()), "test");
-                flushShard(shard);
-            }
-        }
-        version += randomLongBetween(1, 100);
-        primaryTerm += randomLongBetween(0, 100);
-        shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
-        shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
-        closeShard(shard, false);
-
-        final IndexShard failedShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
-            shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
-            RecoverySource.ExistingStoreRecoverySource.INSTANCE));
-        final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
-            Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
-        failedShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
-        throwDuringRecoverFromTranslog.set(true);
-        expectThrows(IndexShardRecoveryException.class, failedShard::recoverFromStore);
-        closeShards(failedShard);
-
-        final IndexShard newShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
-            shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
-            RecoverySource.ExistingStoreRecoverySource.INSTANCE));
-        newShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
-        throwDuringRecoverFromTranslog.set(false);
-        assertTrue(newShard.recoverFromStore());
-        final RetentionLeases retentionLeases = newShard.getRetentionLeases();
-        assertThat(retentionLeases.version(), equalTo(version));
-        assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
-        if (leases.isEmpty()) {
-            assertThat(retentionLeases.leases(), empty());
-        } else {
-            assertThat(retentionLeases.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0])));
-        }
-        closeShards(newShard);
-    }
-
     private void assertRetentionLeases(
             final IndexShard indexShard,
             final int size,
diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java
index 8b4c21ee086aa..b3c93acb97b99 100644
--- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java
+++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java
@@ -278,7 +278,6 @@ public void testCcrAndIlmWithRollover() throws Exception {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37165")
     public void testUnfollowInjectedBeforeShrink() throws Exception {
         final String indexName = "shrink-test";
         final String shrunkenIndexName = "shrink-" + indexName;