Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix corrupted task state #1855

Merged
merged 1 commit into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 67 additions & 25 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


TaskTags = Optional[Dict[str, Any]]
TaskID = uuid.UUID

# Inputs

Expand Down Expand Up @@ -70,6 +71,56 @@ class RPCNoParameters(RPCParameters):
pass


@dataclass
class KillParameters(RPCParameters):
task_id: TaskID


@dataclass
class PollParameters(RPCParameters):
request_token: TaskID
logs: bool = False
logs_start: int = 0


@dataclass
class PSParameters(RPCParameters):
active: bool = True
completed: bool = False


@dataclass
class StatusParameters(RPCParameters):
pass


@dataclass
class GCSettings(JsonSchemaMixin):
# start evicting the longest-ago-ended tasks here
maxsize: int
# start evicting all tasks before now - auto_reap_age when we have this
# many tasks in the table
reapsize: int
# a positive timedelta indicating how far back we should go
auto_reap_age: timedelta


@dataclass
class GCParameters(RPCParameters):
"""The gc endpoint takes three arguments, any of which may be present:

- task_ids: An optional list of task ID UUIDs to try to GC
- before: If provided, should be a datetime string. All tasks that finished
before that datetime will be GCed
- settings: If provided, should be a GCSettings object in JSON form. It
will be applied to the task manager before GC starts. By default the
existing gc settings remain.
"""
task_ids: Optional[List[TaskID]]
before: Optional[datetime]
settings: Optional[GCSettings]


# Outputs

@dataclass
Expand Down Expand Up @@ -133,12 +184,13 @@ class GCResultState(StrEnum):


@dataclass
class GCResultSet(JsonSchemaMixin):
deleted: List[uuid.UUID] = field(default_factory=list)
missing: List[uuid.UUID] = field(default_factory=list)
running: List[uuid.UUID] = field(default_factory=list)
class GCResult(RemoteResult):
logs: List[LogMessage] = field(default_factory=list)
deleted: List[TaskID] = field(default_factory=list)
missing: List[TaskID] = field(default_factory=list)
running: List[TaskID] = field(default_factory=list)

def add_result(self, task_id: uuid.UUID, status: GCResultState):
def add_result(self, task_id: TaskID, status: GCResultState):
if status == GCResultState.Missing:
self.missing.append(task_id)
elif status == GCResultState.Running:
Expand All @@ -150,18 +202,6 @@ def add_result(self, task_id: uuid.UUID, status: GCResultState):
f'Got invalid status in add_result: {status}'
)


@dataclass
class GCSettings(JsonSchemaMixin):
# start evicting the longest-ago-ended tasks here
maxsize: int
# start evicting all tasks before now - auto_reap_age when we have this
# many tasks in the table
reapsize: int
# a positive timedelta indicating how far back we should go
auto_reap_age: timedelta


# Task management types


Expand Down Expand Up @@ -220,7 +260,7 @@ def finished(self) -> bool:

@dataclass
class TaskRow(JsonSchemaMixin):
task_id: uuid.UUID
task_id: TaskID
request_id: Union[str, int]
request_source: str
method: str
Expand All @@ -233,7 +273,7 @@ class TaskRow(JsonSchemaMixin):


@dataclass
class PSResult(JsonSchemaMixin):
class PSResult(RemoteResult):
rows: List[TaskRow]


Expand All @@ -245,8 +285,9 @@ class KillResultStatus(StrEnum):


@dataclass
class KillResult(JsonSchemaMixin):
status: KillResultStatus
class KillResult(RemoteResult):
status: KillResultStatus = KillResultStatus.Missing
logs: List[LogMessage] = field(default_factory=list)


# this is kind of carefuly structured: BlocksManifestTasks is implied by
Expand All @@ -256,13 +297,14 @@ class RemoteMethodFlags(enum.Flag):
BlocksManifestTasks = 1
RequiresConfigReloadBefore = 3
RequiresManifestReloadAfter = 5
Builtin = 8


# Polling types


@dataclass
class PollResult(JsonSchemaMixin):
class PollResult(RemoteResult):
tags: TaskTags = None
status: TaskHandlerState = TaskHandlerState.NotStarted

Expand Down Expand Up @@ -416,9 +458,9 @@ class ManifestStatus(StrEnum):


@dataclass
class LastParse(JsonSchemaMixin):
status: ManifestStatus
class LastParse(RemoteResult):
status: ManifestStatus = ManifestStatus.Init
logs: List[LogMessage] = field(default_factory=list)
error: Optional[Dict[str, Any]] = None
logs: Optional[List[Dict[str, Any]]] = None
timestamp: datetime = field(default_factory=datetime.utcnow)
pid: int = field(default_factory=os.getpid)
227 changes: 227 additions & 0 deletions core/dbt/rpc/builtins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import os
import signal
from datetime import datetime
from typing import Type, Union, Any, List

import dbt.exceptions
from dbt.contracts.rpc import (
TaskTags,
StatusParameters,
LastParse,
GCParameters,
GCResult,
KillParameters,
KillResult,
KillResultStatus,
PSParameters,
TaskRow,
PSResult,
RemoteExecutionResult,
RemoteRunResult,
RemoteCompileResult,
RemoteCatalogResults,
RemoteEmptyResult,
PollParameters,
PollResult,
PollInProgressResult,
PollKilledResult,
PollExecuteCompleteResult,
PollRunCompleteResult,
PollCompileCompleteResult,
PollCatalogCompleteResult,
PollRemoteEmptyCompleteResult,
TaskHandlerState,
)
from dbt.logger import LogMessage
from dbt.rpc.error import dbt_error, RPCException
from dbt.rpc.method import RemoteBuiltinMethod
from dbt.rpc.task_handler import RequestTaskHandler


class GC(RemoteBuiltinMethod[GCParameters, GCResult]):
METHOD_NAME = 'gc'

def set_args(self, params: GCParameters):
super().set_args(params)

def handle_request(self) -> GCResult:
if self.params is None:
raise dbt.exceptions.InternalException('GC: params not set')
return self.task_manager.gc_safe(
task_ids=self.params.task_ids,
before=self.params.before,
settings=self.params.settings,
)


class Kill(RemoteBuiltinMethod[KillParameters, KillResult]):
METHOD_NAME = 'kill'

def set_args(self, params: KillParameters):
super().set_args(params)

def handle_request(self) -> KillResult:
if self.params is None:
raise dbt.exceptions.InternalException('Kill: params not set')
result = KillResult()
task: RequestTaskHandler
try:
task = self.task_manager.get_request(self.params.task_id)
except dbt.exceptions.UnknownAsyncIDException:
# nothing to do!
return result

result.status = KillResultStatus.NotStarted

if task.process is None:
return result
pid = task.process.pid
if pid is None:
return result

if task.process.is_alive():
result.status = KillResultStatus.Killed
task.ended = datetime.utcnow()
os.kill(pid, signal.SIGINT)
task.state = TaskHandlerState.Killed
else:
result.status = KillResultStatus.Finished
# the status must be "Completed"

return result


class Status(RemoteBuiltinMethod[StatusParameters, LastParse]):
METHOD_NAME = 'status'

def set_args(self, params: StatusParameters):
super().set_args(params)

def handle_request(self) -> LastParse:
return self.task_manager.last_parse


class PS(RemoteBuiltinMethod[PSParameters, PSResult]):
METHOD_NAME = 'ps'

def set_args(self, params: PSParameters):
super().set_args(params)

def keep(self, row: TaskRow):
if self.params is None:
raise dbt.exceptions.InternalException('PS: params not set')
if row.state.finished and self.params.completed:
return True
elif not row.state.finished and self.params.active:
return True
else:
return False

def handle_request(self) -> PSResult:
rows = [
row for row in self.task_manager.task_table() if self.keep(row)
]
rows.sort(key=lambda r: (r.state, r.start, r.method))
result = PSResult(rows=rows, logs=[])
return result


def poll_complete(
status: TaskHandlerState, result: Any, tags: TaskTags
) -> PollResult:
if status not in (TaskHandlerState.Success, TaskHandlerState.Failed):
raise dbt.exceptions.InternalException(
'got invalid result status in poll_complete: {}'.format(status)
)

cls: Type[Union[
PollExecuteCompleteResult,
PollRunCompleteResult,
PollCompileCompleteResult,
PollCatalogCompleteResult,
PollRemoteEmptyCompleteResult,
]]

if isinstance(result, RemoteExecutionResult):
cls = PollExecuteCompleteResult
# order matters here, as RemoteRunResult subclasses RemoteCompileResult
elif isinstance(result, RemoteRunResult):
cls = PollRunCompleteResult
elif isinstance(result, RemoteCompileResult):
cls = PollCompileCompleteResult
elif isinstance(result, RemoteCatalogResults):
cls = PollCatalogCompleteResult
elif isinstance(result, RemoteEmptyResult):
cls = PollRemoteEmptyCompleteResult
else:
raise dbt.exceptions.InternalException(
'got invalid result in poll_complete: {}'.format(result)
)
return cls.from_result(status, result, tags)


class Poll(RemoteBuiltinMethod[PollParameters, PollResult]):
METHOD_NAME = 'poll'

def set_args(self, params: PollParameters):
super().set_args(params)

def handle_request(self) -> PollResult:
if self.params is None:
raise dbt.exceptions.InternalException('Poll: params not set')
task_id = self.params.request_token
task = self.task_manager.get_request(task_id)

task_logs: List[LogMessage] = []
if self.params.logs:
task_logs = task.logs[self.params.logs_start:]

# Get a state and store it locally so we ignore updates to state,
# otherwise things will get confusing. States should always be
# "forward-compatible" so if the state has transitioned to error/result
# but we aren't there yet, the logs will still be valid.
state = task.state
if state <= TaskHandlerState.Running:
return PollInProgressResult(
status=state,
tags=task.tags,
logs=task_logs,
)
elif state == TaskHandlerState.Error:
err = task.error
if err is None:
exc = dbt.exceptions.InternalException(
f'At end of task {task_id}, error state but error is None'
)
raise RPCException.from_error(
dbt_error(exc, logs=[l.to_dict() for l in task_logs])
)
# the exception has logs already attached from the child, don't
# overwrite those
raise err
elif state in (TaskHandlerState.Success, TaskHandlerState.Failed):

if task.result is None:
exc = dbt.exceptions.InternalException(
f'At end of task {task_id}, state={state} but result is '
'None'
)
raise RPCException.from_error(
dbt_error(exc, logs=[l.to_dict() for l in task_logs])
)
return poll_complete(
status=state,
result=task.result,
tags=task.tags,
)
elif state == TaskHandlerState.Killed:
return PollKilledResult(
status=state, tags=task.tags, logs=task_logs
)
else:
exc = dbt.exceptions.InternalException(
f'Got unknown value state={state} for task {task_id}'
)
raise RPCException.from_error(
dbt_error(exc, logs=[l.to_dict() for l in task_logs])
)
Loading