Skip to content

Commit

Permalink
fix freshness RPC response behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Sep 22, 2020
1 parent 8379edc commit da80bf2
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 9 deletions.
54 changes: 54 additions & 0 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
CatalogArtifact,
CatalogResults,
ExecutionResult,
FreshnessExecutionResultArtifact,
FreshnessResult,
RunOperationResult,
RunOperationResultsArtifact,
RunResult,
Expand Down Expand Up @@ -282,6 +284,28 @@ def write(self, path: str):
writable.write(path)


@dataclass
@schema_version('remote-freshness-result', 1)
class RemoteFreshnessResult(FreshnessResult, RemoteResult):

@classmethod
def from_local_result(
cls,
base: FreshnessResult,
logs: List[LogMessage],
) -> 'RemoteFreshnessResult':
return cls(
metadata=base.metadata,
results=base.results,
elapsed_time=base.elapsed_time,
logs=logs,
)

def write(self, path: str):
writable = FreshnessExecutionResultArtifact.from_result(base=self)
writable.write(path)


@dataclass
@schema_version('remote-run-result', 1)
class RemoteRunResult(RemoteCompileResultMixin):
Expand All @@ -298,6 +322,7 @@ class RemoteRunResult(RemoteCompileResultMixin):
]



# GC types


Expand Down Expand Up @@ -682,6 +707,35 @@ def from_result(
elapsed=timing.elapsed,
)


@dataclass
@schema_version('poll-remote-freshness-result', 1)
class PollFreshnessResult(RemoteFreshnessResult, PollResult):
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
)

@classmethod
def from_result(
cls: Type['PollFreshnessResult'],
base: RemoteFreshnessResult,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
) -> 'PollFreshnessResult':
return cls(
logs=logs,
tags=tags,
state=timing.state,
start=timing.start,
end=timing.end,
elapsed=timing.elapsed,
metadata=base.metadata,
results=base.results,
elapsed_time=base.elapsed_time,
)

# Manifest parsing types


Expand Down
7 changes: 6 additions & 1 deletion core/dbt/rpc/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
TaskRow,
PSResult,
RemoteExecutionResult,
RemoteFreshnessResult,
RemoteRunResult,
RemoteCompileResult,
RemoteCatalogResults,
Expand All @@ -32,6 +33,7 @@
PollRunCompleteResult,
PollCompileCompleteResult,
PollCatalogCompleteResult,
PollFreshnessResult,
PollRemoteEmptyCompleteResult,
PollRunOperationCompleteResult,
TaskHandlerState,
Expand Down Expand Up @@ -146,7 +148,8 @@ def poll_complete(
PollCatalogCompleteResult,
PollRemoteEmptyCompleteResult,
PollRunOperationCompleteResult,
PollGetManifestResult
PollGetManifestResult,
PollFreshnessResult,
]]

if isinstance(result, RemoteExecutionResult):
Expand All @@ -164,6 +167,8 @@ def poll_complete(
cls = PollRunOperationCompleteResult
elif isinstance(result, GetManifestResult):
cls = PollGetManifestResult
elif isinstance(result, RemoteFreshnessResult):
cls = PollFreshnessResult
else:
raise dbt.exceptions.InternalException(
'got invalid result in poll_complete: {}'.format(result)
Expand Down
28 changes: 20 additions & 8 deletions core/dbt/task/rpc/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
from dbt.contracts.results import RunResultsArtifact
from dbt.contracts.rpc import RemoteExecutionResult
from dbt.contracts.results import (
RunResult,
RunOperationResult,
FreshnessResult,
)
from dbt.contracts.rpc import (
RemoteExecutionResult,
RemoteFreshnessResult,
RemoteRunOperationResult,
)
from dbt.task.runnable import GraphRunnableTask
from dbt.rpc.method import RemoteManifestMethod, Parameters


RESULT_TYPE_MAP = {
RunResult: RemoteExecutionResult,
RunOperationResult: RemoteRunOperationResult,
FreshnessResult: RemoteFreshnessResult,
}


class RPCTask(
GraphRunnableTask,
RemoteManifestMethod[Parameters, RemoteExecutionResult]
Expand All @@ -21,10 +36,7 @@ def load_manifest(self):
def get_result(
self, results, elapsed_time, generated_at
) -> RemoteExecutionResult:
base = RunResultsArtifact.from_node_results(
results=results,
elapsed_time=elapsed_time,
generated_at=generated_at,
)
rpc_result = RemoteExecutionResult.from_local_result(base, logs=[])
base = super().get_result(results, elapsed_time, generated_at)
cls = RESULT_TYPE_MAP.get(type(base), RemoteExecutionResult)
rpc_result = cls.from_local_result(base, logs=[])
return rpc_result

0 comments on commit da80bf2

Please sign in to comment.