Skip to content

Commit

Permalink
linting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dungnmaster committed Nov 3, 2023
1 parent 06050d5 commit 04fab31
Show file tree
Hide file tree
Showing 18 changed files with 183 additions and 109 deletions.
2 changes: 1 addition & 1 deletion evadb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
EvaDBConnection,
EvaDBCursor,
connect,
connect_remote
connect_remote,
)
from evadb.interfaces.relational.relation import EvaDBQuery # noqa: E402,F401

Expand Down
13 changes: 8 additions & 5 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ def check_native_table_exists(self, table_name: str, database_name: str):

return True


"Job catalog services"

def insert_job_catalog_entry(
Expand All @@ -230,7 +229,7 @@ def insert_job_catalog_entry(
end_time: datetime,
repeat_interval: int,
active: bool,
next_schedule_run: datetime
next_schedule_run: datetime,
) -> JobCatalogEntry:
"""A new entry is persisted in the job catalog."
Expand All @@ -250,7 +249,7 @@ def insert_job_catalog_entry(
end_time,
repeat_interval,
active,
next_schedule_run
next_schedule_run,
)

return job_entry
Expand Down Expand Up @@ -291,7 +290,9 @@ def get_next_executable_job(self, only_past_jobs: bool = False) -> JobCatalogEnt
"""
return self._job_catalog_service.get_next_executable_job(only_past_jobs)

def update_job_catalog_entry(self, job_name: str, next_scheduled_run: datetime, active: bool):
def update_job_catalog_entry(
self, job_name: str, next_scheduled_run: datetime, active: bool
):
"""Update the next_scheduled_run and active column as per the provided values
Arguments:
job_name (str): job which should be updated
Expand All @@ -300,7 +301,9 @@ def update_job_catalog_entry(self, job_name: str, next_scheduled_run: datetime,
active (bool): the active status for the job
"""
self._job_catalog_service.update_next_scheduled_run(job_name, next_scheduled_run, active)
self._job_catalog_service.update_next_scheduled_run(
job_name, next_scheduled_run, active
)

"Table catalog services"

Expand Down
21 changes: 13 additions & 8 deletions evadb/catalog/models/job_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import datetime
import json

from sqlalchemy import Column, Boolean, DateTime, Index, Integer, String
from sqlalchemy import Boolean, Column, DateTime, Index, Integer, String

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import JobCatalogEntry
Expand All @@ -41,26 +41,31 @@ class JobCatalog(BaseModel):

_name = Column("name", String(100), unique=True)
_queries = Column("queries", String, nullable=False)
_start_time = Column("start_time", DateTime, default=datetime.datetime.now)
_end_time= Column("end_ts", DateTime)
_start_time = Column("start_time", DateTime, default=datetime.datetime.now)
_end_time = Column("end_ts", DateTime)
_repeat_interval = Column("repeat_interval", Integer)
_active = Column("active", Boolean, default=True)
_next_scheduled_run = Column("next_scheduled_run", DateTime)

_created_at = Column("created_at", DateTime, default=datetime.datetime.now)
_updated_at = Column("updated_at", DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now)
_updated_at = Column(
"updated_at",
DateTime,
default=datetime.datetime.now,
onupdate=datetime.datetime.now,
)

_next_run_index = Index('_next_run_index', _next_scheduled_run)
_next_run_index = Index("_next_run_index", _next_scheduled_run)

def __init__(
self,
self,
name: str,
queries: str,
start_time: datetime,
end_time: datetime,
repeat_interval: Integer,
active: bool,
next_schedule_run: datetime
next_schedule_run: datetime,
):
self._name = name
self._queries = queries
Expand Down
14 changes: 14 additions & 0 deletions evadb/catalog/models/job_history_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
31 changes: 19 additions & 12 deletions evadb/catalog/services/job_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import datetime
import json

from sqlalchemy import and_, asc, true
from sqlalchemy import and_, true
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select

Expand Down Expand Up @@ -58,7 +58,7 @@ def insert_entry(
f"Failed to insert entry into job catalog with exception {str(e)}"
)
raise CatalogError(e)

return job_catalog_obj.as_dataclass()

def get_entry_by_name(self, job_name: str) -> JobCatalogEntry:
Expand Down Expand Up @@ -90,9 +90,7 @@ def delete_entry(self, job_entry: JobCatalogEntry):
job_catalog_obj.delete(self.session)
return True
except Exception as e:
err_msg = (
f"Delete Job failed for {job_entry} with error {str(e)}."
)
err_msg = f"Delete Job failed for {job_entry} with error {str(e)}."
logger.exception(err_msg)
raise CatalogError(err_msg)

Expand All @@ -105,7 +103,10 @@ def get_all_overdue_jobs(self) -> list:
"""
entries = self.session.execute(
select(self.model).filter(
and_(self.model._next_scheduled_run <= datetime.datetime.now(), self.model._active == true())
and_(
self.model._next_scheduled_run <= datetime.datetime.now(),
self.model._active == true(),
)
)
).all()
entry = [row.as_dataclass() for row in entries]
Expand All @@ -124,8 +125,10 @@ def get_next_executable_job(self, only_past_jobs: bool) -> JobCatalogEntry:
.filter(
and_(
self.model._next_scheduled_run <= datetime.datetime.now(),
self.model._active == true()
) if only_past_jobs else self.model._active == true()
self.model._active == true(),
)
if only_past_jobs
else self.model._active == true()
)
.order_by(self.model._next_scheduled_run.asc())
.limit(1)
Expand All @@ -134,7 +137,9 @@ def get_next_executable_job(self, only_past_jobs: bool) -> JobCatalogEntry:
return entry.as_dataclass()
return entry

def update_next_scheduled_run(self, job_name: str, next_scheduled_run : datetime, active: bool):
def update_next_scheduled_run(
self, job_name: str, next_scheduled_run: datetime, active: bool
):
"""Update the next_scheduled_run and active column as per the provided values
Arguments:
job_name (str): job which should be updated
Expand All @@ -145,8 +150,10 @@ def update_next_scheduled_run(self, job_name: str, next_scheduled_run : datetime
Returns:
void
"""
job = self.session.query(self.model).filter(self.model._name == job_name).first()
job = (
self.session.query(self.model).filter(self.model._name == job_name).first()
)
if job:
job._next_scheduled_run = next_scheduled_run
job._active = active
self.session.commit()
self.session.commit()
55 changes: 34 additions & 21 deletions evadb/executor/create_job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
import re

from datetime import datetime

import pandas as pd

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import ExecutorError
from evadb.models.storage.batch import Batch
from evadb.parser.create_statement import CreateJobStatement
from evadb.third_party.databases.interface import get_database_handler
from evadb.utils.logging_manager import logger


Expand All @@ -33,25 +32,29 @@ def __init__(self, db: EvaDBDatabase, node: CreateJobStatement):
def _parse_datetime_str(self, datetime_str: str) -> datetime:
datetime_format = "%Y-%m-%d %H:%M:%S"
date_format = "%Y-%m-%d"
if re.match(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', datetime_str):

if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", datetime_str):
try:
return datetime.strptime(datetime_str, datetime_format)
except ValueError:
raise ExecutorError(
f"{datetime_str} is not in the correct datetime format. expected format: {datetime_format}."
)
elif re.match(r'\d{4}-\d{2}-\d{2}', datetime_str):
elif re.match(r"\d{4}-\d{2}-\d{2}", datetime_str):
try:
return datetime.strptime(datetime_str, date_format)
except ValueError:
raise ExecutorError(
f"{datetime_str} is not in the correct date format. expected format: {date_format}."
)
else:
raise ValueError(f"{datetime_str} does not match the expected date or datetime format")
raise ValueError(
f"{datetime_str} does not match the expected date or datetime format"
)

def _get_repeat_time_interval_seconds(self, repeat_interval: int, repeat_period: str) -> int:
def _get_repeat_time_interval_seconds(
self, repeat_interval: int, repeat_period: str
) -> int:
unit_to_seconds = {
"second": 1,
"minute": 60,
Expand All @@ -66,32 +69,44 @@ def _get_repeat_time_interval_seconds(self, repeat_interval: int, repeat_period:
"month": 2592000,
"months": 2592000,
}
assert (repeat_period is None) or (repeat_period in unit_to_seconds), "repeat period should be one of these values: minute | minutes | min | hour | hours | day | days | week | weeks | month | months"
assert (repeat_period is None) or (
repeat_period in unit_to_seconds
), "repeat period should be one of these values: minute | minutes | min | hour | hours | day | days | week | weeks | month | months"

repeat_interval = 1 if repeat_interval is None else repeat_interval
return repeat_interval * unit_to_seconds.get(repeat_period, 0)

def exec(self, *args, **kwargs):
# Check if the job already exists.
job_catalog_entry = self.catalog().get_job_catalog_entry(
self.node.job_name
)
job_catalog_entry = self.catalog().get_job_catalog_entry(self.node.job_name)

if job_catalog_entry is not None:
if self.node.if_not_exists:
msg = f"A job with name {self.node.job_name} already exists, nothing added."
yield Batch(pd.DataFrame([msg]))
return
else:
raise ExecutorError(f"A job with name {self.node.job_name} already exists.")
raise ExecutorError(
f"A job with name {self.node.job_name} already exists."
)

logger.debug(f"Creating job {self.node}")

job_name = self.node.job_name
queries = [str(q) for q in self.node.queries]
start_time = self._parse_datetime_str(self.node.start_time) if self.node.start_time is not None else datetime.datetime.now()
end_time = self._parse_datetime_str(self.node.end_time) if self.node.end_time is not None else None
repeat_interval = self._get_repeat_time_interval_seconds(self.node.repeat_interval, self.node.repeat_period)
start_time = (
self._parse_datetime_str(self.node.start_time)
if self.node.start_time is not None
else datetime.datetime.now()
)
end_time = (
self._parse_datetime_str(self.node.end_time)
if self.node.end_time is not None
else None
)
repeat_interval = self._get_repeat_time_interval_seconds(
self.node.repeat_interval, self.node.repeat_period
)
active = True
next_schedule_run = start_time

Expand All @@ -102,13 +117,11 @@ def exec(self, *args, **kwargs):
end_time,
repeat_interval,
active,
next_schedule_run
next_schedule_run,
)

yield Batch(
pd.DataFrame(
[
f"The job {self.node.job_name} has been successfully created."
]
[f"The job {self.node.job_name} has been successfully created."]
)
)
6 changes: 2 additions & 4 deletions evadb/executor/drop_object_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ def _handle_drop_database(self, database_name: str, if_exists: bool):
def _handle_drop_job(self, job_name: str, if_exists: bool):
job_catalog_entry = self.catalog().get_job_catalog_entry(job_name)
if not job_catalog_entry:
err_msg = (
f"Job {job_name} does not exist, therefore cannot be dropped."
)
err_msg = f"Job {job_name} does not exist, therefore cannot be dropped."
if if_exists:
logger.warning(err_msg)
return Batch(pd.DataFrame([err_msg]))
Expand All @@ -185,4 +183,4 @@ def _handle_drop_job(self, job_name: str, if_exists: bool):
{f"Job {job_name} successfully dropped"},
index=[0],
)
)
)
4 changes: 3 additions & 1 deletion evadb/interfaces/relational/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
import asyncio
import multiprocessing

import pandas

from evadb.configuration.constants import EvaDB_DATABASE_DIR
Expand Down Expand Up @@ -43,8 +44,8 @@
)
from evadb.server.command_handler import execute_statement
from evadb.utils.generic_utils import find_nearest_word, is_ray_enabled_and_installed
from evadb.utils.logging_manager import logger
from evadb.utils.job_scheduler import JobScheduler
from evadb.utils.logging_manager import logger


class EvaDBConnection:
Expand Down Expand Up @@ -99,6 +100,7 @@ def stop_jobs(self):
self._jobs_process.join()
logger.debug("Job scheduler process stopped")


class EvaDBCursor(object):
def __init__(self, connection):
self._connection = connection
Expand Down
2 changes: 1 addition & 1 deletion evadb/parser/lark_visitor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from evadb.parser.lark_visitor._create_statements import (
CreateDatabase,
CreateIndex,
CreateTable,
CreateJob,
CreateTable,
)
from evadb.parser.lark_visitor._delete_statement import Delete
from evadb.parser.lark_visitor._drop_statement import DropObject
Expand Down
Loading

0 comments on commit 04fab31

Please sign in to comment.