Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions HOWTO.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ or



## Access MinIO console (object storage)

Run
```bash
kubectl port-forward svc/minio 9001:9001 -n minio
```

Then open http://localhost:9001

user: minio_user
password: minio_password

## Troubleshooting

Access the pgsql via local db client
Expand Down
6 changes: 6 additions & 0 deletions backend/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from backend.api import (
auth_routes,
batch_routes,
compliance_report_routes,
demo_routes,
deployed_models_routes,
Expand All @@ -30,6 +31,8 @@
from backend.domain.use_cases.demo_usecases import SimulationManager
from backend.domain.use_cases.ds_simulation_usecases import DSSimulationManager
from backend.infrastructure.grafana_dashboard_adapter import GrafanaDashboardAdapter
from backend.infrastructure.k8s_batch_prediction_adapter import K8sBatchPredictionAdapter
from backend.infrastructure.minio_storage_adapter import MinioStorageAdapter
from backend.infrastructure.mlflow_handler_adapter import MLFlowHandlerAdapter
from backend.infrastructure.model_info_pgsql_db_handler import ModelInfoPostgresDBHandler
from backend.infrastructure.platform_config_pgsql_adapter import PlatformConfigPgsqlAdapter
Expand Down Expand Up @@ -61,7 +64,9 @@ async def lifespan(app: FastAPI):
app.state.model_info_db_handler = ModelInfoPostgresDBHandler(db_config=config.pgsql_db_config)
app.state.user_adapter = UserPgsqlDbAdapter(db_config=config.pgsql_db_config, admin_config=config.mp_admin_config)
app.state.platform_config_handler = PlatformConfigPgsqlAdapter(db_config=config.pgsql_db_config)
app.state.object_storage_handler = MinioStorageAdapter()
app.state.dashboard_handler = GrafanaDashboardAdapter()
app.state.batch_handler = K8sBatchPredictionAdapter()
app.state.simulation_manager = SimulationManager()
app.state.ds_simulation_manager = DSSimulationManager()
app.state.task_status = {}
Expand Down Expand Up @@ -103,6 +108,7 @@ def create_app() -> FastAPI:
app.include_router(model_infos_routes.router, prefix="/model_infos", tags=["Model Infos"])
app.include_router(llm_routes.router, prefix="/ai", tags=["AI Assist"])
app.include_router(compliance_report_routes.router, prefix="/compliance", tags=["Compliance Report"])
app.include_router(batch_routes.router, prefix="/{project_name}/batch", tags=["Batch Predictions"])
app.include_router(demo_routes.router, prefix="/demo", tags=["Demo Simulation"])
return app

Expand Down
246 changes: 246 additions & 0 deletions backend/api/batch_routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
# Philippe Stepniewski
import inspect
import uuid

from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, Request, UploadFile
from loguru import logger
from starlette.responses import JSONResponse, Response

from backend.domain.entities.batch_prediction import BatchPredictionStatus
from backend.domain.ports.batch_prediction_handler import BatchPredictionHandler
from backend.domain.ports.object_storage_handler import ObjectStorageHandler
from backend.domain.ports.project_db_handler import ProjectDbHandler
from backend.domain.ports.registry_handler import RegistryHandler
from backend.domain.ports.user_handler import UserHandler
from backend.domain.use_cases.auth_usecases import get_current_user, get_user_adapter
from backend.domain.use_cases.batch_predict import (
cleanup_batch_predictions,
delete_batch_prediction,
download_batch_result,
get_batch_prediction_status,
list_batch_predictions,
submit_batch_prediction,
)
from backend.domain.use_cases.user_usecases import user_can_perform_action_for_project
from backend.utils import sanitize_project_name

router = APIRouter()


def get_batch_handler(request: Request) -> BatchPredictionHandler:
return request.app.state.batch_handler


def get_project_db_handler(request: Request) -> ProjectDbHandler:
return request.app.state.project_db_handler


def get_object_storage_handler(request: Request) -> ObjectStorageHandler:
return request.app.state.object_storage_handler


def get_registry_pool(request: Request) -> RegistryHandler:
return request.app.state.registry_pool


def get_tasks_status(request: Request) -> dict:
return request.app.state.task_status


def _get_project_registry_tracking_uri(project_name: str) -> str:
sanitized = sanitize_project_name(project_name)
return f"http://{sanitized}.{sanitized}.svc.cluster.local:5000"


def _run_batch_submission(
tasks_status: dict,
job_id: str,
registry,
project_name: str,
model_name: str,
version: str,
file_content: bytes,
object_storage: ObjectStorageHandler,
batch_handler: BatchPredictionHandler,
project_db_handler: ProjectDbHandler,
):
try:
tasks_status[job_id] = BatchPredictionStatus.BUILDING.value
submit_batch_prediction(
project_name=project_name,
model_name=model_name,
version=version,
file_content=file_content,
job_id=job_id,
object_storage=object_storage,
batch_handler=batch_handler,
project_db_handler=project_db_handler,
registry=registry,
)
del tasks_status[job_id]
except Exception as e:
logger.error(f"Batch submission failed for job {job_id}: {e}")
tasks_status[job_id] = BatchPredictionStatus.FAILED.value


@router.post("/submit/{model_name}/{version}")
async def route_submit_batch(
project_name: str,
model_name: str,
version: str,
request: Request,
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
batch_handler: BatchPredictionHandler = Depends(get_batch_handler),
object_storage: ObjectStorageHandler = Depends(get_object_storage_handler),
project_db_handler: ProjectDbHandler = Depends(get_project_db_handler),
registry_pool: RegistryHandler = Depends(get_registry_pool),
tasks_status: dict = Depends(get_tasks_status),
user_adapter: UserHandler = Depends(get_user_adapter),
current_user: dict = Depends(get_current_user),
):
user_can_perform_action_for_project(
current_user,
project_name=project_name,
action_name=inspect.currentframe().f_code.co_name,
user_adapter=user_adapter,
)
file_content = await file.read()

registry = registry_pool.get_registry_adapter(project_name, _get_project_registry_tracking_uri(project_name))

job_id = str(uuid.uuid4())[:8]
tasks_status[job_id] = BatchPredictionStatus.BUILDING.value

background_tasks.add_task(
_run_batch_submission,
tasks_status,
job_id,
registry,
project_name,
model_name,
version,
file_content,
object_storage,
batch_handler,
project_db_handler,
)

return JSONResponse(
content={"job_id": job_id, "status": BatchPredictionStatus.BUILDING.value},
media_type="application/json",
)


@router.get("/status/{job_id}")
def route_batch_status(
project_name: str,
job_id: str,
batch_handler: BatchPredictionHandler = Depends(get_batch_handler),
tasks_status: dict = Depends(get_tasks_status),
user_adapter: UserHandler = Depends(get_user_adapter),
current_user: dict = Depends(get_current_user),
):
user_can_perform_action_for_project(
current_user,
project_name=project_name,
action_name=inspect.currentframe().f_code.co_name,
user_adapter=user_adapter,
)
# Check if the job is still in the build phase (tracked in-memory)
if job_id in tasks_status:
status = tasks_status[job_id]
if status == BatchPredictionStatus.BUILDING.value:
return JSONResponse(
content={"job_id": job_id, "status": BatchPredictionStatus.BUILDING.value},
media_type="application/json",
)
if status == BatchPredictionStatus.FAILED.value:
return JSONResponse(
content={"job_id": job_id, "status": BatchPredictionStatus.FAILED.value},
media_type="application/json",
)

result = get_batch_prediction_status(project_name, job_id, batch_handler)
return JSONResponse(content=result, media_type="application/json")


@router.get("/list")
def route_list_batch(
project_name: str,
batch_handler: BatchPredictionHandler = Depends(get_batch_handler),
user_adapter: UserHandler = Depends(get_user_adapter),
current_user: dict = Depends(get_current_user),
):
user_can_perform_action_for_project(
current_user,
project_name=project_name,
action_name=inspect.currentframe().f_code.co_name,
user_adapter=user_adapter,
)
result = list_batch_predictions(project_name, batch_handler)
return JSONResponse(content=result, media_type="application/json")


@router.get("/download/{job_id}")
def route_download_batch(
project_name: str,
job_id: str,
batch_handler: BatchPredictionHandler = Depends(get_batch_handler),
object_storage: ObjectStorageHandler = Depends(get_object_storage_handler),
user_adapter: UserHandler = Depends(get_user_adapter),
current_user: dict = Depends(get_current_user),
):
user_can_perform_action_for_project(
current_user,
project_name=project_name,
action_name=inspect.currentframe().f_code.co_name,
user_adapter=user_adapter,
)
try:
content = download_batch_result(project_name, job_id, batch_handler, object_storage)
return Response(
content=content,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=predictions-{job_id}.csv"},
)
except Exception as e:
logger.error(f"Failed to download batch result: {e}")
raise HTTPException(status_code=404, detail="Batch result not found or not yet available")


@router.delete("/{job_id}")
def route_delete_batch(
project_name: str,
job_id: str,
batch_handler: BatchPredictionHandler = Depends(get_batch_handler),
object_storage: ObjectStorageHandler = Depends(get_object_storage_handler),
user_adapter: UserHandler = Depends(get_user_adapter),
current_user: dict = Depends(get_current_user),
):
user_can_perform_action_for_project(
current_user,
project_name=project_name,
action_name=inspect.currentframe().f_code.co_name,
user_adapter=user_adapter,
)
result = delete_batch_prediction(project_name, job_id, batch_handler, object_storage)
return JSONResponse(content={"status": result}, media_type="application/json")


@router.post("/cleanup")
def route_cleanup_batch(
project_name: str,
batch_handler: BatchPredictionHandler = Depends(get_batch_handler),
object_storage: ObjectStorageHandler = Depends(get_object_storage_handler),
user_adapter: UserHandler = Depends(get_user_adapter),
current_user: dict = Depends(get_current_user),
):
user_can_perform_action_for_project(
current_user,
project_name=project_name,
action_name=inspect.currentframe().f_code.co_name,
user_adapter=user_adapter,
)
deleted_count = cleanup_batch_predictions(project_name, batch_handler, object_storage)
return JSONResponse(content={"deleted": deleted_count}, media_type="application/json")
41 changes: 39 additions & 2 deletions backend/api/projects_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from backend.domain.entities.project import Project
from backend.domain.entities.role import Role
from backend.domain.ports.model_registry import ModelRegistry
from backend.domain.ports.object_storage_handler import ObjectStorageHandler
from backend.domain.ports.project_db_handler import ProjectDbHandler
from backend.domain.ports.registry_handler import RegistryHandler
from backend.domain.ports.user_handler import UserHandler
Expand All @@ -24,6 +25,7 @@
list_projects,
list_projects_for_user,
remove_project,
update_project_batch_enabled,
)
from backend.domain.use_cases.user_usecases import user_can_perform_action_for_project

