library(pbapply)
# 创建一个示例 data.frame
df = data.frame(a = 1:1000, b = rnorm(1000))
# 使用 pblapply 显示 apply 的进度
result = pbapply(df, 1, function(row) {
Sys.sleep(0.01) # 模拟一些需要时间的计算
sum(row) # 计算每一行的和
})17 批量异步任务
17.1 跟踪进度
在 R 中,apply 函数本身并不提供进度条的功能,但可以结合 pbapply 包来实现进度显示。pbapply 是 apply 家族函数的增强版,支持显示进度条。
17.1.1 安装和使用 pbapply 包
安装
pbapply包:install.packages("pbapply")使用
pbapply::pblapply()代替apply():pbapply包中的函数与apply系列函数接口相同,只是多了进度条显示功能。以下是一个示例,展示如何使用pblapply()处理每一行并显示进度:
17.1.2 pbapply 包中的其他常用函数:
pblapply():用于lapply类的操作。pbapply():用于apply类操作(与apply()类似)。pbmclapply():用于并行化lapply操作(仅适用于 UNIX-like 系统)。
通过这种方法,你可以方便地在 R 中显示 apply 执行的进度,尤其适用于处理大型数据集时跟踪计算进度。
17.2 缓存
为了自动缓存 pbapply 的执行进度,你可以使用 memoise 包,它能将函数的执行结果进行缓存,避免重复计算。它可以缓存到内容、本地文件系统、Google cloud storage、AWS cloud storage 等。结合 pbapply,你可以将每次处理的结果缓存起来,以便下次调用时可以直接使用缓存的数据。
17.2.1 安装和使用 memoise 包
安装
memoise包:install.packages("memoise")将处理函数缓存起来:
使用
memoise::memoise()对函数进行缓存处理。这样,pbapply在每次执行时,都会检查是否已经缓存过该结果。library(pbapply) library(memoise) # 示例:对每一行的计算进行缓存 my_function = function(row) { Sys.sleep(0.01) # 模拟一些耗时操作 sum(row) # 简单计算每行的和 } # 对函数进行缓存 cached_function = memoise(my_function)定义好了缓存函数后,就可以使用
pbapply并结合缓存来处理数据了。在第一次运行时,函数会执行并缓存结果。在后续的运行中,如果输入相同,函数会直接从缓存中返回结果,从而提高效率。下面是第一次运行的结果:
# 使用 pbapply 并结合缓存 set.seed(123) df = data.frame(a = 1:1000, b = rnorm(1000)) system.time(result <- pbapply(df, 1, cached_function))user system elapsed 0.318 0.032 12.187再次运行同样的代码,结果会从缓存中直接返回:
set.seed(123) df = data.frame(a = 1:1000, b = rnorm(1000)) system.time(result <- pbapply(df, 1, cached_function))user system elapsed 0.019 0.001 0.019第二次运行时,由于结果已经缓存,所以速度更快。不过,需要注意的是,缓存只对同一个函数有效,如果函数定义不同,缓存不会生效。重新定义缓存函数后,哪怕函数的内部逻辑完全相同,缓存也会失效。
17.2.2 工作原理:
memoise将函数my_function的结果缓存起来。当pbapply再次调用该函数处理相同的输入时,系统会直接从缓存中读取结果,而不是重新执行。- 对于每一行处理的结果,都会被缓存。即使你多次运行相同的操作,已经计算过的部分会从缓存中直接加载,大幅提高效率。
17.2.3 可选的存储方式:
memoise 还支持不同的存储方式(例如文件系统、数据库、内存等),你可以将缓存的数据存储在持久化的存储中,这样即使 R 会话关闭,也能保留缓存结果:
# 使用本地文件系统缓存
cached_function = memoise(my_function, cache = cache_filesystem("cache"))这样可以在未来的会话中继续使用缓存,提高计算的效率并避免重复执行相同的任务。
17.2.4 自定义缓存键
在 memoise 中,缓存的唯一键是基于输入参数的值自动生成的,使用的是参数的哈希值。这意味着当传递给函数的参数内容相同时,memoise 会识别并使用已经缓存的结果,而无需重复计算。
然而,有时你可能希望更明确地控制如何生成这些缓存键。你可以通过以下方式确保缓存键的唯一性:
17.2.4.1 默认行为(自动基于参数生成键)
memoise 会默认根据函数的输入参数内容来生成哈希值。例如:
library(memoise)
# 定义一个简单的函数
my_function = function(x) {
Sys.sleep(1) # 模拟计算延迟
x^2 # 返回平方
}
# 缓存该函数
cached_function = memoise(my_function)
# 调用函数,memoise 将自动根据 x 的值生成缓存
cached_function(10) # 第一次调用,会执行
cached_function(10) # 第二次调用,会从缓存中返回结果在上面的例子中,memoise 会根据 x=10 生成哈希键,下次调用相同输入时会直接返回缓存结果。
17.2.4.2 自定义键生成方法
如果你需要自定义键的生成方式,可以使用 digest 包来手动生成缓存键。通过将输入参数转换成哈希值作为缓存键来精确控制缓存行为。
例如:
library(memoise)
library(digest)
# 自定义的函数,手动生成缓存键
my_custom_memoise = function(f) {
mem_f = memoise(function(...) {
# 使用 digest 手动生成唯一键,基于输入参数的哈希值
cache_key = digest(list(...))
message("Cache key: ", cache_key)
f(...)
})
return(mem_f)
}
# 定义一个函数
my_function = function(x) {
Sys.sleep(1)
x^2
}
# 将函数缓存,并自定义缓存键生成
cached_function = my_custom_memoise(my_function)
# 调用函数时会生成并输出自定义的缓存键
cached_function(10)[1] 100
cached_function(10)[1] 100
cached_function(20) # 对于不同的输入,缓存键会不同[1] 400
17.2.5 保存缓存到 sqlite 数据库
使用 cache_filesystem 缓存时,缓存数据会保存在文件系统中,会涉及比较多的磁盘读写运算,当处理量较大时,会严重影响性能。而且每一次函数调用都会生成一个文件,管理起来也不方便。因此,这里扩展了一个 cache_dbi() 函数,使用数据库来缓存数据。
#' SQLite Database Cache
#'
#' Use a cache in a SQLite database that will persist between R sessions.
#'
#' @param con A DBI connection object to a SQLite database.
#' @param table_name Name of the table to use for caching. Default: "cache".
#'
#' @examples
#'
#' \dontrun{
#' library(DBI)
#' library(RSQLite)
#'
#' # 创建 SQLite 数据库连接
#' con <- dbConnect(SQLite(), dbname = "cache.sqlite")
#'
#' # 使用 SQLite 缓存
#' db_cache <- cache_dbi(con)
#'
#' mem_runif <- memoise(runif, cache = db_cache)
#' }
#'
#' @export
#' @inheritParams cache_memory
cache_dbi <- function(con, algo = "xxhash64", table_name = "cache") {
if (!(requireNamespace("digest"))) { stop("Package `digest` must be installed for `cache_dbi()`.") }
if (!(requireNamespace("DBI"))) { stop("Package `DBI` must be installed for `cache_dbi()`.") }
if (!grepl("^[A-Za-z0-9_]+$", table_name)) {
stop("Invalid table name")
}
# 创建缓存表
DBI::dbExecute(con, sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT PRIMARY KEY, value BLOB)", table_name))
# 重置缓存
cache_reset <- function() {
DBI::dbExecute(con, sprintf("DELETE FROM %s", table_name))
}
# 设置缓存
cache_set <- function(key, value) {
serialized_value <- serialize(value, NULL) # 序列化 R 对象
DBI::dbExecute(con, sprintf("INSERT OR REPLACE INTO %s (key, value) VALUES (?, ?)", table_name),
params = list(key, list(serialized_value))) # 用 list 包裹 serialized_value
}
# 获取缓存
cache_get <- function(key) {
result <- DBI::dbGetQuery(con, sprintf("SELECT value FROM %s WHERE key = ?", table_name),
params = list(key))
if (nrow(result) == 0) {
return(NULL)
}
unserialize(result$value[[1]]) # 反序列化第一个值
}
# 检查缓存是否存在
cache_has_key <- function(key) {
result <- DBI::dbGetQuery(con, sprintf("SELECT 1 FROM %s WHERE key = ? LIMIT 1", table_name),
params = list(key))
nrow(result) > 0
}
# 删除缓存
cache_drop_key <- function(key) {
DBI::dbExecute(con, sprintf("DELETE FROM %s WHERE key = ?", table_name),
params = list(key))
}
list(
digest = function(...) digest::digest(..., algo = algo),
reset = cache_reset,
set = cache_set,
get = cache_get,
has_key = cache_has_key,
drop_key = cache_drop_key,
keys = function() DBI::dbGetQuery(con, sprintf("SELECT key FROM %s", table_name))$key
)
}下面,使用 cache_dbi() 函数来缓存数据:
library(memoise)
library(DBI)
library(RSQLite)
# 创建 SQLite 数据库连接
con = dbConnect(SQLite(), dbname = tempfile())
# on.exit(dbDisconnect(con))
# 将缓存数据存储在数据库中
cached_function = memoise(my_function, cache = cache_dbi(con))
# 第一次运行
system.time(cached_function(10)) user system elapsed
0.002 0.001 1.008
# 第二次运行
system.time(cached_function(10)) user system elapsed
0.008 0.000 0.007
# 第三次运行
system.time(cached_function(20)) user system elapsed
0.002 0.002 1.009
17.3 动态控制批次执行
为了动态控制批次的执行,并确保正在执行的运算不超过 batch_size,你可以使用 R 的异步编程或并发工具包,比如 parallel 和 future,结合 Sys.sleep() 控制批次提交的速率。
17.3.1 使用 parallel 包实现动态分批
以下是使用 parallel 包执行 batch_size 个并发操作的示例:
library(parallel)
# 示例数据
df = data.frame(a = 1:100, b = rnorm(100))
# 定义处理函数
process_function = function(row) {
Sys.sleep(runif(1, 0.1, 0.5)) # 模拟耗时任务
sum(row) # 返回每行的和
}
# 比较串行和并行处理的性能
# 串行处理
start_time_serial = Sys.time()
results_serial = lapply(1:nrow(df), function(i) {
process_function(df[i, ])
})
end_time_serial = Sys.time()
time_serial = difftime(end_time_serial, start_time_serial, units = "secs")
# 并行处理
start_time_parallel = Sys.time()
results_parallel = mclapply(1:nrow(df), function(i) {
process_function(df[i, ])
}, mc.cores = parallel::detectCores() - 1) # 使用可用核心数减1
end_time_parallel = Sys.time()
time_parallel = difftime(end_time_parallel, start_time_parallel, units = "secs")
# 计算性能提升
speedup = as.numeric(time_serial) / as.numeric(time_parallel)
# 输出结果
cat("串行处理时间:", time_serial, "秒\n")
cat("并行处理时间:", time_parallel, "秒\n")
cat("性能提升:", round(speedup, 2), "倍\n")
# 验证结果正确性
testthat::expect_equal(rowSums(df), unlist(results_serial))
testthat::expect_equal(rowSums(df), unlist(results_parallel))17.3.2 使用 future 包实现动态分批
future 包允许你异步地运行函数,并通过限制同时进行的任务数量来动态控制批次。通过监控并发执行的数量,如果低于 batch_size,则自动启动新的任务,否则等待 1 秒再检查。
17.3.3 代码示例
library(future)
library(future.apply)
# 设定最大并行执行任务数为 batch_size
batch_size = 10
# 使用多线程计划
plan(multisession, workers = batch_size)
# 示例数据
df = data.frame(a = 1:100, b = rnorm(100))
# 定义处理函数
process_function = function(row) {
Sys.sleep(runif(1, 0.1, 0.5)) # 模拟耗时任务
sum(row) # 返回每行的和
}
# 动态控制批次执行
results = list() # 用于存储结果
futures = list() # 用于存储每个 future
# 将每行分成任务提交
for (i in 1:nrow(df)) {
# 检查当前运行中的任务数量
current_tasks = futures[!sapply(futures, resolved)]
# 如果当前运行中的任务数量达到 batch_size,等待 1 秒
while (length(current_tasks) >= batch_size) {
# 打印当前运行中的任务数量
message("当前运行中的任务数量:", length(current_tasks))
# 等待 1 秒
Sys.sleep(1)
# 更新当前运行中的任务
current_tasks = futures[!sapply(futures, resolved)]
}
# 提交新的任务,并保持追踪
futures[[i]] = future({
process_function(df[i, ])
}, seed = TRUE)
}
# 收集所有结果
results = future::value(futures) |> unlist()
# 展示结果
testthat::expect_equal(rowSums(df), results)17.3.4 代码说明
future和future.apply:future包允许将任务异步提交,通过plan(multisession, workers = batch_size)限制同时运行的任务数。- 动态任务调度:
- 每次新任务提交前,检查当前运行中的任务数量(
length(futures))。 - 如果运行中的任务达到
batch_size,程序会等待 1 秒,再次检查任务数量,确保不超出最大并行数。 - 当低于
batch_size时,新的任务才会被提交。
- 每次新任务提交前,检查当前运行中的任务数量(
- 结果收集:使用
future::value(futures)来获取所有任务的计算结果。 - 错误处理:使用
tryCatch捕获错误,并输出错误信息。
通过这种方法,你可以确保批次执行数不会超过 batch_size,并且动态提交新任务,从而实现更灵活的任务管理。
17.4 包装成一个函数
17.4.1 函数定义
函数采用 pbapply 显示进度,batch_size 设定批次大小,fun 接受命令(如果这个命令是一个 memorise 函数则直接使用,如果不是则将其自动缓存),缓存默认使用 cache_dbi(),缓存文件地址为 db_file 文件,由用户提供。
run_task_with_cache <- function(df, fun, db_file) {
# 检查并创建缓存
con = DBI::dbConnect(RSQLite::SQLite(), dbname = db_file)
on.exit(DBI::dbDisconnect(con))
# 如果fun是memoise函数,则提出 warning,并使用fun
if (inherits(fun, "memoised")) {
warning("Fun is already a memoised function. It will be used directly. `db_file` will be ignored.")
} else {
# 如果fun不是memoise函数,则将其缓存
fun <- memoise::memoise(fun, cache = cache_dbi(con))
}
# 执行计算
results = pbapply::pblapply(1:nrow(df), function(i) {
row = df[i, ]
result = tryCatch(fun(row), error = function(e) {
message("错误发生在第 ", i, " 行: ", e$message)
return(NA)
})
return(result)
})
return(results)
}# 示例数据
set.seed(123456)
df = data.frame(a = 1:100, b = rnorm(100))
# 定义处理函数
process_function = function(row) {
Sys.sleep(0.1) # 模拟耗时任务
sum(row) # 返回每行的和
}# 运行函数
result <- run_task_with_cache(df, process_function, db_file = "cache.sqlite")
testthat::expect_equal(unlist(result), rowSums(df))17.4.2 并行计算
并行计算可以使用 future 包,或者 parallel 包。但是在显示进度条、数据库访问方面实现起来有一定难度,特别是涉及到数据库访问的并发问题,暂时没有很好的解决方案。因此,考虑使用数据拆分的方法,拆成多个批次,每个批次使用一个进程,每个进程使用一个数据库连接,这样就可以避免并发问题。
17.5 进度条
在 R 中,可以使用多个包来显示现代化的进度条,其中最常用的包包括:
progressr:一个强大的进度条包,可以显示并行任务的进度条。progress:一个简单的、可高度自定义的进度条包。cli:可以结合进度条显示,也可以显示丰富的命令行界面元素。pbapply:适用于*apply系列函数的进度条集成。
以下是这些包的基本用法:
- 使用
progressr包
progressr 包可以用来显示并行任务的进度条。以下是一个使用 future 和 furrr 包进行并行计算,并使用 progressr 显示进度的示例:
library(future)
library(furrr)
library(progressr)
plan(multisession, workers = 4)
data <- 1:100
time_consuming_task <- function(x) {
Sys.sleep(0.01)
return(x^2)
}
with_progress({
p <- progressor(steps = length(data))
results <- future_map(data, function(x) {
result <- time_consuming_task(x)
p()
return(result)
})
})- 使用
progress包
progress 包提供了简单易用的进度条,同时可以进行高度自定义。
# 安装并加载 progress 包
# install.packages("progress")
library(progress)
# 创建一个进度条
pb = progress_bar$new(
format = " downloading [:bar] :percent in :elapsed",
total = 100, clear = FALSE, width = 60
)
# 模拟一个任务并更新进度条
for (i in 1:100) {
Sys.sleep(0.01) # 模拟耗时任务
pb$tick() # 每次循环时,进度条前进一格
}- 使用
cli包
cli 包不仅提供进度条,还可以创建富文本命令行界面。
# 安装并加载 cli 包
# install.packages("cli")
library(cli)
# 创建并显示进度条
pb = cli_progress_bar("Computing",
total = 100,
format = " {cli::pb_bar} {cli::pb_percent} ETA: {cli::pb_eta}")
for (i in 1:100) {
Sys.sleep(0.05) # 模拟任务
cli_progress_update() # 更新进度条
}
message("Done!")
cli_progress_done() # 任务完成时清除进度条- 使用
pbapply包
如果你经常使用 apply 系列函数,pbapply 包可以在它们的基础上添加进度条。
# 安装并加载 pbapply 包
#install.packages("pbapply")
library(pbapply)
# 使用 pbapply 中的 pbsapply 代替 sapply,自动添加进度条
result = pbsapply(1:100, function(x) {
Sys.sleep(0.01) # 模拟任务
x^2
})这几个包都提供了现代化的进度条显示,你可以根据自己的需求选择最合适的。