Skip to contents

Use the bulk API to prepare bulk format data

Usage

docs_bulk_prep(
  x,
  index,
  path,
  type = NULL,
  chunk_size = 1000,
  doc_ids = NULL,
  quiet = FALSE,
  digits = NA,
  sf = NULL
)

Arguments

x

A data.frame or a list. required.

index

(character) The index name. required.

path

(character) Path to the file. If data is broken into chunks, we'll use this path as the prefix, and suffix each file path with a number. required.

type

(character) The type. default: NULL. Note that type is deprecated in Elasticsearch v7 and greater, and removed in Elasticsearch v8

chunk_size

(integer) Size of each chunk. If your data.frame is smaller thank chunk_size, this parameter is essentially ignored. We write in chunks because at some point, depending on size of each document, and Elasticsearch setup, writing a very large number of documents in one go becomes slow, so chunking can help. This parameter is ignored if you pass a file name. Default: 1000

doc_ids

An optional vector (character or numeric/integer) of document ids to use. This vector has to equal the size of the documents you are passing in, and will error if not. If you pass a factor we convert to character. Default: not passed

quiet

(logical) Suppress progress bar. Default: FALSE

digits

digits used by the parameter of the same name by jsonlite::toJSON() to convert data to JSON before being submitted to your ES instance. default: NA

sf

used by jsonlite::toJSON() to convert sf objects. Set to "features" for conversion to GeoJSON. default: "dataframe"

Value

File path(s). By default we use temporary files; these are cleaned up at the end of a session

Tempfiles

In docs_bulk we create temporary files in some cases, and delete those before the function exits. However, we don't clean up those files in this function because the point of the function is to create the newline delimited JSON files that you need. Tempfiles are cleaned up when you R session ends though - be aware of that. If you want to keep the files make sure to move them outside of the temp directory.

See also

Examples

if (FALSE) { # \dontrun{
# From a data.frame
ff <- tempfile(fileext = ".json")
docs_bulk_prep(mtcars, index = "hello", path = ff)
readLines(ff)

## field names cannot contain dots
names(iris) <- gsub("\\.", "_", names(iris))
docs_bulk_prep(iris, "iris", path = tempfile(fileext = ".json"))

## type can be missing, but index can not
docs_bulk_prep(iris, "flowers", path = tempfile(fileext = ".json"))

# From a list
docs_bulk_prep(apply(iris, 1, as.list), index="iris",
   path = tempfile(fileext = ".json"))
docs_bulk_prep(apply(USArrests, 1, as.list), index="arrests",
   path = tempfile(fileext = ".json"))

# when chunking
## multiple files created, one for each chunk
bigiris <- do.call("rbind", replicate(30, iris, FALSE))
docs_bulk_prep(bigiris, index = "big", path = tempfile(fileext = ".json"))

# When using in a loop
## We internally get last _id counter to know where to start on next bulk
## insert but you need to sleep in between docs_bulk_prep calls, longer the
## bigger the data is
files <- c(system.file("examples", "test1.csv", package = "elastic"),
           system.file("examples", "test2.csv", package = "elastic"),
           system.file("examples", "test3.csv", package = "elastic"))
paths <- vector("list", length = length(files))
for (i in seq_along(files)) {
  d <- read.csv(files[[i]])
  paths[i] <- docs_bulk_prep(d, index = "stuff",
     path = tempfile(fileext = ".json"))
}
unlist(paths)

# You can include your own document id numbers
## Either pass in as an argument
files <- c(system.file("examples", "test1.csv", package = "elastic"),
           system.file("examples", "test2.csv", package = "elastic"),
           system.file("examples", "test3.csv", package = "elastic"))
tt <- vapply(files, function(z) NROW(read.csv(z)), numeric(1))
ids <- list(1:tt[1],
           (tt[1] + 1):(tt[1] + tt[2]),
           (tt[1] + tt[2] + 1):sum(tt))
paths <- vector("list", length = length(files))
for (i in seq_along(files)) {
  d <- read.csv(files[[i]])
  paths[i] <- docs_bulk_prep(d, index = "testes",
    doc_ids = ids[[i]], path = tempfile(fileext = ".json"))
}
unlist(paths)

## or include in the input data
### from data.frame's
files <- c(system.file("examples", "test1_id.csv", package = "elastic"),
           system.file("examples", "test2_id.csv", package = "elastic"),
           system.file("examples", "test3_id.csv", package = "elastic"))
paths <- vector("list", length = length(files))
for (i in seq_along(files)) {
  d <- read.csv(files[[i]])
  paths[i] <- docs_bulk_prep(d, index = "testes",
     path = tempfile(fileext = ".json"))
}
unlist(paths)

### from lists via file inputs
paths <- vector("list", length = length(files))
for (i in seq_along(files)) {
  d <- read.csv(files[[i]])
  d <- apply(d, 1, as.list)
  paths[i] <- docs_bulk_prep(d, index = "testes",
      path = tempfile(fileext = ".json"))
}
unlist(paths)


# A mix of actions
## make sure you use a column named 'es_action' or this won't work
## if you need to delete or update you need document IDs
if (index_exists(x, "baz")) index_delete(x, "baz")
df <- data.frame(a = 1:5, b = 6:10, c = letters[1:5], stringsAsFactors = FALSE)
f <- tempfile(fileext = ".json")
invisible(docs_bulk_prep(df, "baz", f))
cat(readLines(f), sep = "\n")
docs_bulk(x, f)
Sys.sleep(2)
(res <- Search(x, 'baz', asdf=TRUE)$hits$hits)

df[1, "a"] <- 99
df[1, "c"] <- "aa"
df[3, "c"] <- 33
df[3, "c"] <- "cc"
df$es_action <- c('update', 'delete', 'update', 'delete', 'delete')
df$id <- res$`_id`
df
f <- tempfile(fileext = ".json")
invisible(docs_bulk_prep(df, "baz", path = f, doc_ids = df$id))
cat(readLines(f), sep = "\n")
docs_bulk(x, f)


# suppress progress bar
docs_bulk_prep(mtcars, index = "hello",
  path = tempfile(fileext = ".json"), quiet = TRUE)
## vs.
docs_bulk_prep(mtcars, index = "hello",
  path = tempfile(fileext = ".json"), quiet = FALSE)
} # }