Skip to contents

Introduction

This document demonstrates to parallelize weather/climate geospatial data processing with chopin and what cases users may take advantage of parallel processing or not. We will cover the following formats:

We consider TerraClimate and PRISM data which have its own data format each. Parameter-elevation Regressions on Independent Slopes Model (PRISM) is a high-resolution (800-1,000 meters) gridded climate dataset available in the BIL (band interleaved by line) format which is readable with GDAL. TerraClimate is a high-resolution (5 km) gridded climate dataset in the NetCDF format which is readable with GDAL.

Data Source Resolution File format
TerraClimate University of Idaho 0.0417 degrees NetCDF
PRISM Oregon State University 0.0083 degrees BIL

The spatial resolution of TerraClimate data commensurates 5 km in the equator, whereas that of PRISM data is approximately 1 km. The data are available in the NetCDF format which is readable with GDAL. We will use the terra package to read the data.

Prepare target datasets

We will consider the populated places centroids in the mainland United States (i.e., excluding Alaska, Hawaii, and the territories). We will use the tigris package to download the data.

Data Number of features Source Type
Census places 31,377 US Census Bureau Polygon
Block groups 238,193 US Census Bureau Polygon
Grid points in the southeastern US 1,204,904 Author, US Census Bureau (state polygons) Point

Hardware specification

We used a machine with 112 physical CPU cores with 750 GB of memory, but we used only a portion of the cores (up to 20) for the demonstration. No maximum possible memory usage was set. However, in typical HPC management systems, users are required to request the number of cores and memory for their jobs. The number of cores and memory capacity should be considered when users parallelize the extraction process.

Download and preprocess

Download

# populated places
state <- tigris::states(year = 2022)
statemain <-
  state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ]
target_states <- statemain$GEOID

popplace <-
  lapply(target_states, function(x) tigris::places(x, year = 2022)) %>%
  do.call(rbind, .)
saveRDS(popplace, "./input/populated_place_2022.rds", compress = "xz")

Read

state <- tigris::states(year = 2022)
statemain <-
  state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ]
target_states <- statemain$GEOID

# download populated places
options(tigris_use_cache = TRUE)
popplace <-
  lapply(target_states, function(x) {
    tigris::places(year = 2022, state = x)
  })
popplace <- do.call(rbind, popplace)


# generate circular buffers with 10 km radius
popplacep <- sf::st_centroid(popplace, of_largest_polygon = TRUE) %>%
  sf::st_transform("EPSG:5070")
popplacep2 <- sf::st_centroid(popplace, of_largest_polygon = TRUE)
popplaceb <- sf::st_buffer(popplacep, dist = units::set_units(10, "km"))

TerraClimate

TerraClimate data are provided in yearly NetCDF files, each of which contains monthly layers. We will read the data with the terra package and preprocess the data to extract the annual mean and sum of the bands by types of columns.

For reproducibility, run the code below with our companion package amadeus to download terraClimate data. Please note that it will take a while depending on the internet speed.

rlang::check_installed("amadeus")

tcli_variables <- c(
  "aet", "def", "pet", "ppt", "q", "soil", "swe",
  "PDSI", "srad", "tmax", "tmin", "vap", "vpd", "ws"
)

amadeus::download_terraclimate(
  variables = tcli_variables,
  year = c(2000, 2022),
  directory_to_save = "./input/terraClimate",
  acknowledgement = TRUE,
  download = TRUE,
  remove_command = TRUE
)
# wbd
ext_mainland <- c(xmin = -126, xmax = -64, ymin = 22, ymax = 51)
ext_mainland <- terra::ext(ext_mainland)

path_tc <- file.path("input", "terraClimate")
path_tc_files <- list.files(
  path = path_tc, pattern = "*.nc$",
  full.names = TRUE
)
path_tc_files
#>  [1] "input/terraClimate/TerraClimate_aet_2000.nc" 
#>  [2] "input/terraClimate/TerraClimate_aet_2001.nc"
#>  [truncated]
#> [321] "input/terraClimate/TerraClimate_ws_2021.nc"  
#> [322] "input/terraClimate/TerraClimate_ws_2022.nc"

Preprocessing

Fourteen variables are available in TerraClimate data. The table below is from the TerraClimate website.

Variable Description Units
aet Actual Evapotranspiration, monthly total mm
def Climate Water Deficit, monthly total mm
PDSI Palmer Drought Severity Index, at end of month unitless
pet Potential evapotranspiration, monthly total mm
ppt Precipitation, monthly total mm
q Runoff, monthly total mm
soil Soil Moisture, total column - at end of month mm
srad Downward surface shortwave radiation W/m2
swe Snow water equivalent - at end of month mm
tmax Max Temperature, average for month C
tmin Min Temperature, average for month C
vap Vapor pressure, average for month kPa
vpd Vapor Pressure Deficit, average for month kpa
ws Wind speed, average for month m/s

The variables can be categorized into two types: those that will be summed and those that will be averaged.

  • Sum: aet, def, pet, ppt, q, soil, and swe.
  • Mean: PDSI, srad, tmax, tmin, vap, vpd, and ws.

Following that rationale, we will preprocess the data by summing the first seven layers and averaging the rest of the layers. The code blocks below follow “split-apply-combine” strategy. Note that terra::tapp aggregates layers by its attributes such as time or custom indices.

options(future.globals = FALSE)
# some bands should be summed
bandnames <- c(
  "aet", "def", "PDSI", "pet", "ppt", "q", "soil", "srad",
  "swe", "tmax", "tmin", "vap", "vpd", "ws"
)
bandnames_sorted <- sort(bandnames)

# single nc file, yearly aggregation by fun value
# band for summation
bandnames_sum <- c("aet", "def", "pet", "ppt", "q", "soil", "swe")

# band for averaging
bandnames_avg <- c("PDSI", "srad", "tmax", "tmin", "vap", "vpd", "ws")

# mean: temporally marginal pixel mean (i.e., monthly -> yearly)
# sum: temporally marginal pixel sum (i.e., monthly -> yearly)
# Preprocessed data are stored in
tictoc::tic("sum: 7 layers")
netcdf_read_sum <-
  split(bandnames_sum, bandnames_sum) |>
  lapply(function(x) {
    grep(paste0("(", x, ")"), path_tc_files, value = TRUE)
  }) |>
  lapply(function(x) {
    terra::tapp(terra::rast(x, win = ext_mainland, snap = "out"), index = "years", fun = "sum")
  })
netcdf_read_sum <- Reduce(c, netcdf_read_sum)
tictoc::toc()
#> sum: 7 layers: 177.974 sec elapsed
names(netcdf_read_sum) <- paste0(rep(bandnames_sum, each = 23), "_", rep(2000:2022, 7))
netcdf_read_sum
#> class       : SpatRaster 
#> dimensions  : 696, 1488, 161  (nrow, ncol, nlyr)
#> resolution  : 0.04166667, 0.04166667  (x, y)
#> extent      : -126, -64, 22, 51  (xmin, xmax, ymin, ymax)
#> coord. ref. : +proj=longlat +ellps=WGS84 +no_defs 
#> source(s)   : memory
#> names       : aet_2000,  aet_2001,  aet_2002,  aet_2003, ... 
#> min values  :     22.9,      27.3,        11,      30.8, ... 
#> max values  :   1270.9,    1366.6,      1399,    1411.1, ... 
#> time (years):   2000 to 2022
# tictoc::tic("mean: 7 layers")
# netcdf_read_mean <-
#   split(bandnames_avg, bandnames_avg) |>
#   lapply(function(x) {
#     grep(paste0("(", x, ")"), path_tc_files, value = TRUE)
#   }) |>
#   lapply(function(x) {
#     terra::tapp(terra::rast(x, win = ext_mainland, snap = "out"), index = "years", fun = "mean")
#   }) |>
#   Reduce(f = c, x = _)
# tictoc::toc()

# names(netcdf_read_mean) <-
#   sprintf("%s_%d", rep(bandnames_avg, each = 23), rep(2000:2022, 7))
# netcdf_read_mean

\[!WARNING\] Stacking raster data may take a long time and consume a large amount of memory depending on users’ area of interest and data resolution. Users need to consider the memory capacity of the system before stacking rasters.

We have 14 data elements for 23 years with 12 months each. Below demonstrates the summary of the data layers that were averaged at circular buffer polygons with 10 kilometers (10,000 meters) radius. To leverage multiple cores in your machine, run future::availableCores() to check the number of cores and set the number of workers in future::plan accordingly. Typically, there are two logical cores in each physical core in modern central processing units. The number of workers should be set to up to the number of physical cores in the machine for optimal performance. The example below uses future::multicore which shares the memory across the workers.

