Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicated attribute IDs #272

Closed
wajda opened this issue Jul 29, 2021 · 21 comments
Closed

Duplicated attribute IDs #272

wajda opened this issue Jul 29, 2021 · 21 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@wajda
Copy link
Contributor

wajda commented Jul 29, 2021

UPDATE: A temporary workaround - #272 (comment)

The issue was found in and causing AbsaOSS/spline#925

See JSON sample in AbsaOSS/spline#925 (comment)

minimal code to replicate the issue:

package za.co.absa.spline

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object SplineDuplicatedIds {

  val extraConf: Iterable[(String, String)] = List(
    ("spark.sql.queryExecutionListeners", "za.co.absa.spline.harvester.listener.SplineQueryExecutionListener"),
    ("spark.spline.lineageDispatcher", "console"),
    ("spark.spline.mode", "REQUIRED")
  )

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
      .setAll(extraConf)
      .setMaster("local[*]")

    val ss = SparkSession.builder().config(sparkConf).getOrCreate()
    import ss.implicits._

    val df = Seq(("adcde1_12938597", 162))
      .toDF("unique_id", "total_commission")

    val result: DataFrame = df
      .withColumn("commission", col("total_commission"))
      .drop("total_commission")

    val firstValidTransactions = result.withColumnRenamed("commission", "foo")

    val joined = result.join(firstValidTransactions, usingColumns = Seq("unique_id"))

    joined
      .write
      .mode(SaveMode.Overwrite)
      .option("path", "tmp/spline_test_bi_duplicates")
      .saveAsTable("spline_test_bi_duplicated")
  }
}

Happens on Spark 3.1, 2.4 and probably all others as well

@wajda wajda added the bug Something isn't working label Jul 29, 2021
@wajda wajda added this to the 0.6.2 milestone Jul 29, 2021
@wajda
Copy link
Contributor Author

wajda commented Jul 29, 2021

@kuhnen, can you share a Spark job source code that reproduces the issue please?
We could reconstruct it from the execution plan JSON, but with the source code it would be much easier. Thank you!

@wajda
Copy link
Contributor Author

wajda commented Jul 29, 2021

Observation:

The sample JSON above contains 3 Read operations that read from the same data source. Each of them emits its own set of attributes (same name and type, but different IDs). Then there are 3 Project operations each follows one of those reads and outputs an attribute cast(total_commission) AS commission. For some reason all those commission attributes are given the same ID!! (in this example is 123)

@kuhnen
Copy link

kuhnen commented Jul 29, 2021

