Skip to contents

bigKNN includes checkpointed streaming APIs for exact jobs that are large enough, long enough, or operationally important enough that restarting from scratch would be painful. These jobs stream results into destination big.matrix objects and record their progress in an .rds checkpoint file that can be resumed later.

This vignette covers:

When resumable jobs are worth using

Resumable jobs are especially useful when:

  • the result is too large to keep comfortably in ordinary R objects
  • the exact job may run long enough that interruptions are realistic
  • you want durable progress on disk rather than all-or-nothing execution
  • the destination should already live in a file-backed big.matrix

These APIs are about operational safety, not approximate computation. The results are still exact. The difference is that progress is streamed into destination matrices and checkpointed along the way.

Build a Durable Working Directory

To make resume realistic, this vignette uses file-backed references and file-backed destinations. That mirrors the usual production setup, where the reference, destination stores, and checkpoint file all live in a stable working directory.

scratch_dir <- file.path(tempdir(), "bigknn-resumable-jobs")
dir.create(scratch_dir, recursive = TRUE, showWarnings = FALSE)

Checkpointed graph streaming with knn_graph_stream_bigmatrix()

The graph streaming API writes a directed exact kNN graph. In other words, it streams the same edge set you would get from knn_graph_bigmatrix(..., symmetrize = "none").

For a directed graph job, the destination size is known in advance:

n_edge = n_ref * k

Here is a small file-backed reference with four rows and one outgoing edge per row.

graph_points <- data.frame(
  id = paste0("g", 1:4),
  x1 = c(0, 1, 5, 6),
  x2 = c(0, 0, 0, 0)
)

graph_ref <- filebacked.big.matrix(
  nrow = nrow(graph_points),
  ncol = 2,
  type = "double",
  backingfile = "graph-ref.bin",
  descriptorfile = "graph-ref.desc",
  backingpath = scratch_dir
)

graph_ref[,] <- as.matrix(graph_points[c("x1", "x2")])
graph_points
#>   id x1 x2
#> 1 g1  0  0
#> 2 g2  1  0
#> 3 g3  5  0
#> 4 g4  6  0
k <- 1L
n_edge <- nrow(graph_ref) * k

graph_from <- filebacked.big.matrix(
  nrow = n_edge,
  ncol = 1,
  type = "integer",
  backingfile = "graph-from.bin",
  descriptorfile = "graph-from.desc",
  backingpath = scratch_dir
)

graph_to <- filebacked.big.matrix(
  nrow = n_edge,
  ncol = 1,
  type = "integer",
  backingfile = "graph-to.bin",
  descriptorfile = "graph-to.desc",
  backingpath = scratch_dir
)

graph_value <- filebacked.big.matrix(
  nrow = n_edge,
  ncol = 1,
  type = "double",
  backingfile = "graph-value.bin",
  descriptorfile = "graph-value.desc",
  backingpath = scratch_dir
)

graph_checkpoint <- file.path(scratch_dir, "graph-job.rds")

graph_job <- knn_graph_stream_bigmatrix(
  graph_ref,
  k = k,
  xpFrom = graph_from,
  xpTo = graph_to,
  xpValue = graph_value,
  checkpoint_path = graph_checkpoint
)

job_summary(graph_job)
#>      status    metric queries edges    checkpoint
#> 1 completed euclidean       4     4 graph-job.rds
read_graph_store(graph_from, graph_to, graph_value)
#>   from to value
#> 1    1  2     1
#> 2    2  1     1
#> 3    3  4     1
#> 4    4  3     1

Because the output is written directly to the destination matrices, the returned job object is mostly a status record. The actual graph data lives in graph_from, graph_to, and graph_value.

Checkpointed radius streaming with radius_stream_job_bigmatrix()

Radius output is more complicated because the total number of matches is not known from k alone. In practice, the easiest pattern is:

  1. run count_within_radius_bigmatrix() to size the destinations
  2. allocate file-backed stores of the right length
  3. run radius_stream_job_bigmatrix()

The resumable radius job itself still has an internal count phase, so its checkpoint knows the per-query counts and can rebuild the offset vector during resume.

radius_points <- data.frame(
  id = paste0("r", 1:4),
  x1 = c(1, 0, 1, 2),
  x2 = c(0, 1, 1, 1)
)

radius_ref <- filebacked.big.matrix(
  nrow = nrow(radius_points),
  ncol = 2,
  type = "double",
  backingfile = "radius-ref.bin",
  descriptorfile = "radius-ref.desc",
  backingpath = scratch_dir
)

radius_ref[,] <- as.matrix(radius_points[c("x1", "x2")])
radius_points
#>   id x1 x2
#> 1 r1  1  0
#> 2 r2  0  1
#> 3 r3  1  1
#> 4 r4  2  1
radius_counts <- count_within_radius_bigmatrix(radius_ref, radius = 1.1)
total_matches <- sum(radius_counts)

radius_index <- filebacked.big.matrix(
  nrow = total_matches,
  ncol = 1,
  type = "integer",
  backingfile = "radius-index.bin",
  descriptorfile = "radius-index.desc",
  backingpath = scratch_dir
)

radius_distance <- filebacked.big.matrix(
  nrow = total_matches,
  ncol = 1,
  type = "double",
  backingfile = "radius-distance.bin",
  descriptorfile = "radius-distance.desc",
  backingpath = scratch_dir
)

radius_offset <- filebacked.big.matrix(
  nrow = length(radius_counts) + 1L,
  ncol = 1,
  type = "double",
  backingfile = "radius-offset.bin",
  descriptorfile = "radius-offset.desc",
  backingpath = scratch_dir
)

radius_checkpoint <- file.path(scratch_dir, "radius-job.rds")

radius_job <- radius_stream_job_bigmatrix(
  radius_ref,
  xpIndex = radius_index,
  xpDistance = radius_distance,
  xpOffset = radius_offset,
  radius = 1.1,
  checkpoint_path = radius_checkpoint
)

radius_counts
#> [1] 1 1 3 1
job_summary(radius_job)
#>      status    metric queries edges     checkpoint
#> 1 completed euclidean       4     6 radius-job.rds
as.integer(as.vector(bigmemory::as.matrix(radius_offset)))
#> [1] 1 2 3 6 7
read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id)
#>   query neighbor distance
#> 1    r1        3        1
#> 2    r2        3        1
#> 3    r3        1        1
#> 4    r3        2        1
#> 5    r3        4        1
#> 6    r4        3        1

The flattened index and distance vectors are interpreted through xpOffset. For query i, the relevant slice runs from offset[i] to offset[i + 1] - 1.

Restarting work with resume_knn_job()

In a real interruption, the checkpoint would already be left in a partially completed state by the interrupted session. Inside a vignette build we obviously cannot crash the R process on purpose, so the next chunks mimic that situation by rewinding the checkpoint and clearing only the unfinished part of the output.

Graph Job Resume

graph_expected <- as.data.frame(
  knn_graph_bigmatrix(graph_ref, k = 1, format = "edge_list", symmetrize = "none")
)
attr(graph_expected, "bigknn_graph") <- NULL

graph_spec <- readRDS(graph_checkpoint)
graph_spec$status <- "running"
graph_spec$next_row <- 3L
graph_spec$next_edge <- 3L
saveRDS(graph_spec, graph_checkpoint)

graph_from[, 1] <- 0L
graph_to[, 1] <- 0L
graph_value[, 1] <- 0
graph_from[1:2, 1] <- as.integer(graph_expected$from[1:2])
graph_to[1:2, 1] <- as.integer(graph_expected$to[1:2])
graph_value[1:2, 1] <- graph_expected$distance[1:2]

resumed_graph_job <- resume_knn_job(graph_checkpoint)

job_summary(resumed_graph_job)
#>      status    metric queries edges    checkpoint
#> 1 completed euclidean       4     4 graph-job.rds
read_graph_store(graph_from, graph_to, graph_value)
#>   from to value
#> 1    1  2     1
#> 2    2  1     1
#> 3    3  4     1
#> 4    4  3     1

The resumed job finishes the remaining rows and rewrites the missing tail of the streamed edge list.

Radius Job Resume

Radius jobs checkpoint both the current phase and the next row cursor. Here we simulate a restart after the count phase has already completed but the collect phase stopped partway through.

radius_expected <- radius_bigmatrix(radius_ref, radius = 1.1)

radius_spec <- readRDS(radius_checkpoint)
radius_spec$status <- "running"
radius_spec$phase <- "collect"
radius_spec$next_row <- 3L
saveRDS(radius_spec, radius_checkpoint)

