Lazy evaluation on Dask arrays
If you are unfamiliar with Dask, read Parallel computing with Dask in Xarray documentation first.
Recall that the regridding process is divided in two steps : computing the weights and applying the weights. Dask support is much more advanced for the latter, and this what the first part of this notebook is about.
Dask allows lazy evaluation and out-of-core computing, to allow processing large volumes of data with limited memory. You may also get a speed-up by parallelizing the process in some cases, but a general rule of thumb is that if the data fits in memory, regridding will be faster without dask.
[1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import dask.array as da # need to have dask.array installed, although not directly using it here.
import xarray as xr
import xesmf as xe
A simple example
Prepare input data
[2]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 500})
ds
[2]:
<xarray.Dataset> Dimensions: (lat: 25, time: 2920, lon: 53) Coordinates: * lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0 * lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0 * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray> Attributes: Conventions: COARDS title: 4x daily NMC reanalysis (1948) description: Data is from NMC initialized reanalysis\n(4x/day). These a... platform: Model references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
[3]:
ds.chunks
[3]:
Frozen({'time': (500, 500, 500, 500, 500, 420), 'lat': (25,), 'lon': (53,)})
[4]:
ds["air"].data
[4]:
|
Build regridder
[5]:
ds_out = xr.Dataset(
{
"lat": (["lat"], np.arange(16, 75, 1.0)),
"lon": (["lon"], np.arange(200, 330, 1.5)),
}
)
regridder = xe.Regridder(ds, ds_out, "bilinear")
regridder
[5]:
xESMF Regridder
Regridding algorithm: bilinear
Weight filename: bilinear_25x53_59x87.nc
Reuse pre-computed weights? False
Input grid shape: (25, 53)
Output grid shape: (59, 87)
Periodic in longitude? False
Apply to xarray Dataset/DataArray
[7]:
# only build the dask graph; actual computation happens later when calling compute()
%time ds_out = regridder(ds)
ds_out
CPU times: user 8.17 ms, sys: 2.32 ms, total: 10.5 ms
Wall time: 10.2 ms
[7]:
<xarray.Dataset> Dimensions: (time: 2920, lat: 59, lon: 87) Coordinates: * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00 * lat (lat) float64 16.0 17.0 18.0 19.0 20.0 ... 70.0 71.0 72.0 73.0 74.0 * lon (lon) float64 200.0 201.5 203.0 204.5 ... 324.5 326.0 327.5 329.0 Data variables: air (time, lat, lon) float32 dask.array<chunksize=(500, 59, 87), meta=np.ndarray> Attributes: regrid_method: bilinear
[8]:
ds_out["air"].data
[8]:
|
[9]:
%time result = ds_out['air'].compute() # actually applies regridding
CPU times: user 756 ms, sys: 155 ms, total: 911 ms
Wall time: 600 ms
[10]:
type(result.data), result.data.shape
[10]:
(numpy.ndarray, (2920, 59, 87))
Chunking behaviour
xESMF will adjust its default behaviour according to the input data. On spatial dimensions where the data has only one chunk, the output of a Regridder
call will also have only one chunk, no matter the new dimension size. This like the previous example.
However, if the input has more than one chunk along a spatial dimension, then the regridder will try to preserve the chunk size. When upscaling data, this means the number of chunks increases and with it the number of dask tasks added to the graph. This can actually decrease performance if the graph becomes too large, filled up with many small tasks.
One can always override xESMF’s default behaviour by passing output_chunks
to the Regridder
call.
In the example below, the input has three chunks along lon
:
[13]:
ds_3lon = ds.chunk({"lat": 25, "lon": 25, "time": -1})
ds_3lon.air.data
[13]:
|
In this case, the output DataArray will have the same chunk size on longitude, but still only one chunk along latitude.
[14]:
ds_spatial_out = regridder(ds_spatial) # Regridding ds_spatial
ds_spatial_out["air"].data
[14]:
|
Unless the output_chunks
argument is passed to the regridder
[15]:
ds_spatial_out = regridder(ds_spatial, output_chunks={"lat": 10, "lon": 10})
ds_spatial_out["air"].data
[15]:
|
Parallel weight generation with Dask
Dask can also be used to build the regridder and compute its weights in parallel. To do so, xESMF uses the chunks on the destination grid and computes subsets of weights on each chunk in parallel.
This feature is currently in an experimental state and it will force dask to use processes to parallelize the computation. Moreover, it is slower than then normal method abd thus is it only useful if the destination grid does not fit in memory. Recall that the parallization is done over chunks of the destination grid and each iteration will need to load the source grid in memory.
For a more performant way to generate weights in parallel, it might be better to use ESMF
directly instead, assuming you have an MPI-enabled version. See the “Solving large problems using HPC” notebook.
Parallel weight generation example
Prepare input data
[16]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 500})
ds
[16]:
<xarray.Dataset> Dimensions: (lat: 25, time: 2920, lon: 53) Coordinates: * lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0 * lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0 * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray> Attributes: Conventions: COARDS title: 4x daily NMC reanalysis (1948) description: Data is from NMC initialized reanalysis\n(4x/day). These a... platform: Model references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
Prepare output dataset and chunk it
[17]:
ds_out = xr.tutorial.open_dataset("rasm")
ds_out = ds_out.chunk({"y": 50, "x": 50})
ds_out.chunks
[17]:
Frozen({'time': (36,), 'y': (50, 50, 50, 50, 5), 'x': (50, 50, 50, 50, 50, 25)})
Create regridder, generating the weights in parallel
[18]:
para_regridder = xe.Regridder(ds, ds_out, "bilinear", parallel=True)
para_regridder
[WARNING] yaksa: 10 leaked handle pool objects
[WARNING] yaksa: 10 leaked handle pool objects
[WARNING] yaksa: 10 leaked handle pool objects
[WARNING] yaksa: 10 leaked handle pool objects
[18]:
xESMF Regridder
Regridding algorithm: bilinear
Weight filename: bilinear_25x53_205x275.nc
Reuse pre-computed weights? False
Input grid shape: (25, 53)
Output grid shape: (205, 275)
Periodic in longitude? False
Attempting to build the Regridder using the option parallel=True
with either reuse_weights=True
or with weights != None
will produce a warning. In both cases, since the weights are already generated, the regridder will be built skipping the parallel part.
Using a mask to chunk an empty Dataset
If the destination grid has no variables and contains 1D lat/lon coordinates, using xarray’s .chunk()
method will not work
[19]:
ds_out = xr.Dataset(
{
"lat": (["lat"], np.arange(16, 75, 1.0), {"units": "degrees_north"}),
"lon": (["lon"], np.arange(200, 330, 1.5), {"units": "degrees_east"}),
}
)
ds_out
[19]:
<xarray.Dataset> Dimensions: (lat: 59, lon: 87) Coordinates: * lat (lat) float64 16.0 17.0 18.0 19.0 20.0 ... 70.0 71.0 72.0 73.0 74.0 * lon (lon) float64 200.0 201.5 203.0 204.5 ... 324.5 326.0 327.5 329.0 Data variables: *empty*
[20]:
ds_out.chunk({"lat": 25, "lon": 25})
ds_out.chunks
[20]:
Frozen({})
To deal with this issue, we can create a mask
and add it to ds_out
. Using a boolean mask ensures ds_out
is not bloated by data and setting the mask to be True
everywhere will not affect regridding.
[21]:
mask = da.ones((ds_out.lat.size, ds_out.lon.size), dtype=bool, chunks=(25, 25))
ds_out["mask"] = (ds_out.dims, mask)
# Now we check the chunks of ds_out
ds_out.chunks
[21]:
Frozen({'lat': (25, 25, 9), 'lon': (25, 25, 25, 12)})