diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 6f24d80f..c46178ab 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -29,6 +29,7 @@ SAMPLES_URL, _check_profiles, _default_headers, + _drop_hash_columns, _get_args, get_ogc_data, get_stats_data, @@ -57,6 +58,7 @@ def get_daily( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the day. @@ -189,6 +191,9 @@ def get_daily( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -272,6 +277,7 @@ def get_continuous( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """ Continuous data provide instantaneous water conditions. @@ -399,6 +405,9 @@ def get_continuous( convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -492,6 +501,7 @@ def get_monitoring_locations( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Location information is basic information about the monitoring location including the name, identifier, agency responsible for data collection, and @@ -707,6 +717,9 @@ def get_monitoring_locations( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -770,6 +783,7 @@ def get_time_series_metadata( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data and continuous measurements are grouped into time series, which represent a collection of observations of a single parameter, @@ -930,6 +944,9 @@ def get_time_series_metadata( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -1027,6 +1044,7 @@ def get_combined_metadata( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get combined monitoring-location and time-series metadata. @@ -1127,6 +1145,9 @@ def get_combined_metadata( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -1215,6 +1236,7 @@ def get_latest_continuous( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """This endpoint provides the most recent observation for each time series of continuous data. Continuous data are collected via automated sensors @@ -1344,6 +1366,9 @@ def get_latest_continuous( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -1410,6 +1435,7 @@ def get_latest_daily( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the day. @@ -1541,6 +1567,9 @@ def get_latest_daily( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -1608,6 +1637,7 @@ def get_field_measurements( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Field measurements are physically measured values collected during a visit to the monitoring location. Field measurements consist of measurements @@ -1729,6 +1759,9 @@ def get_field_measurements( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -1792,6 +1825,7 @@ def get_field_measurements_metadata( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get field-measurement metadata: one row per (location, parameter) series. @@ -1847,6 +1881,9 @@ def get_field_measurements_metadata( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -1913,6 +1950,7 @@ def get_peaks( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get the annual peak streamflow / stage record for a monitoring location. @@ -1971,6 +2009,9 @@ def get_peaks( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- @@ -2148,6 +2189,7 @@ def get_samples( pointLocationWithinMiles: float | None = None, projectIdentifier: str | Iterable[str] | None = None, recordIdentifierUserSupplied: str | Iterable[str] | None = None, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Search Samples database for USGS water quality data. This is a wrapper function for the Samples database API. All potential @@ -2278,6 +2320,9 @@ def get_samples( recordIdentifierUserSupplied : string or iterable of strings, optional Internal AQS record identifier that returns 1 entry. Only available for the "results" service. + include_hash : boolean, optional + If False (default), drop the opaque per-activity / per-result UUID columns + (``Activity_ActivityIdentifier``, ``Result_MeasureIdentifier``). Returns ------- @@ -2327,7 +2372,7 @@ def get_samples( _check_profiles(service, profile) # Build argument dictionary, omitting None values - params = _get_args(locals(), exclude={"ssl_check", "profile"}) + params = _get_args(locals(), exclude={"ssl_check", "profile", "include_hash"}) params.update({"mimeType": "text/csv"}) @@ -2348,6 +2393,7 @@ def get_samples( df = pd.read_csv(StringIO(response.text), delimiter=",") df = _attach_datetime_columns(df) + df = _drop_hash_columns(df, include_hash) return df, BaseMetadata(response) @@ -2438,6 +2484,7 @@ def get_stats_por( site_type_name: str | Iterable[str] | None = None, parameter_code: str | Iterable[str] | None = None, expand_percentiles: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get day-of-year and month-of-year water data statistics from the USGS Water Data API. @@ -2516,6 +2563,9 @@ def get_stats_por( argument will return both the "values" column, containing the list of percentile threshold values, and a "value" column, containing the singular summary value for the other statistics. + include_hash : boolean, optional + If False (default), drop the hash columns (``computation_id``, + ``parent_time_series_id``); set True to keep them for joining to metadata. Examples -------- @@ -2540,10 +2590,13 @@ def get_stats_por( ... ) """ # Build argument dictionary, omitting None values - params = _get_args(locals(), exclude={"expand_percentiles"}) + params = _get_args(locals(), exclude={"expand_percentiles", "include_hash"}) return get_stats_data( - args=params, service="observationNormals", expand_percentiles=expand_percentiles + args=params, + service="observationNormals", + expand_percentiles=expand_percentiles, + include_hash=include_hash, ) @@ -2562,6 +2615,7 @@ def get_stats_date_range( site_type_name: str | Iterable[str] | None = None, parameter_code: str | Iterable[str] | None = None, expand_percentiles: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get monthly and annual water data statistics from the USGS Water Data API. This service (called the "observationIntervals" endpoint on api.waterdata.usgs.gov) @@ -2644,6 +2698,9 @@ def get_stats_date_range( argument will return both the "values" column, containing the list of percentile threshold values, and a "value" column, containing the singular summary value for the other statistics. + include_hash : boolean, optional + If False (default), drop the hash columns (``computation_id``, + ``parent_time_series_id``); set True to keep them for joining to metadata. Examples -------- @@ -2669,12 +2726,13 @@ def get_stats_date_range( ... ) """ # Build argument dictionary, omitting None values - params = _get_args(locals(), exclude={"expand_percentiles"}) + params = _get_args(locals(), exclude={"expand_percentiles", "include_hash"}) return get_stats_data( args=params, service="observationIntervals", expand_percentiles=expand_percentiles, + include_hash=include_hash, ) @@ -2710,6 +2768,7 @@ def get_channel( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """ Channel measurements taken as part of streamflow field measurements. @@ -2823,6 +2882,9 @@ def get_channel( convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector + include_hash : boolean, optional + If False (default), drop the opaque hash-valued ID columns. Set True to + keep the secondary hashes (e.g. ``time_series_id``) that join to metadata. Returns ------- diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index f5e1496b..eb4d0a27 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -74,3 +74,47 @@ "count", ], } + + +# --- CF / xarray vocabulary mappings --------------------------------------- +# Lookup tables used by :mod:`dataretrieval.waterdata.xarray` to translate +# USGS terms into CF-conventions metadata. Each is intentionally partial: +# anything not listed falls back to a sensible default (raw unit string kept +# verbatim; no standard_name emitted) rather than guessing a wrong CF term. +# They are plain data, so they live here rather than in the (xarray-optional) +# converter module and can be extended without importing xarray. + +# USGS unit strings -> UDUNITS / CF-canonical form. +CF_UNIT_MAP = { + "ft^3/s": "ft3 s-1", + "ft3/s": "ft3 s-1", + "ft": "ft", + "in": "in", + "degC": "degC", + "deg C": "degC", + "uS/cm": "uS/cm", + "mg/l": "mg L-1", + "mg/L": "mg L-1", + "tons/day": "short_ton day-1", + "%": "percent", +} + +# USGS statistic_id -> the operator in a CF ``cell_methods`` string. +CF_CELL_METHODS = { + "00001": "maximum", + "00002": "minimum", + "00003": "mean", + "00006": "sum", + "00008": "median", + "00011": "point", # instantaneous +} + +# USGS 5-digit parameter code -> CF standard_name. Deliberately conservative; +# codes without a confident match are left without a standard_name. +CF_STANDARD_NAMES = { + "00060": "water_volume_transport_in_river_channel", + "00010": "water_temperature", + "00065": "water_surface_height_above_reference_datum", + "63160": "water_surface_height_above_reference_datum", + "00045": "lwe_thickness_of_precipitation_amount", +} diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index dd908143..ad9a0005 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -175,6 +175,140 @@ def _switch_properties_id(properties: list[str] | None, id_name: str, service: s # parameters and require POST with CQL2 JSON instead. _CQL2_REQUIRED_SERVICES = frozenset({"monitoring-locations"}) +# Column names whose values are server-generated hashes (UUIDs or hex +# digests): opaque, non-human-meaningful, and a payload-bloat on large +# queries. Dropped by default; opt in with ``include_hash=True``. +# Includes two kinds with different stability: +# - The per-record version UUIDs that are aliased to a service's +# ``output_id`` (``daily_id``, ``continuous_id``, …). These are +# regenerated on every record refresh, so they are NOT stable over +# time and joining/diffing on them produces spurious churn. They get +# mapped to/from ``"id"`` on the wire; both names are listed so the +# filter works on either side of ``_switch_properties_id``. +# - Secondary hash columns embedded in record payloads +# (``time_series_id``, ``field_visit_id``, ``parent_time_series_id``, +# ``field_measurements_series_id``). These ARE stable and are the +# join keys back to the metadata endpoints (e.g. ``time_series_id`` +# links a values row to ``get_time_series_metadata``); they're +# dropped only because they're opaque and bloat the payload, so a +# caller who needs to join sets ``include_hash=True`` or names +# the column in ``properties``. +# ``monitoring_location_id`` (AGENCY-ID format, e.g. ``USGS-01646500``) +# and other code columns (``parameter_code``, ``statistic_id``, …) are +# intentionally absent — they're stable, human-meaningful identifiers. +_HASH_ID_COLUMNS = frozenset( + { + "daily_id", + "continuous_id", + "latest_continuous_id", + "latest_daily_id", + "field_measurement_id", + "field_series_id", + "peak_id", + "channel_measurements_id", + "combined_meta_id", + "time_series_id", + "parent_time_series_id", + "field_visit_id", + "field_measurements_series_id", + # ``get_stats_*`` (statistics service) output — per-computation + # UUID; ``parent_time_series_id`` is already listed above. + "computation_id", + # ``get_samples`` (Samples database CSV) — per-activity and + # per-result UUIDs. The Samples service uses CamelCase column + # names rather than snake_case, but the drop logic only needs + # exact name matches so they share this set. + "Activity_ActivityIdentifier", + "Result_MeasureIdentifier", + } +) + +# Cache of per-service queryables column lists, populated on first call +# from each service when computing the default ``properties=`` for +# ``include_hash=False``. Keyed by service name; value is the full +# list of property names the server exposes for that collection. +_queryables_cache: dict[str, list[str]] = {} +# Cache of the derived non-hash property whitelist, keyed by +# ``(service, output_id)``. Both inputs determine the result, and +# both are stable per call site — re-deriving on every OGC request +# would do ~30–100 frozenset lookups per call for no reason. +_default_props_cache: dict[tuple[str, str], list[str]] = {} + + +def _service_queryables(service: str) -> list[str]: + """Return the cached queryables property list for ``service``. + + One HTTP GET per service per process; the list is reused for every + subsequent call. Raises ``requests.HTTPError`` on a non-200 — the + caller's ``include_hash=False`` request can't be satisfied + without it, so failing loudly is preferable to silently dropping + the server-side trim. + """ + cached = _queryables_cache.get(service) + if cached is not None: + return cached + body = _check_ogc_requests(endpoint=service, req_type="queryables") + props = list(body.get("properties", {}).keys()) + _queryables_cache[service] = props + return props + + +def _default_non_hash_properties(service: str, output_id: str) -> list[str]: + """Build the ``properties=`` whitelist sent to the server when the + caller didn't supply one and ``include_hash=False``. + + The whitelist is the service's queryables minus :data:`_HASH_ID_COLUMNS`, + minus ``"geometry"`` (the OGC server returns geometry via the feature + envelope, not as a property — some collections reject it as a + property name), and minus the wire-format ``"id"`` column when the + service's ``output_id`` is itself a hash column (e.g. ``daily_id``). + For ``monitoring-locations``, ``id`` becomes the AGENCY-ID + ``monitoring_location_id``, so it's kept. + """ + key = (service, output_id) + cached = _default_props_cache.get(key) + if cached is not None: + return cached + drop_wire_id = output_id in _HASH_ID_COLUMNS + props = [ + p + for p in _service_queryables(service) + if p not in _HASH_ID_COLUMNS + and p != "geometry" + and not (drop_wire_id and p == "id") + ] + _default_props_cache[key] = props + return props + + +def _properties_unspecified(properties) -> bool: + """True when the caller didn't pin a ``properties`` list. + + A ``None``, empty list, or list-of-only-NaN counts as unspecified. + Centralizes the predicate so the (subtly different) ``not properties`` + vs ``properties is None`` variants across call sites stay aligned. + """ + return not properties or all(pd.isna(properties)) + + +def _drop_hash_columns( + df: pd.DataFrame, + include_hash: bool, + keep: set[str] | None = None, +) -> pd.DataFrame: + """Drop hash-valued ID columns from ``df`` when not opting in. + + When ``include_hash`` is True, returns ``df`` unchanged. Otherwise + drops every column whose name is in :data:`_HASH_ID_COLUMNS`, except + those the caller listed in ``keep`` (e.g. names appearing in an + explicit user ``properties=`` request — explicit beats default). + A no-op when no hash columns are present. + """ + if include_hash: + return df + drop = (set(df.columns) & _HASH_ID_COLUMNS) - (keep or set()) + return df.drop(columns=drop) if drop else df + def _parse_datetime(value: str) -> datetime | None: """Parse a single datetime string against the supported formats. @@ -1069,7 +1203,10 @@ def _deal_with_empty( def _arrange_cols( - df: pd.DataFrame, properties: list[str] | None, output_id: str + df: pd.DataFrame, + properties: list[str] | None, + output_id: str, + include_hash: bool = False, ) -> pd.DataFrame: """ Rearranges and renames columns in a DataFrame based on provided @@ -1084,6 +1221,13 @@ def _arrange_cols( only NaN, the function renames 'id' to output_id. output_id : str The name to which the 'id' column should be renamed if applicable. + include_hash : bool, optional + If False (default), hash-valued ID columns (see + :data:`_HASH_ID_COLUMNS`) are dropped from the result unless the + caller explicitly named them in ``properties``. If True, the + legacy behavior is preserved: hash columns are kept and the + per-record output_id columns are moved to the end of the + DataFrame when ``properties`` is unspecified. Returns ------- @@ -1095,7 +1239,9 @@ def _arrange_cols( # Rename id column to output_id df = df.rename(columns={"id": output_id}) - if properties and not all(pd.isna(properties)): + user_specified = not _properties_unspecified(properties) + + if user_specified: # Don't alias the caller's list — we mutate below. local_properties = list(properties) if "geometry" in df.columns and "geometry" not in local_properties: @@ -1106,22 +1252,32 @@ def _arrange_cols( local_properties[local_properties.index("id")] = output_id df = df.loc[:, [col for col in local_properties if col in df.columns]] - # Move meaningless-to-user, extra id columns to the end - # of the dataframe, if they exist - extra_id_col = set(df.columns).intersection( - { - "latest_continuous_id", - "latest_daily_id", - "daily_id", - "continuous_id", - "field_measurement_id", - } - ) + # Client-side safety net for the server-side trim done in + # ``get_ogc_data``: no-op on the happy path (server already omitted + # hash columns), drops them here when the queryables fetch failed + # and we fell back to a full payload. An explicit caller + # ``properties`` list — including ``"id"``, which resolved to + # ``output_id`` above — wins over the default. + keep: set[str] = set() + if user_specified: + keep = set(properties) + if "id" in keep: + keep.add(output_id) + df = _drop_hash_columns(df, include_hash, keep=keep) + + # Legacy ordering: when ``include_hash=True`` and ``properties`` + # is unspecified, move the per-record version IDs to the end so they + # don't crowd the front. With ``include_hash=False`` those + # columns are gone above, so this branch is a no-op. + extra_id_col = set(df.columns) & { + "latest_continuous_id", + "latest_daily_id", + "daily_id", + "continuous_id", + "field_measurement_id", + } - # If the arbitrary id column is returned (either due to properties - # being none or NaN), then move it to the end of the dataframe, but - # if part of properties, keep in requested order - if extra_id_col and (properties is None or all(pd.isna(properties))): + if extra_id_col and _properties_unspecified(properties): id_col_order = [col for col in df.columns if col not in extra_id_col] + list( extra_id_col ) @@ -1238,10 +1394,30 @@ def get_ogc_data( # Capture `properties` before the id-switch so post-processing sees # the user-facing names, not the wire-format ones. properties = args.get("properties") - args["properties"] = _switch_properties_id( - properties, id_name=output_id, service=service - ) convert_type = args.pop("convert_type", False) + include_hash = args.pop("include_hash", False) + + # When the caller didn't pin ``properties`` and isn't opting into + # hash IDs, try a server-side whitelist of the non-hash columns so + # the server skips serializing UUID/hex fields. On any queryables + # failure, fall through to the full payload — ``_arrange_cols`` + # post-processes the drop as a safety net. + use_server_trim = not include_hash and _properties_unspecified(properties) + if use_server_trim: + try: + args["properties"] = _default_non_hash_properties(service, output_id) + except (requests.HTTPError, requests.RequestException, ValueError) as exc: + logger.warning( + "Could not fetch queryables for %s (%s); " + "falling back to client-side hash-ID drop.", + service, + exc, + ) + use_server_trim = False + if not use_server_trim: + args["properties"] = _switch_properties_id( + properties, id_name=output_id, service=service + ) args = {k: v for k, v in args.items() if v is not None} with _progress.progress_context(service=service): @@ -1249,7 +1425,9 @@ def get_ogc_data( return_list = _deal_with_empty(return_list, properties, service) if convert_type: return_list = _type_cols(return_list) - return_list = _arrange_cols(return_list, properties, output_id) + return_list = _arrange_cols( + return_list, properties, output_id, include_hash=include_hash + ) return_list = _sort_rows(return_list) return return_list, BaseMetadata(response) @@ -1433,6 +1611,7 @@ def get_stats_data( service: str, expand_percentiles: bool, client: requests.Session | None = None, + include_hash: bool = False, ) -> tuple[pd.DataFrame, BaseMetadata]: """ Retrieves statistical data from a specified endpoint and returns it @@ -1454,6 +1633,9 @@ def get_stats_data( each percentile gets its own row in the returned dataframe. If True and user requests a computation_type other than percentiles, a percentile column is still returned. + include_hash : bool, optional + If False (default), drop the hash columns (``computation_id``, + ``parent_time_series_id``); set True to keep them for joining to metadata. Returns ------- @@ -1500,6 +1682,11 @@ def follow_up(cursor: str, sess: requests.Session) -> requests.Response: if expand_percentiles: df = _expand_percentiles(df) + + # Drop hash IDs after ``_expand_percentiles`` — it merges on + # ``computation_id`` while exploding the percentile lists. + df = _drop_hash_columns(df, include_hash) + return df, BaseMetadata(response) diff --git a/dataretrieval/waterdata/xarray.py b/dataretrieval/waterdata/xarray.py new file mode 100644 index 00000000..87964392 --- /dev/null +++ b/dataretrieval/waterdata/xarray.py @@ -0,0 +1,452 @@ +"""xarray-returning wrappers for the Water Data time-series getters. + +Each public function here mirrors the same-named function in +:mod:`dataretrieval.waterdata`, but returns a CF-conventions +:class:`xarray.Dataset` instead of a :class:`pandas.DataFrame`. The series +descriptors that the plain getters leave behind (parameter name, units, +statistic/computation) are looked up automatically from the metadata +endpoints and written onto the dataset: + +* each observed parameter becomes a data variable with ``long_name``, + ``units`` (UDUNITS where a mapping is known), ``cell_methods`` (derived + from the statistic/computation), and ``standard_name`` where a confident + USGS-parameter-code -> CF mapping exists; +* the monitoring location is the CF "discrete sampling geometry" instance + dimension (``cf_role = "timeseries_id"``), with ``longitude`` / ``latitude`` + coordinates when point geometry is available; +* dataset-level attributes carry ``Conventions``, provenance, and the + request URL. + +The wrappers call the underlying getter with its default, hash-free output +-- so the large per-record UUID column is never fetched or materialized -- +and derive the CF attributes directly from the surviving columns +(``unit_of_measure`` -> ``units``, ``statistic_id`` -> ``cell_methods``, +``parameter_code`` -> ``standard_name``). Only the human-readable parameter +name comes from a small, cached metadata lookup keyed by ``parameter_code``. + +This module requires the optional ``xarray`` dependency:: + + pip install dataretrieval[xarray] +""" + +from __future__ import annotations + +import datetime as _dt +import re as _re +import warnings as _warnings +from functools import wraps as _wraps + +import pandas as _pd + +try: + import xarray as _xr +except ModuleNotFoundError as _exc: # pragma: no cover - exercised only sans xarray + raise ModuleNotFoundError( + "dataretrieval.waterdata.xarray requires the optional 'xarray' " + "dependency. Install it with: pip install dataretrieval[xarray]" + ) from _exc + +from . import api as _api +from .nearest import get_nearest_continuous as _get_nearest_continuous +from .types import CF_CELL_METHODS, CF_STANDARD_NAMES, CF_UNIT_MAP + +__all__ = [ + "get_continuous", + "get_daily", + "get_field_measurements", + "get_latest_continuous", + "get_latest_daily", + "get_nearest_continuous", + "get_peaks", + "get_stats_date_range", + "get_stats_por", +] + + +# The CF vocabulary lookups (USGS units -> UDUNITS, statistic_id -> +# cell_methods operator, parameter_code -> standard_name) are plain data and +# live in ``types`` -- imported as CF_UNIT_MAP / CF_CELL_METHODS / +# CF_STANDARD_NAMES at the top of this module. + +# Columns kept off the value pivot but surfaced as ancillary (flag) variables. +_ANCILLARY = ("qualifier", "approval_status") + + +# --- metadata lookups (cached per process) --------------------------------- + +_TS_META_CACHE: dict[str, dict[str, dict]] = {} +_FIELD_META_CACHE: dict[str, dict[str, dict]] = {} + +# Only the human-readable name is sourced from the metadata endpoint; units, +# statistic, and parameter code all come from the values frame itself. +_NAME_DESCRIPTORS = ("parameter_name", "parameter_description") + + +def _lookup(site_ids, cache, getter): + """Fetch and cache ``{parameter_code: {name descriptors}}`` for sites. + + Keyed by ``parameter_code`` (stable and 1:1 with the parameter name), so + the lookup needs no hash id. One metadata call per not-yet-cached batch + of sites; the cache is keyed by site so repeated getter calls reuse it. + """ + sites = sorted({str(s) for s in site_ids if _pd.notna(s)}) + todo = [s for s in sites if s not in cache] + if todo: + meta, _ = getter(monitoring_location_id=todo) + for s in todo: + cache[s] = {} + if not meta.empty and "parameter_code" in meta.columns: + cols = [c for c in _NAME_DESCRIPTORS if c in meta.columns] + for _, row in meta.iterrows(): + site = row.get("monitoring_location_id") + pcode = row.get("parameter_code") + if site in cache and _pd.notna(pcode): + cache[site][str(pcode)] = {c: row.get(c) for c in cols} + out: dict[str, dict] = {} + for s in sites: + out.update(cache.get(s, {})) + return out + + +def _timeseries_metadata(site_ids): + return _lookup(site_ids, _TS_META_CACHE, _api.get_time_series_metadata) + + +def _field_metadata(site_ids): + return _lookup(site_ids, _FIELD_META_CACHE, _api.get_field_measurements_metadata) + + +# --- helpers --------------------------------------------------------------- + + +def _slug(name) -> str: + """A lower_snake_case, identifier-safe variable name.""" + s = _re.sub(r"[^0-9a-zA-Z]+", "_", str(name).strip().lower()).strip("_") + return s or "value" + + +def _first(series): + """First non-null value of a column, or None.""" + nonnull = series.dropna() + return nonnull.iloc[0] if len(nonnull) else None + + +def _var_attrs(desc, *, unit, pcode, stat, default_cell_method, ancillary, name): + """Build the CF attribute dict for one data variable. + + ``unit``, ``pcode`` and ``stat`` are read from the values frame; ``desc`` + supplies only the human-readable name from the metadata lookup. + """ + attrs: dict[str, str] = {} + long_name = desc.get("parameter_description") or desc.get("parameter_name") + if long_name and _pd.notna(long_name): + attrs["long_name"] = str(long_name) + + if unit is not None and _pd.notna(unit): + attrs["units"] = CF_UNIT_MAP.get(str(unit), str(unit)) + + op = ( + CF_CELL_METHODS.get(str(stat)) if stat is not None and _pd.notna(stat) else None + ) + op = op or default_cell_method + if op: + attrs["cell_methods"] = f"time: {op}" + + if pcode is not None and _pd.notna(pcode): + sn = CF_STANDARD_NAMES.get(str(pcode)) + if sn: + attrs["standard_name"] = sn + attrs["usgs_parameter_code"] = str(pcode) + + if stat is not None and _pd.notna(stat): + attrs["usgs_statistic_id"] = str(stat) + + if ancillary: + attrs["ancillary_variables"] = " ".join(f"{name}_{c}" for c in ancillary) + return attrs + + +def _dataset_attrs(service, base_meta): + attrs = { + "Conventions": "CF-1.11", + "institution": "U.S. Geological Survey", + "source": f"USGS Water Data API ({service})", + "featureType": "timeSeries", + "history": ( + f"{_dt.datetime.now(_dt.timezone.utc).isoformat(timespec='seconds')} " + "created by dataretrieval.waterdata.xarray" + ), + } + url = getattr(base_meta, "url", None) + if url: + attrs["references"] = str(url) + return attrs + + +def _empty_dataset(service, base_meta): + ds = _xr.Dataset() + ds.attrs = _dataset_attrs(service, base_meta) + return ds + + +def _point_coords(df, inst): + """lon/lat per instance from a GeoDataFrame point geometry, or None.""" + if "geometry" not in df.columns: + return None + geo = df.dropna(subset=["geometry"]).drop_duplicates(inst) + if geo.empty: + return None + try: + lon = {row[inst]: row["geometry"].x for _, row in geo.iterrows()} + lat = {row[inst]: row["geometry"].y for _, row in geo.iterrows()} + except (AttributeError, TypeError): + return None # non-point geometry; skip rather than guess + return lon, lat + + +_INSTANCE = "monitoring_location_id" + + +def _build_timeseries( + df, + base_meta, + *, + service, + series_meta, + group_cols=("parameter_code", "statistic_id"), + default_cell_method=None, +): + """Hash-free values frame -> CF timeSeries Dataset (one var per parameter). + + The frame carries no hash columns (the wrappers fetch the default lean + output); every CF attribute is derived from ``parameter_code`` / + ``statistic_id`` / ``unit_of_measure`` already present, plus the + human-readable name from ``series_meta`` keyed by ``parameter_code``. + ``time`` and ``monitoring_location_id`` become the coordinates. + """ + if df is None or len(df) == 0: + return _empty_dataset(service, base_meta) + + group_cols = [c for c in group_cols if c in df.columns] + ancillary = [c for c in _ANCILLARY if c in df.columns] + has_unit = "unit_of_measure" in df.columns + # Slim to just the columns we convert, so the heavy frame (and any columns + # we ignore) is not copied wholesale. + cols = [_INSTANCE, "time", "value", *group_cols, *ancillary] + if has_unit: + cols.append("unit_of_measure") + work = df.loc[:, list(dict.fromkeys(cols))].copy() + # Normalize to naive-UTC so xarray can store datetime64 (it has no tz dtype). + work["time"] = _pd.to_datetime( + work["time"], errors="coerce", utc=True + ).dt.tz_localize(None) + work["value"] = _pd.to_numeric(work["value"], errors="coerce") + + # A NaT time can't index the array and would be silently swallowed by + # ``to_xarray`` (or crash an all-NaT group). Drop such rows explicitly and + # say so, rather than losing observations without a trace. + n_before = len(work) + work = work[work["time"].notna()] + dropped = n_before - len(work) + if dropped: + _warnings.warn( + f"dropped {dropped} row(s) with an unparseable or missing time.", + stacklevel=3, + ) + if work.empty: + return _empty_dataset(service, base_meta) + + datasets, used = [], set() + for _, group in work.groupby(group_cols, dropna=False): + pcode = _first(group["parameter_code"]) if "parameter_code" in group else None + stat = _first(group["statistic_id"]) if "statistic_id" in group else None + group_units = group["unit_of_measure"].dropna().unique() if has_unit else () + unit = group_units[0] if len(group_units) else None + desc = series_meta.get(str(pcode), {}) if pcode is not None else {} + + name = _slug(desc.get("parameter_name") or pcode) + if name in used: # same parameter, different statistic -> distinct var + op = CF_CELL_METHODS.get(str(stat)) or (str(stat) if stat else None) + name = f"{name}_{_slug(op)}" if op else name + while name in used: + name += "_x" + used.add(name) + + if len(group_units) > 1: + # One variable can carry only one ``units`` attr; surface the + # mix instead of silently labeling every value with the first. + _warnings.warn( + f"'{name}' spans multiple units {list(group_units)}; labeling " + f"with '{unit}'. Filter by site/parameter to avoid mixing " + "units in one variable.", + stacklevel=3, + ) + + sub = group.set_index([_INSTANCE, "time"])[["value", *ancillary]] + if not sub.index.is_unique: + _warnings.warn( + f"'{name}' has multiple values per (site, time) -- two series " + "share this (site, parameter, statistic); keeping the first. " + "Filter the query to separate them.", + stacklevel=3, + ) + sub = sub[~sub.index.duplicated(keep="first")] + ds_g = sub.to_xarray().rename( + {"value": name, **{c: f"{name}_{c}" for c in ancillary}} + ) + ds_g[name].attrs = _var_attrs( + desc, + unit=unit, + pcode=pcode, + stat=stat, + default_cell_method=default_cell_method, + ancillary=ancillary, + name=name, + ) + datasets.append(ds_g) + + # Outer join on time: parameters sampled on different clocks share a + # union time axis, NaN where a given parameter has no observation. + ds = _xr.merge(datasets, combine_attrs="drop_conflicts", join="outer") + ds.attrs = _dataset_attrs(service, base_meta) + ds["time"].attrs.setdefault("standard_name", "time") + if _INSTANCE in ds.coords: + ds[_INSTANCE].attrs.setdefault("cf_role", "timeseries_id") + + coords = _point_coords(df, _INSTANCE) + if coords is not None: + lon, lat = coords + order = list(ds[_INSTANCE].values) + ds = ds.assign_coords( + longitude=(_INSTANCE, [lon.get(k) for k in order]), + latitude=(_INSTANCE, [lat.get(k) for k in order]), + ) + ds["longitude"].attrs = {"standard_name": "longitude", "units": "degrees_east"} + ds["latitude"].attrs = {"standard_name": "latitude", "units": "degrees_north"} + + return ds + + +def _build_stats(df, base_meta, service): + """Best-effort, preliminary conversion of the statistics tables. + + The statistics service returns percentile tables keyed by time-of-year + rather than a (time, value) series, so this produces a flat Dataset (one + variable per column over an ``index`` dimension) with dataset-level + provenance. A richer percentile/day-of-year layout is future work. + """ + if df is None or len(df) == 0: + return _empty_dataset(service, base_meta) + flat = df.drop(columns=[c for c in ("geometry",) if c in df.columns]) + ds = _xr.Dataset.from_dataframe(flat.reset_index(drop=True)) + ds.attrs = _dataset_attrs(service, base_meta) + ds.attrs["comment"] = "preliminary flat conversion; see module docs" + return ds + + +# --- public wrappers ------------------------------------------------------- + + +def _sites(df): + """Unique monitoring-location ids present in a values frame.""" + if "monitoring_location_id" in df: + return df["monitoring_location_id"].unique() + return [] + + +def _fetch(func, args, kwargs): + """Call the underlying getter with hash IDs forced off. + + The xarray path never surfaces hash columns (neither the per-record + UUID nor the per-series join key), so ``include_hash`` is dropped here: + passing it has no effect and we avoid fetching a column we'd discard. + """ + kwargs.pop("include_hash", None) + return func(*args, **kwargs) + + +def _xr_doc(func, *, cf_metadata=True): + """Prepend an xarray note to the wrapped getter's docstring. + + ``cf_metadata=False`` describes the preliminary stats path, which emits a + flat Dataset without per-variable CF attributes. + """ + returns = ( + "a CF-conventions ``xarray.Dataset`` with series metadata populated" + if cf_metadata + else ( + "a preliminary, flat ``xarray.Dataset`` (dataset-level provenance " + "only; per-variable CF metadata is not yet populated)" + ) + ) + note = ( + " xarray wrapper: same arguments as " + f"``dataretrieval.waterdata.{func.__name__}``, but returns\n" + f" {returns}. Hash-valued ID columns are always omitted here;\n" + " the ``include_hash`` flag does not apply.\n\n" + ) + return note + (func.__doc__ or "") + + +def _timeseries_wrapper(func, *, service, default_cell_method=None): + @_wraps(func) + def wrapper(*args, **kwargs): + df, base_meta = _fetch(func, args, kwargs) + return _build_timeseries( + df, + base_meta, + service=service, + series_meta=_timeseries_metadata(_sites(df)), + default_cell_method=default_cell_method, + ) + + wrapper.__doc__ = _xr_doc(func) + return wrapper + + +def _field_wrapper(func, *, service): + @_wraps(func) + def wrapper(*args, **kwargs): + df, base_meta = _fetch(func, args, kwargs) + return _build_timeseries( + df, + base_meta, + service=service, + series_meta=_field_metadata(_sites(df)), + group_cols=("parameter_code",), + default_cell_method="point", + ) + + wrapper.__doc__ = _xr_doc(func) + return wrapper + + +def _stats_wrapper(func, *, service): + @_wraps(func) + def wrapper(*args, **kwargs): + df, base_meta = _fetch(func, args, kwargs) + return _build_stats(df, base_meta, service) + + wrapper.__doc__ = _xr_doc(func, cf_metadata=False) + return wrapper + + +get_daily = _timeseries_wrapper(_api.get_daily, service="daily") +get_continuous = _timeseries_wrapper(_api.get_continuous, service="continuous") +get_latest_continuous = _timeseries_wrapper( + _api.get_latest_continuous, service="latest-continuous", default_cell_method="point" +) +get_latest_daily = _timeseries_wrapper( + _api.get_latest_daily, service="latest-daily", default_cell_method="point" +) +get_nearest_continuous = _timeseries_wrapper( + _get_nearest_continuous, service="continuous", default_cell_method="point" +) +get_peaks = _timeseries_wrapper( + _api.get_peaks, service="peaks", default_cell_method="maximum" +) +get_field_measurements = _field_wrapper( + _api.get_field_measurements, service="field-measurements" +) +get_stats_por = _stats_wrapper(_api.get_stats_por, service="statistics") +get_stats_date_range = _stats_wrapper(_api.get_stats_date_range, service="statistics") diff --git a/pyproject.toml b/pyproject.toml index 5c9fbda0..651ddc73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,9 @@ doc = [ nldi = [ 'geopandas>=0.10' ] +xarray = [ + 'xarray>=2023.1.0' +] [project.urls] homepage = "https://github.com/DOI-USGS/dataretrieval-python" diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 24eb6eff..44385f3b 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -82,8 +82,14 @@ def test_mock_get_samples(requests_mock): monitoringLocationIdentifier="USGS-05406500", ) assert type(df) is DataFrame - # 181 source columns + 6 derived DateTime columns - assert df.shape == (67, 187) + # 181 source columns + 6 derived DateTime columns − 2 hash IDs + # (Activity_ActivityIdentifier, Result_MeasureIdentifier) dropped by default. + assert df.shape == (67, 185) + assert "Activity_ActivityIdentifier" not in df.columns + assert "Result_MeasureIdentifier" not in df.columns + # Stable identifiers are preserved. + assert "Location_Identifier" in df.columns + assert "Org_Identifier" in df.columns assert md.url == request_url assert isinstance(md.query_time, datetime.timedelta) assert md.header == {"mock_header": "value"} @@ -91,6 +97,29 @@ def test_mock_get_samples(requests_mock): assert df["Activity_StartDateTime"].notna().any() +def test_mock_get_samples_include_hash(requests_mock): + """``include_hash=True`` restores the legacy column set.""" + request_url = ( + "https://api.waterdata.usgs.gov/samples-data/results/fullphyschem?" + "activityMediaName=Water&activityStartDateLower=2020-01-01" + "&activityStartDateUpper=2024-12-31&monitoringLocationIdentifier=USGS-05406500&mimeType=text%2Fcsv" + ) + response_file_path = "tests/data/samples_results.txt" + mock_request(requests_mock, request_url, response_file_path) + df, _md = get_samples( + service="results", + profile="fullphyschem", + activityMediaName="Water", + activityStartDateLower="2020-01-01", + activityStartDateUpper="2024-12-31", + monitoringLocationIdentifier="USGS-05406500", + include_hash=True, + ) + assert df.shape == (67, 187) + assert "Activity_ActivityIdentifier" in df.columns + assert "Result_MeasureIdentifier" in df.columns + + def test_mock_get_samples_summary(requests_mock): """Tests USGS Samples summary query""" request_url = ( @@ -216,10 +245,11 @@ def test_samples_results(): activityStartDateLower="2024-10-01", activityStartDateUpper="2025-04-24", ) - assert all( - col in df.columns - for col in ["Location_Identifier", "Activity_ActivityIdentifier"] - ) + # Stable identifiers are kept; hash IDs (Activity_ActivityIdentifier, + # Result_MeasureIdentifier) are dropped by default. + assert "Location_Identifier" in df.columns + assert "Activity_ActivityIdentifier" not in df.columns + assert "Result_MeasureIdentifier" not in df.columns assert len(df) > 0 @@ -231,7 +261,10 @@ def test_samples_activity(): monitoringLocationIdentifier="USGS-06719505", ) assert len(df) > 0 - assert len(df.columns) == 97 + # 97 → 96 cols after dropping Activity_ActivityIdentifier + # (Result_MeasureIdentifier is not in the ``activities`` profile). + assert len(df.columns) == 96 + assert "Activity_ActivityIdentifier" not in df.columns assert "Location_HUCTwelveDigitCode" in df.columns @@ -277,10 +310,14 @@ def test_get_daily(): parameter_code="00060", time="2025-01-01/..", ) - assert "daily_id" in df.columns + # Default: hash-valued ID columns (daily_id, time_series_id) are + # dropped. Stable identifiers (monitoring_location_id, + # parameter_code, statistic_id, time) are preserved. + assert "daily_id" not in df.columns + assert "time_series_id" not in df.columns + assert "monitoring_location_id" in df.columns assert "geometry" in df.columns - assert df.columns[-1] == "daily_id" - assert df.shape[1] == 12 + assert df.shape[1] == 10 assert df.parameter_code.unique().tolist() == ["00060"] assert df.monitoring_location_id.unique().tolist() == ["USGS-05427718"] assert df["time"].apply(lambda x: isinstance(x, datetime.date)).all() @@ -290,6 +327,22 @@ def test_get_daily(): assert df["value"].dtype == "float64" +def test_get_daily_include_hash(): + """``include_hash=True`` restores the legacy behavior: the + per-record UUID (``daily_id``) and secondary hashes + (``time_series_id``) are present.""" + df, _ = get_daily( + monitoring_location_id="USGS-05427718", + parameter_code="00060", + time="2025-01-01/..", + include_hash=True, + ) + assert "daily_id" in df.columns + assert "time_series_id" in df.columns + assert df.columns[-1] == "daily_id" + assert df.shape[1] == 12 + + def test_get_daily_properties(): df, _ = get_daily( monitoring_location_id="USGS-05427718", @@ -335,7 +388,8 @@ def test_get_daily_no_geometry(): skip_geometry=True, ) assert "geometry" not in df.columns - assert df.shape[1] == 11 + # 10 default cols minus geometry, with hash IDs dropped by default. + assert df.shape[1] == 9 assert isinstance(df, DataFrame) @@ -351,7 +405,11 @@ def test_get_continuous(): df["time"].dtype.name.startswith("datetime64[") and "UTC" in df["time"].dtype.name ) - assert "continuous_id" in df.columns + # Default: continuous_id (UUID) and time_series_id (hex hash) are + # dropped. Set ``include_hash=True`` to keep them. + assert "continuous_id" not in df.columns + assert "time_series_id" not in df.columns + assert "monitoring_location_id" in df.columns def test_get_monitoring_locations(): @@ -376,7 +434,10 @@ def test_get_latest_continuous(): monitoring_location_id=["USGS-05427718", "USGS-05427719"], parameter_code=["00060", "00065"], ) - assert df.columns[-1] == "latest_continuous_id" + # Default: latest_continuous_id (UUID) and time_series_id are dropped. + assert "latest_continuous_id" not in df.columns + assert "time_series_id" not in df.columns + assert "monitoring_location_id" in df.columns assert df.shape[0] <= 4 assert df.statistic_id.unique().tolist() == ["00011"] assert hasattr(md, "url") @@ -391,8 +452,11 @@ def test_get_latest_daily(): monitoring_location_id=["USGS-05427718", "USGS-05427719"], parameter_code=["00060", "00065"], ) - assert "latest_daily_id" in df.columns - assert df.shape[1] == 12 + # Default: latest_daily_id (UUID) and time_series_id are dropped. + assert "latest_daily_id" not in df.columns + assert "time_series_id" not in df.columns + assert "monitoring_location_id" in df.columns + assert df.shape[1] == 10 assert hasattr(md, "url") assert hasattr(md, "query_time") @@ -420,7 +484,12 @@ def test_get_field_measurements(): time="2025-01-01/2025-10-01", skip_geometry=True, ) - assert "field_measurement_id" in df.columns + # Default: field_measurement_id (UUID), field_measurements_series_id + # (UUID), and field_visit_id (UUID) are dropped. + assert "field_measurement_id" not in df.columns + assert "field_measurements_series_id" not in df.columns + assert "field_visit_id" not in df.columns + assert "monitoring_location_id" in df.columns assert "geometry" not in df.columns assert df.unit_of_measure.unique().tolist() == ["ft^3/s"] assert hasattr(md, "url") @@ -478,7 +547,9 @@ def test_get_field_measurements_metadata(): df, md = get_field_measurements_metadata( monitoring_location_id="USGS-02238500", skip_geometry=True ) - assert "field_series_id" in df.columns + # Default: field_series_id (UUID) is dropped. + assert "field_series_id" not in df.columns + assert "monitoring_location_id" in df.columns assert "begin" in df.columns assert "end" in df.columns assert (df["monitoring_location_id"] == "USGS-02238500").all() @@ -506,7 +577,10 @@ def test_get_field_measurements_metadata_multi_site(): def test_get_peaks(): df, md = get_peaks(monitoring_location_id="USGS-02238500", skip_geometry=True) - assert "peak_id" in df.columns + # Default: peak_id (UUID) and time_series_id are dropped. + assert "peak_id" not in df.columns + assert "time_series_id" not in df.columns + assert "monitoring_location_id" in df.columns assert "value" in df.columns assert "water_year" in df.columns assert (df["monitoring_location_id"] == "USGS-02238500").all() @@ -577,13 +651,31 @@ def test_get_stats_por_expanded_false(): computation_type=["minimum", "percentile"], ) assert df.shape[0] == 4 - assert df.shape[1] == 20 # if geopandas installed, 21 columns if not + # Default: hash IDs (computation_id, parent_time_series_id) dropped → 18 cols. + assert df.shape[1] == 18 + assert "computation_id" not in df.columns + assert "parent_time_series_id" not in df.columns assert "percentile" not in df.columns assert "percentiles" in df.columns assert type(df["percentiles"][2]) is list assert df.loc[~df["percentiles"].isna(), "value"].isnull().all() +def test_get_stats_por_include_hash(): + """``include_hash=True`` preserves the per-computation UUID + and the upstream time-series hex hash that ``get_stats_*`` used + to return unconditionally.""" + df, _ = get_stats_por( + monitoring_location_id="USGS-12451000", + parameter_code="00060", + start_date="01-01", + end_date="01-01", + include_hash=True, + ) + assert "computation_id" in df.columns + assert "parent_time_series_id" in df.columns + + def test_get_stats_date_range(): df, _ = get_stats_date_range( monitoring_location_id="USGS-12451000", @@ -594,7 +686,10 @@ def test_get_stats_date_range(): ) assert df.shape[0] == 3 - assert df.shape[1] == 20 # if geopandas installed, 21 columns if not + # Default: hash IDs (computation_id, parent_time_series_id) dropped → 18 cols. + assert df.shape[1] == 18 + assert "computation_id" not in df.columns + assert "parent_time_series_id" not in df.columns assert "interval_type" in df.columns assert "percentile" in df.columns assert df["interval_type"].isin(["month", "calendar_year", "water_year"]).all() @@ -604,8 +699,12 @@ def test_get_channel(): df, _ = get_channel(monitoring_location_id="USGS-02238500") assert df.shape[0] > 470 - assert df.shape[1] == 27 # if geopandas installed, 21 columns if not - assert "channel_measurements_id" in df.columns + # Default: channel_measurements_id (UUID) and field_visit_id (UUID) + # are dropped. 27 → 25 cols. + assert df.shape[1] == 25 # if geopandas installed, fewer if not + assert "channel_measurements_id" not in df.columns + assert "field_visit_id" not in df.columns + assert "monitoring_location_id" in df.columns class TestCheckMonitoringLocationId: diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index c135115c..1e198050 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -376,6 +376,88 @@ def test_get_stats_data_warning_includes_next_token(caplog, monkeypatch): assert any("tok2" in m for m in warnings_), warnings_ +def test_get_stats_data_drops_hash_ids_by_default(monkeypatch): + """``get_stats_data`` drops ``computation_id`` and + ``parent_time_series_id`` from the result by default — the + ``include_hash=False`` counterpart for the stats path.""" + from dataretrieval.waterdata.utils import get_stats_data + + monkeypatch.setattr( + _utils_module, + "_handle_stats_nesting", + mock.MagicMock( + return_value=pd.DataFrame( + { + "monitoring_location_id": ["USGS-1"], + "parameter_code": ["00060"], + "computation_id": ["7d70379f-8452-44cd-b026-24dfa11f8503"], + "parent_time_series_id": ["9cca880dec4846ec8cbdd05f3e22603e"], + "value": [1.0], + } + ) + ), + ) + + page1 = mock.MagicMock() + page1.status_code = 200 + page1.json.return_value = {"next": None, "features": []} + page1.elapsed = __import__("datetime").timedelta(milliseconds=1) + page1.headers = {} + page1.url = "https://example/stats" + client = mock.MagicMock(spec=requests.Session) + client.send.return_value = page1 + + df, _ = get_stats_data( + args={"monitoring_location_id": "USGS-1"}, + service="observationNormals", + expand_percentiles=False, + client=client, + ) + assert "computation_id" not in df.columns + assert "parent_time_series_id" not in df.columns + assert "monitoring_location_id" in df.columns + assert "parameter_code" in df.columns + assert "value" in df.columns + + +def test_get_stats_data_keeps_hash_ids_when_opted_in(monkeypatch): + """``include_hash=True`` preserves the legacy stats columns.""" + from dataretrieval.waterdata.utils import get_stats_data + + monkeypatch.setattr( + _utils_module, + "_handle_stats_nesting", + mock.MagicMock( + return_value=pd.DataFrame( + { + "monitoring_location_id": ["USGS-1"], + "computation_id": ["7d70379f-8452-44cd-b026-24dfa11f8503"], + "parent_time_series_id": ["9cca880dec4846ec8cbdd05f3e22603e"], + } + ) + ), + ) + + page1 = mock.MagicMock() + page1.status_code = 200 + page1.json.return_value = {"next": None, "features": []} + page1.elapsed = __import__("datetime").timedelta(milliseconds=1) + page1.headers = {} + page1.url = "https://example/stats" + client = mock.MagicMock(spec=requests.Session) + client.send.return_value = page1 + + df, _ = get_stats_data( + args={"monitoring_location_id": "USGS-1"}, + service="observationNormals", + expand_percentiles=False, + client=client, + include_hash=True, + ) + assert "computation_id" in df.columns + assert "parent_time_series_id" in df.columns + + def test_handle_stats_nesting_tolerates_missing_drop_columns(): """If the upstream stats response shape ever changes such that one of the columns we try to drop ("type", "properties.data") is absent, the @@ -531,6 +613,79 @@ def test_arrange_cols_keeps_geometry_when_present(): assert "geometry" in result.columns +def test_arrange_cols_drops_hash_ids_by_default(): + """Default ``include_hash=False`` drops the per-record UUID + (renamed to ``daily_id``) and secondary hash columns + (``time_series_id``), keeping stable identifiers.""" + df = pd.DataFrame( + { + "id": ["uuid-a"], + "time_series_id": ["hex-1"], + "monitoring_location_id": ["USGS-01"], + "value": [1.0], + } + ) + result = _arrange_cols(df, properties=None, output_id="daily_id") + assert "daily_id" not in result.columns + assert "time_series_id" not in result.columns + assert "monitoring_location_id" in result.columns + assert "value" in result.columns + + +def test_arrange_cols_include_hash_keeps_them(): + """``include_hash=True`` preserves the legacy behavior — hash + columns are kept and the per-record UUID lands at the end of the + column order.""" + df = pd.DataFrame( + { + "id": ["uuid-a"], + "time_series_id": ["hex-1"], + "monitoring_location_id": ["USGS-01"], + "value": [1.0], + } + ) + result = _arrange_cols(df, properties=None, output_id="daily_id", include_hash=True) + assert "daily_id" in result.columns + assert "time_series_id" in result.columns + # Legacy ordering: ``daily_id`` moves to the end. + assert result.columns[-1] == "daily_id" + + +def test_arrange_cols_explicit_properties_keep_hash_ids(): + """A user who lists a hash column in ``properties`` gets it back even + with the default ``include_hash=False`` — explicit beats default.""" + df = pd.DataFrame( + { + "id": ["uuid-a"], + "time_series_id": ["hex-1"], + "monitoring_location_id": ["USGS-01"], + "value": [1.0], + } + ) + result = _arrange_cols( + df, + properties=["daily_id", "time_series_id", "value"], + output_id="daily_id", + ) + assert "daily_id" in result.columns + assert "time_series_id" in result.columns + + +def test_arrange_cols_non_hash_output_id_kept(): + """``monitoring_location_id`` (the output_id for monitoring-locations) + is NOT a hash — the AGENCY-ID format is stable and human-meaningful — + so it must stay even under the default.""" + df = pd.DataFrame( + { + "id": ["USGS-01"], + "agency_code": ["USGS"], + } + ) + result = _arrange_cols(df, properties=None, output_id="monitoring_location_id") + assert "monitoring_location_id" in result.columns + assert result.loc[0, "monitoring_location_id"] == "USGS-01" + + # --- _format_api_dates ------------------------------------------------------- diff --git a/tests/waterdata_xarray_test.py b/tests/waterdata_xarray_test.py new file mode 100644 index 00000000..33fd2675 --- /dev/null +++ b/tests/waterdata_xarray_test.py @@ -0,0 +1,257 @@ +"""Offline unit tests for dataretrieval.waterdata.xarray converters. + +These exercise the pure DataFrame -> xarray.Dataset converters with synthetic +frames, so they run without network access. Live end-to-end behavior is +covered by the waterdata getters' own tests. +""" + +from types import SimpleNamespace + +import pandas as pd +import pytest + +xr = pytest.importorskip("xarray") +from dataretrieval.waterdata import xarray as wdx # noqa: E402 + + +def _meta(url="https://example.test/items"): + return SimpleNamespace(url=url) + + +def _daily_frame( + time_series_id="A", + site="USGS-1", + values=(100, 110), + times=("2024-06-01", "2024-06-02"), +): + n = len(values) + return pd.DataFrame( + { + "time": list(times), + "value": list(values), + "monitoring_location_id": [site] * n, + "parameter_code": ["00060"] * n, + "statistic_id": ["00003"] * n, + "unit_of_measure": ["ft^3/s"] * n, + "qualifier": [None] * n, + "approval_status": ["Approved"] * n, + "time_series_id": [time_series_id] * n, + } + ) + + +# series_meta is keyed by parameter_code and supplies only the readable name; +# units/statistic/parameter_code come from the values frame itself. +_DISCHARGE_META = { + "00060": { + "parameter_name": "Discharge", + "parameter_description": "Discharge, cubic feet per second", + } +} + + +def test_build_timeseries_cf_attributes(): + ds = wdx._build_timeseries( + _daily_frame(), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert isinstance(ds, xr.Dataset) + assert "discharge" in ds.data_vars + v = ds["discharge"] + assert v.attrs["long_name"] == "Discharge, cubic feet per second" + assert v.attrs["units"] == "ft3 s-1" # UDUNITS-normalized from "ft^3/s" + assert v.attrs["cell_methods"] == "time: mean" + assert v.attrs["standard_name"] == "water_volume_transport_in_river_channel" + assert v.attrs["usgs_parameter_code"] == "00060" + assert v.attrs["usgs_statistic_id"] == "00003" + # dataset-level provenance + assert ds.attrs["Conventions"] == "CF-1.11" + assert ds.attrs["featureType"] == "timeSeries" + assert ds.attrs["references"] == "https://example.test/items" + # site is the DSG instance dimension + assert ds["monitoring_location_id"].attrs.get("cf_role") == "timeseries_id" + assert ds.sizes["time"] == 2 + + +def test_ancillary_variables_linked(): + ds = wdx._build_timeseries( + _daily_frame(), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert "discharge_qualifier" in ds.data_vars + assert "discharge_approval_status" in ds.data_vars + assert ds["discharge"].attrs["ancillary_variables"] == ( + "discharge_qualifier discharge_approval_status" + ) + + +def test_unknown_unit_passes_through_verbatim(): + df = _daily_frame() + df["unit_of_measure"] = "widgets/s" # units are read from the frame + ds = wdx._build_timeseries( + df, _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert ds["discharge"].attrs["units"] == "widgets/s" + + +def test_missing_standard_name_is_omitted(): + # parameter_code with no curated CF mapping -> no standard_name key + meta = {"99999": {"parameter_name": "Mystery", "parameter_description": "Mystery"}} + df = _daily_frame() + df["parameter_code"] = "99999" + ds = wdx._build_timeseries(df, _meta(), service="daily", series_meta=meta) + assert "standard_name" not in ds["mystery"].attrs + assert ds["mystery"].attrs["usgs_parameter_code"] == "99999" + + +def test_multiple_parameters_outer_join_on_time(): + # discharge at t1,t2 ; temperature at t2,t3 -> union time, NaN fill + q = _daily_frame(values=(100, 110), times=("2024-06-01", "2024-06-02")) + t = pd.DataFrame( + { + "time": ["2024-06-02", "2024-06-03"], + "value": [18.0, 19.0], + "monitoring_location_id": ["USGS-1", "USGS-1"], + "parameter_code": ["00010", "00010"], + "statistic_id": ["00011", "00011"], + "unit_of_measure": ["degC", "degC"], + "qualifier": [None, None], + "approval_status": ["Approved", "Approved"], + "time_series_id": ["B", "B"], + } + ) + meta = dict(_DISCHARGE_META) + meta["00010"] = { + "parameter_name": "Temperature, water", + "parameter_description": "Temperature, water, degrees Celsius", + } + ds = wdx._build_timeseries( + pd.concat([q, t]), _meta(), service="continuous", series_meta=meta + ) + assert {"discharge", "temperature_water"} <= set(ds.data_vars) + assert ds.sizes["time"] == 3 # union of {t1,t2,t3} + # temperature has no value at t1 -> NaN + assert pd.isna(ds["temperature_water"].sel(time="2024-06-01").item()) + # cell_methods derived from statistic_id 00011 (instantaneous) -> point + assert ds["temperature_water"].attrs["cell_methods"] == "time: point" + + +def test_collision_dedups_with_warning(): + # two values for the same (site, parameter, statistic, time) are ambiguous + # without the hash key -> keep the first and warn; site stays the dim. + a = _daily_frame(values=(100,), times=("2024-06-01",)) + b = _daily_frame(values=(200,), times=("2024-06-01",)) + with pytest.warns(UserWarning, match="multiple values per"): + ds = wdx._build_timeseries( + pd.concat([a, b]), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert "monitoring_location_id" in ds.dims + assert ds.sizes["time"] == 1 + assert ( + ds["discharge"].sel(monitoring_location_id="USGS-1", time="2024-06-01").item() + == 100 + ) + + +def test_empty_frame_returns_dataset_with_conventions(): + ds = wdx._build_timeseries(pd.DataFrame(), _meta(), service="daily", series_meta={}) + assert isinstance(ds, xr.Dataset) + assert list(ds.data_vars) == [] + assert ds.attrs["Conventions"] == "CF-1.11" + + +def test_build_stats_flat_dataset(): + df = pd.DataFrame( + { + "monitoring_location_id": ["USGS-1", "USGS-1"], + "parameter_code": ["00060", "00060"], + "month": [1, 2], + "p50_va": [120.0, 130.0], + } + ) + ds = wdx._build_stats(df, _meta(), "statistics") + assert isinstance(ds, xr.Dataset) + assert "p50_va" in ds.data_vars + assert ds.attrs["Conventions"] == "CF-1.11" + + +def test_public_wrappers_exist_and_are_documented(): + for name in wdx.__all__: + fn = getattr(wdx, name) + assert callable(fn) + assert "xarray wrapper" in (fn.__doc__ or "") + + +def test_fetch_strips_include_hash(): + # The xarray path never surfaces hash columns, so include_hash must be + # dropped before the underlying getter is called (no wasted fetch). + captured = {} + + def fake_getter(*args, **kwargs): + captured.update(kwargs) + return "df", "meta" + + df, meta = wdx._fetch( + fake_getter, (), {"include_hash": True, "parameter_code": "00060"} + ) + assert (df, meta) == ("df", "meta") + assert "include_hash" not in captured + assert captured == {"parameter_code": "00060"} + + +def test_every_wrapper_routes_through_fetch(monkeypatch): + # Pin the wiring: each public wrapper must delegate the underlying fetch to + # _fetch (which is what strips include_hash). Guards against a wrapper + # quietly reverting to calling the getter directly and leaking the flag. + seen = [] + + def spy(func, args, kwargs): + seen.append(dict(kwargs)) + return pd.DataFrame(), SimpleNamespace(url=None) # empty -> no network + + monkeypatch.setattr(wdx, "_fetch", spy) + for name in wdx.__all__: + seen.clear() + ds = getattr(wdx, name)(monitoring_location_id="USGS-1", include_hash=True) + assert isinstance(ds, xr.Dataset) + assert len(seen) == 1, f"{name} did not route through _fetch" + # the wrapper hands include_hash to _fetch; _fetch is what drops it + # (asserted in test_fetch_strips_include_hash) + assert seen[0].get("include_hash") is True + + +def test_unparseable_time_dropped_with_warning(): + # A bad/missing time must be dropped explicitly (with a warning), not + # silently swallowed by to_xarray. + df = _daily_frame( + values=(100, 110, 120), + times=("2024-06-01", "not-a-date", "2024-06-03"), + ) + with pytest.warns(UserWarning, match="unparseable or missing time"): + ds = wdx._build_timeseries( + df, _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert ds.sizes["time"] == 2 # the bad-time row is gone, the good ones stay + assert 110 not in ds["discharge"].values # the dropped value did not survive + + +def test_all_unparseable_time_returns_empty_dataset(): + df = _daily_frame(values=(1, 2), times=("bad-a", "bad-b")) + with pytest.warns(UserWarning, match="unparseable or missing time"): + ds = wdx._build_timeseries( + df, _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert list(ds.data_vars) == [] + assert ds.attrs["Conventions"] == "CF-1.11" + + +def test_mixed_units_in_one_variable_warns(): + # Same (parameter, statistic) across two sites but different units -> one + # variable can hold only one units attr; warn instead of silently mislabeling. + a = _daily_frame(site="USGS-1", values=(100,), times=("2024-06-01",)) + b = _daily_frame(site="USGS-2", values=(3,), times=("2024-06-01",)) + b["unit_of_measure"] = "m3 s-1" + with pytest.warns(UserWarning, match="spans multiple units"): + ds = wdx._build_timeseries( + pd.concat([a, b]), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert ds.sizes["monitoring_location_id"] == 2 + assert ds["discharge"].attrs["units"] == "ft3 s-1" # first unit (ft^3/s)