@wajda I can, but the problem is that it might be hard to follow, We created a framework for our business case, so it might not be that easy to understand the flow of the job :(.

class CustomerTransactionWebgainsCn(val sparkSessionFilesHelper: SparkSessionFilesHelper,
                                    override val dataLakeUpdater: Option[DataLakeUpdaterAthena] = None)
  extends WorkFlow with CustomerTransactionsTransformation {

  val uniqueTransactionsName: CompleteNameConventionBuilder = NameConventions.DataLake.dumps.affiliate_networks.webgains
  val historyName: CompleteNameConventionBuilder = NameConventions.DataLake.dumps.affiliate_networks.webgains_history


  override val colsAddedAfter: Set[String] = Set(
    commissionGroupListColName,
    totalOriginalColName,
    commissionOriginalColName,
    transactionTypeColName,
    orderIdColName,
    vpFlagColName,
  )

  override val dataLakeTransactions: DataFrame = sparkSessionFilesHelper.readDataFrame(buckets.dataLakeRead, uniqueTransactionsName)
    .withColumn(saleValueColName, emtpyOrNullToZeroLogic(saleValueColName))
    .drop(programIdColName)
  override val historicTransactions: DataFrame = sparkSessionFilesHelper.readDataFrame(bucket = buckets.dataLakeRead, historyName)
    .withColumn(saleValueColName, emtpyOrNullToZeroLogic(saleValueColName))
    .select(uniqueIdColName, createdColName, statusColName, paymentStatusColName, saleValueColName, commissionColName)


  override def transformDataLakeTransactions(initialDataLakeTransactions: DataFrame): DataFrame = initialDataLakeTransactions

  override def transformHistoricTransactions(initialHistoricTransactions: DataFrame): DataFrame = initialHistoricTransactions

  override def selectCommissionAndTotalOriginal(historicTransactions: DataFrame): DataFrame = historicTransactions
    .drop(statusColName).drop(paymentStatusColName)
    .withColumnRenamed(saleValueColName, totalOriginalColName)
    .withColumnRenamed(commissionColName, commissionOriginalColName)
    .transform(updateCommissionAndTotalOriginalValues)

  override def selectBiValidated(historicTransactions: DataFrame): DataFrame = historicTransactions
    .drop(saleValueColName, commissionColName)
    .withColumn(transactionStatusColName, transactionStatusLogic)
    .transform(updateBiValidateDate)
    .drop(statusColName, paymentStatusColName, transactionStatusColName)

  override def selectData(dataLakeTransactions: DataFrame, colsAddedAfter: Set[String]): DataFrame = dataLakeTransactions
    .withColumn(clickOutIdColName, clickoutIdLogic(clickRefColName))
    .withColumnRenamed(programNameColName, programIdColName)
    .withColumn(totalColName, emtpyOrNullToZeroLogic(saleValueColName))
    .withColumn(totalColName, col(totalColName).cast(decimalType))
    .withColumn(commissionColName, col(commissionColName).cast(decimalType))
    .withColumnRenamed(transactionDateTimeColName, timeOfEventColName)
    .withColumn(affiliateNetworkColName, typedLit(AffiliateNetworkNames.webgains.toString))
    .withColumn(transactionStatusColName, transactionStatusLogic)
    .selectExpr(colsToBeSelected.filterNot(colsAddedAfter.contains(_)): _*)

  override def prepareDataFrameForUpdatingCommissionAndTotal(historicTransactions: DataFrame): DataFrame = historicTransactions
    .withColumn(totalColName, emtpyOrNullToZeroLogic(saleValueColName))
    .withColumn(totalColName, col(totalColName).cast(decimalType))
    .withColumn(commissionColName, col(commissionColName).cast(decimalType))
    .withColumn(transactionStatusColName, transactionStatusLogic)
    .drop(statusColName, saleValueColName, paymentStatusColName, transactionStatusColName)

  override def joinWithHistoric(selectedData: DataFrame, commissionAndTotalOriginal: DataFrame, biValidated: DataFrame, preparedDataFrameForUpdatingCommissionAndTotal: DataFrame): DataFrame = selectedData
    .join(commissionAndTotalOriginal, usingColumn = uniqueIdColName)
    .withColumn(transactionTypeColName, transactionTypeTotalLogic)
    .withColumn(vpFlagColName, vpFlagLogic)
    .join(biValidated, usingColumn = uniqueIdColName)
    .transform(mainDataFrame => updateTransactionCommissionAndTotal(preparedDataFrameForUpdatingCommissionAndTotal, mainDataFrame))

  override def run(args: Array[String]): Unit = {
    this.transformAndSave(buckets.dataLakeWrite, sparkSessionFilesHelper, dataLakeUpdater)
  }
}

@cerveada cerveada self-assigned this Aug 2, 2021
@kuhnen
Copy link

kuhnen commented Aug 3, 2021

@wajda @cerveada thanks for looking into it, is there any way I can help on this? Maybe if you give the direction which class should I start to look at so that I can try to help?
Thanks a lot.

@cerveada
Copy link
Contributor

cerveada commented Aug 3, 2021

The key to fixing this is to create a Spark code that I could run and reproduce the issue. I was so far not successful in reproducing the issue myself.

From the json I was able to find some information:

The duplicated attribute is comission it is read from the data source as total_comission then cast to data type
Decimal(19, 3) and then renamed to commission. This happens three times in the lineage and the problem is each time it happens the resulting attribute after rename is always the same, so three duplicates are created.

So if you would be able to recreate the code that is used to produce the commission, that could help us or even better a runnable example that reproduces the issue. From the code you provided, it seems to be there are some operations, but it's not clear how it all connects together...

@kuhnen
Copy link

kuhnen commented Aug 4, 2021

@cerveada let me try. I hope to give you the code this week. Thanks a lot.

@cerveada
Copy link
Contributor

cerveada commented Aug 4, 2021

@kuhnen thank you, It may be the only way. I tried a few more attempts to replicate the issue, but even though I am able to produce very similar lineage, the duplicated attribute just doesn't occur for me.

I found another duplicate attribute and there is a pattern:

comission [dulicated] <- (rename) <- (cast) <- total_comission [input]
total [dulicated] <- (rename) <- (cast) <- total_shopping_cart [input]

The input attributes are read several times from what seems to be the same parquet file. Each read creates its own set of input attributes with unique ids. That is correct, but then the cast and rename happen and the result is the same attribute multiple times.

It almost seems like some optimization, but spline captures the logical plan, where no optimizations should be present.

Some ideas what could cause the issues:

  • UDF
  • some direct RDD manipulation
  • are you using standard Spark or something else like Databricks?

@kuhnen
Copy link

kuhnen commented Aug 5, 2021

@cerveada thanks a lot :). Since I was not able to have the code ready, let me give you answer for the three points above.

  • We are not using UDFs (not on this code. 100% sure)
  • No direct RDD manipulation either (only spark Dataframes)
  • I run it locally (the idea in the future is to run on AWS ERM). I mean, we run on AWS ERM, but for developing purposes and evaluate spline I was running locally.

