Validation Module

class pipeline.validate.CloudValidator

Encapsulate all validation testing into a single class. Instantiate for a specific project, the object could then contain all project info (from detail-cfg) opened only once. Also a copy of the total datasets (from native and cloud sources). Subselections can be passed between class methods along with a variable index (class variables: variable list, dimension list etc.)

Class logger attribute so this doesn’t need to be passed between functions. Bypass switch contained here with all switches.

pipeline.validate.add_quality(proj_dir: str)

Add quality_required flag to the detail file for this project.

pipeline.validate.attempt_timestep(args, xobj, kobj, step, nfiles, logger, concat_dims, fullset=False)

Handler for attempting processing on a timestep multiple times. - Handles error conditions

pipeline.validate.check_for_nan(box, bypass, logger, label=None)

Special function for assessing if a box selection has non-NaN values within it. Needs further testing using different data types.

pipeline.validate.compare_data(vname: str, xbox: Dataset, kerchunk_box: Dataset, logger, bypass=False) None

Compare a NetCDF-derived ND array to a Kerchunk-derived one. This function takes a netcdf selection box array of n-dimensions and an equally sized kerchunk_box array and tests for elementwise equality within selection. If possible, tests max/mean/min calculations for the selection to ensure cached values are the same.

Expect TypeErrors later from summations which are bypassed. Other errors will exit the run.

Parameters:
  • vname – (str) The name of the variable described by this box selection

  • xbox – (obj) The native dataset selection

  • kerchunk_box – (obj) The cloud-format (Kerchunk) dataset selection

  • logger – (obj) Logging object for info/debug/error messages.

  • bypass – (bool) Single value flag for bypassing numeric data errors (in the case of values which cannot be added).

Returns:

None but will raise error if data comparison fails.

pipeline.validate.find_dimensions(dimlen: int, divisions: int) int

Determine index of slice end position given length of dimension and fraction to assess.

Parameters:
  • dimlen – (int) The length of the specific dimension

  • divisions – (int) The number of divisions across this dimensions.

Returns:

The size of each division for this dimension, given the number of total divisions.

pipeline.validate.get_concat_dims(xobjs: list, proj_dir) dict

Retrieve the sizes of the concatenation dims.

Parameters:
  • xobjs – (list) A list of xarray Dataset objects for files which are currently being assessed, from which to find the shapes of concat dimensions.

  • proj_code – (str) The project code in string format (DOI)

Returns:

A dictionary of the concatenation dimensions and their array shapes.

pipeline.validate.get_netcdf_list(proj_dir: str, logger, thorough=False) tuple

Open document containing paths to all NetCDF files, make selections and return a list of files.

Parameters:
  • proj_dir – (str) The project code directory path.

  • logger – (obj) Logging object for info/debug/error messages.

  • thorough – (bool) If True will select all files for testing, otherwise standard validation subsetting (0.1% or 3 files) applies.

Returns:

A tuple containing a list of all the files as well as a list of indexes to specific files for testing. The index list should cover at least 3 files, with a maximum of 0.1% of the files selected in the case of > 3000 files.

pipeline.validate.get_vslice(dimensions: list, dtypes: list, lengths: list, divisions: list, logger) list

Assemble dataset slice given the shape of the array and dimensions involved.

Parameters:
  • shape – (list) The dimension names for an array currently being assessed.

  • dtypes – (list) A list of the datatypes corresponding to each dimension for an array.

  • lengths – (list) The lengths of each dimension for an array.

  • divisions – (list) The number of divisions for each of the dimensions for an array.

  • logger – (obj) Logging object for info/debug/error messages.

Returns:

Slice for this particular division set. Special cases for datetime-like objects.

pipeline.validate.locate_kerchunk(args, logger, get_str=False, attempt=1, remote_protocol='https') Dataset

Gets the name of the latest kerchunk file for this project code.

Parameters:
  • args – (obj) Set of command line arguments supplied by argparse.

  • logger – (obj) Logging object for info/debug/error messages.

  • get_str – (bool) If True will return the string filename for the selected Kerchunk file, otherwise the Kerchunk file will be opened as an xarray Virtual Dataset.

  • remote_protocol – (str) Default ‘https’ for accessing files post-compute since these have been reconfigured for remote testing. Override with ‘file’ for Kerchunk files with a local reference.

Returns:

Xarray Virtual Dataset from a Kerchunk file, or the string filename of the Kerchunk file if get_str is enabled.

pipeline.validate.match_timestamp(xobject: Dataset, kobject: Dataset, logger) tuple

Match timestamp of xarray object to kerchunk object.

Parameters:
  • xobject – (obj) An xarray dataset representing the original files opened natively.

  • kobject – (obj) An xarray dataset representing the Kerchunk file constructed by the pipeline.

  • logger – (obj) Logging object for info/debug/error messages.

Returns:

