Skip to content

Commit

Permalink
Merge branch 'master' into delta-stats-columns
Browse files Browse the repository at this point in the history
  • Loading branch information
kamcheungting-db authored May 22, 2023
2 parents 468fede + f052bbb commit c39a044
Show file tree
Hide file tree
Showing 61 changed files with 1,648 additions and 730 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ lazy val core = (project in file("core"))
Compile / packageBin / mappings := (Compile / packageBin / mappings).value ++
listPythonFiles(baseDirectory.value.getParentFile / "python"),

Antlr4 / antlr4Version:= "4.8",
Antlr4 / antlr4Version:= "4.9.3",
Antlr4 / antlr4PackageName := Some("io.delta.sql.parser"),
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,22 @@
],
"sqlState" : "0AKDC"
},
"DELTA_CONVERT_TO_DELTA_ROW_TRACKING_WITHOUT_STATS" : {
"message" : [
"Cannot enable row tracking without collecting statistics.",
"If you want to enable row tracking, do the following:",
" 1. Enable statistics collection by running the command",
" SET <statisticsCollectionPropertyKey> = true",
" 2. Run CONVERT TO DELTA without the NO STATISTICS option.",
"",
"If you do not want to collect statistics, disable row tracking:",
" 1. Deactivate enabling the table feature by default by running the command:",
" RESET <rowTrackingTableFeatureDefaultKey>",
" 2. Deactivate the table property by default by running:",
" SET <rowTrackingDefaultPropertyKey> = false"
],
"sqlState" : "22000"
},
"DELTA_CREATE_EXTERNAL_TABLE_WITHOUT_SCHEMA" : {
"message" : [
"",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.FileNames.checkpointVersion
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

/**
* A trait which provides information about a checkpoint to the Snapshot.
*/
trait CheckpointProvider {

/** Checkpoint version */
def version: Long

/**
* Minimum set of files that represents this checkpoint.
* These files could be reused again to initialize the [[CheckpointProvider]].
*/
def files: Seq[FileStatus]

/** Effective size of checkpoint across all files */
def effectiveCheckpointSizeInBytes(): Long

/**
* List of different file indexes which could help derive full state-reconstruction
* for the checkpoint.
*/
def allActionsFileIndexes(): Seq[DeltaLogFileIndex]
}

/**
* An implementation of [[CheckpointProvider]] where the information about checkpoint files
* (i.e. Seq[FileStatus]) is already known in advance.
*
* @param files - file statuses that describes the checkpoint
* @param lastCheckpointInfoOpt - optional [[LastCheckpointInfo]] corresponding to this checkpoint.
* This comes from _last_checkpoint file
*/
case class PreloadedCheckpointProvider(
override val files: Seq[FileStatus],
lastCheckpointInfoOpt: Option[LastCheckpointInfo]
) extends CheckpointProvider with DeltaLogging {

require(files.nonEmpty, "There should be atleast 1 checkpoint file")
private lazy val fileIndex =
DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, files).get

override lazy val version: Long = checkpointVersion(files.head)

override def effectiveCheckpointSizeInBytes(): Long = fileIndex.sizeInBytes

override def allActionsFileIndexes(): Seq[DeltaLogFileIndex] = Seq(fileIndex)
}
Loading

0 comments on commit c39a044

Please sign in to comment.