Skip to content

Commit

Permalink
Merge pull request #2554 from fishtown-analytics/fix/parallel-rpc-met…
Browse files Browse the repository at this point in the history
…hod-stomping

Fix parallel rpc methods (2484)
  • Loading branch information
beckjake authored Jun 17, 2020
2 parents 765c2de + dbf7e69 commit 64e9d70
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- dbt compile and ls no longer create schemas if they don't already exist ([#2525](https://github.com/fishtown-analytics/dbt/issues/2525), [#2528](https://github.com/fishtown-analytics/dbt/pull/2528))
- `dbt deps` now respects the `--project-dir` flag, so using `dbt deps --project-dir=/some/path` and then `dbt run --project-dir=/some/path` will properly find dependencies ([#2519](https://github.com/fishtown-analytics/dbt/issues/2519), [#2534](https://github.com/fishtown-analytics/dbt/pull/2534))
- `packages.yml` revision/version fields can be float-like again (`revision: '1.0'` is valid). ([#2518](https://github.com/fishtown-analytics/dbt/issues/2518), [#2535](https://github.com/fishtown-analytics/dbt/pull/2535))
- Parallel RPC requests no longer step on each others' arguments ([[#2484](https://github.com/fishtown-analytics/dbt/issues/2484), [#2554](https://github.com/fishtown-analytics/dbt/pull/2554)])

## dbt 0.17.0 (June 08, 2020)

Expand Down
9 changes: 2 additions & 7 deletions core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,8 @@ def env_set_truthy(key: str) -> Optional[str]:


def _get_context():
if os.name == 'posix' and os.uname().sysname.lower() != 'darwin':
# on linux fork is available and it's fast
return multiprocessing.get_context('fork')
else:
# on windows, spawn is the only choice.
# On osx, fork is buggy: https://bugs.python.org/issue33725
return multiprocessing.get_context('spawn')
# TODO: change this back to use fork() on linux when we have made that safe
return multiprocessing.get_context('spawn')


MP_CONTEXT = _get_context()
Expand Down
3 changes: 2 additions & 1 deletion core/dbt/rpc/method.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
from abc import abstractmethod
from copy import deepcopy
from typing import List, Optional, Type, TypeVar, Generic, Dict, Any

from hologram import JsonSchemaMixin, ValidationError
Expand All @@ -20,7 +21,7 @@ class RemoteMethod(Generic[Parameters, Result]):
METHOD_NAME: Optional[str] = None

def __init__(self, args, config):
self.args = args
self.args = deepcopy(args)
self.config = config

@classmethod
Expand Down
5 changes: 3 additions & 2 deletions test/integration/048_rpc_test/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,9 @@ def test_invalid_requests_postgres(self):
'hi this is not sql',
name='foo'
).json()
# neat mystery: Why is this "1" on macos and "2" on linux?
lineno = '1' if sys.platform == 'darwin' else '2'
# this is "1" if the multiprocessing context is "spawn" and "2" if
# it's fork.
lineno = '1'
error_data = self.assertIsErrorWith(data, 10003, 'Database Error', {
'type': 'DatabaseException',
'message': f'Database Error in rpc foo (from remote system)\n syntax error at or near "hi"\n LINE {lineno}: hi this is not sql\n ^',
Expand Down
39 changes: 39 additions & 0 deletions test/rpc/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from concurrent.futures import ThreadPoolExecutor, as_completed

from .util import (
get_querier,
ProjectDefinition,
)


def _compile_poll_for_result(querier, id: int):
sql = f'select {id} as id'
resp = querier.compile_sql(
request_id=id, sql=sql, name=f'query_{id}'
)
compile_sql_result = querier.async_wait_for_result(resp)
assert compile_sql_result['results'][0]['compiled_sql'] == sql


def test_rpc_compile_sql_concurrency(
project_root, profiles_root, postgres_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'}
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_root,
profiles_dir=profiles_root,
schema=unique_schema,
test_kwargs={},
)

with querier_ctx as querier:
values = {}
with ThreadPoolExecutor(max_workers=10) as tpe:
for id in range(20):
fut = tpe.submit(_compile_poll_for_result, querier, id)
values[fut] = id
for fut in as_completed(values):
fut.result()

0 comments on commit 64e9d70

Please sign in to comment.