I can add we are reading s3 files instead of using the tables approach.
Could it be the renaming? We use lots of renaming to build the datamarts for the analytics people.
I hope this already gives you some extra help.

Which version should I use? 0.6.1 ? Thanks a lot.

@cerveada
Copy link
Contributor

cerveada commented Aug 5, 2021

I can add we are reading s3 files instead of using the tables approach.
Could it be the renaming? We use lots of renaming to build the datamarts for the analytics people.

Maybe, If I will be able to debug any code example that is causing the issue I am sure I will be able to find the cause, but without it, it's just guessing.

Which version should I use? 0.6.1 ? Thanks a lot.

Yes, 0.6.1

@wajda
Copy link
Contributor Author

wajda commented Aug 6, 2021

Another code snippet that potentially reproduces it (see #265 (comment)):

SELECT 
    concat_ws(',', col1,col2) AS row_info,
    b.file_row_count AS file_row_count,
    a.filename AS filename,
    run_timestamp,
    load_date,
    batch_id
FROM (
    SELECT *, row_number() OVER (PARTITION BY filename ORDER BY batch_id) AS batch_id_rank 
    FROM schema_validated_failed
    WHERE schema_validation_result = False
) a
JOIN (
    SELECT count(1) AS file_row_count, filename
    FROM schema_validated_failed
    GROUP BY filename
) b 
ON a.filename = b.filename
WHERE batch_id_rank <= 100

@kuhnen
Copy link

kuhnen commented Aug 6, 2021

@wajda thanks a lot :). @cerveada please let me know if you still need my code. Sorry, so many meetings this week,

@kuhnen
Copy link

kuhnen commented Aug 9, 2021

@wajda @cerveada , I created a new project spark project to help with this task. But now I am getting this error:

java.lang.NoClassDefFoundError: scala/tools/reflect/package$
test_aws_catalog[ERROR] 	at za.co.absa.commons.reflect.ReflectionUtils$.compile(ReflectionUtils.scala:46)

any idea how to solve it or why it began to happen?

@cerveada
Copy link
Contributor

cerveada commented Aug 9, 2021

It could be Scala version mismatch. Do you use the same Scala version for everything?

@kuhnen
Copy link

kuhnen commented Aug 9, 2021

@cerveada thanks. I found the issue, I mean, I could solve it, but I do not understand why 😂 . At least I can work and hopefully give you guys a code that recreates the issue 🙌 .

@kuhnen
Copy link

kuhnen commented Aug 10, 2021

@cerveada @wajda I have a piece of code, without any external dependencies that shows the problem.
Please let me know if you can use it. I will try to find the exactly line in the mean time:

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, lower, typedLit, when}
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}

object SplineDuplicatedIds {

