-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CT-202] Workaround for some limitations due to list_relations_without_caching
method
#228
Comments
@danfran Thanks for opening the issue, and apologies for getting back to you here (and over in #215). BackgroundThe reason we use We found we need to use (A neat side effect of having to use I know that Proposed changeOkay, now that we've gotten that background out of the way—I think you're onto something here:
I think this might just be doable! There's a base adapter method, But what if we instead passed in a schema relation that did have an identifier? That identifier could look like Any methods defined in Here's my rough attempt: def _get_cache_schemas(self, manifest: Manifest) -> Set[BaseRelation]:
"""Get the set of schema relations that the cache logic needs to
populate. This means only executable nodes are included.
"""
# the cache only cares about executable nodes
relations = [
self.Relation.create_from(self.config, node) # keep the identifier
for node in manifest.nodes.values()
if (
node.is_relational and not node.is_ephemeral_model
)
]
# group up relations by common schema
import collections
relmap = collections.defaultdict(list)
for r in relations:
relmap[r.schema].append(r)
# create a single relation for each schema
# set the identifier to a '|' delimited string of relation names, or '*'
schemas = [
self.Relation.create(
schema=schema,
identifier=(
'|'.join(r.identifier for r in rels)
# there's probably some limit to how many we can include by name
if len(rels) < 100 else '*'
)
) for schema, rels in relmap.items()
]
return schemas Then, the macro becomes: {% macro spark__list_relations_without_caching(relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
show table extended in {{ relation.schema }} like '{{ relation.identifier or "*" }}'
{% endcall %}
{% do return(load_result('list_relations_without_caching').table) %}
{% endmacro %} That works for me locally: show table extended in dev_jerco like 'my_first_dbt_model|my_second_dbt_model' Next steps
|
Hi @jtcohen6 thank you for the insights, very useful! I am more than happy to try what you suggested as seems working around the issue in a good way. I will also try to apply your other suggested points. |
@danfran I am curious to know whether this worked for you. |
@ssimeonov sorry for the late answer. Unfortunately my work conditions are changed since then and I was not able to test it in my original environment. However the solution proposed by @jtcohen6 still seems valid. If you are in an AWS environment and you want to test it (included the Athena view / EMR-Hive conflict that makes this fix even more useful) you need just a few tables in order to reproduce the issue and create a valid environment. |
@jtcohen6 is there a strictly macro override way (without getting into python) to make the macro do as you suggested? I plonked something like this into the {% macro spark__list_relations_without_caching(relation) %}
{% set rels = [] %}
{% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %}
{% do rels.append(node.fqn[1]) %}
{% endfor %}
{% if rels | length > 1 %}
{% set suffix = rels | join('|') %}
{% else %}
{% set suffix = '*' %}
{% endif %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
show table extended in {{ relation }} like {{ suffix }}
{% endcall %}
{% do return(load_result('list_relations_without_caching').table) %}
{% endmacro %} I'm guessing surely not because you provided a |
@jeremyyeo I think this approach would indeed require implementing An alternative approach that either I just remembered, or which just occurred to me, in the threads where we're discussing this internally:
It might be worth experimenting with both approaches, and seeing which one yields greater benefits. Does the slowness have more to do with the verbosity of |
Yes. |
After speaking a bit more about this with @superdupershant:
There are other performance boosts that the |
This aligns with our experience @jtcohen6. Forgive my unfamiliarity with DBT architecture: how do capabilities "degrade" based on the quality of metadata available about assets a data production depends on? I can think of (at least) several levels:
The reason for the question is that, while the Spark metastore is quite dumb, Delta tables have queryable history and recognizable versioned identifiers (which could be used as an indication of immutable data state). Further, one could design a simple open metadata standard (say, a single property containing some JSON, implemented via managed table properties in Spark, for example) for getting the metadata DBT needs for advanced functionality. |
@ssimeonov It's a really fair question!
In the future, I could see integrating both types of information (freshness + catalog) into other dbt tasks. Overall, I think dbt only stands to benefit from Delta tables that can provide all types of metadata more readily, quickly, and reliably. |
Then the path forward seems to be the implementation of your original idea: to generate the |
Yes, I think that's right. Let's aim to include this change in a v1.1 beta, and ask folks to test it out in their own projects to ensure it yields the desired speedups. cc @VersusFacit: This is another cache-related performance question, similar to dbt-labs/dbt-snowflake#83, where benchmarking (and anticipating edge cases) will be the name of the game. The change proposed in this issue will significantly help in cases where dbt manages some objects, in large databases that also contain thousands of non-dbt objects. If we want to speed up cases where dbt manages thousands of objects, and a given invocation is only selecting a handful to run, we'll need to tackle the larger question of selection-based caching: dbt-labs/dbt-core#4688 |
list_relations_without_caching
methodlist_relations_without_caching
method
Adding another customer onto this issue. Seeing |
Hoping this change can get added into 1.1 as part of the base code! It would be a significant benefit for us as we are using Managed Workflows for Apache Airflow (MWAA) and are unable to modify the core code as referenced here #228 (comment). With AWS MWAA we provide the requirements.txt (dbt-core==1.0.0) but have no access to the shell to modify the core code after the fact and Our implementation means we don't do a single Anyways, looking forward to changes that speed up "show table extended" for spark schema objects. |
dbt uses
Cache population: We're tracking the investigation into the two Spark-specific possibilities noted above in #296. There are two branches with sample/experimental code, if you're able to try them out and let us know what you find. We're also thinking about cache speedups via dbt-core changes, e.g. dbt-labs/dbt-core#4688. Catalog generation: As a separate question, I think it would make a lot of sense to pass specific table info ( |
Very interested in this fix -- @jtcohen6 @TalkWIthKeyboard do you think there's anything I could do to help with #433, or do you think it's best to let y'all handle it? |
I'm working with a dbt Cloud customer who have an existing, long-lived Spark cluster and are keen to try dbt Core/Cloud with it. They have 10,000+ objects in Spark and are experiencing this issue. They are following along to see its resolution! |
Is there any progress with a fix? |
I ran into the same issue, it took ~1hr to run |
While waiting for this to be implemented I decided to use @ryanClift-sd 's dbt-spark wip-faster-caching-option1 and stuck with the further problem: it seems that
even thought the manifest definitely exists because it is being used by I'm not sure whom this problem should be addressed to. |
I assume it's probably related to changes in made after dbt-core/dbt-spark 1.0. When we had implemented this solution we were only using dbt-core 1.0 and dbt-spark 1.0 & never tested anything above that version. We stuck with dbt-core/dbt-spark 1.0 until very recently. We have since upgraded to dbt-core 1.5 and luckily were able to move to dbt-databricks vs dbt-spark. dbt-databricks solved our issues with the relation retrieval and supported unity catalog which we were starting to leverage and needed to use in our dbt projects |
I would like to report another issue on We use the OSS HMS with Spark3, and there are some Kudu, HBase tables registered in HMS, but there are no correspondingly serde jars under the Spark application classpath, thus the above query would cause failure like
So, alongside the performance, I think the new approach should consider supporting table exclusion list |
We have the same issue, but our problem is not focused on performance but rather on the loss of information. |
We are also facing the same issue with iceberg. The failure is an intermittent one and get resolved in the rerun. However, there is an explicit loss of data because of the We are facing this issue in dbt-spark 1.6.0 |
What @vinjai and @despewerba are telling its true, the same thing happened on databricks and they fixed it. |
The same issue leading to data loss because of failed |
Iceberg does not support efficient listing because it is a V2 datasource in Spark. This will be fixed with the new 3.6 release. Until then, there is not much you can do (except making sure that you don't have too many tables in your schema). |
@Fokko could you please provide some additional info (did't find any similar issues at https://issues.apache.org/)? I am wondering if the problem with iceberg the same as the problem with delta tables mentioned in the linked issue above. |
Hello guys! I'm very interested in this topic. We have a very similar use, like the one @ryanClift-sd commented last year in #228 (comment), where we run every dbt model in an Airflow task, so for every run we need to wait 1-2 minutes in order to discover the catalog. Do you have any news on how this behavior is going to improve? Do you know if this is better with Databricks Unity Catalog? |
@daniel-gomez-mng I know for us, moving to the DBT Databricks adapter fixed our underlying spark adapter performance issues whether unity catalog or hive_metastore databricks tables. I understand this might not be an option for everyone though depending upon their environments. |
Hi @ryanClift-sd , we are already using the Databricks adapter and we have the same performance issues. Just to give you more info, each dbt run is executing the following queries:
In our case, every show views query take 40-50 seconds. So, if we take into account all models, we are spending around 16 hours per day discovering this metadata. This is very expensive and 'unnecessary'. We are analyzing this cache options in order to avoid recovering all metadata in all models, but please, if anyone knows how to improve this it would be very helpful. |
Is there any update on this issue? For me |
Hi @AlanCortesK , we improved this behavior by adding this config to our Spark jobs. We use SQL warehouses in Databricks and a Hive Metastore in Glue, I don't know if it also your case
|
Recently I have fixed this issue. Improved the performance of dbt-spark performance much more faster. |
I'd Love to see this implemented! |
@rahulgoyal2987 how did you solve it? We have the same issue. Thanks |
Describe the feature
I am currently facing an issue using DBT with Spark on AWS/Glue/EMR environment as discussed already in #215 (but already raised here #93).
The current issue is about the adapter's method
list_relations_without_caching
:https://github.com/dbt-labs/dbt/blob/HEAD/core/dbt/include/global_project/macros/adapters/common.sql#L240
which in the Spark Adapter implementation is:
dbt-spark/dbt/include/spark/macros/adapters.sql
Lines 133 to 139 in a8a85c5
In this case you can see that the command
show table extended in {{ relation }} like '*'
is executed. It will force Spark to go through all the tables info in the schema (as Spark has not Information Schema Layer) in a sort of "discovery mode" and this approach produces two main issues:Bad performance: some environments can have hundreds or even thousands of tables generated not only by DBT but also by other processes in the same schema. In that case this operation can be very costly, especially when you have different DBT processes that run some updates at different times on a few tables.
Instability, as I verified in AWS/Glue/EMR environment, where you can have views without "S3 Location" defined, like an Athena/Presto view, that will make crash a DBT process running SparkSql on EMR with errors like:
Describe alternatives you've considered
I do not see the reason why DBT process should care of the "rest of the world" like the Athena views from before or tables created from other processes that are in the same schema.
So I can think ideally to replace the method:
show table extended in <schema> like '*'
with something like:
show table extended in <schema> like ('<table1>|<table2>|…')
where my
<table1>
,<table2>
, etc. are determined automatically when I run a command likedbt run --models my_folder
where
my_folder
contains the files:table1.sql
,table2.sql
, etcbut from the current method interface, only the schema params can be passed.
Two questions here:
How can I infer automatically the name of the tables involved when a command like
dbt run --models my_folder
run and how can I pass them eventually to thelist_relations_without_caching
?Additional context
I found it relevant for Spark on AWS environment but can be potentially a similar issue for other implementations.
Who will this benefit?
On DBT's slack channel I talked to another used "affected" by similar issue, but probably whoever is going to use Spark in distributed environment can be affected by this (AWS and non).
Are you interested in contributing this feature?
Sure, both coding and testing.
The text was updated successfully, but these errors were encountered: