Compute Module

class pipeline.compute.KerchunkConverter(clogger=None, bypass_driver=False, ctype=None, verbose=1)

Bases: object

Class for converting a single file to a Kerchunk reference object

convert_to_zarr(nfile: str, extension=False, **kwargs) None

Perform conversion to zarr with exceptions for bypassing driver errors.

Parameters:
  • nfile – (str) Path to a local native file of an appropriate type to be converted.

  • extension – (str) File extension relating to file type if known. All extensions/drivers will be tried first, subsequent files in the same dataset will use whatever extension worked for the first file as a starting point.

Returns:

The output of performing a driver if successful, None if the driver is unsuccessful. Errors will be bypassed if the bypass_driver option is selected for this class.

grib_to_zarr(gfile: str, **kwargs) dict

Wrapper for converting GRIB type files to Kerchunk

hdf5_to_zarr(nfile: str, **kwargs) dict

Wrapper for converting NetCDF4/HDF5 type files to Kerchunk

load_individual_ref(cache_ref: str) dict

Wrapper for getting proj_file cache_ref contents

Returns:

Dictionary of refs if successful, None or raised error otherwise.

ncf3_to_zarr(nfile: str, **kwargs) dict

Wrapper for converting NetCDF3 type files to Kerchunk

save_individual_ref(ref: dict, cache_ref: str, forceful=False) None

Save each individual set of refs created for each file immediately to reduce loss of progress in the event of a failure somewhere in processing.

tiff_to_zarr(tfile: str, **kwargs) dict

Wrapper for converting GeoTiff type files to Kerchunk

try_all_drivers(nfile: str, **kwargs) dict

Safe creation allows for known issues and tries multiple drivers

Returns:

dictionary of Kerchunk references if successful, raises error otherwise if unsuccessful.

class pipeline.compute.KerchunkDSProcessor(proj_code, **kwargs)

Bases: ProjectProcessor

Kerchunk Dataset Processor Class, capable of processing a single dataset’s worth of input files into a single concatenated Kerchunk file.

Add the download link to each of the Kerchunk references

add_kerchunk_history(attrs: dict) dict

Add kerchunk variables to the metadata for this dataset, including creation/update date and version/revision number.

combine_and_save(refs: dict, zattrs: dict) None

Concatenation of ref data for different kerchunk schemes

construct_virtual_dim(refs: dict) None

Construct a Virtual dimension for stacking multiple files where no suitable concatenation dimension is present.

create_refs() None

Organise creation and loading of refs - Load existing cached refs - Create new refs - Combine metadata and global attributes into a single set - Coordinate combining and saving of data

data_to_json(refs: dict, zattrs: dict) None

Concatenating to JSON-format Kerchunk file

data_to_parq(refs: dict) None

Concatenating to Parquet-format Kerchunk store

load_temp_zattrs() dict

Load global attributes from a ‘temporary’ cache file.

perform_shape_checks(ref: dict) None

Check the shape of each variable for inconsistencies which will require a thorough validation process.

save_metadata(zattrs: dict) dict

Cache metadata global attributes in a temporary file.

class pipeline.compute.ProjectProcessor(proj_code, workdir=None, thorough=False, forceful=False, verb=0, mode=None, version_no='trial-', concat_msg='See individual files for more details', bypass=<pipeline.utils.BypassSwitch object>, groupID=None, limiter=None, dryrun=True, ctype=None, fh=None, logid=None, skip_concat=False, logger=None, new_version=None, **kwargs)

Bases: object

Processing for a single Project, using Zarr/Kerchunk/COG, all class attributes common to these three processes kept in one place.

check_time_attributes(times: dict) dict

Takes dict of time attributes with lists of values - Sort time arrays - Assume time_coverage_start, time_coverage_end, duration (2 or 3 variables)

This Class method is common to all zarr-like conversion types.

clean_attr_array(allzattrs: dict) dict

Collect global attributes from all refs: - Determine which differ between refs and apply changes

This Class method is common to all zarr-like conversion types.

clean_attrs(zattrs: dict) dict

Ammend any saved attributes post-combining - Not currently implemented, may be unnecessary

This Class method is common to all zarr-like conversion types.

collect_details() dict

Collect kwargs for combining and any special attributes - save to detail file. Common class method for all conversion types.

correct_metadata(allzattrs: dict) dict

General function for correcting metadata - Combine all existing metadata in standard way (cleaning arrays) - Add updates and remove removals specified by configuration

This Class method is common to all zarr-like conversion types.

determine_dim_specs(objs: list) None

Perform identification of identical_dims and concat_dims here.

find_concat_dims(ds_examples: list, logger=<pipeline.logs.FalseLogger object>) None

Find dimensions to use when combining for concatenation - Dimensions which change over the set of files must be concatenated together - Dimensions which do not change (typically lat/lon) are instead identified as identical_dims

This Class method is common to all conversion types.

find_identical_dims(ds_examples: list, logger=<pipeline.logs.FalseLogger object>) None

Find dimensions and variables that are identical across the set of files. - Variables which do not change (typically lat/lon) are identified as identical_dims and not concatenated over the set of files. - Variables which do change are concatenated as usual.

This Class method is common to all conversion types.

get_timings() dict

Export timed values if refs were all created from scratch. Ref loading invalidates timings so returns None if any refs were loaded not created - common class method for all conversion types.

Returns:

Dictionary of timing values if successful and refs were not loaded. If refs were loaded, timings are invalid so returns None.

set_filelist() None

Get the list of files from the filelist for this dataset and set to ‘filelist’ list - common class method for all conversion types.

class pipeline.compute.ZarrDSRechunker(proj_code, mem_allowed='100MB', preferences=None, **kwargs)

Bases: ProjectProcessor

Rechunk input data types directly into zarr using Pangeo Rechunker. - If refs already exist from previous Kerchunk runs, can use these to inform rechunker. - Otherwise will have to start from scratch.

obtain_file_subset() None

Quick function for obtaining a subset of the whole fileset. Originally used to open all the files using Xarray for concatenation later.

pipeline.compute.compute_config(args, logger, fh=None, logid=None, **kwargs) None

serves as main point of configuration for processing/conversion runs. Can set up kerchunk or zarr configurations, check required files are present.

Parameters:
  • args – (obj) Set of command line arguments supplied by argparse.

  • logger – (obj) Logging object for info/debug/error messages. Will create a new logger object if not given one.

  • fh – (str) Path to file for logger I/O when defining new logger.

  • logid – (str) If creating a new logger, will need an id to distinguish this logger from other single processes (typically n of N total processes.)

  • overide_type – (str) Set as JSON/parq/zarr to specify output cloud format type to use.

Returns:

None

pipeline.compute.configure_kerchunk(args, logger, fh=None, logid=None)

Configure all required steps for Kerchunk processing. - Check if output files already exist. - Configure timings post-run.