  val extraConf: Iterable[(String, String)] = List(
    ("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
    ("hive.imetastoreclient.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"),
    ("spark.sql.queryExecutionListeners", "za.co.absa.spline.harvester.listener.SplineQueryExecutionListener"),
    ("spark.spline.producer.url", "http://localhost:8080/producer"),
    ("spark.spline.mode", "REQUIRED")
  )


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
      .setAll(extraConf)
      .setMaster("local[*]")

    val ss = SparkSession.builder().config(sparkConf).getOrCreate()
    import ss.implicits._
    val uniqueIdColName = "unique_id"
    val createdColName = "created"
    val totalCommissionColName = "total_commission"
    val totalShoppingCartColName = "total_shopping_cart"
    val statusColName = "status"
    val commissionColName = "commission"

    val adcellTransactionStatusLogic: Column = {

      when(lower(col(statusColName)) === "accepted", "Validated")
        .otherwise(
          when(lower(col(statusColName)) === "open", "Pending")
            .otherwise(
              when(lower(col(statusColName)) === "cancelled", "Refused")
                .otherwise(typedLit[String](null))))

    }

    val decimalType = new DecimalType(19, 3)
    val transactionStatusColName = "transaction_status"
    val df = Seq(("adcde1_12938597", 1.162, 7.260, "2021-06-28 21:43:43", "accepted"))
      .toDF("unique_id", "total_commission", "total_shopping_cart", "created", "status")

    val historicTransactions: DataFrame = df
      .withColumn("commission", col(totalCommissionColName).cast(decimalType))

    def updateBiValidateDate(df: DataFrame): DataFrame = {

      val firstValidStatusDateColName = "first_valid_status_date"
      val firstValidTransactions = df
        .select(uniqueIdColName, transactionStatusColName, createdColName)
        .where(col(transactionStatusColName).isInCollection(Set("Paid", "Validated")))
        .withColumnRenamed(createdColName, firstValidStatusDateColName)
        .drop(transactionStatusColName)

      val withFirstValidDate = df.join(firstValidTransactions, usingColumns = Seq(uniqueIdColName), joinType = "left")
      withFirstValidDate

    }

    val result = historicTransactions
      .drop(totalCommissionColName, totalShoppingCartColName, commissionColName)
      .withColumn(transactionStatusColName, adcellTransactionStatusLogic)
      .transform(updateBiValidateDate)
      .drop(transactionStatusColName, statusColName)

    result
      .write
      .mode(SaveMode.Overwrite)
      .option("path", "tmp/spline_test_bi_duplicates")
      .saveAsTable("spline_test_bi_duplicated")

  }
}

@cerveada
Copy link
Contributor

I will try it out, thanks a lot!

@kuhnen
Copy link

kuhnen commented Aug 10, 2021

Sorry about this line ("hive.imetastoreclient.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"). for sure you do not need that

@cerveada
Copy link
Contributor

The duplicates are there, now I can finally try to fix this.

@kuhnen
Copy link

kuhnen commented Aug 10, 2021

@cerveada great :). I was trying to simplify even more the code. Is there a way I can long the name of the attributes? I can only see this: Cannot send lineage data to http://localhost:8080/producer/execution-plans. HTTP Response: 400 Duplicated attribute IDs: 33, 26

@cerveada
Copy link
Contributor

You can use this config

val extraConf: Iterable[(String, String)] = List(
    ("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
    ("spark.sql.queryExecutionListeners", "za.co.absa.spline.harvester.listener.SplineQueryExecutionListener"),
    ("spark.spline.lineageDispatcher", "console"),
    ("spark.spline.mode", "REQUIRED")
  )

The console dispatcher will log the json in the console instead of trying to send it.

Or I just use debug breakpoint in LineageHarvester.Scala Line 150. Where I can see the plan object directly in the IDE.

@kuhnen
Copy link

kuhnen commented Aug 10, 2021

@cerveada quick update from my side :)
If I add localCheckpoint here

  val firstValidTransactions = df
        .select(uniqueIdColName, transactionStatusColName, createdColName)
        .where(col(transactionStatusColName).isInCollection(Set("Paid", "Validated")))
        .withColumnRenamed(createdColName, firstValidStatusDateColName)
        .drop(transactionStatusColName)
       .localCheckPoint()

The duplicated ids are solved. Maybe this helps with your debugging 👍

cerveada added a commit that referenced this issue Aug 12, 2021
- remove deprecated symbol :/
cerveada added a commit that referenced this issue Aug 12, 2021
cerveada added a commit that referenced this issue Aug 12, 2021
* spark agent #272 fix duplicated attribute ids

- remove deprecated symbol /:
@wajda wajda closed this as completed Aug 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Done
Development

No branches or pull requests

3 participants