Skip to content

Commit 2c9d95b

Browse files
committed
feat: Add dagrun status progress to backfills
1 parent bfcb77f commit 2c9d95b

14 files changed

Lines changed: 267 additions & 14 deletions

File tree

airflow-core/src/airflow/api_fastapi/core_api/datamodels/backfills.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class BackfillResponse(BaseModel):
5454
completed_at: datetime | None
5555
updated_at: datetime
5656
dag_display_name: str = Field(validation_alias=AliasPath("dag_model", "dag_display_name"))
57+
num_runs: int = 0
58+
dag_run_state_counts: dict[str, int] = {}
5759

5860

5961
class BackfillCollectionResponse(BaseModel):

airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1483,6 +1483,16 @@ components:
14831483
dag_display_name:
14841484
type: string
14851485
title: Dag Display Name
1486+
num_runs:
1487+
type: integer
1488+
title: Num Runs
1489+
default: 0
1490+
dag_run_state_counts:
1491+
additionalProperties:
1492+
type: integer
1493+
type: object
1494+
title: Dag Run State Counts
1495+
default: {}
14861496
type: object
14871497
required:
14881498
- id

airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9296,6 +9296,16 @@ components:
92969296
dag_display_name:
92979297
type: string
92989298
title: Dag Display Name
9299+
num_runs:
9300+
type: integer
9301+
title: Num Runs
9302+
default: 0
9303+
dag_run_state_counts:
9304+
additionalProperties:
9305+
type: integer
9306+
type: object
9307+
title: Dag Run State Counts
9308+
default: {}
92999309
type: object
93009310
required:
93019311
- id

airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
from collections import defaultdict
1920
from typing import Annotated
2021

2122
from fastapi import Depends, HTTPException, status
2223
from fastapi.exceptions import RequestValidationError
2324
from pydantic import NonNegativeInt
24-
from sqlalchemy import select, update
25-
from sqlalchemy.orm import joinedload
25+
from sqlalchemy import func, select, update
26+
from sqlalchemy.orm import Session, joinedload
2627

