-
-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathflows.py
More file actions
96 lines (77 loc) · 2.65 KB
/
flows.py
File metadata and controls
96 lines (77 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from collections.abc import Sequence
from typing import cast
from sqlalchemy import Row, text
from sqlalchemy.ext.asyncio import AsyncConnection
from database.tagging import insert_tag, remove_tag, select_tag, select_tags
_TABLE = "implementation_tag"
_ID_COLUMN = "id"
async def get_subflows(for_flow: int, expdb: AsyncConnection) -> Sequence[Row]:
rows = await expdb.execute(
text(
"""
SELECT child as child_id, identifier
FROM implementation_component
WHERE parent = :flow_id
""",
),
parameters={"flow_id": for_flow},
)
return cast(
"Sequence[Row]",
rows.all(),
)
async def get_tags(flow_id: int, expdb: AsyncConnection) -> list[str]:
return await select_tags(table=_TABLE, id_column=_ID_COLUMN, id_=flow_id, expdb=expdb)
async def get_parameters(flow_id: int, expdb: AsyncConnection) -> Sequence[Row]:
rows = await expdb.execute(
text(
"""
SELECT *, defaultValue as default_value, dataType as data_type
FROM input
WHERE implementation_id = :flow_id
""",
),
parameters={"flow_id": flow_id},
)
return cast(
"Sequence[Row]",
rows.all(),
)
async def tag(id_: int, tag_: str, *, user_id: int, expdb: AsyncConnection) -> None:
await insert_tag(
table=_TABLE,
id_column=_ID_COLUMN,
id_=id_,
tag_=tag_,
user_id=user_id,
expdb=expdb,
)
async def get_tag(id_: int, tag_: str, expdb: AsyncConnection) -> Row | None:
return await select_tag(table=_TABLE, id_column=_ID_COLUMN, id_=id_, tag_=tag_, expdb=expdb)
async def delete_tag(id_: int, tag_: str, expdb: AsyncConnection) -> None:
await remove_tag(table=_TABLE, id_column=_ID_COLUMN, id_=id_, tag_=tag_, expdb=expdb)
async def get_by_name(name: str, external_version: str, expdb: AsyncConnection) -> Row | None:
"""Get flow by name and external version."""
row = await expdb.execute(
text(
"""
SELECT *, uploadDate as upload_date
FROM implementation
WHERE name = :name AND external_version = :external_version
""",
),
parameters={"name": name, "external_version": external_version},
)
return row.one_or_none()
async def get(id_: int, expdb: AsyncConnection) -> Row | None:
row = await expdb.execute(
text(
"""
SELECT *, uploadDate as upload_date
FROM implementation
WHERE id = :flow_id
""",
),
parameters={"flow_id": id_},
)
return row.one_or_none()