tic("multi threads (grid)")
future::plan(future::multicore, workers = 8L)
grid_init <-
  chopin::par_pad_grid(
    popplacep2,
    mode = "grid",
    padding = 1e4,
    nx = 4L,
    ny = 2L
  )
multi_grid <-
  chopin::par_grid(
    grids = grid_init,
    fun_dist = extract_at,
    y = popplacep2,
    x = netcdf_read_sum,
    id = "GEOID",
    func = "mean",
    radius = 1e4
  )
toc()
#> multi threads (grid): 16.668 sec elapsed
system.time(
  multi <-
    grep(
      paste0("(", paste(bandnames_sum, collapse = "|"), ")"),
      path_tc_files,
      value = TRUE
    ) %>%
    chopin::par_multirasters(
      filenames = .,
      fun_dist = extract_at,
      y = popplacep2,
      x = rast(), # ignored
      id = "GEOID",
      func = "mean",
      radius = 1e4
    )
)
#>     user   system  elapsed 
#> 5377.495  231.654  762.147
future::plan(future::sequential)


# single thread
tic("single thread")
single <-
  exactextractr::exact_extract(
    netcdf_read_sum,
    sf::st_as_sf(popplaceb),
    fun = "mean",
    stack_apply = TRUE,
    force_df = TRUE,
    append_cols = c("GEOID"),
    progress = FALSE
  )
toc()
#> single thread: 21.161 sec elapsed

\[!CAUTION\] All Windows users and RStudio users (all platforms) will not be able to use future::multicore due to the restriction in future package. Please future::multisession instead and note that this option will runs slower than future::multicore case.

PRISM dataset

PRISM data are provided in monthly 30-year normal BIL files. Assuming that a user wants to summarize 30-year normal precipitation at 10 kilometers circular buffers of the geogrpahic centroids of US Census Places (from p.26), we demonstrate the extraction process with the chopin package.

Download and preprocess

Download

# populated places
# mainland states
state <- tigris::states(year = 2022)
statemain <-
  state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ]
target_states <- statemain$GEOID

popplace <-
  lapply(target_states, function(x) tigris::places(x, year = 2022)) %>%
  do.call(rbind, .)
saveRDS(popplace, "./input/populated_place_2022.rds", compress = "xz")

Read

bils <- list.files("input", "bil$", recursive = TRUE, full.names = TRUE)
bilssds <- terra::rast(bils[-13])
popplace2 <- sf::st_transform(popplace, crs = terra::crs(bilssds))
popplaceb2 <- sf::st_transform(popplaceb, crs = terra::crs(bilssds))

\[!IMPORTANT\] chopin::par_pad_grid works the best with point datasets since each grid will clip the input features when parallelized. Polygon inputs will result in duplicate values in the output and lead to take longer to complete.

Grid parallelization

In the same vein as the TerraClimate data, we will use the chopin package to parallelize the extraction process with grid strategy.

exgrid <-
  chopin::par_pad_grid(
    popplacep2,
    mode = "grid",
    padding = 1e4,
    nx = 4L,
    ny = 2L
  )


future::plan(future::multicore, workers = 8L)

system.time(
  exmulti <-
    chopin::par_grid(
      exgrid,
      fun_dist = extract_at,
      y = popplacep2,
      x = bilssds,
      radius = units::set_units(1e4, "meter"),
      id = "GEOID",
      func = "mean"
    )
)
#> Your input function was successfully run at CGRIDID: 1
#> Your input function was successfully run at CGRIDID: 2
#> Your input function was successfully run at CGRIDID: 3
#> Your input function was successfully run at CGRIDID: 4
#> Your input function was successfully run at CGRIDID: 5
#> Your input function was successfully run at CGRIDID: 6
#> Your input function was successfully run at CGRIDID: 7
#> Your input function was successfully run at CGRIDID: 8
#>    user  system elapsed 
#>  33.090  13.254  14.571
system.time(
  exsingle <-
    exactextractr::exact_extract(
      bilssds,
      popplaceb2,
      fun = "mean",
      stack_apply = TRUE,
      force_df = TRUE,
      append_cols = "GEOID",
      max_cells_in_memory = 2.14e9,
      progress = FALSE
    )
)
#>    user  system elapsed 
#>  19.347   1.927  21.716
future::plan(future::sequential)