2728
from airflow._shared.timezones import timezone
2829
from airflow.api_fastapi.common.db.common import (
@@ -61,6 +62,40 @@
6162
backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills")
6263

6364

65+
def _enrich_backfill_responses(
66+
backfills: list[BackfillResponse],
67+
*,
68+
session: Session,
69+
) -> list[BackfillResponse]:
70+
"""Populate num_runs and dag_run_state_counts on each backfill response."""
71+
ids = [b.id for b in backfills]
72+
if not ids:
73+
return backfills
74+
# Single query: get state counts per backfill, derive num_runs by summing counts.
75+
rows = session.execute(
76+
select(
77+
BackfillDagRun.backfill_id,
78+
DagRun.state,
79+
func.count().label("count"),
80+
)
81+
.join(DagRun, BackfillDagRun.dag_run_id == DagRun.id)
82+
.where(
83+
BackfillDagRun.backfill_id.in_(ids),
84+
DagRun.backfill_id == BackfillDagRun.backfill_id,
85+
)
86+
.group_by(BackfillDagRun.backfill_id, DagRun.state)
87+
).all()
88+
counts: dict[int, dict[str, int]] = defaultdict(dict)
89+
num_runs: dict[int, int] = defaultdict(int)
90+
for backfill_id, state, count in rows:
91+
counts[backfill_id][state] = count
92+
num_runs[backfill_id] += count
93+
for backfill in backfills:
94+
backfill.num_runs = num_runs.get(backfill.id, 0)
95+
backfill.dag_run_state_counts = counts.get(backfill.id, {})
96+
return backfills
97+
98+
6499
@backfills_router.get(
65100
path="",
66101
dependencies=[
@@ -84,8 +119,10 @@ def list_backfills(
84119
limit=limit,
85120
session=session,
86121
)
122+
backfills = [BackfillResponse.model_validate(b) for b in session.scalars(select_stmt)]
123+
_enrich_backfill_responses(backfills, session=session)
87124
return BackfillCollectionResponse(
88-
backfills=session.scalars(select_stmt),
125+
backfills=backfills,
89126
total_entries=total_entries,
90127
)
91128

@@ -104,9 +141,11 @@ def get_backfill(
104141
backfill = session.scalars(
105142
select(Backfill).where(Backfill.id == backfill_id).options(joinedload(Backfill.dag_model))
106143
).one_or_none()
107-
if backfill:
108-
return backfill
109-
raise HTTPException(status.HTTP_404_NOT_FOUND, "Backfill not found")
144+
if not backfill:
145+
raise HTTPException(status.HTTP_404_NOT_FOUND, "Backfill not found")
146+
response = BackfillResponse.model_validate(backfill)
147+
_enrich_backfill_responses([response], session=session)
148+
return response
110149

111150

112151
@backfills_router.put(
@@ -133,7 +172,9 @@ def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfill
133172
if b.is_paused is False:
134173
b.is_paused = True
135174
session.commit()
136-
return b
175+
response = BackfillResponse.model_validate(b)
176+
_enrich_backfill_responses([response], session=session)
177+
return response
137178

138179

139180
@backfills_router.put(
@@ -159,7 +200,9 @@ def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfi
159200
raise HTTPException(status.HTTP_409_CONFLICT, "Backfill is already completed.")
160201
if b.is_paused:
161202
b.is_paused = False
162-
return b
203+
response = BackfillResponse.model_validate(b)
204+
_enrich_backfill_responses([response], session=session)
205+
return response
163206

164207

165208
@backfills_router.put(
@@ -210,7 +253,9 @@ def cancel_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfil
210253
# this is in separate transaction just to avoid potential conflicts
211254
session.refresh(b)
212255
b.completed_at = timezone.utcnow()
213-
return b
256+
response = BackfillResponse.model_validate(b)
257+
_enrich_backfill_responses([response], session=session)
258+
return response
214259

215260

216261
@backfills_router.post(

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from airflow.api_fastapi.core_api.openapi.exceptions import (
3737
create_openapi_http_exception_doc,
3838
)
39+
from airflow.api_fastapi.core_api.routes.public.backfills import _enrich_backfill_responses
3940
from airflow.api_fastapi.core_api.security import ReadableBackfillsFilterDep, requires_access_backfill
4041
from airflow.models.backfill import Backfill
4142

@@ -73,9 +74,12 @@ def list_backfills_ui(
7374
session=session,
7475
)
7576
backfills = [
76-
BackfillResponse(**row._mapping) if not isinstance(row, Backfill) else row
77+
BackfillResponse(**row._mapping)
78+
if not isinstance(row, Backfill)
79+
else BackfillResponse.model_validate(row)
7780
for row in session.scalars(select_stmt)
7881
]
82+
_enrich_backfill_responses(backfills, session=session)
7983
return BackfillCollectionResponse(
8084
backfills=backfills,
8185
total_entries=total_entries,

airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,19 @@ export const $BackfillResponse = {
541541
dag_display_name: {
542542
type: 'string',
543543
title: 'Dag Display Name'
544+
},
545+
num_runs: {
546+
type: 'integer',
547+
title: 'Num Runs',
548+
default: 0
549+
},
550+
dag_run_state_counts: {
551+
additionalProperties: {
552+
type: 'integer'
553+
},
554+
type: 'object',
555+
title: 'Dag Run State Counts',
556+
default: {}
544557
}
545558
},
546559
type: 'object',

airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ export type BackfillResponse = {
147147
completed_at: string | null;
148148
updated_at: string;
149149
dag_display_name: string;
150+
num_runs?: number;
151+
dag_run_state_counts?: {
152+
[key: string]: (number);
153+
};
150154
};
151155

152156
/**

airflow-core/src/airflow/ui/public/i18n/locales/en/common.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
},
2929
"collapseAllExtra": "Collapse all extra JSON",
3030
"collapseDetailsPanel": "Collapse Details Panel",
31+
"completed": "Completed",
3132
"createdAssetEvent_one": "Created Asset Event",
3233
"createdAssetEvent_other": "Created Asset Events",
3334
"dag_one": "Dag",
@@ -232,6 +233,7 @@
232233
"from": "From",
233234
"maxActiveRuns": "Max Active Runs",
234235
"noTagsFound": "No tags found",
236+
"progress": "Progress",
235237
"tagMode": {
236238
"all": "All",
237239
"any": "Any"
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*!
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
import { Box, HStack, Text } from "@chakra-ui/react";
20+
21+
type Props = {
22+
readonly stateCounts: Record<string, number>;
23+
readonly total: number;
24+
readonly trackColor?: string;
25+
};
26+
27+
export const BackfillProgressBar = ({ stateCounts, total, trackColor = "bg.emphasized" }: Props) => {
28+
const successCount = stateCounts.success ?? 0;
29+
const failedCount = stateCounts.failed ?? 0;
30+
const successPct = (successCount / total) * 100;
31+
const failedPct = (failedCount / total) * 100;
32+
const remainingPct = 100 - successPct - failedPct;
33+
34+
return (
35+
<HStack gap="2" minWidth="60px">
36+
<HStack flex="1" gap={0}>
37+
{successPct > 0 ? (
38+
<Box bg="success.solid" borderLeftRadius={5} height="5px" width={`${successPct}%`} />
39+
) : undefined}
40+
{failedPct > 0 ? (
41+
<Box
42+
bg="failed.solid"
43+
borderLeftRadius={successCount === 0 ? 5 : 0}
44+
height="5px"
45+
width={`${failedPct}%`}
46+
/>
47+
) : undefined}
48+
{remainingPct > 0 ? (
49+
<Box
50+
bg={trackColor}
51+
borderLeftRadius={successCount === 0 && failedCount === 0 ? 5 : 0}
52+
borderRightRadius={5}
53+
height="5px"
54+
width={`${remainingPct}%`}
55+
/>
56+
) : undefined}
57+
</HStack>
58+
<Text fontSize="sm" whiteSpace="nowrap">
59+
{successCount + failedCount}/{total}
60+
</Text>
61+
</HStack>
62+
);
63+
};

airflow-core/src/airflow/ui/src/components/Banner/BackfillBanner.tsx

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ import {
3232
import type { BackfillResponse } from "openapi/requests/types.gen";
3333
import { useAutoRefresh } from "src/utils";
3434

35+
import { BackfillProgressBar } from "../BackfillProgressBar";
3536
import Time from "../Time";
36-
import { ProgressBar } from "../ui";
3737

3838
type Props = {
3939
readonly dagId: string;
@@ -113,7 +113,13 @@ const BackfillBanner = ({ dagId }: Props) => {
113113
</Text>
114114

115115
<Spacer flex="max-content" />
116-
<ProgressBar size="xs" visibility="visible" />
116+
{(backfill.num_runs ?? 0) > 0 ? (
117+
<BackfillProgressBar
118+
stateCounts={backfill.dag_run_state_counts ?? {}}
119+
total={backfill.num_runs ?? 0}
120+
trackColor="whiteAlpha.400"
121+
/>
122+
) : undefined}
117123
<Button
118124
aria-label={backfill.is_paused ? translate("banner.unpause") : translate("banner.pause")}
119125
loading={isPausePending || isUnPausePending}

0 commit comments

Comments
 (0)