Source code for ceda_datapoint.core.client

__author__    = "Daniel Westwood"
__contact__   = "daniel.westwood@stfc.ac.uk"
__copyright__ = "Copyright 2024 United Kingdom Research and Innovation"

import pystac_client
from pystac_client.stac_api_io import StacApiIO
import logging
import xarray as xr

from ceda_datapoint.mixins import UIMixin
from ceda_datapoint.utils import urls, hash_id, generate_id, logstream
from .cloud import DataPointCluster, DataPointMapper
from .item import DataPointItem

logger = logging.getLogger(__name__)
logger.addHandler(logstream)
logger.propagate = False

[docs]class DataPointSearch(UIMixin): """ Search instance created upon searching using the client.""" def __init__( self, pystac_search: object, mappings: dict = None, search_terms: dict = None, meta: dict = None, parent_id: str = None ) -> None: """ Initialise the search object - used by the DataPointClient upon searching. :param pystac_search: (object) The returned search object from pystac_client (to be abstracted). :param search_terms: (dict) The search terms used in the search query. :param meta: (dict) Metadata about the Client (url/organisation etc.) :param parent_id: (str) ID of the parent client. """ self._search_terms = search_terms or None self._meta = meta or None self._mappings = mappings self._search = pystac_search self._item_set = None self._meta['search_terms'] = self._search_terms self._id = f'{parent_id}-{hash_id(parent_id)}' def __str__(self) -> str: """ String representation of this search. """ terms = {k: v for k, v in self._search_terms.items() if k != 'query'} if 'query' in self._search_terms: terms['query'] = len(self._search_terms['query']) return f'<DataPointSearch: {self._id} ({terms})>' def __getitem__(self, index) -> DataPointItem: """ Public method to index the dict of items. :param index: (int|str) The index or ID from which to pull an item from the search. """ if not self._item_set: self._load_item_set() if isinstance(index, str): if index not in self._item_set: logger.warning( f'Item "{index}" not present in the set of items.' ) return None return self._item_set[index] elif isinstance(index, int): if index > len(self._item_set.keys()): logger.warning( f'Could not return item "{index}" from the set ' f'of {len(self._item_set)} items.' ) return None key = list(self._item_set.keys())[index] return self._item_set[key] else: logger.warning( f'Unrecognised index type for {index} - ' f'must be one of ("int","str")' ) return None @property def items(self) -> dict[str, DataPointItem]: """ Get the set of ``DataPointItem`` objects described by this search. """ if not self._item_set: self._load_item_set() return self._item_set @property def assets(self) -> dict: """ Get the set of assets under each item in this search, returned as a set of nested dictionaries. """ if not self._asset_set: self._load_asset_set() return self._asset_set
[docs] def help(self) -> None: """Helper function - lists methods that can be utilised for this class""" print('DataPointSearch Help:') print(' > search.info() - General information about this search') print(' > search.collect_cloud_assets() - Collect the cloud products into a `cluster`') print(' > search.display_assets() - List the names of assets for each item in this search') print(' > search.display_cloud_assets() - List the cloud format types for each item in this search') super().help(additionals=['items','assets'])
[docs] def info(self) -> None: """ Provide information about this search """ print(self.__repr__())
[docs] def open_dataset( self, id : str, mode : str = 'xarray', combine: bool = False, priority: list[str] = [], mappings: dict = None, **kwargs, ) -> xr.Dataset: """Open a dataset directly from the search result :param id: (str) The ID or index of the dataset in the resulting cluster. :param mode: (str) The type of dataset to be returned, currently only Xarray is supported (0.4.X) :param combine: (bool) Combine multiple datasets to a single dataset - not implemented (0.4.X) :param priority: (list) Order by which to open a set of datasets. """ return self.collect_cloud_assets( mode=mode, combine=combine, priority=priority, mappings=mappings).open_dataset(id,**kwargs)
[docs] def collect_cloud_assets( self, mode: str = 'xarray', combine: bool = False, priority: list[str] = [], show_unreachable: bool = False, asset_mappings: dict = None, **kwargs, ) -> DataPointCluster: """ Open a DataPointCluster object from the cloud assets for each item in this search. :param mode: (str) The type of dataset to be returned, currently only Xarray is supported (0.4.X) :param combine: (bool) Combine multiple datasets to a single dataset - not implemented (0.4.X) :param priority: (list) Order by which to open a set of datasets. :param show_unreachable: (bool) Show the hidden assets that DataPoint has determined are currently unreachable. """ if combine: raise NotImplementedError( '"Combine" feature has not yet been implemented' ) if not self._item_set: self._load_item_set(mappings=self._mappings) assets = [] for item in self._item_set.values(): assets.append( item.collect_cloud_assets( priority=priority, show_unreachable=show_unreachable, asset_mappings=asset_mappings ) ) return DataPointCluster(assets, meta=self._meta, parent_id=self._id)
[docs] def display_assets(self) -> None: """ Display the number of assets attributed to each item in the itemset. """ for item in self.items.values(): assets = item.get_assets() print(item) print(' - ' + ', '.join(assets.keys()))
[docs] def display_cloud_assets(self) -> None: """ Display the cloud assets attributed to each item in the itemset. """ if not self._item_set: self._load_item_set() for item in self._item_set.values(): assets = item.list_cloud_formats() if not assets: print(item) print(' <No Cloud Assets>') else: print(item) print(' - ' + ', '.join(assets))
def _load_item_set(self, mappings: dict = None) -> None: """ Load the set of items for this search into self-describing DataPointItem instances. """ mappings = mappings or self._mappings mapper=None if mappings is not None: mapper = DataPointMapper(mappings=mappings) items = {} for item in self._search.items(): items[item.id] = DataPointItem(item, meta=self._meta, mapper=mapper) self._item_set = items def _load_asset_set(self) -> None: """ Load the set of assets under each item for this search as a dictionary """ assets = {} for item in self.items.values(): assets[item.id] = item.get_assets() self._asset_set = assets
[docs]class DataPointClient(UIMixin): """ Client for searching STAC collections, returns self-describing components at all points.""" def __init__( self, org: str = 'CEDA', url: str = None, hash_token: str = None, mappings: dict = None, ) -> None: """ Initialise a DataPointClient. Default organisation/url corresponds to CEDA from config information. A hash token can be provided for setting the ID (mostly for testing). :param org: (str) Organisation with a known API endpoint. :param url: (str) Bare API endpoint (outside organisation mapper) to search. :param hash_token (str) Token to use when generating IDs for client and other objects.""" if hash_token is None: hash_token = generate_id() self._url = url self._mappings = mappings if url and org != 'CEDA': self._org = org elif url: self._org = None else: # Not provided a url so just use the org if org not in urls: raise ValueError( f'Organisation "{org}" not recognised - please select from ' f'{list(urls.keys())}' ) self._url = urls[org] self._org = org if self._url is None: raise ValueError( 'API URL could not be resolved' ) self._client = pystac_client.Client.open(self._url) self._meta = { 'url' : self._url, 'organisation': self._org } self._id = self._org or '' self._id += f'-{hash_id(hash_token)}' def __str__(self) -> str: """ String representation of this class. """ org = '' if self._org: org = f'{self._org}' return f'<DataPointClient: {self._id}>'
[docs] def help(self) -> None: """Helper function - lists methods that can be utilised for this class""" print('DataPointClient Help:') print(' > client.info() - Get information about this client.') print(' > client.list_query_terms() - List of queryable terms for a specific collection') print(' > client.display_query_terms() - Prints query terms to the terminal.') print(' > client.list_collections() - Get list of all collections known to this client.') print(' > client.display_collections() - Print collections and their descriptions') print(' > client.search() - perform a search operation. For example syntax see the documentation.') super().help()
[docs] def info(self) -> None: """Display information about this class object""" print(f'{str(self)}') print(f' - Client for DataPoint searches via {self._url}')
def __getitem__(self, collection: str): """ Public method for getting a collection from this client """ return DataPointSearch(self.search(collections=[collection]))
[docs] def list_query_terms(self, collection: str) -> list | None: """ List the possible query terms for all or a particular collection. """ dps = self.search(collections=[collection], max_items=1) item = dps[0] if item is not None: return list(item.attributes.keys()) else: logger.warning(f'Collection {collection} returned no search terms.') return None
[docs] def display_query_terms(self, collection: str = None) -> None: """ Display query terms for all collections or just a specific collection. """ colls = self.list_collections() if collection is not None: if collection in colls: print(f'{collection}: {self.list_query_terms(collection)}') else: logger.warning(f'Collection {collection} was not found.') return for coll in colls: print(f'{coll}: {self.list_query_terms(coll)}')
[docs] def list_collections(self) -> list: """ Return a list of the names of collections for this Client """ return [coll.id for coll in self._client.get_collections()]
[docs] def display_collections(self): """ Display the list of collections with their descriptions""" for coll in self._client.get_collections(): print(f'{coll.id}: {coll.description}')
[docs] def search(self, mappings: dict = None, **kwargs) -> DataPointSearch: """ Perform a search operation, creates a ``DataPointSearch`` object which is also self-describing.""" mappings = mappings or self._mappings search = self._client.search(**kwargs) return DataPointSearch(search, search_terms=kwargs, meta=self._meta, parent_id=self._id, mappings=mappings)