Skip to content

Commit 12957a2

Browse files
committed
Have ert show hooked workflows in GUI
This commit changes the GUI to: * Show iterations and updates in tabs to accommodate showing pre/post run/update workflows * Show pre-experiment and post-experiment workflows on main tab-bar * Pressing terminate experiment during hooked workflows cancels the workflow, and marks it as cancelled by user. Following workflows in the same hook will be marked as cancelled, as to not mislead the user.
1 parent beb9d13 commit 12957a2

25 files changed

Lines changed: 2166 additions & 117 deletions
-452 Bytes
Loading

src/_ert/events.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
from datetime import UTC, datetime
2+
from enum import StrEnum
23
from typing import Annotated, Any, Final, Literal
34

45
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
56

7+
from _ert.hook_runtime import HookRuntime
8+
69

710
class Id:
811
FORWARD_MODEL_STEP_START_TYPE = Literal["forward_model_step.start"]
@@ -239,3 +242,56 @@ def dispatcher_event_from_json(raw_msg: str | bytes) -> DispatcherEvent:
239242

240243
def dispatcher_event_to_json(event: DispatcherEvent) -> str:
241244
return event.model_dump_json()
245+
246+
247+
class WorkflowStatus(StrEnum):
248+
PENDING = "Pending"
249+
RUNNING = "Running"
250+
FINISHED = "Finished"
251+
FAILED = "Failed"
252+
CANCELLED = "Cancelled"
253+
254+
255+
class _WorkflowEvent(BaseModel):
256+
hook: HookRuntime | None = None
257+
iteration: int | None = None
258+
259+
260+
class WorkflowBatchStartedEvent(_WorkflowEvent):
261+
event_type: Literal["WorkflowBatchStartedEvent"] = "WorkflowBatchStartedEvent"
262+
workflow_names: list[str]
263+
264+
265+
class WorkflowBatchFinishedEvent(_WorkflowEvent):
266+
event_type: Literal["WorkflowBatchFinishedEvent"] = "WorkflowBatchFinishedEvent"
267+
status: WorkflowStatus
268+
workflow_names: list[str]
269+
270+
271+
class WorkflowStartedEvent(_WorkflowEvent):
272+
event_type: Literal["WorkflowStartedEvent"] = "WorkflowStartedEvent"
273+
workflow_name: str
274+
275+
276+
class WorkflowFinishedEvent(_WorkflowEvent):
277+
event_type: Literal["WorkflowFinishedEvent"] = "WorkflowFinishedEvent"
278+
workflow_name: str
279+
status: WorkflowStatus
280+
stdout: str | None = None
281+
stderr: str | None = None
282+
283+
284+
class WorkflowCancelledEvent(_WorkflowEvent):
285+
event_type: Literal["WorkflowCancelledEvent"] = "WorkflowCancelledEvent"
286+
workflow_name: str
287+
stdout: str | None = None
288+
stderr: str | None = None
289+
290+
291+
WorkflowEvent = (
292+
WorkflowBatchStartedEvent
293+
| WorkflowBatchFinishedEvent
294+
| WorkflowStartedEvent
295+
| WorkflowFinishedEvent
296+
| WorkflowCancelledEvent
297+
)

src/_ert/hook_runtime.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from enum import StrEnum
2+
3+
4+
class HookRuntime(StrEnum):
5+
PRE_SIMULATION = "PRE_SIMULATION"
6+
POST_SIMULATION = "POST_SIMULATION"
7+
PRE_UPDATE = "PRE_UPDATE"
8+
POST_UPDATE = "POST_UPDATE"
9+
PRE_FIRST_UPDATE = "PRE_FIRST_UPDATE"
10+
PRE_EXPERIMENT = "PRE_EXPERIMENT"
11+
POST_EXPERIMENT = "POST_EXPERIMENT"
12+
13+
def workflow_tab_title(self) -> str:
14+
return {
15+
HookRuntime.PRE_EXPERIMENT: "Pre-experiment workflows",
16+
HookRuntime.POST_EXPERIMENT: "Post-experiment workflows",
17+
HookRuntime.PRE_SIMULATION: "Pre-simulation workflows",
18+
HookRuntime.POST_SIMULATION: "Post-simulation workflows",
19+
HookRuntime.PRE_FIRST_UPDATE: "Pre-first-update workflows",
20+
HookRuntime.PRE_UPDATE: "Pre-update workflows",
21+
HookRuntime.POST_UPDATE: "Post-update workflows",
22+
}[self]

