1
\$\begingroup\$

I was told over at gis.stackexchange.com that this question would be a better fit here.

I already have a working code but it is very slow and my hunch is that it could be optimized. The goal here is to process 11 years of data = 11 x 12 x 30 x 24 = roughly 95,040 hourly files

On a RAID/NAS external storage I have NetCDF files from NLDAS-2 products that look like this:

<xarray.Dataset> Dimensions: (lon: 464, lat: 224, time: 1, bnds: 2) Coordinates: * lon (lon) float32 -124.9 -124.8 -124.7 ... -67.31 -67.19 -67.06 * lat (lat) float32 25.06 25.19 25.31 25.44 ... 52.69 52.81 52.94 * time (time) datetime64[ns] 1992-11-09T15:00:00 spatial_ref int32 0 Dimensions without coordinates: bnds Data variables: time_bnds (time, bnds) datetime64[ns] 1992-11-09T14:00:00 1992-11-09T1... Tair (time, lat, lon) float32 ... Qair (time, lat, lon) float32 ... PSurf (time, lat, lon) float32 ... Wind_E (time, lat, lon) float32 ... Wind_N (time, lat, lon) float32 ... LWdown (time, lat, lon) float32 ... CRainf_frac (time, lat, lon) float32 ... CAPE (time, lat, lon) float32 ... PotEvap (time, lat, lon) float32 ... Rainf (time, lat, lon) float32 ... SWdown (time, lat, lon) float32 ... 

The files are hourly and I'm aggregating them on a daily basis. Here are the important steps of my code:

Reading

I'm reading the data in chunks of one month at a time.

forcing_data = xr.open_mfdataset(file_list, chunks={'time': 24}, parallel=False, combine='nested', concat_dim='time', autoclose=True) 

I tried using parallel=True but with the files stored on a NAS I was getting reading errors different every time I re-ran the code.

I have a little validation step where I use the time_bnds variable to make sure I have the right time boundaries. I think this could be optimized when generating the files to read but I don't think it's the main problem here

idx_from, idx_to = get_time_indices_from_time_bnds(forcing_data, start_date, end_date) forcing_data = forcing_data.isel(time=slice(idx_from, idx_to+1)) 

Augmenting

I need to generate new variables by performing arithmetic on the existing variables. This is pretty fast typically.

forcing_data['RH'] = compute_RH(forcing_data['PSurf'], forcing_data['Tair'], forcing_data['Qair']) forcing_data['Tdew'] = compute_dew_point(forcing_data['RH'], forcing_data['Tair']) forcing_data['wind_speed'] = compute_wind_speed(forcing_data['Wind_E'], forcing_data['Wind_N']) forcing_data['wind_direction'] = compute_wind_direction(forcing_data['Wind_E'], forcing_data['Wind_N']) 

Aggregating (Main Loop)

In a loop I aggregate the data month by month (I tried different time steps, it actually seemed like weekly/daily would work better in some cases) by reading the files in the way mentioned above

# get the files for each dataset type requested agg_list = [] curr_date = start_date while curr_date < end_date: # Read the forcing data ds = read_forcing_dataset(files_dir, curr_date, one_month, product=product, augment=True) # Aggregate the forcing data agg = aggregate_forcing_data_daily(ds) # Force dask to compute agg = agg.compute() agg_list.append(agg) curr_date += one_month # Concatenate the aggregated data agg_ds = xr.concat(agg_list, dim='time') 

The aggregate_forcing_data_daily function has a similar while loop with a day increment, it basically uses agg_data = ds.isel(time=slice(idx_from, idx_to + 1)).map(agg_func, keep_attrs=True) with agg_func returning x.mean(dim='time') or x.sum(dim='time') based on the x.name. It also adds the minimum and maximum temperature for that day in the following fashion:

agg_data['Tmin'] = ds['Tair'].sel(time=slice(start_date, end_date)).min(dim='time', skipna=True, keep_attrs=True) agg_data['Tmax'] = ds['Tair'].sel(time=slice(start_date, end_date)).max(dim='time', skipna=True, keep_attrs=True) 

Performance so far:

This part of the code takes about 5 to 6 minutes per month of data on a laptop with Intel i9 @ 2.5 GHz and 32 GB of RAM, 4 processes (16 threads)

Extracting time-series for specific coordinates

for lat, lon in zip(lats,lons): df = agg_ds.sel(lat=lat, lon=lon, method='nearest').to_dataframe() df = function_to_rename_and_reorder_columns(df) df.to_parquet(custom/path/to/savefile/latlon.parquet) 

Performance for this step

Before I used agg = agg.compute() in the aggregation loop, this step took a lot of time but now it seems to be much faster. Prior to that the DataFrame generation step took looooot of time, many minutes if not hours. As it is it would take over a day of non-stop computation to process the 11 years dataset. Is it just the way it has to be?

I tried to play with the chunk size and the timestep for reading files and monitoring with the dask dashboard but I haven't really been able to improve further.

Like I said at the beginning this works but it feels like it could be improved. I think that the fact that the files are stored on a NAS is not the best but I cannot get around that for now. This is my first big code involving xarray and NetCDF files so I might be missing obvious things.

\$\endgroup\$
1
  • \$\begingroup\$coiled.io/blog/speed-up-pandas-query-10x-with-dask might help ? (just the dask stuff). Also, "CPU" is really "CPU + L1 cache + L2 cache + disk" -- is your disk SSD ? Also, do you really need all lon: 464, lat: 224 or could you subset say 1/10 of that early ?\$\endgroup\$
    – denis
    CommentedMay 26, 2023 at 9:36

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.