Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update with master #12

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4243bcc
Allow earliset Delta table time travel to smallest Delta file version…
lizhangdatabricks Jun 10, 2021
4bee7ae
[ES-113602] Minor change in sbt install script
Yaohua628 Jun 10, 2021
1470e33
[DELTA-OSS-EXTERNAL] [SC-77949] Make DeltaTable.forName support "delt…
FX196 Jun 11, 2021
d1f8b83
[DELTA-OSS-EXTERNAL] [Storage System] IBM Cloud Object Storage Suppor…
guykhazma Jun 11, 2021
31bf4bc
[SC-78244]Delta should lock commits on Azure
zsxwing Jun 16, 2021
ac57b78
[SC-79127][DELTA] Refactor some test names, remove redundant comment
zachschuermann Jun 16, 2021
0b8e6cb
[SC-76551] Refactor DeltaCatalog and CreateDeltaTableCommand
yuyuant Jun 17, 2021
181941f
[SC-78753][Delta]Add more logging for measuring timing in conflict de…
tdas Jun 18, 2021
62ad794
[SC-77778] Minor refactor style
larsk-db Jun 18, 2021
7bd8e22
[SC-79320][DELTA] Support getBinIndex func in FileSizeHistogram
prakharjain09 Jun 18, 2021
736b9f2
[SC-77769][DELTA] Refactor conf code
prakharjain09 Jun 18, 2021
65eff80
[SC-78522] Remove redundant import
larsk-db Jun 21, 2021
7e9c6e5
[SC-74475][DELTA] Minor refactor of EvolvabilitySuiteBase
zachschuermann Jun 21, 2021
c424efa
[SC-78050][DELTA] Call `deltaLog.update()` in DataFrame read path
zachschuermann Jun 23, 2021
ac5f9e1
[SC-59185] Refactor DeltaAnalysis code and update comments
linhongliu-db Jun 25, 2021
83277eb
[SC-75521][DELTA] Strip the full temp view plan for Delta DML commands
jose-torres Jun 25, 2021
9f04cf7
[SC-74210] Drop NullType columns for SQL read path
junlee-db Jun 28, 2021
1cd3740
[SC-80108][DELTA] Minor code style change in OptimisticTransactionLeg…
prakharjain09 Jun 28, 2021
f5da823
[SC-70216] Support generated columns in merge command
mengtong-db Jul 1, 2021
7c6a0ce
[SC-80108][DELTA] Add new testsuite OptimisticTransactionSuite
prakharjain09 Jul 1, 2021
b29742d
[SC-78889][Delta] Move MergeSchema in SchemaUtils to a new file and R…
yijiacui-db Jul 2, 2021
bf0d95b
[SC-78855] Call default table path only once in delta table creation …
yuchenhuo Jul 4, 2021
3fa6bca
[SC-79815][DELTA] Refactor Conflict detection code flow
prakharjain09 Jul 7, 2021
86bbe99
[DELTA-OSS-EXTERNAL] Fix the sbt launch download url
zsxwing Jul 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ dlog () {

acquire_sbt_jar () {
SBT_VERSION=`awk -F "=" '/sbt\.version/ {print $2}' ./project/build.properties`
URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
# Download sbt from mirror URL if the environment variable is provided
if [[ "${SBT_VERSION}" == "0.13.18" ]] && [[ -n "${SBT_MIRROR_JAR_URL}" ]]; then
URL1="${SBT_MIRROR_JAR_URL}"
else
URL1="https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar"
fi
JAR=build/sbt-launch-${SBT_VERSION}.jar

sbt_jar=$JAR
Expand All @@ -47,7 +52,7 @@ acquire_sbt_jar () {
# Download sbt launch jar if it hasn't been downloaded yet
if [ ! -f "${JAR}" ]; then
# Download
printf "Attempting to fetch sbt\n"
printf 'Attempting to fetch sbt from %s\n' "${URL1}"
JAR_DL="${JAR}.part"
if [ $(command -v curl) ]; then
curl --fail --location --silent ${URL1} > "${JAR_DL}" &&\
Expand Down
8 changes: 2 additions & 6 deletions contribs/src/main/scala/io/delta/storage/IBMCOSLogStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,10 @@ class IBMCOSLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
if (exists && overwrite == false) {
throw new FileAlreadyExistsException(path.toString)
} else {
// create is atomic
// write is atomic when overwrite == false
val stream = fs.create(path, overwrite)
try {
var writeSize = 0L
actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(action => {
stream.write(action)
writeSize += action.length
})
actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
stream.close()
} catch {
case e: IOException if isPreconditionFailure(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
new DeltaSqlParser(parser)
}
extensions.injectResolutionRule { session =>
new DeltaAnalysis(session, session.sessionState.conf)
new DeltaAnalysis(session)
}
extensions.injectCheckRule { session =>
new DeltaUnsupportedOperationsCheck(session)
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import scala.collection.JavaConverters._
import scala.collection.Map

import org.apache.spark.sql.delta.{DeltaErrors, PreprocessTableMerge}
import org.apache.spark.sql.delta.DeltaViewHelper
import org.apache.spark.sql.delta.commands.MergeIntoCommand
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.AnalysisHelper

Expand All @@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.internal.SQLConf

/**
* Builder to specify how to merge data from source DataFrame into the target Delta table.
Expand Down Expand Up @@ -217,8 +220,13 @@ class DeltaMergeBuilder private(
if (!resolvedMergeInto.resolved) {
throw DeltaErrors.analysisException("Failed to resolve\n", plan = Some(resolvedMergeInto))
}
val strippedMergeInto = resolvedMergeInto.copy(
target = DeltaViewHelper.stripTempViewForMerge(resolvedMergeInto.target, SQLConf.get)
)
// Preprocess the actions and verify
val mergeIntoCommand = PreprocessTableMerge(sparkSession.sessionState.conf)(resolvedMergeInto)
val mergeIntoCommand =
PreprocessTableMerge(sparkSession.sessionState.conf)(strippedMergeInto)
.asInstanceOf[MergeIntoCommand]
sparkSession.sessionState.analyzer.checkAnalysis(mergeIntoCommand)
mergeIntoCommand.run(sparkSession)
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,8 @@ object DeltaTable {
new DeltaTable(
sparkSession.table(tableName),
DeltaTableV2(sparkSession, new Path(tbl.location), Some(tbl), Some(tableName)))
} else if (DeltaTableUtils.isValidPath(tableId)) {
forPath(sparkSession, tableId.table)
} else {
throw DeltaErrors.notADeltaTableException(DeltaTableIdentifier(table = Some(tableId)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import java.util.Locale

import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -227,7 +228,9 @@ case class DeltaMergeInto(
condition: Expression,
matchedClauses: Seq[DeltaMergeIntoMatchedClause],
notMatchedClauses: Seq[DeltaMergeIntoInsertClause],
migrateSchema: Boolean) extends Command with SupportsSubquery {
migrateSchema: Boolean,
finalSchema: Option[StructType])
extends Command with SupportsSubquery {

(matchedClauses ++ notMatchedClauses).foreach(_.verifyActions())

Expand Down Expand Up @@ -271,12 +274,13 @@ object DeltaMergeInto {
condition,
whenClauses.collect { case x: DeltaMergeIntoMatchedClause => x },
whenClauses.collect { case x: DeltaMergeIntoInsertClause => x },
migrateSchema = false)
migrateSchema = false,
finalSchema = Some(target.schema))
}

def resolveReferences(merge: DeltaMergeInto, conf: SQLConf)(
resolveExpr: (Expression, LogicalPlan) => Expression): DeltaMergeInto = {
val DeltaMergeInto(target, source, condition, matchedClauses, notMatchedClause, _) = merge
val DeltaMergeInto(target, source, condition, matchedClauses, notMatchedClause, _, _) = merge

// We must do manual resolution as the expressions in different clauses of the MERGE have
// visibility of the source, the target or both. Additionally, the resolution logic operates
Expand Down Expand Up @@ -403,10 +407,24 @@ object DeltaMergeInto {
}
val containsStarAction =
(matchedClauses ++ notMatchedClause).flatMap(_.actions).exists(_.isInstanceOf[UnresolvedStar])

val migrateSchema = canAutoMigrate && containsStarAction

val finalSchema = if (migrateSchema) {
// The implicit conversions flag allows any type to be merged from source to target if Spark
// SQL considers the source type implicitly castable to the target. Normally, mergeSchemas
// enforces Parquet-level write compatibility, which would mean an INT source can't be merged
// into a LONG target.
SchemaMergingUtils.mergeSchemas(target.schema, source.schema, allowImplicitConversions = true)
} else {
target.schema
}

val resolvedMerge = DeltaMergeInto(
target, source, resolvedCond,
resolvedMatchedClauses, resolvedNotMatchedClause,
migrateSchema = canAutoMigrate && containsStarAction)
migrateSchema = migrateSchema,
finalSchema = Some(finalSchema))

// Its possible that pre-resolved expressions (e.g. `sourceDF("key") = targetDF("key")`) have
// attribute references that are not present in the output attributes of the children (i.e.,
Expand Down
Loading