Skip to content

Commit

Permalink
work on multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
sondreso committed Sep 6, 2024
1 parent 5513112 commit 7fde666
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions src/ert/experiment_server/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import queue
import asyncio
import multiprocessing as mp
import uuid
from multiprocessing.queues import Queue

from fastapi import BackgroundTasks, FastAPI, HTTPException, WebSocket
from pydantic import BaseModel, Field
import json
import dataclasses
import asyncio

from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.run_models.model_factory import create_model
Expand All @@ -19,7 +19,6 @@
from ert.storage import open_storage

from typing import Dict, Union, Tuple
import uuid

from ert.config import ErtConfig, QueueSystem
from fastapi.encoders import jsonable_encoder
Expand All @@ -28,22 +27,26 @@ class Experiment(BaseModel):
args: Union[EnsembleExperimentArguments, EnsembleSmootherArguments, EvaluateEnsembleArguments, IteratedEnsembleSmootherArguments, ManualUpdateArguments, MultipleDataAssimilationArguments, SingleTestRunArguments] = Field(..., discriminator='mode')
ert_config: ErtConfig


mp_ctx = mp.get_context('forkserver')
app = FastAPI()


@app.get("/")
async def root():
return {"message": "ping"}

experiments : Dict[str, Tuple[BaseRunModel, queue.SimpleQueue]]= {}
experiments : Dict[str, Tuple[BaseRunModel, "Queue[StatusEvents]"]]= {}

def run_experiment(experiment_id:str, evaluator_server_config: EvaluatorServerConfig):
experiments[experiment_id][0].start_simulations_thread(evaluator_server_config=evaluator_server_config)
p = mp_ctx.Process(target=experiments[experiment_id][0].start_simulations_thread, args=(evaluator_server_config,))
p.start()
p.join()

@app.post("/experiments/")
async def submit_experiment(experiment: Experiment, background_tasks: BackgroundTasks):
storage = open_storage(experiment.ert_config.ens_path, "w")
status_queue: queue.SimpleQueue[StatusEvents] = queue.SimpleQueue()
status_queue: "Queue[StatusEvents]" = mp_ctx.Queue()
try:
model = create_model(
experiment.ert_config,
Expand Down Expand Up @@ -74,6 +77,8 @@ async def cancel_experiment(experiment_id: str):

@app.websocket("/experiments/{experiment_id}/events")
async def websocket_endpoint(websocket: WebSocket, experiment_id: str):
if experiment_id not in experiments:
return
await websocket.accept()
print(experiment_id)
print(experiments)
Expand Down

0 comments on commit 7fde666

Please sign in to comment.