src/ert/config/external_ert_script.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def run(self, *args: Any) -> None:
3131
sys.stdout.write(self._stdoutdata)
3232

3333
if self.__job.returncode != 0:
34+
if self.isCancelled():
35+
return None
3436
raise RuntimeError(self._stderrdata)
3537

3638
def cancel(self) -> Any:

src/ert/config/parsing/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from _ert.hook_runtime import HookRuntime
2+
13
from ._parse_zonemap import parse_zonemap
24
from .config_dict import ConfigDict
35
from .config_errors import ConfigValidationError, ConfigWarning
@@ -8,7 +10,6 @@
810
from .forward_model_keywords import ForwardModelStepKeys
911
from .forward_model_schema import init_forward_model_schema
1012
from .history_source import HistorySource
11-
from .hook_runtime import HookRuntime
1213
from .lark_parser import parse, parse_contents, read_file
1314
from .observations_parser import (
1415
ObservationConfigError,

src/ert/config/parsing/config_schema.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from _ert.hook_runtime import HookRuntime
2+
13
from .config_dict import ConfigDict
24
from .config_keywords import ConfigKeys
35
from .config_schema_deprecations import deprecated_keywords_list
@@ -15,7 +17,6 @@
1517
string_keyword,
1618
)
1719
from .history_source import HistorySource
18-
from .hook_runtime import HookRuntime
1920
from .observations_parser import parse_observations
2021
from .queue_system import QueueSystem
2122
from .schema_dict import SchemaItemDict

src/ert/config/parsing/hook_runtime.py

Lines changed: 0 additions & 11 deletions
This file was deleted.

src/ert/config/workflow_fixtures.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from typing_extensions import TypedDict
1010

11-
from ert.config.parsing.hook_runtime import HookRuntime
11+
from _ert.hook_runtime import HookRuntime
1212

1313
if TYPE_CHECKING:
1414
from PyQt6.QtWidgets import QWidget

src/ert/gui/experiments/run_dialog.py

Lines changed: 129 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,15 @@
3131
)
3232
from typing_extensions import override
3333

