I am quite new to parallel computing and Dask. In addition, this is my first question here in Stackoverflow and I hope everything will work.
Problem Description
I want to set up a bias correction of climate data (e.g. total precipitation, tp). For this I have three datasets:
- Actual predictions (ds_pred), containing several different ensemble members for a period of 215 days from an initial date. (Dimension:ensxdaysxlatxlon)
- Historical model data (ds_mdl_hist) for a given period of time (e.g. 1981 - 2016). (yearxdaysxlatxlon)
- Observational data (ds_obs). (yearxdaysxlatxlon)
I created some dummy data, to illustrate my problem. Please note: My real datasets contain a lot of more data.
# create dummy dataframe
days          = np.arange(0,215)
lat           = np.arange(35, 50)
lon           = np.arange(10,20)
ens           = np.arange(0,10)
year          = np.arange(1981,2016)
data_pred     = np.random.randint(0, 100, size=(len(days), len(ens), len(lat), len(lon)))
data_mdl_hist = np.random.randint(0, 100, size=(len(year),len(days), len(ens), len(lat), len(lon)))
data_obs      = np.random.randint(0, 100, size=(len(year),len(days), len(lat), len(lon)))
# create dataset
ds_pred     = xr.Dataset({'tp': xr.DataArray(
                data   = data_pred,   # enter data here
                dims   = ['days', 'ens', 'lat', 'lon'],
                coords = {'days': days, 'ens': ens, 'lat': lat, 'lon': lon},
                attrs  = {'units'     : 'mm/day'})})
ds_mdl_hist = xr.Dataset({'tp': xr.DataArray(
                data   = data_mdl_hist,   # enter data here
                dims   = ['year', 'days','ens', 'lat', 'lon'],
                coords = {'year': year, 'days': days, 'ens': ens, 'lat': lat, 'lon': lon},
                attrs  = {'units'     : 'mm/day'})})
ds_obs      = xr.Dataset({'tp': xr.DataArray(
                data   = data_obs,   # enter data here
                dims   = ['year', 'days', 'lat', 'lon'],
                coords = {'year': year, 'days': days, 'lat': lat, 'lon': lon},
                attrs  = {'units'     : 'mm/day'})})
For each day in ds_pred, I slice the corresponding days in ds_mdl_hist and ds_obs over each year. Furthermore I do not select only the single day, but a 30-day-window around it. Example for day=20:
# Slice data corresponding to time step
k = 20
# Predictions
ds_pred_sub = ds_pred.isel(days=k)
# Pool 15 days before and after the time step k
window = 15
day_range = np.arange(k-window, k+window)
# Historical model
ds_mdl_hist_sub = ds_mdl_hist.isel(days=day_range)
# Observational
ds_obs_sub = ds_obs.isel(days=day_range)
In order to run apply_ufunc, I will stack some dimension.
# Stack dimension in order to use apply_u_func over only one dimension
ds_mdl_hist_sub = ds_mdl_hist_sub.stack(days_ens=("ens", "days", "year"))
ds_obs_sub = ds_obs_sub.stack(days_year=('days', 'year'))
My function for the bias correction includes the calculation of the distribution for both the subset of historical model data (ds_mdl_hist_sub) and observations (ds_obs_sub), as well as the interpolation with the subset of the predictions (ds_pred_sub). The function returns the corrected predictions (pred_corr).
def bc_module(pred, mdl_hist, obs):
    # Define number of grid points, at which quantiles are calculated
    p     = np.linspace(0, 1, 1000)
    # Calculate quantile of historical model
    q_mdl = np.quantile(mdl_hist, p, interpolation='midpoint')
    # Calculate quantile of observations
    q_obs = np.quantile(obs, p, interpolation='midpoint')
    
    
    # Do the bias correction
    Y_pred = interp1d(q_mdl,p)(pred)
    # Do the back-transformation to the data space
    pred_corr = interp1d(p, q_obs, bounds_error=False)(Y_pred)
    
    return pred_corr
Because my real datasets contains much more data, I want to apply the bc_modulein  parallel by using apply_ufunc and dask=parallelized over my "time" dimension, by vectorizing over each lat, lon. This is my call:
# Xarray apply_ufunc
pred_corr = xr.apply_ufunc(bc_module, ds_pred_sub, ds_mdl_hist_sub, ds_obs_sub, input_core_dims=[['ens'], ['days_ens'], ['days_year']], output_core_dims=[['ens']], vectorize=True, dask="parallelized", output_dtypes=[np.float64]).compute()
This works fine, for me it seems that my code is not that fast.
Open Questions
- I really do not know, how I can loop now over the 215 days (k) in an efficient way. A simpleforloop is not sufficient, because I have different variables, which I have to correct and the run time of my real dataset is for one variable about 1 hour. Is it possible to usedaskfor this as well? Maybe something likedask.delayed, because it isembarrassingly parallel.
- In particular, I wonder if I can use daskto parallelize my outerforloop over the 215 days whenapply_ufuncalready usesdask=parallelized.
- Is there are more efficient way, then using apply_ufunc? I really want to avoid nested loops over firstlatandlonand second the 215 days.
- Is it possible to add some commands to my apply_ufunc, so that I can include the looping over the 215 days?
