Source code for cfapyx.creator

__author__    = "Daniel Westwood"
__contact__   = "daniel.westwood@stfc.ac.uk"
__copyright__ = "Copyright 2024 United Kingdom Research and Innovation"

import netCDF4
import numpy as np
import logging
import glob

from collections import OrderedDict

logger = logging.getLogger(__name__)

CONCAT_MSG = 'See individual datasets for more information.'

[docs]class CFACreateMixin: """ Mixin class for ``Create`` methods for a CFA-netCDF dataset. """
[docs] def _first_pass(self, agg_dims: list = None) -> tuple: """ Perform a first pass across all provided files. Extracts the global attributes and information on all variables and dimensions into separate python dictionaries. Also collects the set of files arranged by aggregated dimension coordinates, to be used later in constructing the CFA ``fragment_location`` properties. """ logger.info('Performing first pass on the set of files.') arranged_files = {} var_info = None dim_info = None global_attrs = None ## First Pass - Determine dimensions for x, file in enumerate(self.files): logger.info(f'First pass: File {x+1}/{len(self.files)}') ds = self._call_file(file) if len(file) == 1: file = file[0] all_dims = ds.dimensions.keys() all_vars = ds.variables.keys() coord_variables = [] pure_dimensions = [] variables = [] ## Sort dimension/variable types - switch to dict with types? for d in all_dims: if d in all_vars: coord_variables.append(d) else: pure_dimensions.append(d) for v in all_vars: if v not in all_dims: variables.append(v) if not dim_info: dim_info = {d: {} for d in all_dims} if not var_info: var_info = {v: {} for v in variables} logger.info(f'Coordinate variables: {coord_variables}') logger.info(f'Pure dimensions: {pure_dimensions}') logger.info(f'Variables: {variables}') if var_info: if len(set(variables) ^ set(var_info.keys())) != 0: raise ValueError( 'Differing numbers of variables across the fragment files ' 'is not currently supported.' ) ## Accumulate global attributes ncattrs = {} for attr in ds.ncattrs(): ncattrs[attr] = ds.getncattr(attr) global_attrs = self._accumulate_attrs(global_attrs, ncattrs) ## Accumulate dimension info fcoord = [] first_time = (x == 0) for d in all_dims: if dim_info[d] == {} and not first_time: raise ValueError( f'Files contain differing numbers of dimensions. "{d}"' 'appears to not be present in all files.' ) new_info, arr_components = self._collect_dim_info( ds, d, pure_dimensions, coord_variables, agg_dims=agg_dims, first_time=first_time) dim_info = self._update_info(ds.dimensions[d], dim_info, new_info) if arr_components is not None: if first_time: for attr in arr_components.keys(): dim_info[d][attr] = [arr_components[attr]] else: if arr_components['starts'] not in dim_info[d]['starts']: dim_info[d]['starts'] += [arr_components['starts']] dim_info[d]['sizes'] += [arr_components['sizes']] dim_info[d]['arrays'] += [arr_components['arrays']] fcoord.append(arr_components['starts'].item()) ## Accumulate var_info for v in variables: try: fill = ds[v].getncattr('_FillValue') except: fill = None vdims = [] for d in ds[v].dimensions: # Preserving the dimensions per variable if d in coord_variables: vdims.append(d) new_info = { 'dtype': np.dtype(ds[v].dtype), 'dims' : tuple(ds[v].dimensions), 'cdims': vdims, 'address': v, # Or match with replacement, '_FillValue': fill, } var_info = self._update_info(ds.variables[v], var_info, new_info) arranged_files[tuple(fcoord)] = file return arranged_files, global_attrs, var_info, dim_info
[docs] def _second_pass( self, var_info : dict, non_aggregated : list ) -> dict: """ Second pass through a subset of the files (2) to collect non-aggregated variables which will be stored in the CFA file. """ logger.info('Performing a second pass on a subset of files.') second_set = self.files[:2] for x, file in enumerate(second_set): logger.info(f'Second pass: File {x+1}/{len(self.files)}') ds = self._call_file(file) # Ideally don't want to do this twice. for v in non_aggregated: new_values = np.array(ds.variables[v]) if 'data' not in var_info[v]: var_info[v]['data'] = new_values else: if not np.array_equal(new_values, var_info[v]['data']): raise ValueError( f'Non-aggregated variable "{v}" differs between sample files.' ) return var_info
[docs] def _collect_dim_info( self, ds, d : str, pure_dimensions : list, coord_variables : list, agg_dims : list = None, first_time : bool = False, ): """ Collect new info about each dimension. The collected attributes depend mostly on if the dimension is ``pure`` (i.e not a coordinate variable) or if it is a coordinate variable. Aggregated dimensions require collection of all array sequences that have a different ``start`` value. If the aggregation dimensions are known, we do not have to collect arrays from each file from non-aggregated dimensions.""" arr_components = None if first_time: agg_dims = coord_variables else: if agg_dims is None or agg_dims == []: agg_dims = coord_variables if d in pure_dimensions: new_info = { 'size': ds.dimensions[d].size, 'type':'pure', 'f_size': 1, } else: new_info = { 'size': None, 'type': 'coord', 'dtype': ds[d].dtype, 'f_size': None, } if d in agg_dims: array = np.array(list(ds[d]), dtype=ds[d].dtype) start = array[0] size = len(array) arr_components = { 'sizes': size, 'starts': start, 'arrays': array, } return new_info, arr_components
[docs] def _update_info( self, ncattr_obj, info : dict, new_info: dict, ) -> dict: """ Update the information for a variable/dimension based on the current dataset. Certain properties are collected in lists while others are explicitly defined and should be equal across all files. Others still may differ across files, in which case the ``concat_msg`` is applied which usually indicates to inspect individual files for the correct value. """ id = ncattr_obj.name logger.debug(f'Concatenating information for {id}') attrs = {} if hasattr(ncattr_obj, 'ncattrs'): for attr in ncattr_obj.ncattrs(): attrs[attr] = ncattr_obj.getncattr(attr) if info[id] != {}: info[id]['attrs'] = self._accumulate_attrs(info[id]['attrs'], attrs) for attr, value in new_info.items(): if value != info[id][attr]: raise ValueError( f'Info not matching between files for "{id}": "{attr}"' ) else: info[id] = new_info info[id]['attrs'] = attrs return info
[docs] def _arrange_dimensions( self, dim_info : dict, agg_dims : list = None ) -> dict: """ Arrange the aggregation dimensions by ordering the start values collected from each file. Dimension arrays are aggregated to a single array once properly ordered, and the sizes fragments in each dimension are recorded in the ``dim_info``. """ logger.info('Performing aggregated dimension sorting') ## Arrange Aggregation Dimensions aggregation_dims = [] for cd, info in dim_info.items(): if 'starts' not in info: continue starts = info['starts'] # Needed later arrays = info.pop('arrays') sizes = info['sizes'] dim_info[cd]['f_size'] = len(starts) if len(starts) == 1: cdimarr = arrays[0] ndimsizes = (sizes[0],) else: ## Still here means the dimension requires aggregation. aggregation_dims.append(cd) narr = np.array(starts) arr = narr.astype(np.float64) sort = np.argsort(arr) cdimarr = None nds = [] for s in sort: if cdimarr is None: cdimarr = np.array(arrays[s]) else: cdimarr = np.concatenate((cdimarr, np.array(arrays[s]))) nds.append(sizes[s]) ndimsizes = tuple(nds) # Removed np.array here info['size'] = cdimarr.size info['array'] = cdimarr info['sizes'] = ndimsizes if agg_dims is not None: if len(agg_dims) != len(aggregation_dims): raise ValueError( 'Found fewer aggregation dims than user provided value.' f'User defined: ({list(agg_dims)})' f'Derived: ({list(aggregation_dims)})' ) return dim_info, aggregation_dims
[docs] def _assemble_location( self, arranged_files : dict, dim_info : dict ) -> dict: """ Assemble the base CFA ``fragment_location`` from which all the locations for different variables are derived. Locations are defined by the number of dimensions, and follow the same pattern for definition as the ``fragment_shapes``. The combinations of dimensions that require their own ``location`` and ``shape`` are recorded in ``cdim_opts``. """ logger.debug('Assembling the location variable') # Define the location space location_space = tuple(i for i in self.fragment_space if i > 1) if self.max_files > 1: location_space = location_space + (self.max_files,) # Assemble the set of named dims named_cdims = [k for k, v in dim_info.items() if v['type'] == 'coord'] # Initialise empty location container location = np.empty(location_space, dtype=f'<U{len(self.longest_filename)}') # Map collected coords to proper place for location. for coord in arranged_files.keys(): new_coord = [] for x, c in enumerate(coord): if self.fragment_space[x] > 1: new_coord.append( dim_info[named_cdims[x]]['starts'].index(c) ) location[tuple(new_coord)] = arranged_files[coord] return location
def _apply_agg_dims( self, var_info, agg_dims ): for var, meta in var_info.items(): aggs = [] if 'cdims' not in meta: continue for cd in meta['cdims']: if cd in agg_dims: aggs.append(cd) if aggs: var_info[var]['adims'] = tuple(aggs) return var_info
[docs] def _determine_non_aggregated( self, var_info : dict, agg_dims : list ) -> list: """ Determine non-aggregated variables present in the fragment files. Non-aggregated variables are equivalent to the ``identical variables`` from kerchunk jargon. If the non-aggregated variables are later found to vary across the fragment files, an error will be raised. """ non_aggregated = [] for var, info in var_info.items(): if not (set(info['cdims']) & set(agg_dims)): logger.info('Second pass required to extract non-aggregated variable values') non_aggregated.append(var) logger.debug(f'Non-aggregated variables: {tuple(non_aggregated)}') return non_aggregated
[docs] def _determine_size_opts(self, var_info: dict, agg_dims: list) -> list: """ Determine the combinations of dimensions from the information around each variable. Each combination requires a different ``location`` and ``shape`` fragment array variable in the final CFA-netCDF file. """ cdimopts = [] for v in var_info.values(): cds = v['dims'] if (set(agg_dims) & set(cds)): if cds and cds not in cdimopts: cdimopts.append(cds) logger.debug(f'Determined {len(cdimopts)} size options:') for c in cdimopts: logger.debug(f' - {tuple(c)}') return cdimopts
[docs] def _accumulate_attrs(self, attrs: dict, ncattrs: dict) -> dict: """ Accumulate attributes from the new source and the existing set. Ignore fill value attributes as these are handled elsewhere. If attributes are not equal across files, the ``concat_msg`` is used to indicate where data users should seek out the source files for correct values. """ if attrs is None: first_time = True attrs = {} for attr in ncattrs.keys(): if attr == '_FillValue': continue if attr not in attrs: if first_time: attrs[attr] = ncattrs[attr] else: logger.warning(f'AttributeWarning: Attribute "{attr}" not present in all files') attrs[attr] = self.concat_msg else: if isinstance(ncattrs[attr], (np.ndarray, np.generic)): if not np.array_equal(attrs[attr], ncattrs[attr]): attrs[attr] = self.concat_msg continue if attrs[attr] != ncattrs[attr]: attrs[attr] = self.concat_msg else: attrs[attr] = ncattrs[attr] return attrs
[docs]class CFAWriteMixin: """ Mixin class for ``Write`` methods for a CFA-netCDF dataset. """
[docs] def _write_dimensions(self): """ Write the collected dimensions in dim_info as new dimensions in the CFA-netCDF file. So-called ``pure`` dimensions which have no variable component (no array of values) are defined with size alone, whereas coordinate dimensions (coordinate variables) have an associated variable component. The so-called ``f-dims`` are also created here as the fragmented size of each coordinate dimension. Note: if a coordinate dimension is not fragmented, it still has an attributed f-dim, equal to 1. """ f_dims = {} for dim, di in self.dim_info.items(): f_size = di['f_size'] dim_size = di['size'] real_part = self.ds.createDimension( dim, dim_size ) frag_part = self.ds.createDimension( f'f_{dim}', f_size, ) f_dims[f'f_{dim}'] = f_size if di['type'] == 'coord': axis_var = self.ds.createVariable( dim, di['dtype'], (dim,), # Link to coord dimension ) for k, v in di['attrs'].items(): axis_var.setncattr(k, v) axis_var[:] = di['array'] else: for k, v in di['attrs'].items(): real_part.setncattr(k, v) return f_dims
[docs] def _write_variables(self): """ Non-aggregated variables are defined exactly the same as in the fragment files, while aggregated variables contain ``aggregated_data`` and ``aggregated_dimensions`` attributes, which link to the fragment array variables. """ for var, meta in self.var_info.items(): if 'adims' not in meta: variable = self._write_nonagg_variable(var, meta) else: agg_dims = ' '.join(meta['dims']) num = None for n, opt in enumerate(self.cdim_opts): if opt == meta['dims']: num = n if num is None: raise ValueError( 'Dimension mismatch issue.' ) agg_data = ' '.join([ f'location: fragment_location_{num}', f'address: fragment_address_{var}', f'shape: fragment_shape_{num}' ]) variable = self._write_aggregated_variable(var, meta, agg_dims, agg_data)
[docs] def _write_fragment_addresses(self): """ Create a ``fragment_address`` variable for each variable which is not dimension-less. """ addrs = [] for variable, meta in self.var_info.items(): if 'adims' not in meta: continue addr = self.ds.createVariable( f'fragment_address_{variable}', str, (), ) addr[:] = np.array(meta['address'], dtype=str) addrs.append(addr)
[docs] def _write_shape_dims(self, f_dims: dict): """ Construct the shape and location dimensions for each combination of dimensions stored in ``cdim_opts``. This utilises the so-called ``f-dims`` previously created for each coordinate dimension. """ for x, opt in enumerate(self.cdim_opts): ndims = self.ds.createDimension( f'shape_{x}', len(opt), ) vopt = tuple([f'f_{o}' for o in opt]) if self.max_files > 1: vopt = vopt + ('versions',) location = self.ds.createVariable( f'fragment_location_{x}', str, vopt, ) vshape = [] for opt in vopt: vshape.append(f_dims[opt]) loc_data = np.reshape(self.location, vshape) location[(slice(0, None) for i in vopt)] = np.array(loc_data, dtype=str)
[docs] def _write_fragment_shapes(self): """ Construct the ``fragment_shape`` variable part for each combination of dimensions stored in ``cdim_opts``. This utilises the ``shape`` dimensions previously created. """ def fill_empty(array, size): array = list(array) init_length = int(len(array)) for x in range(size - init_length): array.append(0) return tuple(array) cdimlens = {d: len(meta['sizes']) for d, meta in self.dim_info.items() if meta['type'] == 'coord'} for num, cdims in enumerate(self.cdim_opts): # opt is a tuple of the cdimensions for this set of instructions. largest = 0 i_dim = '' for d in cdims: if d not in cdimlens: continue if cdimlens[d] > largest: largest = cdimlens[d] i_dim = f'f_{d}' # Find the largest of the dimensions # Set dim_sizes accordingly shape_name = f'fragment_shape_{num}' shapes = [] for d in cdims: if 'sizes' in self.dim_info[d]: sizes = self.dim_info[d]['sizes'] else: sizes = (self.dim_info[d]['size'],) shapes.append(fill_empty(sizes, largest)) shape = self.ds.createVariable( shape_name, int, # Type (f'shape_{num}', i_dim) ) shapes = np.array(shapes) shapes = np.ma.array(shapes, dtype=int, mask=(shapes==0)) shape[:,:] = shapes
[docs] def _write_aggregated_variable( self, var : str, meta : dict, agg_dims : str, agg_data : str ): """ Create the netCDF parameters required for an aggregated variable. Note: The dimensions and variables referenced in ``agg_data`` need to have already been defined for the dataset by this point. """ var_arr = self.ds.createVariable( var, meta['dtype'], (), fill_value = meta.pop('_FillValue', None), ) for k, v in meta['attrs'].items(): if k == '_FillValue': continue try: var_arr.setncattr(k, v) except Exception as err: logger.warning( f'Cannot set attribute - {k}: {v} for {var}' ) logger.warning(err) var_arr.aggregated_dimensions = agg_dims var_arr.aggregated_data = agg_data
[docs] def _write_nonagg_variable( self, var : str, meta: dict ): """ Create a non-aggregated variable for the CFA-netCDF file. If this variable has some attributed data (which it should), the data is set for this variable in the new file.""" var_arr = self.ds.createVariable( var, meta['dtype'], meta['dims'], fill_value = meta.pop('_FillValue', None), ) for k, v in meta['attrs'].items(): if k == '_FillValue': continue try: var_arr.setncattr(k, v) except Exception as err: logger.warning( f'Cannot set attribute - {k}: {v} for {var}' ) logger.warning(err) if 'data' in meta: var_arr[:] = meta['data']
[docs]class CFANetCDF(CFACreateMixin, CFAWriteMixin): """ CFA-netCDF file constructor class, enables creation and writing of new CF1.12 aggregations. """ description = 'The CFAPyX Constructor class, for creating new CFA-netCDF files.' def __init__(self, files: list, concat_msg : str = CONCAT_MSG): """ Initialise this CFANetCDF instance with some basic values, and filter the provided set of files. A custom concat message can also be set here if needed.""" if isinstance(files, str): fileset = glob.glob(files) self.files = self._filter_files(fileset) if len(self.files) < 2: raise ValueError( f'Unable to aggregate less than two files; found {len(self.files)}' f' from pattern "{files}"' ) else: self.files = self._filter_files(files) if len(self.files) < 2: raise ValueError( f'Unable to aggregate less than two files; only {len(self.files)}' f' files given.' ) self.longest_filename = '' self.global_attrs = None self.var_info = None self.dim_info = None self.fragment_space = None self.location = None self.cdim_opts = None self.concat_msg = concat_msg self.ds = None
[docs] def create( self, updates : dict = None, removals: list = None, agg_dims: list = None, ) -> None: """ Perform the operations and passes needed to accumulate the set of variable/dimension info and attributes to then construct a CFA-netCDF file.""" updates = updates or {} removals = removals or [] # First pass collect info arranged_files, global_attrs, var_info, dim_info = self._first_pass(agg_dims=agg_dims) global_attrs, var_info, dim_info = self._apply_filters(updates, removals, global_attrs, var_info, dim_info) # Arrange aggregation dimensions dim_info, agg_dims = self._arrange_dimensions(dim_info, agg_dims=agg_dims) var_info = self._apply_agg_dims(var_info, agg_dims) # Determine size options and non-aggregated variables self.cdim_opts = self._determine_size_opts(var_info, agg_dims) non_aggregated = self._determine_non_aggregated(var_info, agg_dims) # Perform a second pass to collect non-aggregated variables if present. if len(non_aggregated) > 0: var_info = self._second_pass(var_info, non_aggregated) # Define the fragment space self.fragment_space = [v['f_size'] for v in dim_info.values() if 'f_size' in v] # Assemble the location with correct dimensions location = self._assemble_location(arranged_files, dim_info) self.global_attrs = global_attrs self.dim_info = dim_info self.var_info = var_info self.location = location
[docs] def write( self, outfile: str ) -> None: """ Use the accumulated dimension/variable info and attributes to construct a CFA-netCDF file.""" self.ds = netCDF4.Dataset(outfile, mode='w', format='NETCDF4', maskandcale=True) self.ds.Conventions = 'CF-1.12' # Populate global dimensions for attr, value in self.global_attrs.items(): self.ds.setncattr(attr, value) f_dims = self._write_dimensions() if self.max_files > 1: nfiles = self.ds.createDimension( 'versions', self.max_files, ) f_dims['versions'] = self.max_files self._write_shape_dims(f_dims) self._write_fragment_shapes() self._write_fragment_addresses() self._write_variables() self.ds.close()
[docs] def display_attrs(self): """ Display the global attributes consolidated in the aggregation process. """ print('Global Attributes:') _display_attrs(self.global_attrs)
[docs] def display_variables(self): """ Display the variables and some basic properties about each. """ if not self.var_info: return for v, m in self.var_info.items(): print(f'{v}: dtype={m["dtype"]}, dimensions={m["dims"]}')
[docs] def display_dimensions(self): """ Display the dimensions and some basic properties about each. """ if not self.dim_info: return for d, m in self.dim_info.items(): print(f'{d}: dtype={m["dtype"]}, size={m["size"]}', end='') if 'f_size' in m: print(f', f_size={m["f_size"]}') else: print()
[docs] def display_variable(self, var): """ Handler for displaying information about a variable """ if not self.var_info: return '' if var not in self.var_info: return '' self._display_item(self.var_info[var])
[docs] def display_dimension(self, dim): """ Handler for displaying information about a variable """ if not self.dim_info: return '' if dim not in self.dim_info: return '' self._display_item(self.dim_info[dim])
[docs] def _display_item(self, keyset): """ Display the information about a dimension/variable """ for attr, value in keyset.items(): if attr == 'attrs': print('Attributes:') _display_attrs(value) else: print(f'{attr}: {value}')
@property def agg_dims(self): """ Display the aggregated dimensions identified on creation. """ if not self.dim_info: return [] agg_dims = [] for dim, meta in self.dim_info.items(): if 'f_size' not in meta: continue if meta['f_size'] > 1: agg_dims.append(dim) return tuple(agg_dims) @property def pure_dims(self): """ Display the 'pure' dimensions identified on creation. Pure dimensions are defined only by a size, with no array of values. """ if not self.dim_info: return [] pure_dims = [] for dim, meta in self.dim_info.items(): if meta['type'] == 'pure': pure_dims.append(dim) return tuple(pure_dims) @property def coord_dims(self): """ Display the coordinate dimensions identified on creation. Coordinate dimensions include an array of values for the dimension as a variable with the same name. """ if not self.dim_info: return [] coord_dims = [] for dim, meta in self.dim_info.items(): if meta['type'] == 'coord': coord_dims.append(dim) return tuple(coord_dims) @property def scalar_vars(self): """ Display the scalar variables identified on creation, which are single valued and are dimensionless. """ if not self.var_info: return [] scalar_vars = [] for var, meta in self.var_info.items(): if 'dims' not in meta: scalar_vars.append(var) elif meta['dims'] == (): scalar_vars.append(var) return tuple(scalar_vars) @property def aggregated_vars(self): """ Display the variables that vary across the aggregation dimensions. """ if not self.var_info: return [] agg_vars = [] for var, meta in self.var_info.items(): if 'adims' in meta: agg_vars.append(var) return tuple(agg_vars) @property def identical_vars(self): """ Display the variables that do not vary across the aggregation dimensions and must therefore be identical across all files. """ if not self.var_info: return [] id_vars = [] for var, meta in self.var_info.items(): if 'adims' not in meta: id_vars.append(var) elif meta['adims'] == (): id_vars.append(var) return tuple(id_vars) def _apply_filters(self, updates, removals, global_attrs, var_info, dim_info): global_attrs, var_info, dim_info = self._apply_updates(updates, global_attrs, var_info, dim_info) global_attrs, var_info, dim_info = self._apply_removals(removals, global_attrs, var_info, dim_info) return global_attrs, var_info, dim_info def _apply_updates(self, updates, global_attrs, var_info, dim_info): global_u, vars_u, dims_u = {}, {}, {} for upd in updates.keys(): if '.' not in upd: global_u[upd] = updates[upd] else: item = upd.split('.')[0] if item in var_info.keys(): vars_u[upd] = updates[upd] elif item in dim_info.keys(): dims_u[upd] = updates[upd] else: logger.warning( 'Attempting to set an attribute for a var/dim that' f'is not present: "{item}"' ) for attr, upd in global_u.items(): global_attrs[attr] = upd for attr, upd in vars_u.items(): (v, vattr) = attr.split('.') var_info[v]['attrs'][vattr] = upd for attr, upd in dims_u.items(): (d, dattr) = attr.split('.') dim_info[d]['attrs'][dattr] = upd return global_attrs, var_info, dim_info def _apply_removals(self, removals, global_attrs, var_info, dim_info): global_r, vars_r, dims_r = [],[],[] for rem in removals: if '.' not in rem: global_r.append(rem) else: item = rem.split('.')[0] if item in var_info.keys(): vars_r.append(rem) elif item in dim_info.keys(): dims_r.append(rem) else: logger.warning( 'Attempting to remove an attribute for a var/dim that' f'is not present: "{item}"' ) for rem in global_r: global_attrs.pop(rem) for rem in vars_r: (v, vattr) = rem.split('.') var_info[v]['attrs'].pop(rem) for rem in dims_r: (d, dattr) = rem.split('.') dim_info[v]['attrs'].pop(rem) return global_attrs, var_info, dim_info
[docs] def _filter_files(self, files: list) -> list: """ Filter the set of files to identify the trailing dimension indicative of multiple file locations. Also identifies the length of the longest filename to be used later when storing numpy string arrays. """ filtered = [] trailing_file = False max_files = 0 for f in files: if isinstance(f, tuple): trailing_file = True if max_files < len(f): max_files = len(f) for f in files: if trailing_file: fileopts = [''] * max_files if isinstance(f, tuple): for x, c in enumerate(f): fileopts[x] = c else: fileopts[0] = f filtered.append(tuple(fileopts)) else: filtered.append((f,)) self.max_files = max_files return filtered
[docs] def _call_file(self, file: str) -> netCDF4.Dataset: """ Open the file as a netcdf dataset. If there are multiple filenames provided, use the first file. Also determine the longest filename to be used to define the ``location`` parameter later. """ if isinstance(file, tuple): ds = netCDF4.Dataset(file[0]) for f in file: if len(f) > len(self.longest_filename): self.longest_filename = f else: ds = netCDF4.Dataset(file) if len(file) > len(self.longest_filename): self.longest_filename = file return ds
def _display_attrs(attrs): for k, v in attrs.items(): print(f' - {k}: {v}')