34-
from _ert.events import EnsembleEvaluationWarning
34+
from _ert.events import (
35+
EnsembleEvaluationWarning,
36+
WorkflowBatchFinishedEvent,
37+
WorkflowBatchStartedEvent,
38+
WorkflowCancelledEvent,
39+
WorkflowFinishedEvent,
40+
WorkflowStartedEvent,
41+
)
42+
from _ert.hook_runtime import HookRuntime
3543
from ert.config import ErrorInfo, WarningInfo
3644
from ert.ensemble_evaluator import (
3745
EndEvent,
@@ -80,10 +88,12 @@
8088
from .queue_emitter import QueueEmitter
8189
from .view import (
8290
DiskSpaceWidget,
91+
IterationWidget,
8392
ProgressWidget,
8493
RealizationWidget,
8594
RunpathProgressWidget,
8695
UpdateWidget,
96+
WorkflowWidget,
8797
)
8898
from .view.disk_space_widget import MountType
8999

@@ -389,10 +399,15 @@ def is_experiment_done(self) -> bool:
389399

390400
def _current_tab_changed(self, index: int) -> None:
391401
widget = self._tab_widget.widget(index)
392-
if isinstance(widget, RealizationWidget):
393-
widget.refresh_current_selection()
402+
if isinstance(widget, IterationWidget):
403+
current_widget = widget.current_widget()
404+
if isinstance(current_widget, RealizationWidget):
405+
current_widget.refresh_current_selection()
406+
407+
self.fm_step_frame.setHidden(isinstance(current_widget, UpdateWidget))
408+
return
394409

395-
self.fm_step_frame.setHidden(isinstance(widget, UpdateWidget))
410+
self.fm_step_frame.setHidden(False)
396411

397412
@Slot(QModelIndex, int, int)
398413
def on_snapshot_new_iteration(
@@ -402,26 +417,21 @@ def on_snapshot_new_iteration(
402417
index = self._snapshot_model.index(start, 0, parent)
403418
iteration = int(cast(IterNode, index.internalPointer()).id_)
404419
self._latest_iteration = iteration
405-
iter_row = start
406420
self._iteration_progress_label.setText(
407421
f"Progress for iteration {iteration}"
408422
if not self.is_everest
409423
else f"Progress for batch {iteration}"
410424
)
411425

412-
widget = RealizationWidget(iter_row)
426+
iteration_widget = self._get_or_create_iteration_tab(
427+
iteration, is_update=False
428+
)
429+
widget = iteration_widget.select_or_create_realization_tab()
413430
widget.setSnapshotModel(self._snapshot_model)
414431
widget.itemClicked.connect(self._select_real)
415432
widget.setProperty("identifier", f"tab-iter-{iteration}")
416433
self._select_real(widget._real_list_model.index(0, 0))
417-
tab_index = self._tab_widget.addTab(
418-
widget,
419-
f"Realizations for iteration {iteration}"
420-
if not self.is_everest
421-
else f"Batch {iteration}...",
422-
)
423-
if self._tab_widget.currentIndex() == self._tab_widget.count() - 2:
424-
self._tab_widget.setCurrentIndex(tab_index)
434+
self._tab_widget.setCurrentWidget(iteration_widget)
425435

426436
if self.is_everest:
427437
self._batch_result_types.append(set())
@@ -579,6 +589,24 @@ def _on_event(self, event: object) -> None:
579589
self.post_experiment_warnings.append(msg)
580590
case EnsembleEvaluationWarning(warning_message=msg):
581591
self._show_warning(msg)
592+
case WorkflowBatchStartedEvent(
593+
hook=hook, iteration=iteration, workflow_names=workflow_names
594+
):
595+
self._select_or_create_workflow_tab(hook, iteration, workflow_names)
596+
case WorkflowStartedEvent(hook=hook, iteration=iteration):
597+
self._select_or_create_workflow_tab(hook, iteration).handle_event(event)
598+
case WorkflowFinishedEvent(
599+
hook=hook,
600+
iteration=iteration,
601+
):
602+
self._select_or_create_workflow_tab(hook, iteration).handle_event(event)
603+
case WorkflowCancelledEvent(
604+
hook=hook,
605+
iteration=iteration,
606+
):
607+
self._select_or_create_workflow_tab(hook, iteration).handle_event(event)
608+
case WorkflowBatchFinishedEvent(hook=hook, iteration=iteration):
609+
self._select_or_create_workflow_tab(hook, iteration).handle_event(event)
582610

583611
case FullSnapshotEvent(
584612
status_count=status_count, realization_count=realization_count
@@ -604,22 +632,23 @@ def _on_event(self, event: object) -> None:
604632
)
605633
self.progress_update_event.emit(status_count, realization_count)
606634
case RunModelUpdateBeginEvent(iteration=iteration):
607-
widget = UpdateWidget(iteration)
608-
tab_index = self._tab_widget.addTab(widget, f"Update {iteration}")
609-
if self._tab_widget.currentIndex() == self._tab_widget.count() - 2:
610-
self._tab_widget.setCurrentIndex(tab_index)
635+
iteration_widget = self._get_or_create_iteration_tab(
636+
iteration, is_update=True
637+
)
638+
widget = iteration_widget.select_or_create_update_tab()
639+
self._tab_widget.setCurrentWidget(iteration_widget)
611640
widget.begin(event)
612641
case RunModelUpdateEndEvent():
613642
self._progress_widget.stop_waiting_progress_bar()
614-
self._get_update_widget(event.iteration).end(event)
643+
self._get_or_create_update_tab(event.iteration).end(event)
615644
event.write_as_csv(self.output_path)
616645
case RunModelStatusEvent() | RunModelTimeEvent():
617-
self._get_update_widget(event.iteration).update_status(event)
646+
self._get_or_create_update_tab(event.iteration).update_status(event)
618647
case RunModelDataEvent():
619-
self._get_update_widget(event.iteration).add_table(event)
648+
self._get_or_create_update_tab(event.iteration).add_table(event)
620649
event.write_as_csv(self.output_path)
621650
case RunModelErrorEvent():
622-
self._get_update_widget(event.iteration).error(event)
651+
self._get_or_create_update_tab(event.iteration).error(event)
623652
event.write_as_csv(self.output_path)
624653
case EverestBatchResultEvent():
625654
batch_types = self._batch_result_types[event.batch]
@@ -652,12 +681,85 @@ def _on_event(self, event: object) -> None:
652681
):
653682
runpath_widget.advance()
654683

655-
def _get_update_widget(self, iteration: int) -> UpdateWidget:
656-
for i in range(self._tab_widget.count()):
657-
widget = self._tab_widget.widget(i)
658-
if isinstance(widget, UpdateWidget) and widget.iteration == iteration:
684+
def _get_or_create_update_tab(self, iteration: int) -> UpdateWidget:
685+
return self._get_or_create_iteration_tab(
686+
iteration, is_update=True
687+
).select_or_create_update_tab()
688+
689+
def _get_or_create_iteration_tab(
690+
self, iteration: int, is_update: bool
691+
) -> IterationWidget:
692+
for index in range(self._tab_widget.count()):
693+
widget = self._tab_widget.widget(index)
694+
if (
695+
isinstance(widget, IterationWidget)
696+
and widget.iteration == iteration
697+
and widget.is_update_page == is_update
698+
):
659699
return widget
660-
raise ValueError("Could not find UpdateWidget")
700+
701+
widget = IterationWidget(iteration, self)
702+
widget.is_update_page = is_update
703+
widget.currentTabChanged.connect(
704+
lambda iteration_widget=widget: self._on_iteration_tab_changed(
705+
iteration_widget
706+
)
707+
)
708+
709+
self._tab_widget.addTab(
710+
widget,
711+
f"update-{iteration}" if is_update else f"iteration-{iteration}",
712+
)
713+
return widget
714+
715+
def _on_iteration_tab_changed(self, iteration_widget: IterationWidget) -> None:
716+
if self._tab_widget.currentWidget() is iteration_widget:
717+
self._current_tab_changed(self._tab_widget.currentIndex())
718+
719+
@staticmethod
720+
def _workflow_belongs_to_update(hook: HookRuntime) -> bool:
721+
return hook in {
722+
HookRuntime.PRE_FIRST_UPDATE,
723+
HookRuntime.PRE_UPDATE,
724+
HookRuntime.POST_UPDATE,
725+
}
726+
727+
def _select_or_create_workflow_tab(
728+
self,
729+
hook: HookRuntime | None,
730+
iteration: int | None,
731+
workflow_names: list[str] | None = None,
732+
) -> WorkflowWidget:
733+
assert hook is not None
734+
if iteration is not None:
735+
iteration_widget = (
736+
self._get_or_create_iteration_tab(iteration, is_update=True)
737+
if self._workflow_belongs_to_update(hook)
738+
else self._get_or_create_iteration_tab(iteration, is_update=False)
739+
)
740+
widget = iteration_widget.select_or_create_workflow_tab(
741+
hook, workflow_names
742+
)
743+
self._tab_widget.setCurrentWidget(iteration_widget)
744+
return widget
745+
746+
for index in range(self._tab_widget.count()):
747+
existing_widget = self._tab_widget.widget(index)
748+
if (
749+
isinstance(existing_widget, WorkflowWidget)
750+
and existing_widget.hook == hook
751+
):
752+
return existing_widget
753+
754+
if workflow_names is None:
755+
raise RuntimeError(
756+
"Workflow tab must be created from WorkflowBatchStartedEvent"
757+
)
758+
759+
widget = WorkflowWidget(hook, workflow_names, parent=self)
760+
tab_index = self._tab_widget.addTab(widget, hook.workflow_tab_title())
761+
self._tab_widget.setCurrentIndex(tab_index)
762+
return widget
661763

662764
def update_total_progress(
663765
self, progress_value: float, iteration_label: str, iteration: int | None = None
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
from .disk_space_widget import DiskSpaceWidget
2+
from .iteration import IterationWidget
23
from .progress_widget import ProgressWidget
34
from .realization import RealizationWidget
45
from .runpath_progress_widget import RunpathProgressWidget
56
from .update import UpdateWidget
7+
from .workflow import WorkflowWidget
68

79
__all__ = [
810
"DiskSpaceWidget",
11+
"IterationWidget",
912
"ProgressWidget",
1013
"RealizationWidget",
1114
"RunpathProgressWidget",
1215
"UpdateWidget",
16+
"WorkflowWidget",
1317
]

0 commit comments

Comments
 (0)