Skip to content

Commit

Permalink
ENG-382 remove billing worker (#2191)
Browse files Browse the repository at this point in the history
  • Loading branch information
zubenkoivan authored Oct 3, 2024
1 parent 1b110b8 commit afe8ad0
Show file tree
Hide file tree
Showing 21 changed files with 100 additions and 1,614 deletions.
29 changes: 29 additions & 0 deletions alembic/versions/13c284f200c9_remove_jobs_fully_billed_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""remove jobs_fully_billed_index
Revision ID: 13c284f200c9
Revises: 631e12f654a4
Create Date: 2024-10-03 09:52:28.579238
"""

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision = "13c284f200c9"
down_revision = "631e12f654a4"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.drop_index("jobs_fully_billed_index", table_name="jobs")


def downgrade() -> None:
op.create_index(
"jobs_fully_billed_index",
"jobs",
[sa.text("(((payload ->> 'fully_billed'::text))::boolean)")],
)
43 changes: 43 additions & 0 deletions alembic/versions/fc114b57a56c_remove_billing_log_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""remove billing_log table
Revision ID: fc114b57a56c
Revises: 13c284f200c9
Create Date: 2024-10-03 10:35:09.025409
"""

import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as sapg

from alembic import op

# revision identifiers, used by Alembic.
revision = "fc114b57a56c"
down_revision = "13c284f200c9"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.drop_table("billing_log")
op.drop_table("sync_record")


def downgrade() -> None:
op.create_table(
"billing_log",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("job_id", sa.String(), nullable=False),
# All other fields
sa.Column("payload", sapg.JSONB(), nullable=False),
)
op.create_table(
"sync_record",
sa.Column("type", sa.String(), primary_key=True),
sa.Column("last_entry_id", sa.Integer()),
)
op.create_index(
"billing_log_job_index",
"billing_log",
["job_id"],
)
37 changes: 0 additions & 37 deletions platform_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from neuro_notifications_client import Client as NotificationsClient

from platform_api.orchestrator.job_policy_enforcer import (
BillingEnforcer,
CreditsLimitEnforcer,
CreditsNotificationsEnforcer,
JobPolicyEnforcePoller,
Expand All @@ -40,8 +39,6 @@
from .config_client import ConfigClient
from .config_factory import EnvironConfigFactory
from .handlers import JobsHandler
from .orchestrator.billing_log.service import BillingLogService, BillingLogWorker
from .orchestrator.billing_log.storage import PostgresBillingLogStorage
from .orchestrator.job_request import JobError, JobException
from .orchestrator.jobs_service import (
JobsService,
Expand Down Expand Up @@ -477,46 +474,12 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:

logger.info("Initializing JobPolicyEnforcePoller")

logger.info("Initializing BillingLogStorage")
billing_log_storage = PostgresBillingLogStorage(engine)

billing_log_new_entry_notifier = ResubscribingNotifier(
PostgresChannelNotifier(engine, "billing_log_new_entry"),
check_interval=15,
)

billing_log_entry_done_notifier = ResubscribingNotifier(
PostgresChannelNotifier(engine, "billing_log_entry_done_notifier"),
check_interval=15,
)

logger.info("Initializing BillingLogService")
billing_log_service = await exit_stack.enter_async_context(
BillingLogService(
storage=billing_log_storage,
new_entry=billing_log_new_entry_notifier,
entry_done=billing_log_entry_done_notifier,
)
)

logger.info("Initializing BillingLogWorker")
await exit_stack.enter_async_context(
BillingLogWorker(
storage=billing_log_storage,
new_entry=billing_log_new_entry_notifier,
entry_done=billing_log_entry_done_notifier,
admin_client=admin_client,
jobs_service=jobs_service,
)
)

await exit_stack.enter_async_context(
JobPolicyEnforcePoller(
config.job_policy_enforcer,
enforcers=[
RuntimeLimitEnforcer(jobs_service),
CreditsLimitEnforcer(jobs_service, admin_client),
BillingEnforcer(jobs_service, billing_log_service),
CreditsNotificationsEnforcer(
jobs_service=jobs_service,
admin_client=admin_client,
Expand Down
2 changes: 1 addition & 1 deletion platform_api/handlers/jobs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def convert_job_to_job_response(job: Job) -> dict[str, Any]:
"materialized": job.materialized,
"being_dropped": job.being_dropped,
"logs_removed": job.logs_removed,
"total_price_credits": str(job.total_price_credits),
"total_price_credits": str(job.get_total_price_credits()),
"price_credits_per_hour": str(job.price_credits_per_hour),
"priority": job.priority.to_name(),
}
Expand Down
Empty file.
219 changes: 0 additions & 219 deletions platform_api/orchestrator/billing_log/service.py

This file was deleted.

Loading

0 comments on commit afe8ad0

Please sign in to comment.