A tuple containing subselections of both xarray datasets such that both now have matching timestamps.

pipeline.validate.mem_to_value(mem) float

Convert a memory value i.e 2G into a value

Returns:

Int value of e.g. ‘2G’ in bytes.

pipeline.validate.open_netcdfs(args, logger, thorough=False, concat_dims='time') list

Returns a single xarray object with one timestep:

  1. Select a single file and a single timestep from that file

  2. Verify that a single timestep can be selected (Yes: return this xarray object, No: select all files and select a single timestep from that)

  3. In all cases, returns a list of xarray objects.

Parameters:
  • args – (obj) Set of command line arguments supplied by argparse.

  • logger – (obj) Logging object for info/debug/error messages.

  • thorough – (bool) If True will concatenate all selected Datasets to a single combined dataset, rather than a list of individual separate objects.

Returns:

A list of the xarray datasets (or a single concatenated dataset), along with a list of indexes to use for selecting a subset of those files, plus a list of filepaths to the original files.

pipeline.validate.run_backtrack(workdir: str, groupID: str, proj_code: str, logger, quality=False)

Backtrack progress on all output files. If quality is specified as well, files are removed rather than backtracked

pipeline.validate.run_successful(args, logger)

Move kerchunk-1a.json file to complete directory with proper name

pipeline.validate.validate_data(xobj, kobj, xv: str, step: int, logger, bypass=<pipeline.utils.BypassSwitch object>, depth_default=128, nfiles=2)

Run growing selection test for specified variable from xarray and kerchunk datasets

pipeline.validate.validate_dataset(args, logger, fh=None, logid=None, **kwargs)

Perform validation steps for specific dataset defined here, sets up a local dask cluster to limit memory usage, retrieves the set of xarray objects and the kerchunk dataset, then runs validation on each of the selected indexes (subset of total number of objects).

Parameters:
  • args – (obj) Set of command line arguments supplied by argparse.

  • logger – (obj) Logging object for info/debug/error messages. Will create a new logger object if not given one.

  • fh – (str) Path to file for logger I/O when defining new logger.

  • logid – (str) If creating a new logger, will need an id to distinguish this logger from other single processes (typically n of N total processes.)

pipeline.validate.validate_selection(xvariable, kvariable, vname: str, divs: int, currentdiv: int, logger, bypass=<pipeline.utils.BypassSwitch object>)
Validate this data selection in xvariable/kvariable objects
  • Recursive function tests a growing selection of data until one is found with real data

  • Repeats with exponentially increasing box size (divisions of all data dimensions)

  • Will halt at 1 division which equates to testing all data

Returns:

Number of attempts to connect to all required data.

pipeline.validate.validate_shape_to_tolerance(nfiles: int, xv: str, dims: tuple, xshape: tuple, kshape: tuple, logger, proj_dir=None) None

Special case function for validating a shaped array to some tolerance. This is an alternative to opening N files, only works if each file has roughly the same total shape. Tolerance is based on the number of files supplied, more files means the tolerance is lower?

Parameters:
  • nfiles – (int) The number of native files across the whole dataset.

  • xv – (str) The name of the variable within the dataset.

  • dims – (tuple) A list of the names of the dimensions in this dataset.

  • xshape – (tuple) The shape of the array from the original native files.

  • kshape – (tuple) The shape of the array from the cloud formatted dataset.

  • logger – (obj) Logging object for info/debug/error messages.

  • proj_dir – (str) The project code directory path.

pipeline.validate.validate_shapes(xobj, kobj, nfiles: int, xv: str, logger, bypass_shape=False, proj_dir=None, concat_dims={}) None

Ensure shapes are equivalent across Kerchunk/NetCDF per variable. Must account for the number of files opened vs how many files in total.

Parameters:
  • xobj – (obj) The native dataset selection.

  • kobj – (obj) The cloud-format (Kerchunk) dataset selection

  • nfiles – (int) The number of native files for this whole dataset.

  • xv – (str) The name of the variable within the dataset.

  • logger – (obj) Logging object for info/debug/error messages.

  • bypass_shape – (bool) Switch for bypassing shape errors - diverts to tolerance testing as a backup.

  • proj_dir – (str) The project code directory path.

  • concat_dims – (dict) Dictionary of concatenation dimensions with their appropriate sizes by index. (e.g {‘time’:100})

Returns:

None but will raise error if shape validation fails.

pipeline.validate.validate_timestep(args, xobj, kobj, step: int, nfiles: int, logger, concat_dims={}, index=0)

Run all tests for a single file which may or may not equate to 1 timestep

pipeline.validate.value_to_mem(value) str

Convert a number of bytes i.e 1000000000 into a string

Returns:

String value of the above (1000000000 -> 1M)

pipeline.validate.verify_connection(logger)

Verify connection to the CEDA archive by opening a test file in Kerchunk and comparing to a known value.