Skip to content

Commit

Permalink
Fix backward compatibility issues with sample tables
Browse files Browse the repository at this point in the history
- add schema name to BASETABLE if it is not fully qualified
- explicitly add weightage column at catalog level if not present
- delete existing temporary hive directory on restart
  • Loading branch information
Sumedh Wale committed Jan 15, 2019
1 parent 731e6fd commit 5d53b94
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,20 @@ trait SnappyExternalCatalog extends ExternalCatalog {
* Check for baseTable in both properties and storage.properties (older releases used a mix).
*/
def getBaseTable(tableDefinition: CatalogTable): Option[String] = {
tableDefinition.properties.get(BASETABLE_PROPERTY) match {
(tableDefinition.properties.get(BASETABLE_PROPERTY) match {
case None =>
val params = new CaseInsensitiveMap(tableDefinition.storage.properties)
params.get(BASETABLE_PROPERTY) match {
// older released didn't have base table entry for indexes
case None => params.get(INDEXED_TABLE).map(Utils.toUpperCase)
case Some(t) => Some(Utils.toUpperCase(t))
// older releases didn't have base table entry for indexes
case None => params.get(INDEXED_TABLE)
case t => t
}
case Some(t) => Some(Utils.toUpperCase(t))
case t => t
}) match {
case None => None
case Some(t) =>
if (t.indexOf('.') != -1) Some(Utils.toUpperCase(t))
else Some(tableDefinition.database + '.' + Utils.toUpperCase(t))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ object ExternalStoreUtils {
final val COLUMN_BATCH_SIZE = "COLUMN_BATCH_SIZE"
final val COLUMN_MAX_DELTA_ROWS = "COLUMN_MAX_DELTA_ROWS"
final val COMPRESSION_CODEC = "COMPRESSION"
final val RELATION_FOR_SAMPLE = "RELATION_FOR_SAMPLE"

// inbuilt basic table properties
final val PARTITION_BY = "PARTITION_BY"
final val REPLICATE = "REPLICATE"
final val BUCKETS = "BUCKETS"
final val KEY_COLUMNS = "KEY_COLUMNS"

// these two are obsolete column table properties only for backward compatibility
// these three are obsolete column table properties only for backward compatibility
final val COLUMN_BATCH_SIZE_TRANSIENT = "COLUMN_BATCH_SIZE_TRANSIENT"
final val COLUMN_MAX_DELTA_ROWS_TRANSIENT = "COLUMN_MAX_DELTA_ROWS_TRANSIENT"
final val RELATION_FOR_SAMPLE = "RELATION_FOR_SAMPLE"

val ddlOptions: Seq[String] = Seq(INDEX_NAME, COLUMN_BATCH_SIZE,
COLUMN_BATCH_SIZE_TRANSIENT, COLUMN_MAX_DELTA_ROWS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils
import com.pivotal.gemfirexd.internal.impl.sql.catalog.GfxdDataDictionary
import io.snappydata.sql.catalog.SnappyExternalCatalog._
import io.snappydata.sql.catalog.{CatalogObjectType, ConnectorExternalCatalog, RelationInfo, SnappyExternalCatalog}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.http.annotation.GuardedBy
Expand All @@ -57,6 +58,7 @@ import org.apache.spark.sql.internal.StaticSQLConf.{GLOBAL_TEMP_DATABASE, SCHEMA
import org.apache.spark.sql.policy.PolicyProperties
import org.apache.spark.sql.sources.JdbcExtendedUtils
import org.apache.spark.sql.sources.JdbcExtendedUtils.toUpperCase
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.{AnalysisException, _}
import org.apache.spark.{SparkConf, SparkException}

Expand Down Expand Up @@ -454,7 +456,7 @@ class SnappyHiveExternalCatalog private[hive](val conf: SparkConf,
Some(toUpperCase(table.identifier.database.get)))
// VIEW text is stored as split text for large view strings,
// so restore its full text and schema from properties if present
if (table.tableType == CatalogTableType.VIEW) {
val newTable = if (table.tableType == CatalogTableType.VIEW) {
val viewText = JdbcExtendedUtils.readSplitProperty(SPLIT_VIEW_TEXT_PROPERTY,
table.properties).orElse(table.viewText)
val viewOriginalText = JdbcExtendedUtils.readSplitProperty(SPLIT_VIEW_ORIGINAL_TEXT_PROPERTY,
Expand All @@ -475,6 +477,12 @@ class SnappyHiveExternalCatalog private[hive](val conf: SparkConf,
table.storage.properties + (DBTABLE_PROPERTY -> tableIdent.unquotedString)))
case _ => table.copy(identifier = tableIdent)
}
// explicitly add weightage column to sample tables for old catalog data
if (CatalogObjectType.getTableType(newTable) == CatalogObjectType.Sample &&
newTable.schema(table.schema.length - 1).name != Utils.WEIGHTAGE_COLUMN_NAME) {
newTable.copy(schema = newTable.schema.add(Utils.WEIGHTAGE_COLUMN_NAME, LongType,
nullable = false))
} else newTable
}

override protected def getCachedCatalogTable(schema: String, table: String): CatalogTable = {
Expand Down Expand Up @@ -554,8 +562,7 @@ class SnappyHiveExternalCatalog private[hive](val conf: SparkConf,

def refreshPolicies(ldapGroup: String): Unit = {
val qualifiedLdapGroup = Constants.LDAP_GROUP_PREFIX + ldapGroup
getAllTables().filter(_.provider.map(_.equalsIgnoreCase("policy")).
getOrElse(false)).foreach { table =>
getAllTables().filter(_.provider.exists(_.equalsIgnoreCase("policy"))).foreach { table =>
val applyToStr = table.properties(PolicyProperties.policyApplyTo)
if (applyToStr.nonEmpty) {
val applyTo = applyToStr.split(",")
Expand Down Expand Up @@ -784,6 +791,8 @@ object SnappyHiveExternalCatalog {
log4jLogger.setLevel(Level.ERROR)
}
try {
// delete the hive scratch directory if it exists
FileUtils.deleteDirectory(new java.io.File("./hive"))
instance = new SnappyHiveExternalCatalog(sparkConf, hadoopConf, createTime)
} finally {
logger.setLevel(previousLevel)
Expand Down
2 changes: 1 addition & 1 deletion spark
Submodule spark updated from 0ed22d to 317d74
2 changes: 1 addition & 1 deletion store
Submodule store updated from 79380a to 7ee801

0 comments on commit 5d53b94

Please sign in to comment.