Skip to content

FacebookAdsReportToGcsOperator method _flush_rows() infers field names from first data point instead of declared fields #34173

@Taishan314

Description

@Taishan314

Apache Airflow version

2.5.3+composer

What happened

I created a task to retrieve insight level ad data using the FacebookAdsReportToGcsOperator. Whilst running the pipeline, the dag failed with the following response:

[2023-09-07, 11:18:38 UTC] {taskinstance.py:1778} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py", line 151, in execute
    total_row_count = self._decide_and_flush(converted_rows_with_action=converted_rows_with_action)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py", line 183, in _decide_and_flush
    self._flush_rows(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py", line 213, in _flush_rows
    writer.writerows(converted_rows)
  File "/opt/python3.8/lib/python3.8/csv.py", line 157, in writerows
    return self.writer.writerows(map(self._dict_to_list, rowdicts))
  File "/opt/python3.8/lib/python3.8/csv.py", line 149, in _dict_to_list
    raise ValueError("dict contains fields not in fieldnames: "
ValueError: dict contains fields not in fieldnames: 'action_values'

The field 'action_values' was in my requested fields, but I found that it didn't appear in all data points in the data set. Upon inspecting the code, I found that the __flush_rows() method infers the fields (denoted as headers) using the first data point.

Is it possible to get this method amended to infer headers from all requested fields?

What you think should happen instead

The __flush_rows() method shouldn't get the headers (fields) from the first data point, it should get them from the requested fields, or at least view all data points and use the one with the most fields in.

How to reproduce

Create and run a task using the FacebookAdsReportToGcsOperator.

api_version = v17.0
fields =["account_name","estimated_ad_recall_rate","video_avg_time_watched_actions","video_p100_watched_actions","video_p95_watched_actions","video_p25_watched_actions","video_play_actions","account_id","account_currency","campaign_name","campaign_id","objective","adset_name","adset_id","ad_name","ad_id","reach","impressions","clicks","spend","actions","action_values"]
params={"level": "ad","time_range": {"since": "2023-09-23", "until": "2023-09-29"},"breakdowns": ["age", "gender"],"action_breakdowns": ["action_type"],"action_report_time": "conversion","time_increment": 1}

Operating System

Windows 10

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.7.0
apache-airflow-providers-facebook==3.2.1

Deployment

Google Cloud Composer

Deployment details

image version: composer-2.4.1-airflow-2.5.3
python version: 3 


Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions