Source code for ceda_datapoint.core.cloud

__author__    = "Daniel Westwood"
__contact__   = "daniel.westwood@stfc.ac.uk"
__copyright__ = "Copyright 2024 United Kingdom Research and Innovation"

import json
import logging
import os
from typing import Any, Union

import fsspec
import requests
import rioxarray as rxr
import xarray as xr

from ceda_datapoint.core.asset import BasicAsset, DataPointMapper
from ceda_datapoint.mixins import PropertiesMixin, UIMixin
from ceda_datapoint.utils import hash_id, logstream

logger = logging.getLogger(__name__)
logger.addHandler(logstream)
logger.propagate = False

def _decode_polygon(spatial_dims: list, coordinates: list) -> dict:
    """
    Decode GeoJSON Polygon to Xarray Selection.
    """
    selects = {}
    for i, dim in enumerate(spatial_dims):
        corners = [c[i] for c in coordinates]
        dim_range = slice(min(corners),max(corners))
        selects[dim] = dim_range
    return selects

def _decode_datetime(datetime):
    """
    Decode pystac datetime to xarray select.
    """
    dt = datetime.split('/')
    if len(dt) > 1:
        return slice(dt[0],dt[1])
    elif len(dt) == 1:
        return dt[0]
    else:
        return dt

def _find_spatial_dims(ds) -> Union[list,None]:
    """
    Determine the names of the spatial dims.
    """

    accepted_lats = ['lat','latitude','Lat','Latitude']
    accepted_lons = ['lon','longitude','Lon','Longitude']

    convention = None

    for convent in range(len(accepted_lats)):
        lat = accepted_lats[convent]
        lon = accepted_lons[convent]
        if lat in ds.dims and lon in ds.dims:
            convention = convent

    if convention is None:
        logger.warning(
            'Spatial AOI Skipped - Could not identify spatial dims. '
            f'Accepted dimensions are {accepted_lats} and {accepted_lons}'
        )
        return None
    
    lat = accepted_lats[convention]
    lon = accepted_lons[convention]

    # Determine lat/lon ordering in dataset.
    lat_ind = list(ds.dims).index(lat)
    lon_ind = list(ds.dims).index(lon)
    if lat_ind > lon_ind:
        return [lon, lat]
    else:
        return [lat, lon]

[docs] class DataPointCloudProduct(BasicAsset): """ Object for storing and manipulating a single cloud product i.e Kerchunk/Zarr/CFA. """ def __init__( self, asset_stac: dict, id: str = None, cf: str = None, order: int = None, mode: str = 'xarray', meta: dict = None, stac_attrs: dict = None, properties: dict = None, mapper: DataPointMapper = None, data_selection: dict = None, ) -> None: """ Initialise a single cloud product object. The cloud product has identical properties and attributes to the parent item, but now represents a single reference dataset. :param asset_stac: (dict) The asset as presented in the stac index. :param id: (str) Identifier for this cloud product. :param cf: (str) Cloud format type. :param order: (int) Unused property relating to priority. :param mode: (str) Method to use for opening dataset. :param meta: (dict) DataPoint metadata relating to parent objects. :param stac_attrs: (dict) Attributes of the item outside the ``properties``. :param properties: (dict) Properties of the item in the ``properties`` field. """ if mode != 'xarray': raise NotImplementedError( 'Only "xarray" mode currently implemented - cf-python is a future option' ) super().__init__( asset_stac, id=id, meta=meta, stac_attrs=stac_attrs, properties=properties, mapper=mapper, ) self._order = order self._cloud_format = cf self._data_selection = data_selection or {} self._meta['cloud_format'] = cf self.visibility = 'all' self._set_visibility() def __str__(self) -> str: return f'<DataPointCloudProduct: {self._id} (Format: {self._cloud_format})>' def __repr__(self) -> str: """Representation of this class using the meta components""" repr = super().__repr__().split('\n') repr.append('Attributes:') if self._properties is None: repr.append(' <empty>') return '\n'.join(repr) for k, v in self._properties.items(): repr.append(f' - {k}: {v}') return '\n'.join(repr) @property def cloud_format(self) -> str: """Read-only property""" return self._cloud_format @property def href(self) -> str: """Read-only href property""" return self._mapper.get('href',self._asset_stac)
[docs] @classmethod def help(cls) -> None: """Display public methods for this object.""" print('DataPointCloudProduct Help:') print(' > product.info() - Get information about this cloud product.') print(' > product.open_dataset() - Open the dataset for this cloud product (in xarray)') #print(' > product.open_asset() - Overrides BasicAsset get method') super(DataPointCloudProduct, cls).help(additionals = ['href','cloud_format'])
[docs] def info(self) -> None: """Display information about this object""" print(self.__repr__())
[docs] def open_asset( self, local_only: bool = False, prepare_data: bool = True, **kwargs ) -> Any: """ Override for basic asset get function. """ return self.open_dataset(local_only=local_only, prepare_data=prepare_data)
[docs] def open_dataset( self, local_only: bool = False, prepare_data: bool = True, **kwargs ) -> xr.Dataset: """ Open the dataset for this product (in xarray). Specific methods to open cloud formats are private since the method should be determined by internal values not user input. :param local_only: (bool) Switch to using local-only files - DataPoint will convert all hrefs and internal Kerchunk links to use local paths. """ if not self._cloud_format: raise ValueError( 'No cloud format given for this dataset' ) if self.href is None: raise ValueError( 'Cloud assets with no "href" are not supported' ) if self.visibility == 'local-only' and not local_only: raise ValueError( 'Href not reachable via https, please use `local_only=True` ' 'to open this dataset.' ) try: if self._cloud_format == 'kerchunk': ds = self._open_kerchunk(local_only=local_only, **kwargs) elif self._cloud_format == 'CFA': ds = self._open_cfa(**kwargs) elif self._cloud_format == 'zarr': ds = self._open_zarr(**kwargs) elif self._cloud_format == 'cog': ds = self._open_cog(**kwargs) else: raise ValueError( 'Cloud format not recognised - must be one of ("kerchunk", "CFA", "zarr", "cog")' ) return self._prepare_dataset(ds, prepare_data=prepare_data) except ValueError as err: raise err except FileNotFoundError: raise FileNotFoundError( 'The requested resource could not be located: ' f'{self.href}' )
def _open_kerchunk( self, local_only: bool = False, mapper_kwargs: dict = None, **kwargs, ) -> xr.Dataset: """ Open a kerchunk dataset in xarray :param local_only: (bool) Switch to using local-only files - DataPoint will convert all hrefs and internal Kerchunk links to use local paths. :param mapper_kwargs: (dict) Kwargs to provide to Kerchunk's fsspec mapper. """ mapper_kwargs = mapper_kwargs or {} href = self.href mapper_kwargs = self._mapper.get('mapper_kwargs',self._asset_stac) or mapper_kwargs open_zarr_kwargs = self._mapper.get('open_zarr_kwargs', self._asset_stac) or {} if local_only: href = _fetch_kerchunk_make_local(href) return xr.open_dataset(href, engine='kerchunk',**open_zarr_kwargs) def _open_cfa( self, cfa_options: dict = None, **kwargs, ) -> xr.Dataset: """ Open a CFA dataset in xarray :param cfa_options: (dict) Configuration options to pass to the CFA engine """ cfa_options = cfa_options or {} href = self.href open_xarray_kwargs = (self._mapper.get('open_xarray_kwargs', self._asset_stac) or {}) | kwargs return xr.open_dataset( href, engine='CFA', cfa_options=cfa_options, **open_xarray_kwargs ) def _open_zarr( self, **kwargs, ) -> xr.Dataset: open_zarr_kwargs = (self._mapper.get('open_zarr_kwargs', self._asset_stac) or {}) | kwargs return xr.open_dataset(self.href, engine='zarr', **open_zarr_kwargs) def _open_cog( self, **kwargs, ) -> xr.Dataset: open_cog_kwargs = (self._mapper.get('open_cog_kwargs', self._asset_stac) or {}) | kwargs return rxr.open_rasterio(self.href, **open_cog_kwargs) def _prepare_dataset( self, ds: xr.Dataset, prepare_data: bool = True ) -> Union[xr.Dataset, xr.DataArray]: """Perform any dataset selections here.""" intersects = self._meta['search_terms'].get('intersects',None) # Intersection applies to spatial data datetime = self._meta['search_terms'].get('datetime',None) # Datetime can only be applied to 'time' dimension. query = self._meta['search_terms'].get('query',[]) vq = None for ql in query: if 'variables' in ql: vq = ql.split('=')[-1] variables = self._data_selection.get('variables',None) or vq sel = self._data_selection.get('sel',None) isel = self._data_selection.get('isel',None) spatial_dims = None if intersects: # Order spatial dims correctly based on bbox spatial_dims = _find_spatial_dims(ds) if spatial_dims is not None: if intersects['type'] == 'Polygon': if len(intersects['coordinates']) == 1: coords = intersects['coordinates'][0] else: coords = intersects['coordinates'] select = _decode_polygon(spatial_dims, coords) ds = ds.sel(**select) else: logger.warning( 'Unsupported intersection type for Single Search Selection - ' \ 'AOI not applied.' ) if datetime is not None: if 'time' not in ds: logger.warning( 'Datetime selection could not be applied - ', 'no "time" dimension present.' ) else: if isinstance(datetime, str): time_sel = _decode_datetime(datetime) else: time_sel = slice(datetime[0],datetime[1]) ds = ds.sel(time=time_sel) if variables is not None: if isinstance(variables,str): variables = [variables] keep_vars = [] all_vars = list(ds.variables) for v in variables: if v not in ds: logger.warning( 'Variable selection could not be applied - ', f'no "{v}" variable present.' ) else: keep_vars.append(v) if len(keep_vars) == 0: raise ValueError( f'No variables kept in current selection - {variables}' ) drop_vars = list(set(all_vars).difference(set(keep_vars))) ds = ds.drop_vars(drop_vars) if sel is not None: ds = ds.sel(**sel) if isel is not None: ds = ds.isel(**isel) return ds def _set_visibility(self) -> None: """Determine if this product is reachable""" if self.href.startswith('/'): # Check local path self.visibility = 'local-only' if not os.path.isfile(self.href): self.visibility = 'unreachable' return # Check remote link check_ref = self.href if self._cloud_format == 'zarr': check_ref = f'{self.href}/.zmetadata' status = requests.head(check_ref) if status.status_code != 200: # Check local link self.visibility='local-only' local_ref = self.href.replace('https://dap.ceda.ac.uk','') if not os.path.isfile(local_ref): self.visibility = 'unreachable' return return
[docs] class DataPointCluster(UIMixin): """ A set of non-combined datasets opened using the DataPointSearch ``to_dataset()`` method. Has some additional properties over a list of datasets. """ def __init__( self, products: list, parent_id: str = None, meta: dict = None, local_only: bool = False, data_selection: Union[dict,None] = None, show_unreachable: bool = False, ) -> None: """Initialise a cluster of datasets from a set of assets. :param products: (list) A list of DataPoint cloud product objects. :param parent_id: (str) ID of the parent search/item object. :param meta: (dict) Metadata about the parent object. :param local_only: (bool) Switch to using local-only files - DataPoint will convert all hrefs and internal Kerchunk links to use local paths. :param show_unreachable: (bool) Show the hidden assets that DataPoint has determined are currently unreachable. """ self._id = f'{parent_id}-{hash_id(parent_id)}' self._local_only = local_only self._data_selection = data_selection self.show_unreachable = show_unreachable meta = meta or {} self._products = {} for p in products: if isinstance(p, DataPointCluster): for sub_p in p.products: self._products[sub_p.id] = sub_p elif p is not None: self._products[p.id] = p self._meta = meta self._meta['products'] = len(products) def __str__(self) -> str: """String representation of this class""" return f'<DataPointCluster: {self._id} (Datasets: {len(self._products)})>' def __getitem__(self, index): """ Index this object to obtain a DataPointCloudProduct by ID or position in the cluster. """ if isinstance(index, int): index = list(self._products.keys())[index] if index not in self._products: raise IndexError( f'"{index}" not found in available products.' ) return self._products[index] @property def products(self) -> list[DataPointCloudProduct]: """List of products contained within this cluster""" return [ v for v in self._products.values() if v.visibility != 'unreachable' or self.show_unreachable]
[docs] @classmethod def help(cls) -> None: """Helper function - lists methods that can be utilised for this class""" print('DataPointCluster Help:') print(' > cluster.info() - basic cluster information') print(' > cluster.open_dataset(index/id) - open a specific dataset in xarray') super(DataPointCluster, cls).help(additionals=['products'])
[docs] def info(self) -> None: """Information about this object instance.""" print(self.__repr__())
def __repr__(self) -> str: """Notebooks representation of this class""" repr = super().__repr__().split('\n') repr.append('Products:') for p in self._products.values(): if p.visibility != 'all': repr.append(f' - {p.id}: {p.cloud_format} ({p.visibility})') else: repr.append(f' - {p.id}: {p.cloud_format}') return '\n'.join(repr)
[docs] def open_dataset( self, id : str, mode: str = 'xarray', local_only: bool = False, **kwargs, ) -> xr.Dataset: """ Open a dataset from within this cluster's cloud products. A dataset can be indexed either by id or position within this cluster's set of datasets. :param id: (str) The ID or index of the dataset in the resulting cluster. :param mode: (str) The type of dataset to be returned, currently only Xarray is supported (0.3.X) :param local_only: (bool) Switch to using local-only files - DataPoint will convert all hrefs and internal Kerchunk links to use local paths.""" if mode != 'xarray': raise NotImplementedError( 'Only "xarray" mode currently implemented - cf-python is a future option' ) local_only = local_only or self._local_only if isinstance(id, int): id = list(self._products.keys())[id] if id not in self._products: logger.warning( f'"{id}" not found in available datasets.' ) return None product = self._products[id] return product.open_dataset(local_only=local_only, **kwargs)
def open_datasets(self): raise NotImplementedError( '"Combine" feature has not yet been implemented' )
def _zarr_kwargs_default(add_kwargs: dict = None) -> dict: """Add any default kwargs for specific requests""" add_kwargs = add_kwargs or {} defaults = { 'consolidated':False, } return defaults | add_kwargs def _fetch_kerchunk_make_local(href: str) -> dict: """ Fetch a kerchunk file, open as json content and do find/replace to access local files only. """ href_local = href.replace('https://dap.ceda.ac.uk','') if not os.path.isfile(href_local): attempts = 0 success = False while attempts < 3 and not success: resp = requests.get(href) if resp.status_code == 200: success = True attempts += 1 if attempts >= 3 and not success: raise ValueError( f'File {href}: Download unsuccessful - ' 'could not download the file successfully (tried 3 times)' ) refs = json.loads(resp.text) else: with open(href_local) as f: refs = json.load(f) for key in refs['refs'].keys(): v = refs['refs'][key] if isinstance(v, list) and len(v) == 3: # First character if 'https://' in v[0]: refs['refs'][key][0] = v[0].replace('https://dap.ceda.ac.uk/','/') return refs