bigKNN includes checkpointed streaming APIs for exact
jobs that are large enough, long enough, or operationally important
enough that restarting from scratch would be painful. These jobs stream
results into destination big.matrix objects and record
their progress in an .rds checkpoint file that can be
resumed later.
This vignette covers:
- directed kNN graph streaming with
knn_graph_stream_bigmatrix() - checkpointed radius streaming with
radius_stream_job_bigmatrix() - resuming interrupted work with
resume_knn_job() - what the destination and checkpoint contracts look like in practice
When resumable jobs are worth using
Resumable jobs are especially useful when:
- the result is too large to keep comfortably in ordinary R objects
- the exact job may run long enough that interruptions are realistic
- you want durable progress on disk rather than all-or-nothing execution
- the destination should already live in a file-backed
big.matrix
These APIs are about operational safety, not approximate computation. The results are still exact. The difference is that progress is streamed into destination matrices and checkpointed along the way.
Build a Durable Working Directory
To make resume realistic, this vignette uses file-backed references and file-backed destinations. That mirrors the usual production setup, where the reference, destination stores, and checkpoint file all live in a stable working directory.
scratch_dir <- file.path(tempdir(), "bigknn-resumable-jobs")
dir.create(scratch_dir, recursive = TRUE, showWarnings = FALSE)Checkpointed graph streaming with
knn_graph_stream_bigmatrix()
The graph streaming API writes a directed exact kNN graph. In other
words, it streams the same edge set you would get from
knn_graph_bigmatrix(..., symmetrize = "none").
For a directed graph job, the destination size is known in advance:
n_edge = n_ref * k
Here is a small file-backed reference with four rows and one outgoing edge per row.
graph_points <- data.frame(
id = paste0("g", 1:4),
x1 = c(0, 1, 5, 6),
x2 = c(0, 0, 0, 0)
)
graph_ref <- filebacked.big.matrix(
nrow = nrow(graph_points),
ncol = 2,
type = "double",
backingfile = "graph-ref.bin",
descriptorfile = "graph-ref.desc",
backingpath = scratch_dir
)
graph_ref[,] <- as.matrix(graph_points[c("x1", "x2")])
graph_points
#> id x1 x2
#> 1 g1 0 0
#> 2 g2 1 0
#> 3 g3 5 0
#> 4 g4 6 0
k <- 1L
n_edge <- nrow(graph_ref) * k
graph_from <- filebacked.big.matrix(
nrow = n_edge,
ncol = 1,
type = "integer",
backingfile = "graph-from.bin",
descriptorfile = "graph-from.desc",
backingpath = scratch_dir
)
graph_to <- filebacked.big.matrix(
nrow = n_edge,
ncol = 1,
type = "integer",
backingfile = "graph-to.bin",
descriptorfile = "graph-to.desc",
backingpath = scratch_dir
)
graph_value <- filebacked.big.matrix(
nrow = n_edge,
ncol = 1,
type = "double",
backingfile = "graph-value.bin",
descriptorfile = "graph-value.desc",
backingpath = scratch_dir
)
graph_checkpoint <- file.path(scratch_dir, "graph-job.rds")
graph_job <- knn_graph_stream_bigmatrix(
graph_ref,
k = k,
xpFrom = graph_from,
xpTo = graph_to,
xpValue = graph_value,
checkpoint_path = graph_checkpoint
)
job_summary(graph_job)
#> status metric queries edges checkpoint
#> 1 completed euclidean 4 4 graph-job.rds
read_graph_store(graph_from, graph_to, graph_value)
#> from to value
#> 1 1 2 1
#> 2 2 1 1
#> 3 3 4 1
#> 4 4 3 1Because the output is written directly to the destination matrices,
the returned job object is mostly a status record. The actual graph data
lives in graph_from, graph_to, and
graph_value.
Checkpointed radius streaming with
radius_stream_job_bigmatrix()
Radius output is more complicated because the total number of matches
is not known from k alone. In practice, the easiest pattern
is:
- run
count_within_radius_bigmatrix()to size the destinations - allocate file-backed stores of the right length
- run
radius_stream_job_bigmatrix()
The resumable radius job itself still has an internal count phase, so its checkpoint knows the per-query counts and can rebuild the offset vector during resume.
radius_points <- data.frame(
id = paste0("r", 1:4),
x1 = c(1, 0, 1, 2),
x2 = c(0, 1, 1, 1)
)
radius_ref <- filebacked.big.matrix(
nrow = nrow(radius_points),
ncol = 2,
type = "double",
backingfile = "radius-ref.bin",
descriptorfile = "radius-ref.desc",
backingpath = scratch_dir
)
radius_ref[,] <- as.matrix(radius_points[c("x1", "x2")])
radius_points
#> id x1 x2
#> 1 r1 1 0
#> 2 r2 0 1
#> 3 r3 1 1
#> 4 r4 2 1
radius_counts <- count_within_radius_bigmatrix(radius_ref, radius = 1.1)
total_matches <- sum(radius_counts)
radius_index <- filebacked.big.matrix(
nrow = total_matches,
ncol = 1,
type = "integer",
backingfile = "radius-index.bin",
descriptorfile = "radius-index.desc",
backingpath = scratch_dir
)
radius_distance <- filebacked.big.matrix(
nrow = total_matches,
ncol = 1,
type = "double",
backingfile = "radius-distance.bin",
descriptorfile = "radius-distance.desc",
backingpath = scratch_dir
)
radius_offset <- filebacked.big.matrix(
nrow = length(radius_counts) + 1L,
ncol = 1,
type = "double",
backingfile = "radius-offset.bin",
descriptorfile = "radius-offset.desc",
backingpath = scratch_dir
)
radius_checkpoint <- file.path(scratch_dir, "radius-job.rds")
radius_job <- radius_stream_job_bigmatrix(
radius_ref,
xpIndex = radius_index,
xpDistance = radius_distance,
xpOffset = radius_offset,
radius = 1.1,
checkpoint_path = radius_checkpoint
)
radius_counts
#> [1] 1 1 3 1
job_summary(radius_job)
#> status metric queries edges checkpoint
#> 1 completed euclidean 4 6 radius-job.rds
as.integer(as.vector(bigmemory::as.matrix(radius_offset)))
#> [1] 1 2 3 6 7
read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id)
#> query neighbor distance
#> 1 r1 3 1
#> 2 r2 3 1
#> 3 r3 1 1
#> 4 r3 2 1
#> 5 r3 4 1
#> 6 r4 3 1The flattened index and distance vectors
are interpreted through xpOffset. For query i,
the relevant slice runs from offset[i] to
offset[i + 1] - 1.
Restarting work with resume_knn_job()
In a real interruption, the checkpoint would already be left in a partially completed state by the interrupted session. Inside a vignette build we obviously cannot crash the R process on purpose, so the next chunks mimic that situation by rewinding the checkpoint and clearing only the unfinished part of the output.
Graph Job Resume
graph_expected <- as.data.frame(
knn_graph_bigmatrix(graph_ref, k = 1, format = "edge_list", symmetrize = "none")
)
attr(graph_expected, "bigknn_graph") <- NULL
graph_spec <- readRDS(graph_checkpoint)
graph_spec$status <- "running"
graph_spec$next_row <- 3L
graph_spec$next_edge <- 3L
saveRDS(graph_spec, graph_checkpoint)
graph_from[, 1] <- 0L
graph_to[, 1] <- 0L
graph_value[, 1] <- 0
graph_from[1:2, 1] <- as.integer(graph_expected$from[1:2])
graph_to[1:2, 1] <- as.integer(graph_expected$to[1:2])
graph_value[1:2, 1] <- graph_expected$distance[1:2]
resumed_graph_job <- resume_knn_job(graph_checkpoint)
job_summary(resumed_graph_job)
#> status metric queries edges checkpoint
#> 1 completed euclidean 4 4 graph-job.rds
read_graph_store(graph_from, graph_to, graph_value)
#> from to value
#> 1 1 2 1
#> 2 2 1 1
#> 3 3 4 1
#> 4 4 3 1The resumed job finishes the remaining rows and rewrites the missing tail of the streamed edge list.
Radius Job Resume
Radius jobs checkpoint both the current phase and the next row cursor. Here we simulate a restart after the count phase has already completed but the collect phase stopped partway through.
radius_expected <- radius_bigmatrix(radius_ref, radius = 1.1)
radius_spec <- readRDS(radius_checkpoint)
radius_spec$status <- "running"
radius_spec$phase <- "collect"
radius_spec$next_row <- 3L
saveRDS(radius_spec, radius_checkpoint)
radius_index[, 1] <- 0L
radius_distance[, 1] <- 0
radius_offset[, 1] <- 0
prefix_end <- radius_expected$offset[3L] - 1L
radius_index[seq_len(prefix_end), 1] <- as.integer(radius_expected$index[seq_len(prefix_end)])
radius_distance[seq_len(prefix_end), 1] <- radius_expected$distance[seq_len(prefix_end)]
resumed_radius_job <- resume_knn_job(radius_checkpoint)
job_summary(resumed_radius_job)
#> status metric queries edges checkpoint
#> 1 completed euclidean 4 6 radius-job.rds
as.integer(as.vector(bigmemory::as.matrix(radius_offset)))
#> [1] 1 2 3 6 7
read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id)
#> query neighbor distance
#> 1 r1 3 1
#> 2 r2 3 1
#> 3 r3 1 1
#> 4 r3 2 1
#> 5 r3 4 1
#> 6 r4 3 1Notice that the offset vector is restored as part of the resumed run. That is why the radius checkpoint tracks both the phase and the per-query counts.
Destination matrix sizing and storage types
The destination contracts are strict, and it helps to keep them explicit:
- For
knn_graph_stream_bigmatrix(),xpFromandxpTomust be writable single-columnbig.matrixobjects withn_ref * krows. - For graph jobs,
xpValueis optional, but when supplied it must be a writable single-column double-backedbig.matrix. - For
radius_stream_job_bigmatrix(),xpIndexmust be writable, single column, and long enough for the flattened match vector. - For radius jobs,
xpDistancemust be double-backed when supplied. - For radius jobs,
xpOffsetmust be single-column and haven_query + 1rows.
In practice, file-backed destinations are often the most natural choice because they match the persistence model of checkpointed jobs.
What checkpoint files contain
Checkpoint files are ordinary .rds files, but they are
not meant to be edited in normal use. At a high level they store:
- prepared-reference metadata
- query metadata
- descriptors for the destination matrices
- the current phase and row cursor
- enough state to finish the remaining work
Here are the most informative scalar fields for the graph and radius checkpoints created above:
graph_spec <- readRDS(graph_checkpoint)
radius_spec <- readRDS(radius_checkpoint)
checkpoint_summary(graph_spec, c("type", "status", "k", "next_row", "next_edge"))
#> field value
#> 1 type knn_graph_stream
#> 2 status completed
#> 3 k 1
#> 4 next_row 5
#> 5 next_edge 5
checkpoint_summary(radius_spec, c("type", "status", "phase", "next_row", "total_matches"))
#> field value
#> 1 type radius_stream
#> 2 status completed
#> 3 phase collect
#> 4 next_row 5
#> 5 total_matches 6The non-scalar fields in those checkpoint objects hold the serialized prepared reference, query description, and destination descriptors needed for resume.
Failure recovery and restart patterns
A few practical patterns make checkpointed workflows more reliable:
- Keep the checkpoint file, the reference backing files, and the destination backing files together inside a stable workflow directory.
- After an interruption, call
resume_knn_job()with the same checkpoint path rather than starting from scratch. - If a checkpoint already shows
status = "completed", callingresume_knn_job()again simply returns a completed job object. - If the backing files for the reference or destinations are missing, resume cannot reattach the descriptors and the job cannot continue.
The key idea is that the checkpoint is only one part of the state.
The backing files for the referenced big.matrix objects
matter just as much.
Operational tips for long-running jobs
- Pair resumable jobs with
knn_plan_bigmatrix()when you want a repeatable block-size and thread policy across runs. - Prefer file-backed destinations for long jobs, especially when the outputs are large enough that you would not want to recompute them.
- Put checkpoint files in a predictable location so restart logic can find them easily.
- Use graph streaming when the directed edge list is the real output, and use radius streaming when you want flattened match vectors plus offsets.
- Treat checkpoint files as internal state. Reading them for diagnostics is fine; hand-editing them should be reserved for controlled recovery scenarios like the simulated example in this vignette.
Used this way, resumable jobs give bigKNN a more
production-friendly exact workflow: results are streamed to durable
storage, progress survives interruptions, and recovery is a normal part
of the API instead of an external hack.