Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RecursionError when pickling bigquery table object #3052

Closed
mousetree opened this issue Jul 29, 2020 · 8 comments
Closed

RecursionError when pickling bigquery table object #3052

mousetree opened this issue Jul 29, 2020 · 8 comments

Comments

@mousetree
Copy link

Description

When running my flow against a DaskKubernetes environment I get the following RecursionError:

Unexpected error: RecursionError('maximum recursion depth exceeded')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 491, in get_flow_run_state
    upstream_states = executor.wait(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 375, in wait
    return self.client.gather(futures)
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1982, in gather
    return self.sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 832, in sync
    return sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1876, in _gather
    response = await future
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1927, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils_comm.py", line 385, in retry_operation
    return await retry(
  File "/usr/local/lib/python3.8/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 861, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 644, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read
    msg = await from_frames(
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/utils.py", line 87, in from_frames
    res = _from_frames()
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/utils.py", line 65, in _from_frames
    return protocol.loads(
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/core.py", line 130, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 302, in deserialize
    return loads(header, frames)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  [Previous line repeated 974 more times]
RecursionError: maximum recursion depth exceeded

Following that error I am getting an error that looks like this:

│ [2020-07-29 17:07:14] DEBUG - prefect.CloudFlowRunner | Flow 'customers': Handling state change from Running to Failed                                                                                                                     │
│ distributed.scheduler - INFO - Scheduler closing...                                                                                                                                                                                        │
│ distributed.scheduler - INFO - Scheduler closing all comms                                                                                                                                                                                 │
│ distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.60.6.2:43271', name: 0, memory: 0, processing: 0>                                                                                                                           │
│ distributed.core - INFO - Removing comms to tcp://10.60.6.2:43271                                                                                                                                                                          │
│ distributed.scheduler - INFO - Lost all workers                                                                                                                                                                                            │
│ [2020-07-29 17:07:19] DEBUG - kubernetes.client.rest | response body: {"kind":"PodList","apiVersion":"v1","metadata":{"selfLink":"/api/v1/namespaces/prefect-production/pods","resourceVersion":"158839033"},"items":[{"metadata":{"name": │
│ [2020-07-29 17:07:20] DEBUG - kubernetes.client.rest | response body: {"kind":"Pod","apiVersion":"v1","metadata":{"name":"dask-root-11ce59a5-btl5w9","generateName":"dask-root-11ce59a5-b","namespace":"prefect-production","selfLink":"/a │
│ [2020-07-29 17:07:20] INFO - dask_kubernetes.core | Deleted pod: dask-root-11ce59a5-btl5w9                                                                                                                                                 │
│ [2020-07-29 17:07:20] DEBUG - kubernetes.client.rest | response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services is forbidden: User \"system:serviceaccount:prefect-production:default\" cann │
│ Traceback (most recent call last):                                                                                                                                                                                                         │
│   File "/usr/local/lib/python3.8/weakref.py", line 642, in _exitfunc                                                                                                                                                                       │
│     f()                                                                                                                                                                                                                                    │
│   File "/usr/local/lib/python3.8/weakref.py", line 566, in __call__                                                                                                                                                                        │
│     return info.func(*info.args, **(info.kwargs or {}))                                                                                                                                                                                    │
│   File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/core.py", line 707, in _cleanup_resources                                                                                                                                   │
│     services = core_api.list_namespaced_service(                                                                                                                                                                                           │
│   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 13463, in list_namespaced_service                                                                                                               │
│     (data) = self.list_namespaced_service_with_http_info(namespace, **kwargs)  # noqa: E501                                                                                                                                                │
│   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 13551, in list_namespaced_service_with_http_info                                                                                                │
│     return self.api_client.call_api(                                                                                                                                                                                                       │
│   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 330, in call_api                                                                                                                                     │
│     return self.__call_api(resource_path, method,                                                                                                                                                                                          │
│   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 162, in __call_api                                                                                                                                   │
│     response_data = self.request(                                                                                                                                                                                                          │
│   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 352, in request                                                                                                                                      │
│     return self.rest_client.GET(url,                                                                                                                                                                                                       │
│   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 237, in GET                                                                                                                                                │
│     return self.request("GET", url,                                                                                                                                                                                                        │
│   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request                                                                                                                                            │
│     raise ApiException(http_resp=r)                                                                                                                                                                                                        │
│ kubernetes.client.rest.ApiException: (403)                                                                                                                                                                                                 │
│ Reason: Forbidden                                                                                                                                                                                                                          │
│ HTTP response headers: HTTPHeaderDict({'Audit-Id': 'ea92e80a-d7cd-4fb4-b731-6b695e45e945', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Wed, 29 Jul 2020 17:07:20 GMT', 'Content-Length': '316'})     │
│ HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services is forbidden: User \"system:serviceaccount:prefect-production:default\" cannot list resource \"services\" in API group \"\" in │
│ stream closed

Expected Behavior

When running locally with flow.run() it gives no errors, but I get the above error when running with DaskKubernetes

Reproduction

The code I'm running looks something like the following:

from random import randrange

import pandas as pd
import prefect
from prefect import Flow, task
from prefect.tasks.gcp.bigquery import BigQueryTask

get_customer_ids = BigQueryTask(
    name="Get unique customer names",
    query="""
        select distinct id from cloud_sql.customers
        where redacted_id is not null limit 100
    """,
)


@task
def get_customer_balance(customer_record):
    logger = prefect.context.get("logger")
    customer_id = customer_record.get("id")
    logger.info(f"Requesting balance for customer {customer_id}")
    # For now we just generate a random number, later this will be a API call
    balance = randrange(0, 100)
    return (customer_id, balance)


@task
def prepare_balances(balances):
    logger = prefect.context.get("logger")
    # convert the list of tuples into a dataframe
    df = pd.DataFrame(balances, columns=["customer_id", "balance"])
    logger.info(df)
    return df


@task
def save_balances(balances):
    logger = prefect.context.get("logger")
    logger.info("Storing balances in BigQuery")
    # convert the df to sql or a file and store in bigquery
    return True


with Flow("customers") as flow:
    customer_records = get_customer_ids()
    synapse_balances = get_customer_balance.map(customer_records)
    balances = prepare_balances(synapse_balances)
    save_balances(balances)

if __name__ == "__main__":

    flow.run()

Additionally, when running on my cluster, the CI/CD process executes another Python file that looks like:

from os import environ, path

import docker
from customers.flow import flow as customers
from prefect.environments import DaskKubernetesEnvironment
from prefect.environments.storage import Docker

# The following TLS config is required for CircleCI as it uses a "docker in docker" approach
tls_config = docker.tls.TLSConfig(
    client_cert=(
        path.join(environ.get("DOCKER_CERT_PATH"), "cert.pem"),
        path.join(environ.get("DOCKER_CERT_PATH"), "key.pem"),
    )
)

customers.storage = Docker(
    registry_url="gcr.io/redacted/redacted",
    base_url=environ.get("DOCKER_HOST"),  # required for CircleCI
    tls_config=tls_config,  # required for CircleCI
    python_dependencies=["pandas", "prefect[google,kubernetes]"],
)

customers.environment = DaskKubernetesEnvironment(min_workers=1, max_workers=3)
customers.register(project_name="prefect-test-1")

Environment

On my local machine:

{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "macOS-10.15.5-x86_64-i386-64bit",
    "prefect_version": "0.12.6",
    "python_version": "3.8.4"
  }
}

From inside the prefect agent container on my Kubernetes cluster:

{
  "config_overrides": {},
  "env_vars": [
    "PREFECT__CLOUD__API",
    "PREFECT__CLOUD__AGENT__AUTH_TOKEN",
    "PREFECT__CLOUD__AGENT__LABELS",
    "PREFECT__CLOUD__AGENT__AGENT_ADDRESS",
    "PREFECT__BACKEND"
  ],
  "system_information": {
    "platform": "Linux-4.19.104+-x86_64-with-debian-10.4",
    "prefect_version": "0.12.5",
    "python_version": "3.6.11"
  }
}

I've noticed the Python versions are different.

@mousetree
Copy link
Author

I noticed the error include the message:

 {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services is forbidden: User \"system:serviceaccount:prefect-production:default\" cannot list resource \"services\" in API group \"\"

It seems DaskKubernetes requires RBAC permissions to Services. The docs listed here don't require that. But maybe I'm reading that wrong

@mousetree
Copy link
Author

The Dask Kubernetes Documentation seems to have added something on the need for Services:

If you intend to use the newer Dask functionality in which the scheduler is created in its own pod and accessed via a service, you will also need:

- apiGroups:
  - "" # indicates the core API group
  resources:
  - "services"
  verbs:
  - "get"
  - "list"
  - "watch"
  - "create"
  - "delete"

@joshmeek
Copy link

@mousetree definitely we could add the listing permissions for services to the RBAC but I think that's a red herring here because it looks to happen after the issue occurs. Tagging @jcrist in case he has seen this recursion depth issue before, otherwise we can try to reproduce 🙂

@jcrist
Copy link

jcrist commented Jul 29, 2020

That looks like it's happening in your flow execution, not in the dask/environment layer. I suspect you could reproduce without registering the flow by running with a dask executor instead. It looks like a pickling error, so it might have to do with version differences between dask workers and your flow runner or possibly google objects failing to pickle properly (which happens) Can you try running on dask but outside of k8s (which I think is likely unrelated)?

# Stuff to create your flow object...

from prefect.engine.executors import DaskExecutor
executor = DaskExecutor()
flow.run(executor=executor)

@mousetree
Copy link
Author

Thanks for the follow up @jcrist. So I've tried to run it with both LocalDaskExecutor and DaskExecutor. It works fine with LocalDaskExecutor, but when running with DaskExecutor I get the following error:

[2020-07-30 08:07:48] INFO - prefect.FlowRunner | Beginning Flow run for 'customers'
[2020-07-30 08:07:48] INFO - prefect.FlowRunner | Starting flow run.
[2020-07-30 08:07:50] INFO - prefect.TaskRunner | Task 'BigQueryTask': Starting task run...
[2020-07-30 08:07:53] INFO - prefect.TaskRunner | Task 'BigQueryTask': finished task run for task with final state: 'Success'
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/protocol/core.py", line 130, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 302, in deserialize
    return loads(header, frames)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  [Previous line repeated 974 more times]
RecursionError: maximum recursion depth exceeded
[2020-07-30 08:07:55] ERROR - prefect.FlowRunner | Unexpected error: RecursionError('maximum recursion depth exceeded')
Traceback (most recent call last):
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 491, in get_flow_run_state
    upstream_states = executor.wait(
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 375, in wait
    return self.client.gather(futures)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/client.py", line 1982, in gather
    return self.sync(
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/client.py", line 832, in sync
    return sync(
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/client.py", line 1876, in _gather
    response = await future
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/client.py", line 1927, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/utils_comm.py", line 385, in retry_operation
    return await retry(
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/core.py", line 861, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/core.py", line 644, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read
    msg = await from_frames(
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/comm/utils.py", line 87, in from_frames
    res = _from_frames()
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/comm/utils.py", line 65, in _from_frames
    return protocol.loads(
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/protocol/core.py", line 130, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 302, in deserialize
    return loads(header, frames)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  File "/Users/adam/.local/share/virtualenvs/sable-batch-shMD5XGQ/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  [Previous line repeated 974 more times]
RecursionError: maximum recursion depth exceeded
[2020-07-30 08:07:55] ERROR - prefect.customers | Unexpected error occured in FlowRunner: RecursionError('maximum recursion depth exceeded')

It indeed seems like a pickling error. Any ideas how I can resolve this?

@jcrist
Copy link

jcrist commented Jul 30, 2020

That indeed looks to be due to pickling issues. Two things I'd check (in order):

  • If your dask worker python environments are different from your flow-runner environment, check if the version of the bigquery library is the same. If you ran the above with DaskExecutor() (with no parameters) and got that error then this isn't the issue, since in that case the worker and flow runner environments are the same.
  • Can you try pickling a similar bigquery table object locally and see if it works? Just calling cloudpickle.loads(cloudpickle.dumps(table)) as a check should be good enough.

@jcrist jcrist changed the title RecursionError when executing on DaskKubernetes RecursionError when pickling bigquery table object Jul 30, 2020
@mousetree
Copy link
Author

This might be crazy, but is it possible that pickling fails when there is code that:

name = mydict["name"]

vs.

name = mydict.get("name")

I'm not sure what the requirements are for pickling, but that seems to solve some cases.

@adinamarca
Copy link

adinamarca commented Aug 23, 2024

I raised this issue in the repo of Bigquery. Hope they fix it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants