Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job scheduler implementation #1308

Merged
merged 8 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import shutil
from pathlib import Path
from typing import Any, List
Expand Down Expand Up @@ -39,6 +40,8 @@
FunctionIOCatalogEntry,
FunctionMetadataCatalogEntry,
IndexCatalogEntry,
JobCatalogEntry,
JobHistoryCatalogEntry,
TableCatalogEntry,
drop_all_tables_except_catalog,
init_db,
Expand All @@ -61,6 +64,8 @@
FunctionMetadataCatalogService,
)
from evadb.catalog.services.index_catalog_service import IndexCatalogService
from evadb.catalog.services.job_catalog_service import JobCatalogService
from evadb.catalog.services.job_history_catalog_service import JobHistoryCatalogService
from evadb.catalog.services.table_catalog_service import TableCatalogService
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
from evadb.expression.function_expression import FunctionExpression
Expand All @@ -85,6 +90,10 @@ def __init__(self, db_uri: str):
self._config_catalog_service = ConfigurationCatalogService(
self._sql_config.session
)
self._job_catalog_service = JobCatalogService(self._sql_config.session)
self._job_history_catalog_service = JobHistoryCatalogService(
self._sql_config.session
)
self._table_catalog_service = TableCatalogService(self._sql_config.session)
self._column_service = ColumnCatalogService(self._sql_config.session)
self._function_service = FunctionCatalogService(self._sql_config.session)
Expand Down Expand Up @@ -215,6 +224,137 @@ def check_native_table_exists(self, table_name: str, database_name: str):

return True

"Job catalog services"

def insert_job_catalog_entry(
self,
name: str,
queries: str,
start_time: datetime,
end_time: datetime,
repeat_interval: int,
active: bool,
next_schedule_run: datetime,
) -> JobCatalogEntry:
"""A new entry is persisted in the job catalog.

Args:
name: job name
queries: job's queries
start_time: job start time
end_time: job end time
repeat_interval: job repeat interval
active: job status
next_schedule_run: next run time as per schedule
"""
job_entry = self._job_catalog_service.insert_entry(
name,
queries,
start_time,
end_time,
repeat_interval,
active,
next_schedule_run,
)

return job_entry

def get_job_catalog_entry(self, job_name: str) -> JobCatalogEntry:
"""
Returns the job catalog entry for the given database_name
Arguments:
job_name (str): name of the job

Returns:
JobCatalogEntry
"""

table_entry = self._job_catalog_service.get_entry_by_name(job_name)

return table_entry

def drop_job_catalog_entry(self, job_entry: JobCatalogEntry) -> bool:
"""
This method deletes the job from catalog.

Arguments:
job_entry: job catalog entry to remove

Returns:
True if successfully deleted else False
"""
return self._job_catalog_service.delete_entry(job_entry)

def get_next_executable_job(self, only_past_jobs: bool = False) -> JobCatalogEntry:
"""Get the oldest job that is ready to be triggered by trigger time
Arguments:
only_past_jobs: boolean flag to denote if only jobs with trigger time in
past should be considered
Returns:
Returns the first job to be triggered
"""
return self._job_catalog_service.get_next_executable_job(only_past_jobs)

def update_job_catalog_entry(
self, job_name: str, next_scheduled_run: datetime, active: bool
):
"""Update the next_scheduled_run and active column as per the provided values
Arguments:
job_name (str): job which should be updated

next_run_time (datetime): the next trigger time for the job

active (bool): the active status for the job
"""
self._job_catalog_service.update_next_scheduled_run(
job_name, next_scheduled_run, active
)

"Job history catalog services"

def insert_job_history_catalog_entry(
self,
job_id: str,
job_name: str,
execution_start_time: datetime,
execution_end_time: datetime,
) -> JobCatalogEntry:
"""A new entry is persisted in the job history catalog.

Args:
job_id: job id for the execution entry
job_name: job name for the execution entry
execution_start_time: job execution start time
execution_end_time: job execution end time
"""
job_history_entry = self._job_history_catalog_service.insert_entry(
job_id, job_name, execution_start_time, execution_end_time
)

return job_history_entry

def get_job_history_by_job_id(self, job_id: int) -> list[JobHistoryCatalogEntry]:
"""Returns all the entries present for this job_id on in the history.

Args:
job_id: the id of job whose history should be fetched
"""
return self._job_history_catalog_service.get_entry_by_job_id(job_id)

def update_job_history_end_time(
self, job_id: int, execution_start_time: datetime, execution_end_time: datetime
) -> list[JobHistoryCatalogEntry]:
"""Updates the execution_end_time for this job history matching job_id and execution_start_time.

Args:
job_id: id of the job whose history entry which should be updated
execution_start_time: the start time for the job history entry
execution_end_time: the end time for the job history entry
"""
return self._job_history_catalog_service.update_entry_end_time(
job_id, execution_start_time, execution_end_time
)

"Table catalog services"

def insert_table_catalog_entry(
Expand Down
92 changes: 92 additions & 0 deletions evadb/catalog/models/job_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json

from sqlalchemy import Boolean, Column, DateTime, Index, Integer, String
from sqlalchemy.orm import relationship

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import JobCatalogEntry


class JobCatalog(BaseModel):
"""The `JobCatalog` catalog stores information about all the created Jobs.
`_row_id:` an autogenerated unique identifier.
`_name:` the job name.
`_queries:` the queries to run as part of this job
`_start_time:` the job's start time
`_end_time:` the job's end time
`_repeat_interval:` the job's repeat interval
`_repeat_period:` the job's repeat period
`_active:` is the job active/deleted
`_next_scheduled_run:` the next trigger time for the job as per the schedule
`_created_at:` entry creation time
`_updated_at:` entry last update time
"""

__tablename__ = "job_catalog"

_name = Column("name", String(100), unique=True)
_queries = Column("queries", String, nullable=False)
_start_time = Column("start_time", DateTime, default=datetime.datetime.now)
_end_time = Column("end_ts", DateTime)
_repeat_interval = Column("repeat_interval", Integer)
_active = Column("active", Boolean, default=True)
_next_scheduled_run = Column("next_scheduled_run", DateTime)

_created_at = Column("created_at", DateTime, default=datetime.datetime.now)
_updated_at = Column(
"updated_at",
DateTime,
default=datetime.datetime.now,
onupdate=datetime.datetime.now,
)

_next_run_index = Index("_next_run_index", _next_scheduled_run)
_job_history_catalog = relationship("JobHistoryCatalog", cascade="all, delete")

def __init__(
self,
name: str,
queries: str,
start_time: datetime,
end_time: datetime,
repeat_interval: Integer,
active: bool,
next_schedule_run: datetime,
):
self._name = name
self._queries = queries
self._start_time = start_time
self._end_time = end_time
self._repeat_interval = repeat_interval
self._active = active
self._next_scheduled_run = next_schedule_run

def as_dataclass(self) -> "JobCatalogEntry":
return JobCatalogEntry(
row_id=self._row_id,
name=self._name,
queries=json.loads(self._queries),
start_time=self._start_time,
end_time=self._end_time,
repeat_interval=self._repeat_interval,
active=self._active,
next_scheduled_run=self._next_scheduled_run,
created_at=self._created_at,
updated_at=self._updated_at,
)
73 changes: 73 additions & 0 deletions evadb/catalog/models/job_history_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime

from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, UniqueConstraint

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import JobHistoryCatalogEntry


class JobHistoryCatalog(BaseModel):
"""The `JobHistoryCatalog` stores the execution history of jobs .
`_row_id:` an autogenerated unique identifier.
`_job_id:` job id.
`_job_name:` job name.
`_execution_start_time:` start time of this run
`_execution_end_time:` end time for this run
`_created_at:` entry creation time
`_updated_at:` entry last update time
"""

__tablename__ = "job_history_catalog"

_job_id = Column(
"job_id", Integer, ForeignKey("job_catalog._row_id", ondelete="CASCADE")
)
_job_name = Column("job_name", String(100))
_execution_start_time = Column("execution_start_time", DateTime)
_execution_end_time = Column("execution_end_time", DateTime)
_created_at = Column("created_at", DateTime, default=datetime.datetime.now)
_updated_at = Column(
"updated_at",
DateTime,
default=datetime.datetime.now,
onupdate=datetime.datetime.now,
)

__table_args__ = (UniqueConstraint("job_id", "execution_start_time"), {})

def __init__(
self,
job_id: int,
job_name: str,
execution_start_time: datetime,
execution_end_time: datetime,
):
self._job_id = job_id
self._job_name = job_name
self._execution_start_time = execution_start_time
self._execution_end_time = execution_end_time

def as_dataclass(self) -> "JobHistoryCatalogEntry":
return JobHistoryCatalogEntry(
row_id=self._row_id,
job_id=self._job_id,
job_name=self._job_name,
execution_start_time=self._execution_start_time,
execution_end_time=self._execution_end_time,
created_at=self._created_at,
updated_at=self._updated_at,
)
Loading