forked from openml/server-api
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflows.py
More file actions
91 lines (79 loc) · 3.06 KB
/
flows.py
File metadata and controls
91 lines (79 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from typing import Annotated, Literal
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncConnection
import database.flows
from core.conversions import _str_to_num
from core.errors import FlowNotFoundError
from routers.dependencies import expdb_connection
from schemas.flows import Flow, FlowExistsBody, Parameter, Subflow
router = APIRouter(prefix="/flows", tags=["flows"])
@router.post("/exists")
async def flow_exists(
body: FlowExistsBody,
expdb: Annotated[AsyncConnection, Depends(expdb_connection)],
) -> dict[Literal["flow_id"], int]:
"""Check if a Flow with the name and version exists, if so, return the flow id."""
flow = await database.flows.get_by_name(
name=body.name,
external_version=body.external_version,
expdb=expdb,
)
if flow is None:
msg = f"Flow with name {body.name} and external version {body.external_version} not found."
raise FlowNotFoundError(msg)
return {"flow_id": flow.id}
@router.get("/exists/{name}/{external_version}", deprecated=True)
async def flow_exists_get(
name: str,
external_version: str,
expdb: Annotated[AsyncConnection, Depends(expdb_connection)],
) -> dict[Literal["flow_id"], int]:
"""Use POST /flows/exists instead."""
return await flow_exists(FlowExistsBody(name=name, external_version=external_version), expdb)
@router.get("/{flow_id}")
async def get_flow(
flow_id: int,
expdb: Annotated[AsyncConnection, Depends(expdb_connection)],
) -> Flow:
flow = await database.flows.get(flow_id, expdb)
if not flow:
msg = f"Flow with id {flow_id} not found."
raise FlowNotFoundError(msg)
parameter_rows = await database.flows.get_parameters(flow_id, expdb)
parameters = [
Parameter(
name=parameter.name,
# PHP sets the default value to [], not sure where that comes from.
# In the modern interface, `None` is used instead for now, but I think it might
# make more sense to omit it if there is none.
default_value=_str_to_num(parameter.default_value) if parameter.default_value else None,
data_type=parameter.data_type,
description=parameter.description,
)
for parameter in parameter_rows
]
tags = await database.flows.get_tags(flow_id, expdb)
subflow_rows = await database.flows.get_subflows(flow_id, expdb)
subflows = []
for subflow in subflow_rows:
subflows.append( # noqa: PERF401
Subflow(
identifier=subflow.identifier,
flow=await get_flow(flow_id=subflow.child_id, expdb=expdb),
),
)
return Flow(
id_=flow.id,
uploader=flow.uploader,
name=flow.name,
class_name=flow.class_name,
version=flow.version,
external_version=flow.external_version,
description=flow.description,
upload_date=flow.upload_date,
language=flow.language,
dependencies=flow.dependencies,
parameter=parameters,
subflows=subflows,
tag=tags,
)