-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BigQueryInsertJobOperator is broken on any type of job except query
#23826
Comments
Thanks for opening your first issue here! Be sure to follow the issue template! |
Do you think #23165 fixes this issue ? Can we close it @raphaelauv ? |
No It's the reason of the issue |
Ah OK. That was not clear @wojsamjan -> can you please take a look and fix it ? |
I have exactly the issue. |
try this from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from typing import Any
from google.api_core.exceptions import Conflict
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
class FixBigQueryInsertJobOperator(BigQueryInsertJobOperator):
def execute(self, context: Any):
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
self.hook = hook
job_id = self._job_id(context)
try:
job = self._submit_job(hook, job_id)
self._handle_job_error(job)
except Conflict:
# If the job already exists retrieve it
job = hook.get_job(
project_id=self.project_id,
location=self.location,
job_id=job_id,
)
if job.state in self.reattach_states:
# We are reattaching to a job
job.result(timeout=self.result_timeout, retry=self.result_retry)
self._handle_job_error(job)
else:
# Same job configuration so we need force_rerun
raise AirflowException(
f"Job with id: {job_id} already exists and is in {job.state} state. If you "
f"want to force rerun it consider setting `force_rerun=True`."
f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
)
if "query" in job.to_api_repr()["configuration"]:
if "destinationTable" in job.to_api_repr()["configuration"]["query"]:
table = job.to_api_repr()["configuration"]["query"]["destinationTable"]
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["datasetId"],
project_id=table["projectId"],
table_id=table["tableId"],
)
self.job_id = job.job_id
return job.job_id |
Although I was avoiding this solution because I would have to refactor hundreds of queries, I guess its the best solution. |
This is open source project - the fix will arrive when someone decides to spend the time on fixing it. |
My apologies if I was misunderstood. If I have time in the next couple of weeks, I will let you know. |
hi. |
Have you read comments above @takuma11248250 ?
This will be solved and released when someone solves it. You will not get answers on when it will be fixed but by contributing and providing fixes and testing you can help with speeding it up. Let me revert the question. Do you expect to help with it by providing all details to help us solve it ? Can we cound you to observe the issue and when we release an RC you will help with testing @takuma11248250 ? |
And one more thing @takuma11248250 - if you need it fast actually you can help - I just merged the fix by @raphaelauv that should address it. This is an open-source project and you can easily apply the fix by cherry-picking this change. When can we axpect that you apply it to your test system and report that the fix worked? |
@potiuk |
You do not have to wait for Composer. When we release an RC (soon) you can install it manually and test it by adding specifically I would really appreciate it - since you are interested in knowing when it will be released, you are the perfect source of information whether the source has been released. Failing to test it will basically be problematic for you in case it is not fixed, because the next wave of providers is going to be released in about month, but bt testing the RC you have a chance to confirm that the fix worked for you. By not doing it, you are risking that the fix will not fix your problem so you will have to wait at least another month or maybe longer. But you can prevent that from happening by investing a little time and testing the change while it is in RC stage. |
Hi @potiuk I'm having the same problem with my Composer, I can help test the RC version once it's released. |
RC will be out likely tomorrow :) |
Or maybe even today |
The RC is out #24289 @takuma11248250 @gilangardya I'd love your "test status" in the #24289 - that's where everyone else will be posting theirs. |
I installed the new 8.0.0rc1 on google composer and it seems to have fixed the problem. Thx for your help @raphaelauv |
@potiuk I've tested it and it's working as expected, the test details are in test status #24289 (comment) Thank you @raphaelauv, and everyone 👍 |
I know this is already closed and fix on its way, but would it be possible for someone who tested this on gcp to explain how you tested? I can not find a way to override operators versions and would very much like to be able to help out testing stuff like this in the future. |
@MazrimT yeah sure On the Google composer you can define pypi packages. apache-airflow-providers-google ==8.0.0rc2 This should install this specific version, and override the composer internal version of that package |
@DrStriky I have no idea what I've been doing before, I thought I tried exactly that but it didn't work, now it worked :) Thank you for your patience with a gcp beginner. |
Apache Airflow Provider(s)
google
Versions of Apache Airflow Providers
apache-airflow-providers-google==7.0.0
Apache Airflow version
2.2.5
Operating System
MacOS 12.2.1
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
We are using
BigQueryInsertJobOperator
to load data from parquet files in Google Cloud Storage with this kind of configuration:After upgrade to
apache-airflow-providers-google==7.0.0
all load jobs are now broken. I believe that problem lies in this line:airflow/airflow/providers/google/cloud/operators/bigquery.py
Line 2170 in 5bfacf8
So it's trying to get the destination table from
query
job config and makes it impossible to use any other type of job.What you think should happen instead
No response
How to reproduce
Use BigQueryInsertJobOperator to submit any type of job except
query
Anything else
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: