Lazy evaluation on Dask arrays
If you are unfamiliar with Dask, read Parallel computing with Dask in Xarray documentation first. The current version only supports dask arrays on a single machine. Support of Dask.distributed is in roadmap.
xESMF’s Dask support is mostly for lazy evaluation and out-of-core computing, to allow processing large volumes of data with limited memory. You might also get moderate speed-up on a multi-core machine by choosing proper chunk sizes, but that generally won’t help your entire pipeline too much, because the read-regrid-write pipeline is severely I/O limited (see this issue for more discussions). On a single machine, the disk bandwidth is typically limited to ~500 MB/s, and you cannot process data faster than such rate. If you need much faster data processing rate, you should resort to parallel file systems on HPC clusters or distributed storage on public cloud platforms. Please refer to the Pangeo project for more information.
[1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import dask.array as da # need to have dask.array installed, although not directly using it here.
import xarray as xr
import xesmf as xe
A simple example
Prepare input data
[2]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 500})
ds
[2]:
<xarray.Dataset> Dimensions: (lat: 25, time: 2920, lon: 53) Coordinates: * lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0 * lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0 * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray> Attributes: Conventions: COARDS title: 4x daily NMC reanalysis (1948) description: Data is from NMC initialized reanalysis\n(4x/day). These a... platform: Model references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
[3]:
ds.chunks
[3]:
Frozen({'time': (500, 500, 500, 500, 500, 420), 'lat': (25,), 'lon': (53,)})
[4]:
ds["air"].data
[4]:
|
Build regridder
[5]:
ds_out = xr.Dataset(
{
"lat": (["lat"], np.arange(16, 75, 1.0)),
"lon": (["lon"], np.arange(200, 330, 1.5)),
}
)
regridder = xe.Regridder(ds, ds_out, "bilinear")
regridder
[5]:
xESMF Regridder
Regridding algorithm: bilinear
Weight filename: bilinear_25x53_59x87.nc
Reuse pre-computed weights? False
Input grid shape: (25, 53)
Output grid shape: (59, 87)
Periodic in longitude? False
Apply to xarray Dataset/DataArray
[6]:
# only build the dask graph; actual computation happens later when calling compute()
%time ds_out = regridder(ds)
ds_out
CPU times: user 2.04 s, sys: 43.8 ms, total: 2.08 s
Wall time: 2.22 s
[6]:
<xarray.Dataset> Dimensions: (time: 2920, lat: 59, lon: 87) Coordinates: * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00 * lat (lat) float64 16.0 17.0 18.0 19.0 20.0 ... 70.0 71.0 72.0 73.0 74.0 * lon (lon) float64 200.0 201.5 203.0 204.5 ... 324.5 326.0 327.5 329.0 Data variables: air (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray> Attributes: regrid_method: bilinear
[7]:
ds_out["air"].data # chunks are preserved
[7]:
|
[8]:
%time result = ds_out['air'].compute() # actually applies regridding
CPU times: user 1.62 s, sys: 386 ms, total: 2.01 s
Wall time: 908 ms
[9]:
type(result.data), result.data.shape
[9]:
(numpy.ndarray, (2920, 59, 87))
Spatial chunks
Dask support also includes chunking over horizontal/core dimensions (lat
, lon
, or x
, y
).
[10]:
# xESMF will take DataArrays that are chunked along the horizontal/core dimensions
ds_spatial = ds.chunk({"lat": 25, "lon": 25, "time": -1})
ds_spatial
[10]:
<xarray.Dataset> Dimensions: (lat: 25, time: 2920, lon: 53) Coordinates: * lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0 * lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0 * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat, lon) float32 dask.array<chunksize=(2920, 25, 25), meta=np.ndarray> Attributes: Conventions: COARDS title: 4x daily NMC reanalysis (1948) description: Data is from NMC initialized reanalysis\n(4x/day). These a... platform: Model references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
Specifying spatial chunks will result in the output DataArray having the same chunk size on the spatial dimensions.
[11]:
ds_spatial_out = regridder(ds_spatial) # Regridding ds_spatial
ds_spatial_out["air"].data
[11]:
|
Unless the output_chunks
argument is passed to the regridder
[12]:
ds_spatial_out = regridder(ds_spatial, output_chunks={"lat": 10, "lon": 10})
ds_spatial_out["air"].data
[12]:
|
Parallel weight generation with Dask
Dask can also be used to build the regridder and compute its weights in parallel. To do so, xESMF uses the chunks on the destination grid and computes subsets of weights on each chunk in parallel.
Parallel weight generation example
Prepare input data
[13]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 500})
ds
[13]:
<xarray.Dataset> Dimensions: (lat: 25, time: 2920, lon: 53) Coordinates: * lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0 * lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0 * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray> Attributes: Conventions: COARDS title: 4x daily NMC reanalysis (1948) description: Data is from NMC initialized reanalysis\n(4x/day). These a... platform: Model references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
Prepare output dataset and chunk it
[14]:
ds_out = xr.tutorial.open_dataset("rasm")
ds_out = ds_out.chunk({"y": 50, "x": 50})
ds_out.chunks
[14]:
Frozen({'time': (36,), 'y': (50, 50, 50, 50, 5), 'x': (50, 50, 50, 50, 50, 25)})
Create regridder, generating the weights in parallel
[15]:
para_regridder = xe.Regridder(ds, ds_out, "bilinear", parallel=True)
para_regridder
[15]:
xESMF Regridder
Regridding algorithm: bilinear
Weight filename: bilinear_25x53_205x275.nc
Reuse pre-computed weights? False
Input grid shape: (25, 53)
Output grid shape: (205, 275)
Periodic in longitude? False
Attempting to build the Regridder using the option parallel=True
with either reuse_weights=True
or with weights != None
will produce a warning. In both cases, since the weights are already generated, the regridder will be built skipping the parallel part.
Using a mask to chunk an empty Dataset
If the destination grid has no variables and contains 1D lat/lon coordinates, using xarray’s .chunk()
method will not work
[16]:
ds_out = xr.Dataset(
{
"lat": (["lat"], np.arange(16, 75, 1.0), {"units": "degrees_north"}),
"lon": (["lon"], np.arange(200, 330, 1.5), {"units": "degrees_east"}),
}
)
ds_out
[16]:
<xarray.Dataset> Dimensions: (lat: 59, lon: 87) Coordinates: * lat (lat) float64 16.0 17.0 18.0 19.0 20.0 ... 70.0 71.0 72.0 73.0 74.0 * lon (lon) float64 200.0 201.5 203.0 204.5 ... 324.5 326.0 327.5 329.0 Data variables: *empty*
[17]:
ds_out.chunk({"lat": 25, "lon": 25})
ds_out.chunks
[17]:
Frozen({})
To deal with this issue, we can create a mask
and add it to ds_out
. Using a boolean mask ensures ds_out
is not bloated by data and setting the mask to be True
everywhere will not affect regridding.
[18]:
mask = da.ones((ds_out.lat.size, ds_out.lon.size), dtype=bool, chunks=(25, 25))
ds_out["mask"] = (ds_out.dims, mask)
# Now we check the chunks of ds_out
ds_out.chunks
[18]:
Frozen({'lat': (25, 25, 9), 'lon': (25, 25, 25, 12)})