Expand All @@ -34,6 +36,10 @@ def get_project_db_handler(request: Request) -> ProjectDbHandler:
return request.app.state.project_db_handler


def get_object_storage_handler(request: Request) -> ObjectStorageHandler:
return request.app.state.object_storage_handler


@router.get("/list")
def route_list_projects(
project_sqlite_db_handler: ProjectDbHandler = Depends(get_project_db_handler),
Expand Down Expand Up @@ -68,27 +74,58 @@ def route_project_info(
def route_add_project(
project: Project,
project_sqlite_db_handler: ProjectDbHandler = Depends(get_project_db_handler),
object_storage: ObjectStorageHandler = Depends(get_object_storage_handler),
user_adapter: UserHandler = Depends(get_user_adapter),
current_user: dict = Depends(get_current_user),
) -> JSONResponse:
user_can_perform_action_for_project(
current_user, project_name="", action_name=inspect.currentframe().f_code.co_name, user_adapter=user_adapter
)
status = add_project(project_db_handler=project_sqlite_db_handler, project=project)
status = add_project(project_db_handler=project_sqlite_db_handler, project=project, object_storage=object_storage)
return JSONResponse(content={"status": status}, media_type="application/json")


@router.get("/{project_name}/remove")
def route_remove_project(
project_name: str,
project_sqlite_db_handler: ProjectDbHandler = Depends(get_project_db_handler),
object_storage: ObjectStorageHandler = Depends(get_object_storage_handler),
user_adapter: UserHandler = Depends(get_user_adapter),
current_user: dict = Depends(get_current_user),
):
user_can_perform_action_for_project(
current_user, project_name="", action_name=inspect.currentframe().f_code.co_name, user_adapter=user_adapter
)
return remove_project(project_sqlite_db_handler, project_name=project_name)
return remove_project(project_sqlite_db_handler, project_name=project_name, object_storage=object_storage)


@router.patch("/{project_name}/batch_enabled")
def route_update_batch_enabled(
project_name: str,
body: dict,
project_sqlite_db_handler: ProjectDbHandler = Depends(get_project_db_handler),
object_storage: ObjectStorageHandler = Depends(get_object_storage_handler),
user_adapter: UserHandler = Depends(get_user_adapter),
current_user: dict = Depends(get_current_user),
):
user_can_perform_action_for_project(
current_user,
project_name=project_name,
action_name=inspect.currentframe().f_code.co_name,
user_adapter=user_adapter,
)
batch_enabled = body.get("batch_enabled", False)
try:
status = update_project_batch_enabled(
project_db_handler=project_sqlite_db_handler,
project_name=project_name,
batch_enabled=batch_enabled,
object_storage=object_storage,
)
except Exception as e:
logger.error(f"Failed to update batch_enabled for project '{project_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))
return JSONResponse(content={"status": status}, media_type="application/json")


@router.post("/{project_name}/add_user")
Expand Down
Loading
Loading