Scaled up examples

Larger buffer sizes

Examples above showed that the difference between the parallelized and single-threaded extraction process is not significant. We will increase the buffer size to 50 kilometers and compare the performance of the parallelized and single-threaded extraction process.

# make buffers
popplaceb5 <- sf::st_buffer(popplacep, dist = units::set_units(50, "km")) %>%
  sf::st_transform(terra::crs(bilssds))

system.time(
  exsingle5 <-
    exactextractr::exact_extract(
      bilssds,
      popplaceb5,
      fun = "mean",
      stack_apply = TRUE,
      force_df = TRUE,
      append_cols = "GEOID",
      max_cells_in_memory = 2.14e9,
      progress = FALSE
    )
)
#>    user  system elapsed 
#> 140.373   2.302 144.552
exgrid5k <-
  chopin::par_pad_grid(
    popplacep2,
    mode = "grid",
    padding = 5e4,
    nx = 4L,
    ny = 2L
  )


future::plan(future::multicore, workers = 8L)

system.time(
  exmulti5k <-
    chopin::par_grid(
      exgrid5k,
      fun_dist = extract_at,
      y = popplacep2,
      x = bilssds,
      radius = 5e4,
      id = "GEOID",
      func = "mean"
    )
)
#> Your input function was successfully run at CGRIDID: 1
#> Your input function was successfully run at CGRIDID: 2
#> Your input function was successfully run at CGRIDID: 3
#> Your input function was successfully run at CGRIDID: 4
#> Your input function was successfully run at CGRIDID: 5
#> Your input function was successfully run at CGRIDID: 6
#> Your input function was successfully run at CGRIDID: 7
#> Your input function was successfully run at CGRIDID: 8
#>    user  system elapsed 
#> 152.344  11.003  60.409
future::plan(future::sequential)

\[!NOTE\] The example above used strings of raster file paths for surf argument in chopin::extract_at_buffer. terra::rast at multiple file paths will return a SpatRaster with multiple layers only if the rasters have the same extent and resolution.

Larger number of features

This example uses 1,204,934 1-km grid points in the southeastern United States to summarize seven layers of TerraClimate.

## generate 1km grid points in the southeastern US States
stt <- tigris::states(year = 2020)
targ_states <- c("NC", "SC", "GA", "FL", "AL", "MS", "TN", "LA", "AR")
stt_targ <- stt[stt$STUSPS %in% targ_states, ]
stt_t <- sf::st_transform(stt_targ, "EPSG:5070")
stt_g <- sf::st_sample(stt_t, type = "regular", 1204934)
stt_g <- sf::st_as_sf(stt_g)
sf::st_geometry(stt_g) <- "geometry"
stt_g$pid <- seq(1, nrow(stt_g))
stt_gb <- sf::st_buffer(stt_g, units::set_units(10, "km"))

tic()
single_2m <-
  exactextractr::exact_extract(
    netcdf_read_sum,
    stt_gb,
    fun = "mean",
    stack_apply = TRUE,
    force_df = TRUE,
    max_cells_in_memory = 2.14e9,
    progress = FALSE
  )
toc()
#> 855.908 sec elapsed
stt_gbg <-
  chopin::par_pad_grid(
    stt_g,
    mode = "grid",
    padding = 5e3,
    nx = 5L,
    ny = 5L
  )


future::plan(future::multicore, workers = 8L)
system.time(
  stt5k <-
    chopin::par_grid(
      stt_gbg,
      fun_dist = extract_at,
      y = stt_g,
      x = netcdf_read_sum,
      id = "pid",
      radius = 1e4,
      func = "mean",
      max_cells = 2e7
    )
)
#>   user  system elapsed 
#>  6.745   4.102 434.041
future::plan(future::sequential)

Finely resolved vector

Using PRISM data, the example below summarizes the mean values of each data element at census block groups.

Download

# set state=NULL and cb=TRUE will download the block groups for the entire US
bg <- tigris::block_groups(state = NULL, cb = TRUE, year = 2020)
sf::write_sf(bg, file.path("input", "Blockgroups_2020.gpkg"))

Extract

