diff --git a/src/ert/analysis/_enif_update.py b/src/ert/analysis/_enif_update.py index f457323da83..e0ea463d946 100644 --- a/src/ert/analysis/_enif_update.py +++ b/src/ert/analysis/_enif_update.py @@ -100,7 +100,7 @@ def enif_update( data=smoother_snapshot.csv, extra=smoother_snapshot.extra, ), - posterior_id=str(posterior_storage.id), + ensemble_id=str(posterior_storage.id), ) ) return smoother_snapshot diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 05ae06c0720..d8ef9282738 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -429,7 +429,7 @@ def log_warning( data=smoother_snapshot.csv, extra=smoother_snapshot.extra, ), - posterior_id=str(posterior_storage.id), + ensemble_id=str(posterior_storage.id), ) ) return smoother_snapshot diff --git a/src/ert/analysis/event.py b/src/ert/analysis/event.py index de79c1c7c3d..4f80c0a0912 100644 --- a/src/ert/analysis/event.py +++ b/src/ert/analysis/event.py @@ -72,4 +72,4 @@ class AnalysisErrorEvent(AnalysisEvent): class AnalysisCompleteEvent(AnalysisEvent): event_type: Literal["AnalysisCompleteEvent"] = "AnalysisCompleteEvent" data: DataSection - posterior_id: str + ensemble_id: str diff --git a/src/ert/run_models/update_run_model.py b/src/ert/run_models/update_run_model.py index 9d20155f7a4..3e3f2ee0938 100644 --- a/src/ert/run_models/update_run_model.py +++ b/src/ert/run_models/update_run_model.py @@ -2,6 +2,8 @@ import functools import uuid +import polars as pl + from ert.analysis import build_strategy_map, smoother_update from ert.analysis._update_commons import ErtAnalysisError from ert.analysis.event import ( @@ -192,9 +194,13 @@ def send_smoother_event( ) ) case AnalysisCompleteEvent(): - self._storage.get_ensemble(event.posterior_id).save_transition_data( - f"{AnalysisCompleteEvent.__name__}_{uuid.uuid4().hex[:8]}.json", - event.model_dump_json(), + ensemble = self._storage.get_ensemble(event.ensemble_id) + ensemble.save_parquet_as_blob_data( + pl.DataFrame( + event.data.data, + schema=event.data.header, + orient="row", + ) ) self.send_event( RunModelUpdateEndEvent( diff --git a/src/ert/storage/blob_data.py b/src/ert/storage/blob_data.py new file mode 100644 index 00000000000..06c027268d8 --- /dev/null +++ b/src/ert/storage/blob_data.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from pydantic import BaseModel, ConfigDict + + +class BlobStorageData(BaseModel): + model_config = ConfigDict(extra="forbid") + blob_type: str + uri: str + file_size: int + ensemble_id: str diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index fe8bc3c7194..ae75f64f27a 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -5,6 +5,7 @@ import logging import os import time +import uuid from collections import Counter from collections.abc import Iterable from datetime import UTC, datetime @@ -31,6 +32,7 @@ from ert.exceptions import StorageError from ert.substitutions import substitute_runpath_name +from .blob_data import BlobStorageData from .load_status import LoadResult from .mode import BaseMode, Mode, require_write from .realization_storage_state import RealizationStorageState @@ -50,7 +52,7 @@ class EverestRealizationInfo(TypedDict): SCALAR_FILENAME = "SCALAR" -TRANSITION_DATA_DIR = "transition" +BLOB_DATA_DIR = "blobs" class BatchDataframes(TypedDict, total=False): @@ -1298,10 +1300,23 @@ def save_everest_realization_info( ) @require_write - def save_transition_data(self, file_name: str, data: str) -> None: - path = self._path / TRANSITION_DATA_DIR / file_name - Path.mkdir(path.parent, exist_ok=True) - self._storage._write_transaction(path, data.encode("utf-8")) + def save_parquet_as_blob_data(self, data: pl.DataFrame) -> None: + blob_dir = self._path / BLOB_DATA_DIR + Path.mkdir(blob_dir, exist_ok=True) + report_id = uuid.uuid4().hex[:8] + file_name = f"observation_report_{report_id}.parquet" + parquet_path = blob_dir / file_name + self._storage._to_parquet_transaction(parquet_path, data) + blob_data = BlobStorageData( + blob_type="observation_report", + uri=file_name, + file_size=parquet_path.stat().st_size, + ensemble_id=str(self.id), + ) + self._storage._write_transaction( + blob_dir / f"{blob_data.uri}.json", + blob_data.model_dump_json(indent=2).encode("utf-8"), + ) @require_write def save_batch_dataframes(self, dataframes: BatchDataframes) -> None: diff --git a/tests/ert/unit_tests/run_models/test_update_run_model.py b/tests/ert/unit_tests/run_models/test_update_run_model.py index 7f11fc28a44..586c181dfd2 100644 --- a/tests/ert/unit_tests/run_models/test_update_run_model.py +++ b/tests/ert/unit_tests/run_models/test_update_run_model.py @@ -1,7 +1,8 @@ -import json import uuid from unittest.mock import MagicMock +import polars as pl + from ert.analysis.event import AnalysisCompleteEvent, DataSection from ert.run_models.update_run_model import UpdateRunModel @@ -10,27 +11,24 @@ def test_that_send_smoother_event_persists_observation_report_on_analysis_comple model = MagicMock(spec=UpdateRunModel) model._storage = MagicMock() mock_ensemble = MagicMock() + ensemble_id = str(uuid.uuid4()) + mock_ensemble.id = uuid.UUID(ensemble_id) model._storage.get_ensemble.return_value = mock_ensemble - posterior_id = str(uuid.uuid4()) data_section = DataSection( header=["observation_key", "status"], data=[("OBS_1", "Active"), ("OBS_2", "Deactivated, outlier")], ) - event = AnalysisCompleteEvent(data=data_section, posterior_id=posterior_id) + event = AnalysisCompleteEvent(data=data_section, ensemble_id=ensemble_id) UpdateRunModel.send_smoother_event( model, iteration=0, run_id=uuid.uuid4(), event=event ) - model._storage.get_ensemble.assert_called_once_with(posterior_id) - mock_ensemble.save_transition_data.assert_called_once() - - _, saved_json = mock_ensemble.save_transition_data.call_args[0] - parsed = json.loads(saved_json) - assert parsed["posterior_id"] == posterior_id - assert parsed["data"]["header"] == ["observation_key", "status"] - assert parsed["data"]["data"] == [ - ["OBS_1", "Active"], - ["OBS_2", "Deactivated, outlier"], - ] + model._storage.get_ensemble.assert_called_once_with(ensemble_id) + mock_ensemble.save_parquet_as_blob_data.assert_called_once() + + (saved_df,) = mock_ensemble.save_parquet_as_blob_data.call_args[0] + assert isinstance(saved_df, pl.DataFrame) + assert saved_df.columns == ["observation_key", "status"] + assert len(saved_df) == 2 diff --git a/tests/ert/unit_tests/storage/test_local_ensemble.py b/tests/ert/unit_tests/storage/test_local_ensemble.py index d40494a5384..c2494062901 100644 --- a/tests/ert/unit_tests/storage/test_local_ensemble.py +++ b/tests/ert/unit_tests/storage/test_local_ensemble.py @@ -1,4 +1,5 @@ import asyncio +import json from contextlib import contextmanager from datetime import datetime from pathlib import Path @@ -683,41 +684,82 @@ async def run_test(): asyncio.run(run_test()) -def test_that_save_transition_data_writes_file_to_disk(tmp_path): +def test_that_save_parquet_as_blob_data_writes_file_to_disk(tmp_path): with open_storage(tmp_path, mode="w") as storage: experiment = storage.create_experiment() ensemble = storage.create_ensemble( experiment, ensemble_size=1, iteration=0, name="prior" ) - ensemble.save_transition_data("report.json", '{"key": "value"}') + df = pl.DataFrame({"key": ["value"]}) + ensemble.save_parquet_as_blob_data(df) - written = (ensemble._path / "transition" / "report.json").read_text( - encoding="utf-8" - ) - assert written == '{"key": "value"}' + blob_dir = ensemble._path / "blobs" + parquet_files = list(blob_dir.glob("observation_report_*.parquet")) + assert len(parquet_files) == 1 + loaded = pl.read_parquet(parquet_files[0]) + assert loaded["key"][0] == "value" -def test_that_save_transition_data_creates_transition_directory(tmp_path): +def test_that_save_parquet_as_blob_data_creates_blobs_directory(tmp_path): with open_storage(tmp_path, mode="w") as storage: experiment = storage.create_experiment() ensemble = storage.create_ensemble( experiment, ensemble_size=1, iteration=0, name="prior" ) - transition_dir = ensemble._path / "transition" - assert not transition_dir.exists() + blob_dir = ensemble._path / "blobs" + assert not blob_dir.exists() - ensemble.save_transition_data("report.json", "data") - assert transition_dir.is_dir() + df = pl.DataFrame({"x": [1]}) + ensemble.save_parquet_as_blob_data(df) + assert blob_dir.is_dir() -def test_that_save_transition_data_raises_in_read_mode(tmp_path): +def test_that_save_parquet_as_blob_data_raises_in_read_mode(tmp_path): with open_storage(tmp_path, mode="w") as storage: experiment = storage.create_experiment() storage.create_ensemble(experiment, ensemble_size=1, iteration=0, name="prior") with open_storage(tmp_path, mode="r") as storage: ensemble = next(iter(storage.ensembles)) + df = pl.DataFrame({"x": [1]}) with pytest.raises(ModeError): - ensemble.save_transition_data("report.json", "data") + ensemble.save_parquet_as_blob_data(df) + + +def test_that_save_parquet_as_blob_data_writes_parquet_and_json_to_disk(tmp_path): + with open_storage(tmp_path, mode="w") as storage: + experiment = storage.create_experiment() + ensemble = storage.create_ensemble( + experiment, ensemble_size=1, iteration=0, name="prior" + ) + + df = pl.DataFrame( + { + "observation_key": ["OBS_1", "OBS_2"], + "status": ["Active", "Deactivated, outlier"], + "value": [1.5, 2.0], + } + ) + + ensemble.save_parquet_as_blob_data(df) + + blob_dir = ensemble._path / "blobs" + parquet_files = list(blob_dir.glob("observation_report_*.parquet")) + assert len(parquet_files) == 1 + parquet_path = parquet_files[0] + json_path = blob_dir / f"{parquet_path.name}.json" + assert parquet_path.exists() + assert json_path.exists() + + loaded_df = pl.read_parquet(parquet_path) + assert loaded_df.columns == ["observation_key", "status", "value"] + assert len(loaded_df) == 2 + assert loaded_df["observation_key"][0] == "OBS_1" + + metadata = json.loads(json_path.read_text(encoding="utf-8")) + assert metadata["blob_type"] == "observation_report" + assert metadata["uri"] == parquet_path.name + assert metadata["ensemble_id"] == str(ensemble.id) + assert metadata["file_size"] == parquet_path.stat().st_size