diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java index 7408e2888cc3a5..4500c76f638792 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java @@ -21,7 +21,10 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -39,10 +42,28 @@ public static byte[] compress(byte[] data) throws IOException { return bytesStream.toByteArray(); } + public static byte[] compress(File file) throws IOException { + ByteArrayOutputStream bytesStream = new ByteArrayOutputStream(); + try (FileInputStream fileInputStream = new FileInputStream(file); + GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) { + + byte[] buffer = new byte[8192]; // 8KB buffer + int bytesRead; + while ((bytesRead = fileInputStream.read(buffer)) != -1) { + gzipStream.write(buffer, 0, bytesRead); + } + } + return bytesStream.toByteArray(); + } + public static byte[] decompress(byte[] data) throws IOException { ByteArrayInputStream bytesStream = new ByteArrayInputStream(data); try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) { return IOUtils.toByteArray(gzipStream); } } + + public static InputStream lazyDecompress(byte[] data) throws IOException { + return new GZIPInputStream(new ByteArrayInputStream(data)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 83cd86fee898fd..8518b6606cadc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -608,8 +608,7 @@ public void restore(Repository repository, Database db, RestoreCommand command) ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "not supported now."); } if (command.isLocal()) { - String jobInfoString = new String(command.getJobInfo()); - jobInfo = BackupJobInfo.genFromJson(jobInfoString); + jobInfo = command.getJobInfo(); if (jobInfo.extraInfo == null) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info empty"); @@ -646,13 +645,7 @@ public void restore(Repository repository, Database db, RestoreCommand command) metaVersion = jobInfo.metaVersion; } - BackupMeta backupMeta; - try { - backupMeta = BackupMeta.fromBytes(command.getMeta(), metaVersion); - } catch (IOException e) { - LOG.warn("read backup meta failed, current meta version {}", Env.getCurrentEnvJournalVersion(), e); - throw new DdlException("read backup meta failed", e); - } + BackupMeta backupMeta = command.getMeta(); String backupTimestamp = TimeUtils.longToTimeString( jobInfo.getBackupTime(), TimeUtils.getDatetimeFormatWithHyphenWithTimeZone()); restoreJob = new RestoreJob(command.getLabel(), backupTimestamp, diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index a7bf13d0a3cf2d..1ea7d6ee6a6c87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -1163,20 +1163,12 @@ public synchronized Snapshot getSnapshot() { // Avoid loading expired meta. long expiredAt = createTime + timeoutMs; if (System.currentTimeMillis() >= expiredAt) { - return new Snapshot(label, new byte[0], new byte[0], expiredAt, commitSeq); + return new Snapshot(label, null, null, expiredAt, commitSeq); } - try { - File metaInfoFile = new File(localMetaInfoFilePath); - File jobInfoFile = new File(localJobInfoFilePath); - byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); - byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); - return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, commitSeq); - } catch (IOException e) { - LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ", - localMetaInfoFilePath, localJobInfoFilePath, e); - return null; - } + File metaInfoFile = new File(localMetaInfoFilePath); + File jobInfoFile = new File(localJobInfoFilePath); + return new Snapshot(label, metaInfoFile, jobInfoFile, expiredAt, commitSeq); } public synchronized List getInfo() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java index ccf4941538ab35..643aead0446480 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java @@ -53,6 +53,8 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -827,6 +829,12 @@ public static BackupJobInfo genFromJson(String json) { return jobInfo; } + public static BackupJobInfo fromInputStream(InputStream inputStream) throws IOException { + try (InputStreamReader reader = new InputStreamReader(inputStream)) { + return GsonUtils.GSON.fromJson(reader, BackupJobInfo.class); + } + } + public void writeToFile(File jobInfoFile) throws FileNotFoundException { PrintWriter printWriter = new PrintWriter(jobInfoFile); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java index 41b48af6bcb011..7d34e0ce5d5933 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java @@ -101,7 +101,7 @@ public static BackupMeta fromBytes(byte[] bytes, int metaVersion) throws IOExcep return fromInputStream(new ByteArrayInputStream(bytes), metaVersion); } - protected static BackupMeta fromInputStream(InputStream stream, int metaVersion) throws IOException { + public static BackupMeta fromInputStream(InputStream stream, int metaVersion) throws IOException { MetaContext metaContext = new MetaContext(); metaContext.setMetaVersion(metaVersion); metaContext.setThreadLocalInfo(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java index a9f734dbc99220..2fc3ca6d146ee6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java @@ -17,19 +17,22 @@ package org.apache.doris.backup; -import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.common.GZIPUtils; +import org.apache.doris.common.Pair; import com.google.gson.annotations.SerializedName; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + public class Snapshot { @SerializedName(value = "label") private String label = null; - @SerializedName(value = "meta") - private byte[] meta = null; + private File meta = null; - @SerializedName(value = "jobInfo") - private byte[] jobInfo = null; + private File jobInfo = null; @SerializedName(value = "expired_at") private long expiredAt = 0; @@ -40,7 +43,7 @@ public class Snapshot { public Snapshot() { } - public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, long commitSeq) { + public Snapshot(String label, File meta, File jobInfo, long expiredAt, long commitSeq) { this.label = label; this.meta = meta; this.jobInfo = jobInfo; @@ -48,12 +51,45 @@ public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, long this.commitSeq = commitSeq; } - public byte[] getMeta() { - return meta; + public static Pair readFromBytes(byte[] meta, byte[] jobInfo) throws IOException { + BackupJobInfo backupJobInfo = BackupJobInfo.genFromJson(new String(jobInfo)); + BackupMeta backupMeta = BackupMeta.fromBytes(meta, backupJobInfo.metaVersion); + return Pair.of(backupMeta, backupJobInfo); + } + + public static Pair readFromCompressedBytes(byte[] meta, byte[] jobInfo) + throws IOException { + BackupJobInfo backupJobInfo = BackupJobInfo.fromInputStream(GZIPUtils.lazyDecompress(jobInfo)); + BackupMeta backupMeta = BackupMeta.fromInputStream(GZIPUtils.lazyDecompress(meta), backupJobInfo.metaVersion); + return Pair.of(backupMeta, backupJobInfo); + } + + public static boolean isCompressed(byte[] meta, byte[] jobInfo) { + return GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta); + } + + public long getMetaSize() { + return meta != null ? meta.length() : 0; + } + + public long getJobInfoSize() { + return jobInfo != null ? jobInfo.length() : 0; } - public byte[] getJobInfo() { - return jobInfo; + public byte[] getCompressedMeta() throws IOException { + return GZIPUtils.compress(meta); + } + + public byte[] getCompressedJobInfo() throws IOException { + return GZIPUtils.compress(jobInfo); + } + + public byte[] getMeta() throws IOException { + return Files.readAllBytes(meta.toPath()); + } + + public byte[] getJobInfo() throws IOException { + return Files.readAllBytes(jobInfo.toPath()); } public long getExpiredAt() { @@ -68,16 +104,10 @@ public long getCommitSeq() { return commitSeq; } - public String toJson() { - return GsonUtils.GSON.toJson(this); - } - @Override public String toString() { return "Snapshot{" + "label='" + label + '\'' - + ", meta=" + meta - + ", jobInfo=" + jobInfo + ", expiredAt=" + expiredAt + ", commitSeq=" + commitSeq + '}'; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java index c76368f4e64401..673e8f1700b0b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java @@ -18,6 +18,8 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.analysis.StmtType; +import org.apache.doris.backup.BackupJobInfo; +import org.apache.doris.backup.BackupMeta; import org.apache.doris.backup.Repository; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; @@ -94,8 +96,8 @@ public class RestoreCommand extends Command implements ForwardWithSync { private final Map properties; private final boolean isExclude; private long timeoutMs; - private byte[] meta = null; - private byte[] jobInfo = null; + private BackupMeta meta = null; + private BackupJobInfo jobInfo = null; private String storageVaultName = null; /** @@ -410,19 +412,19 @@ public boolean isLocal() { return isLocal; } - public byte[] getMeta() { + public BackupMeta getMeta() { return meta; } - public void setMeta(byte[] meta) { + public void setMeta(BackupMeta meta) { this.meta = meta; } - public byte[] getJobInfo() { + public BackupJobInfo getJobInfo() { return jobInfo; } - public void setJobInfo(byte[] jobInfo) { + public void setJobInfo(BackupJobInfo jobInfo) { this.jobInfo = jobInfo; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 70234ae52ba6f3..83c53c532dccef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -29,6 +29,8 @@ import org.apache.doris.analysis.TableScanParams; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.backup.BackupJobInfo; +import org.apache.doris.backup.BackupMeta; import org.apache.doris.backup.Snapshot; import org.apache.doris.binlog.BinlogLagInfo; import org.apache.doris.catalog.AutoIncrementGenerator; @@ -62,7 +64,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; -import org.apache.doris.common.GZIPUtils; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; @@ -3125,24 +3126,38 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_EXPIRED); result.getStatus().addToErrorMsgs(String.format("snapshot %s is expired", label)); } else { - byte[] meta = snapshot.getMeta(); - byte[] jobInfo = snapshot.getJobInfo(); + long metaSize = snapshot.getMetaSize(); + long jobInfoSize = snapshot.getJobInfoSize(); + long snapshotSize = snapshot.getMetaSize() + snapshot.getJobInfoSize(); + if (metaSize + jobInfoSize >= Integer.MAX_VALUE && !request.isEnableCompress()) { + String msg = String.format( + "Snapshot %s is too large (%d bytes > 2GB). Please enable compression to continue.", + label, snapshotSize); + LOG.warn("get snapshot failed: {}", msg); + result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR); + result.getStatus().addToErrorMsgs(msg); + return result; + } + long expiredAt = snapshot.getExpiredAt(); long commitSeq = snapshot.getCommitSeq(); LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, " - + "expired at: {}, commit seq: {}", label, meta.length, jobInfo.length, expiredAt, commitSeq); + + "expired at: {}, commit seq: {}", label, metaSize, jobInfoSize, expiredAt, commitSeq); if (request.isEnableCompress()) { - meta = GZIPUtils.compress(meta); - jobInfo = GZIPUtils.compress(jobInfo); + byte[] meta = snapshot.getCompressedMeta(); + byte[] jobInfo = snapshot.getCompressedJobInfo(); + result.setMeta(meta); + result.setJobInfo(jobInfo); result.setCompressed(true); if (LOG.isDebugEnabled()) { LOG.debug("get snapshot info with compress, snapshot: {}, compressed meta " + "size {}, compressed job info size {}", label, meta.length, jobInfo.length); } + } else { + result.setMeta(snapshot.getMeta()); + result.setJobInfo(snapshot.getJobInfo()); } - result.setMeta(meta); - result.setJobInfo(jobInfo); result.setExpiredAt(expiredAt); result.setCommitSeq(commitSeq); } @@ -3255,6 +3270,8 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque } } + BackupMeta backupMeta; + BackupJobInfo backupJobInfo; byte[] meta = request.getMeta(); byte[] jobInfo = request.getJobInfo(); if (Config.enable_restore_snapshot_rpc_compression && request.isCompressed()) { @@ -3263,15 +3280,25 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque meta.length, jobInfo.length); } try { - meta = GZIPUtils.decompress(meta); - jobInfo = GZIPUtils.decompress(jobInfo); + Pair pair = Snapshot.readFromCompressedBytes(meta, jobInfo); + backupMeta = pair.first; + backupJobInfo = pair.second; } catch (Exception e) { LOG.warn("decompress meta and job info failed", e); throw new UserException("decompress meta and job info failed", e); } - } else if (GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta)) { + } else if (Snapshot.isCompressed(meta, jobInfo)) { throw new UserException("The request is compressed, but the config " + "`enable_restore_snapshot_rpc_compressed` is not enabled."); + } else { + try { + Pair pair = Snapshot.readFromBytes(meta, jobInfo); + backupMeta = pair.first; + backupJobInfo = pair.second; + } catch (Exception e) { + LOG.warn("deserialize meta and job info failed", e); + throw new UserException("deserialize meta and job info failed", e); + } } //instantiate RestoreCommand @@ -3331,8 +3358,8 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque } RestoreCommand restoreCommand = new RestoreCommand(labelNameInfo, repoName, tableRefInfos, properties, false); - restoreCommand.setMeta(meta); - restoreCommand.setJobInfo(jobInfo); + restoreCommand.setMeta(backupMeta); + restoreCommand.setJobInfo(backupJobInfo); restoreCommand.setIsBeingSynced(); LOG.debug("restore snapshot info, restoreCommand: {}", restoreCommand); try {