Lazy evaluation on Dask arrays

If you are unfamiliar with Dask, read Parallel computing with Dask in Xarray documentation first. The current version only supports dask arrays on a single machine. Support of Dask.distributed is in roadmap.

xESMF’s Dask support is mostly for lazy evaluation and out-of-core computing, to allow processing large volumes of data with limited memory. You might also get moderate speed-up on a multi-core machine by choosing proper chunk sizes, but that generally won’t help your entire pipeline too much, because the read-regrid-write pipeline is severely I/O limited (see this issue for more discussions). On a single machine, the disk bandwidth is typically limited to ~500 MB/s, and you cannot process data faster than such rate. If you need much faster data processing rate, you should resort to parallel file systems on HPC clusters or distributed storage on public cloud platforms. Please refer to the Pangeo project for more information.

[1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import dask.array as da  # need to have dask.array installed, although not directly using it here.
import xarray as xr
import xesmf as xe

A simple example

Prepare input data

[2]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 500})
ds
[2]:
<xarray.Dataset>
Dimensions:  (lat: 25, time: 2920, lon: 53)
Coordinates:
  * lat      (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0
  * lon      (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air      (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray>
Attributes:
    Conventions:  COARDS
    title:        4x daily NMC reanalysis (1948)
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
[3]:
ds.chunks
[3]:
Frozen({'time': (500, 500, 500, 500, 500, 420), 'lat': (25,), 'lon': (53,)})
[4]:
ds["air"].data
[4]:
Array Chunk
Bytes 14.76 MiB 2.53 MiB
Shape (2920, 25, 53) (500, 25, 53)
Dask graph 6 chunks in 2 graph layers
Data type float32 numpy.ndarray
53 25 2920

Build regridder

[5]:
ds_out = xr.Dataset(
    {
        "lat": (["lat"], np.arange(16, 75, 1.0)),
        "lon": (["lon"], np.arange(200, 330, 1.5)),
    }
)

regridder = xe.Regridder(ds, ds_out, "bilinear")
regridder
[5]:
xESMF Regridder
Regridding algorithm:       bilinear
Weight filename:            bilinear_25x53_59x87.nc
Reuse pre-computed weights? False
Input grid shape:           (25, 53)
Output grid shape:          (59, 87)
Periodic in longitude?      False

Apply to xarray Dataset/DataArray

[6]:
# only build the dask graph; actual computation happens later when calling compute()
%time ds_out = regridder(ds)
ds_out
CPU times: user 2.04 s, sys: 43.8 ms, total: 2.08 s
Wall time: 2.22 s
[6]:
<xarray.Dataset>
Dimensions:  (time: 2920, lat: 59, lon: 87)
Coordinates:
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
  * lat      (lat) float64 16.0 17.0 18.0 19.0 20.0 ... 70.0 71.0 72.0 73.0 74.0
  * lon      (lon) float64 200.0 201.5 203.0 204.5 ... 324.5 326.0 327.5 329.0
Data variables:
    air      (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray>
Attributes:
    regrid_method:  bilinear
[7]:
ds_out["air"].data  # chunks are preserved
[7]:
Array Chunk
Bytes 57.18 MiB 2.53 MiB
Shape (2920, 59, 87) (500, 25, 53)
Dask graph 36 chunks in 8 graph layers
Data type float32 numpy.ndarray
87 59 2920
[8]:
%time result = ds_out['air'].compute()  # actually applies regridding
CPU times: user 1.62 s, sys: 386 ms, total: 2.01 s
Wall time: 908 ms
[9]:
type(result.data), result.data.shape
[9]:
(numpy.ndarray, (2920, 59, 87))

Spatial chunks

Dask support also includes chunking over horizontal/core dimensions (lat, lon, or x, y).

[10]:
# xESMF will take DataArrays that are chunked along the horizontal/core dimensions
ds_spatial = ds.chunk({"lat": 25, "lon": 25, "time": -1})
ds_spatial
[10]:
<xarray.Dataset>
Dimensions:  (lat: 25, time: 2920, lon: 53)
Coordinates:
  * lat      (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0
  * lon      (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air      (time, lat, lon) float32 dask.array<chunksize=(2920, 25, 25), meta=np.ndarray>
Attributes:
    Conventions:  COARDS
    title:        4x daily NMC reanalysis (1948)
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...

Specifying spatial chunks will result in the output DataArray having the same chunk size on the spatial dimensions.

[11]:
ds_spatial_out = regridder(ds_spatial)  # Regridding ds_spatial
ds_spatial_out["air"].data
[11]:
Array Chunk
Bytes 57.18 MiB 6.96 MiB
Shape (2920, 59, 87) (2920, 25, 25)
Dask graph 12 chunks in 10 graph layers
Data type float32 numpy.ndarray
87 59 2920

Unless the output_chunks argument is passed to the regridder

[12]:
ds_spatial_out = regridder(ds_spatial, output_chunks={"lat": 10, "lon": 10})
ds_spatial_out["air"].data
[12]:
Array Chunk
Bytes 57.18 MiB 1.11 MiB
Shape (2920, 59, 87) (2920, 10, 10)
Dask graph 54 chunks in 10 graph layers
Data type float32 numpy.ndarray
87 59 2920

Parallel weight generation with Dask

Dask can also be used to build the regridder and compute its weights in parallel. To do so, xESMF uses the chunks on the destination grid and computes subsets of weights on each chunk in parallel.

Parallel weight generation example

Prepare input data

[13]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 500})
ds
[13]:
<xarray.Dataset>
Dimensions:  (lat: 25, time: 2920, lon: 53)
Coordinates:
  * lat      (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0
  * lon      (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air      (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray>
Attributes:
    Conventions:  COARDS
    title:        4x daily NMC reanalysis (1948)
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...

Prepare output dataset and chunk it

[14]:
ds_out = xr.tutorial.open_dataset("rasm")
ds_out = ds_out.chunk({"y": 50, "x": 50})
ds_out.chunks
[14]:
Frozen({'time': (36,), 'y': (50, 50, 50, 50, 5), 'x': (50, 50, 50, 50, 50, 25)})

Create regridder, generating the weights in parallel

[15]:
para_regridder = xe.Regridder(ds, ds_out, "bilinear", parallel=True)
para_regridder
[15]:
xESMF Regridder
Regridding algorithm:       bilinear
Weight filename:            bilinear_25x53_205x275.nc
Reuse pre-computed weights? False
Input grid shape:           (25, 53)
Output grid shape:          (205, 275)
Periodic in longitude?      False

Attempting to build the Regridder using the option parallel=True with either reuse_weights=True or with weights != None will produce a warning. In both cases, since the weights are already generated, the regridder will be built skipping the parallel part.

Using a mask to chunk an empty Dataset

If the destination grid has no variables and contains 1D lat/lon coordinates, using xarray’s .chunk() method will not work

[16]:
ds_out = xr.Dataset(
    {
        "lat": (["lat"], np.arange(16, 75, 1.0), {"units": "degrees_north"}),
        "lon": (["lon"], np.arange(200, 330, 1.5), {"units": "degrees_east"}),
    }
)
ds_out
[16]:
<xarray.Dataset>
Dimensions:  (lat: 59, lon: 87)
Coordinates:
  * lat      (lat) float64 16.0 17.0 18.0 19.0 20.0 ... 70.0 71.0 72.0 73.0 74.0
  * lon      (lon) float64 200.0 201.5 203.0 204.5 ... 324.5 326.0 327.5 329.0
Data variables:
    *empty*
[17]:
ds_out.chunk({"lat": 25, "lon": 25})
ds_out.chunks
[17]:
Frozen({})

To deal with this issue, we can create a mask and add it to ds_out. Using a boolean mask ensures ds_out is not bloated by data and setting the mask to be True everywhere will not affect regridding.

[18]:
mask = da.ones((ds_out.lat.size, ds_out.lon.size), dtype=bool, chunks=(25, 25))
ds_out["mask"] = (ds_out.dims, mask)

# Now we check the chunks of ds_out
ds_out.chunks
[18]:
Frozen({'lat': (25, 25, 9), 'lon': (25, 25, 25, 12)})