radius_index[, 1] <- 0L
radius_distance[, 1] <- 0
radius_offset[, 1] <- 0

prefix_end <- radius_expected$offset[3L] - 1L
radius_index[seq_len(prefix_end), 1] <- as.integer(radius_expected$index[seq_len(prefix_end)])
radius_distance[seq_len(prefix_end), 1] <- radius_expected$distance[seq_len(prefix_end)]

resumed_radius_job <- resume_knn_job(radius_checkpoint)

job_summary(resumed_radius_job)
#>      status    metric queries edges     checkpoint
#> 1 completed euclidean       4     6 radius-job.rds
as.integer(as.vector(bigmemory::as.matrix(radius_offset)))
#> [1] 1 2 3 6 7
read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id)
#>   query neighbor distance
#> 1    r1        3        1
#> 2    r2        3        1
#> 3    r3        1        1
#> 4    r3        2        1
#> 5    r3        4        1
#> 6    r4        3        1

Notice that the offset vector is restored as part of the resumed run. That is why the radius checkpoint tracks both the phase and the per-query counts.

Destination matrix sizing and storage types

The destination contracts are strict, and it helps to keep them explicit:

  • For knn_graph_stream_bigmatrix(), xpFrom and xpTo must be writable single-column big.matrix objects with n_ref * k rows.
  • For graph jobs, xpValue is optional, but when supplied it must be a writable single-column double-backed big.matrix.
  • For radius_stream_job_bigmatrix(), xpIndex must be writable, single column, and long enough for the flattened match vector.
  • For radius jobs, xpDistance must be double-backed when supplied.
  • For radius jobs, xpOffset must be single-column and have n_query + 1 rows.

In practice, file-backed destinations are often the most natural choice because they match the persistence model of checkpointed jobs.

What checkpoint files contain

Checkpoint files are ordinary .rds files, but they are not meant to be edited in normal use. At a high level they store:

  • prepared-reference metadata
  • query metadata
  • descriptors for the destination matrices
  • the current phase and row cursor
  • enough state to finish the remaining work

Here are the most informative scalar fields for the graph and radius checkpoints created above:

graph_spec <- readRDS(graph_checkpoint)
radius_spec <- readRDS(radius_checkpoint)

checkpoint_summary(graph_spec, c("type", "status", "k", "next_row", "next_edge"))
#>       field            value
#> 1      type knn_graph_stream
#> 2    status        completed
#> 3         k                1
#> 4  next_row                5
#> 5 next_edge                5
checkpoint_summary(radius_spec, c("type", "status", "phase", "next_row", "total_matches"))
#>           field         value
#> 1          type radius_stream
#> 2        status     completed
#> 3         phase       collect
#> 4      next_row             5
#> 5 total_matches             6

The non-scalar fields in those checkpoint objects hold the serialized prepared reference, query description, and destination descriptors needed for resume.

Failure recovery and restart patterns

A few practical patterns make checkpointed workflows more reliable:

  • Keep the checkpoint file, the reference backing files, and the destination backing files together inside a stable workflow directory.
  • After an interruption, call resume_knn_job() with the same checkpoint path rather than starting from scratch.
  • If a checkpoint already shows status = "completed", calling resume_knn_job() again simply returns a completed job object.
  • If the backing files for the reference or destinations are missing, resume cannot reattach the descriptors and the job cannot continue.

The key idea is that the checkpoint is only one part of the state. The backing files for the referenced big.matrix objects matter just as much.

Operational tips for long-running jobs

  • Pair resumable jobs with knn_plan_bigmatrix() when you want a repeatable block-size and thread policy across runs.
  • Prefer file-backed destinations for long jobs, especially when the outputs are large enough that you would not want to recompute them.
  • Put checkpoint files in a predictable location so restart logic can find them easily.
  • Use graph streaming when the directed edge list is the real output, and use radius streaming when you want flattened match vectors plus offsets.
  • Treat checkpoint files as internal state. Reading them for diagnostics is fine; hand-editing them should be reserved for controlled recovery scenarios like the simulated example in this vignette.

Used this way, resumable jobs give bigKNN a more production-friendly exact workflow: results are streamed to durable storage, progress survives interruptions, and recovery is a normal part of the API instead of an external hack.