I was told over at gis.stackexchange.com that this question would be a better fit here.
I already have a working code but it is very slow and my hunch is that it could be optimized. The goal here is to process 11 years of data = 11 x 12 x 30 x 24 = roughly 95,040 hourly files
On a RAID/NAS external storage I have NetCDF files from NLDAS-2 products that look like this:
<xarray.Dataset> Dimensions: (lon: 464, lat: 224, time: 1, bnds: 2) Coordinates: * lon (lon) float32 -124.9 -124.8 -124.7 ... -67.31 -67.19 -67.06 * lat (lat) float32 25.06 25.19 25.31 25.44 ... 52.69 52.81 52.94 * time (time) datetime64[ns] 1992-11-09T15:00:00 spatial_ref int32 0 Dimensions without coordinates: bnds Data variables: time_bnds (time, bnds) datetime64[ns] 1992-11-09T14:00:00 1992-11-09T1... Tair (time, lat, lon) float32 ... Qair (time, lat, lon) float32 ... PSurf (time, lat, lon) float32 ... Wind_E (time, lat, lon) float32 ... Wind_N (time, lat, lon) float32 ... LWdown (time, lat, lon) float32 ... CRainf_frac (time, lat, lon) float32 ... CAPE (time, lat, lon) float32 ... PotEvap (time, lat, lon) float32 ... Rainf (time, lat, lon) float32 ... SWdown (time, lat, lon) float32 ...
The files are hourly and I'm aggregating them on a daily basis. Here are the important steps of my code:
Reading
I'm reading the data in chunks of one month at a time.
forcing_data = xr.open_mfdataset(file_list, chunks={'time': 24}, parallel=False, combine='nested', concat_dim='time', autoclose=True)
I tried using parallel=True
but with the files stored on a NAS I was getting reading errors different every time I re-ran the code.
I have a little validation step where I use the time_bnds
variable to make sure I have the right time boundaries. I think this could be optimized when generating the files to read but I don't think it's the main problem here
idx_from, idx_to = get_time_indices_from_time_bnds(forcing_data, start_date, end_date) forcing_data = forcing_data.isel(time=slice(idx_from, idx_to+1))
Augmenting
I need to generate new variables by performing arithmetic on the existing variables. This is pretty fast typically.
forcing_data['RH'] = compute_RH(forcing_data['PSurf'], forcing_data['Tair'], forcing_data['Qair']) forcing_data['Tdew'] = compute_dew_point(forcing_data['RH'], forcing_data['Tair']) forcing_data['wind_speed'] = compute_wind_speed(forcing_data['Wind_E'], forcing_data['Wind_N']) forcing_data['wind_direction'] = compute_wind_direction(forcing_data['Wind_E'], forcing_data['Wind_N'])
Aggregating (Main Loop)
In a loop I aggregate the data month by month (I tried different time steps, it actually seemed like weekly/daily would work better in some cases) by reading the files in the way mentioned above
# get the files for each dataset type requested agg_list = [] curr_date = start_date while curr_date < end_date: # Read the forcing data ds = read_forcing_dataset(files_dir, curr_date, one_month, product=product, augment=True) # Aggregate the forcing data agg = aggregate_forcing_data_daily(ds) # Force dask to compute agg = agg.compute() agg_list.append(agg) curr_date += one_month # Concatenate the aggregated data agg_ds = xr.concat(agg_list, dim='time')
The aggregate_forcing_data_daily
function has a similar while loop with a day increment, it basically uses agg_data = ds.isel(time=slice(idx_from, idx_to + 1)).map(agg_func, keep_attrs=True)
with agg_func
returning x.mean(dim='time')
or x.sum(dim='time')
based on the x.name
. It also adds the minimum and maximum temperature for that day in the following fashion:
agg_data['Tmin'] = ds['Tair'].sel(time=slice(start_date, end_date)).min(dim='time', skipna=True, keep_attrs=True) agg_data['Tmax'] = ds['Tair'].sel(time=slice(start_date, end_date)).max(dim='time', skipna=True, keep_attrs=True)
Performance so far:
This part of the code takes about 5 to 6 minutes per month of data on a laptop with Intel i9 @ 2.5 GHz and 32 GB of RAM, 4 processes (16 threads)
Extracting time-series for specific coordinates
for lat, lon in zip(lats,lons): df = agg_ds.sel(lat=lat, lon=lon, method='nearest').to_dataframe() df = function_to_rename_and_reorder_columns(df) df.to_parquet(custom/path/to/savefile/latlon.parquet)
Performance for this step
Before I used agg = agg.compute()
in the aggregation loop, this step took a lot of time but now it seems to be much faster. Prior to that the DataFrame
generation step took looooot of time, many minutes if not hours. As it is it would take over a day of non-stop computation to process the 11 years dataset. Is it just the way it has to be?
I tried to play with the chunk size and the timestep for reading files and monitoring with the dask dashboard but I haven't really been able to improve further.
Like I said at the beginning this works but it feels like it could be improved. I think that the fact that the files are stored on a NAS is not the best but I cannot get around that for now. This is my first big code involving xarray
and NetCDF files so I might be missing obvious things.