## extract prism by par_hierarchy
bgsf <- sf::st_read("input/Blockgroups_2020.gpkg")
#> Reading layer `Blockgroups_2020' from data source 
#>   `/ddn/gs1/home/songi2/projects/chopin/input/Blockgroups_2020.gpkg' 
#>   using driver `GPKG'
#> Simple feature collection with 242298 features and 11 fields
#> Geometry type: MULTIPOLYGON
#> Dimension:     XY
#> Bounding box:  xmin: -179.1467 ymin: -14.5487 xmax: 179.7785 ymax: 71.38782
#> Geodetic CRS:  NAD83
bgsf_main <- bgsf %>%
  dplyr::filter(!STATEFP %in% c("02", "15", "72", "66", "78", "60", "69"))

## extract prism at bg
system.time(
  exsingle <-
    exactextractr::exact_extract(
      bilssds,
      bgsf_main,
      fun = "mean",
      stack_apply = TRUE,
      force_df = TRUE,
      append_cols = "GEOID",
      max_cells_in_memory = 2.14e9,
      progress = FALSE
    )
)
#>    user  system elapsed 
#>  60.387   2.059  64.147
nmain <- c("AS", "AK", "HI", "PR", "VI", "GU", "MP")
stt_nmain <- stt[!stt$STUSPS %in% nmain, ]


future::plan(future.mirai::mirai_multisession, workers = 20L)
system.time(
  exhierarchy <-
    chopin::par_hierarchy(
      bgsf_main,
      regions_id = "STATEFP",
      fun_dist = extract_at,
      y = bgsf_main,
      x = bilssds,
      id = "GEOID"
    )
)
#>    user  system elapsed 
#>   2.205   0.558 158.905
future::plan(future::sequential)

Finely resolved raster

We demonstrate the extraction process with the CropScape dataset which has a resolution of 30 meters. In this case, we use frac option of exact_extract which tabulates the fraction of the area of the cell category that is covered by the polygon after accounting for partial coverage of polygon segments over raster cells.

cdl <- terra::rast("input/2022_cdls/2022_30m_cdls.tif")

system.time(
  bgsf_cdl_single <-
    exactextractr::exact_extract(
      cdl,
      bgsf_main,
      fun = "frac",
      stack_apply = TRUE,
      force_df = TRUE,
      append_cols = "GEOID",
      max_cells_in_memory = 2.14e9,
      progress = FALSE
    )
)
#>     user   system  elapsed 
#> 1013.411   44.322 1369.053

For balancing computation time, we will split the block groups into nine subsets to parallelize. Note that mode = "grid_quantile" is used in par_pad_grid to balance the number of block groups per grid. When input argument of par_pad_grid is polygons, a few polygons will have duplicate rows in the output data.frame since block groups overlapping each grid will be selected.

cdl <- terra::rast("input/2022_cdls/2022_30m_cdls.tif")
bgsf <- sf::st_read("input/Blockgroups_2020.gpkg")
bgsf_main <- bgsf %>%
  dplyr::filter(!STATEFP %in% c("02", "15", "72", "66", "78", "60", "69"))


# balancing the number of features assigned per workers
# by splitting block groups by splitting centroids
bgsf_9grids_plain <- bgsf_main %>%
  chopin::par_pad_grid(
    mode = "grid",
    nx = 3L, ny = 3L, padding = 1e4
  )
bgsf_9grids <- bgsf_main %>%
  chopin::par_pad_grid(
    mode = "grid_quantile",
    padding = 1e4,
    quantiles = chopin::par_def_q(3L)
  )


future::plan(future.mirai::mirai_multisession, workers = 9L)
# Note that bgsf_9grids were converted to sf
# for exporting the objects to the parallel workers
system.time(
  bgsf_cdl_par <-
    chopin::par_grid(
      lapply(bgsf_9grids, sf::st_as_sf),
      fun_dist = extract_at,
      y = bgsf_main,
      x = cdl,
      id = "GEOID",
      func = "frac",
      max_cells = 2e7
    )
)
#> Your input function was successfully run at CGRIDID: 1
#> Your input function was successfully run at CGRIDID: 2
#> Your input function was successfully run at CGRIDID: 3
#> Your input function was successfully run at CGRIDID: 4
#> Your input function was successfully run at CGRIDID: 5
#> Your input function was successfully run at CGRIDID: 6
#> Your input function was successfully run at CGRIDID: 7
#> Your input function was successfully run at CGRIDID: 8
#> Your input function was successfully run at CGRIDID: 9
#>   user  system elapsed 
#>  6.632   1.821 347.638