library(pbapply)
# 创建一个示例 data.frame
= data.frame(a = 1:1000, b = rnorm(1000))
df
# 使用 pblapply 显示 apply 的进度
= pbapply(df, 1, function(row) {
result Sys.sleep(0.01) # 模拟一些需要时间的计算
sum(row) # 计算每一行的和
})
17 批量异步任务
17.1 跟踪进度
在 R 中,apply
函数本身并不提供进度条的功能,但可以结合 pbapply
包来实现进度显示。pbapply
是 apply
家族函数的增强版,支持显示进度条。
17.1.1 安装和使用 pbapply
包
安装
pbapply
包:install.packages("pbapply")
使用
pbapply::pblapply()
代替apply()
:pbapply
包中的函数与apply
系列函数接口相同,只是多了进度条显示功能。以下是一个示例,展示如何使用pblapply()
处理每一行并显示进度:
17.1.2 pbapply
包中的其他常用函数:
pblapply()
:用于lapply
类的操作。pbapply()
:用于apply
类操作(与apply()
类似)。pbmclapply()
:用于并行化lapply
操作(仅适用于 UNIX-like 系统)。
通过这种方法,你可以方便地在 R 中显示 apply
执行的进度,尤其适用于处理大型数据集时跟踪计算进度。
17.2 缓存
为了自动缓存 pbapply
的执行进度,你可以使用 memoise
包,它能将函数的执行结果进行缓存,避免重复计算。它可以缓存到内容、本地文件系统、Google cloud storage、AWS cloud storage 等。结合 pbapply
,你可以将每次处理的结果缓存起来,以便下次调用时可以直接使用缓存的数据。
17.2.1 安装和使用 memoise
包
安装
memoise
包:install.packages("memoise")
将处理函数缓存起来:
使用
memoise::memoise()
对函数进行缓存处理。这样,pbapply
在每次执行时,都会检查是否已经缓存过该结果。library(pbapply) library(memoise) # 示例:对每一行的计算进行缓存 = function(row) { my_function Sys.sleep(0.01) # 模拟一些耗时操作 sum(row) # 简单计算每行的和 } # 对函数进行缓存 = memoise(my_function) cached_function
定义好了缓存函数后,就可以使用
pbapply
并结合缓存来处理数据了。在第一次运行时,函数会执行并缓存结果。在后续的运行中,如果输入相同,函数会直接从缓存中返回结果,从而提高效率。下面是第一次运行的结果:
# 使用 pbapply 并结合缓存 set.seed(123) = data.frame(a = 1:1000, b = rnorm(1000)) df system.time(result <- pbapply(df, 1, cached_function))
user system elapsed 0.318 0.032 12.187
再次运行同样的代码,结果会从缓存中直接返回:
set.seed(123) = data.frame(a = 1:1000, b = rnorm(1000)) df system.time(result <- pbapply(df, 1, cached_function))
user system elapsed 0.019 0.001 0.019
第二次运行时,由于结果已经缓存,所以速度更快。不过,需要注意的是,缓存只对同一个函数有效,如果函数定义不同,缓存不会生效。重新定义缓存函数后,哪怕函数的内部逻辑完全相同,缓存也会失效。
17.2.2 工作原理:
memoise
将函数my_function
的结果缓存起来。当pbapply
再次调用该函数处理相同的输入时,系统会直接从缓存中读取结果,而不是重新执行。- 对于每一行处理的结果,都会被缓存。即使你多次运行相同的操作,已经计算过的部分会从缓存中直接加载,大幅提高效率。
17.2.3 可选的存储方式:
memoise
还支持不同的存储方式(例如文件系统、数据库、内存等),你可以将缓存的数据存储在持久化的存储中,这样即使 R 会话关闭,也能保留缓存结果:
# 使用本地文件系统缓存
= memoise(my_function, cache = cache_filesystem("cache")) cached_function
这样可以在未来的会话中继续使用缓存,提高计算的效率并避免重复执行相同的任务。
17.2.4 自定义缓存键
在 memoise
中,缓存的唯一键是基于输入参数的值自动生成的,使用的是参数的哈希值。这意味着当传递给函数的参数内容相同时,memoise
会识别并使用已经缓存的结果,而无需重复计算。
然而,有时你可能希望更明确地控制如何生成这些缓存键。你可以通过以下方式确保缓存键的唯一性:
17.2.4.1 默认行为(自动基于参数生成键)
memoise
会默认根据函数的输入参数内容来生成哈希值。例如:
library(memoise)
# 定义一个简单的函数
= function(x) {
my_function Sys.sleep(1) # 模拟计算延迟
^2 # 返回平方
x
}
# 缓存该函数
= memoise(my_function)
cached_function
# 调用函数,memoise 将自动根据 x 的值生成缓存
cached_function(10) # 第一次调用,会执行
cached_function(10) # 第二次调用,会从缓存中返回结果
在上面的例子中,memoise
会根据 x=10
生成哈希键,下次调用相同输入时会直接返回缓存结果。
17.2.4.2 自定义键生成方法
如果你需要自定义键的生成方式,可以使用 digest
包来手动生成缓存键。通过将输入参数转换成哈希值作为缓存键来精确控制缓存行为。
例如:
library(memoise)
library(digest)
# 自定义的函数,手动生成缓存键
= function(f) {
my_custom_memoise = memoise(function(...) {
mem_f # 使用 digest 手动生成唯一键,基于输入参数的哈希值
= digest(list(...))
cache_key message("Cache key: ", cache_key)
f(...)
})return(mem_f)
}
# 定义一个函数
= function(x) {
my_function Sys.sleep(1)
^2
x
}
# 将函数缓存,并自定义缓存键生成
= my_custom_memoise(my_function)
cached_function
# 调用函数时会生成并输出自定义的缓存键
cached_function(10)
[1] 100
cached_function(10)
[1] 100
cached_function(20) # 对于不同的输入,缓存键会不同
[1] 400
17.2.5 保存缓存到 sqlite 数据库
使用 cache_filesystem
缓存时,缓存数据会保存在文件系统中,会涉及比较多的磁盘读写运算,当处理量较大时,会严重影响性能。而且每一次函数调用都会生成一个文件,管理起来也不方便。因此,这里扩展了一个 cache_dbi()
函数,使用数据库来缓存数据。
#' SQLite Database Cache
#'
#' Use a cache in a SQLite database that will persist between R sessions.
#'
#' @param con A DBI connection object to a SQLite database.
#' @param table_name Name of the table to use for caching. Default: "cache".
#'
#' @examples
#'
#' \dontrun{
#' library(DBI)
#' library(RSQLite)
#'
#' # 创建 SQLite 数据库连接
#' con <- dbConnect(SQLite(), dbname = "cache.sqlite")
#'
#' # 使用 SQLite 缓存
#' db_cache <- cache_dbi(con)
#'
#' mem_runif <- memoise(runif, cache = db_cache)
#' }
#'
#' @export
#' @inheritParams cache_memory
<- function(con, algo = "xxhash64", table_name = "cache") {
cache_dbi if (!(requireNamespace("digest"))) { stop("Package `digest` must be installed for `cache_dbi()`.") }
if (!(requireNamespace("DBI"))) { stop("Package `DBI` must be installed for `cache_dbi()`.") }
if (!grepl("^[A-Za-z0-9_]+$", table_name)) {
stop("Invalid table name")
}
# 创建缓存表
::dbExecute(con, sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT PRIMARY KEY, value BLOB)", table_name))
DBI
# 重置缓存
<- function() {
cache_reset ::dbExecute(con, sprintf("DELETE FROM %s", table_name))
DBI
}
# 设置缓存
<- function(key, value) {
cache_set <- serialize(value, NULL) # 序列化 R 对象
serialized_value ::dbExecute(con, sprintf("INSERT OR REPLACE INTO %s (key, value) VALUES (?, ?)", table_name),
DBIparams = list(key, list(serialized_value))) # 用 list 包裹 serialized_value
}
# 获取缓存
<- function(key) {
cache_get <- DBI::dbGetQuery(con, sprintf("SELECT value FROM %s WHERE key = ?", table_name),
result params = list(key))
if (nrow(result) == 0) {
return(NULL)
}unserialize(result$value[[1]]) # 反序列化第一个值
}
# 检查缓存是否存在
<- function(key) {
cache_has_key <- DBI::dbGetQuery(con, sprintf("SELECT 1 FROM %s WHERE key = ? LIMIT 1", table_name),
result params = list(key))
nrow(result) > 0
}
# 删除缓存
<- function(key) {
cache_drop_key ::dbExecute(con, sprintf("DELETE FROM %s WHERE key = ?", table_name),
DBIparams = list(key))
}
list(
digest = function(...) digest::digest(..., algo = algo),
reset = cache_reset,
set = cache_set,
get = cache_get,
has_key = cache_has_key,
drop_key = cache_drop_key,
keys = function() DBI::dbGetQuery(con, sprintf("SELECT key FROM %s", table_name))$key
) }
下面,使用 cache_dbi()
函数来缓存数据:
library(memoise)
library(DBI)
library(RSQLite)
# 创建 SQLite 数据库连接
= dbConnect(SQLite(), dbname = tempfile())
con # on.exit(dbDisconnect(con))
# 将缓存数据存储在数据库中
= memoise(my_function, cache = cache_dbi(con))
cached_function
# 第一次运行
system.time(cached_function(10))
user system elapsed
0.002 0.001 1.008
# 第二次运行
system.time(cached_function(10))
user system elapsed
0.008 0.000 0.007
# 第三次运行
system.time(cached_function(20))
user system elapsed
0.002 0.002 1.009
17.3 动态控制批次执行
为了动态控制批次的执行,并确保正在执行的运算不超过 batch_size
,你可以使用 R 的异步编程或并发工具包,比如 parallel
和 future
,结合 Sys.sleep()
控制批次提交的速率。
17.3.1 使用 parallel
包实现动态分批
以下是使用 parallel
包执行 batch_size
个并发操作的示例:
library(parallel)
# 示例数据
= data.frame(a = 1:100, b = rnorm(100))
df
# 定义处理函数
= function(row) {
process_function Sys.sleep(runif(1, 0.1, 0.5)) # 模拟耗时任务
sum(row) # 返回每行的和
}
# 比较串行和并行处理的性能
# 串行处理
= Sys.time()
start_time_serial = lapply(1:nrow(df), function(i) {
results_serial process_function(df[i, ])
})= Sys.time()
end_time_serial = difftime(end_time_serial, start_time_serial, units = "secs")
time_serial
# 并行处理
= Sys.time()
start_time_parallel = mclapply(1:nrow(df), function(i) {
results_parallel process_function(df[i, ])
mc.cores = parallel::detectCores() - 1) # 使用可用核心数减1
}, = Sys.time()
end_time_parallel = difftime(end_time_parallel, start_time_parallel, units = "secs")
time_parallel
# 计算性能提升
= as.numeric(time_serial) / as.numeric(time_parallel)
speedup
# 输出结果
cat("串行处理时间:", time_serial, "秒\n")
cat("并行处理时间:", time_parallel, "秒\n")
cat("性能提升:", round(speedup, 2), "倍\n")
# 验证结果正确性
::expect_equal(rowSums(df), unlist(results_serial))
testthat::expect_equal(rowSums(df), unlist(results_parallel)) testthat
17.3.2 使用 future
包实现动态分批
future
包允许你异步地运行函数,并通过限制同时进行的任务数量来动态控制批次。通过监控并发执行的数量,如果低于 batch_size
,则自动启动新的任务,否则等待 1 秒再检查。
17.3.3 代码示例
library(future)
library(future.apply)
# 设定最大并行执行任务数为 batch_size
= 10
batch_size
# 使用多线程计划
plan(multisession, workers = batch_size)
# 示例数据
= data.frame(a = 1:100, b = rnorm(100))
df
# 定义处理函数
= function(row) {
process_function Sys.sleep(runif(1, 0.1, 0.5)) # 模拟耗时任务
sum(row) # 返回每行的和
}
# 动态控制批次执行
= list() # 用于存储结果
results = list() # 用于存储每个 future
futures
# 将每行分成任务提交
for (i in 1:nrow(df)) {
# 检查当前运行中的任务数量
= futures[!sapply(futures, resolved)]
current_tasks
# 如果当前运行中的任务数量达到 batch_size,等待 1 秒
while (length(current_tasks) >= batch_size) {
# 打印当前运行中的任务数量
message("当前运行中的任务数量:", length(current_tasks))
# 等待 1 秒
Sys.sleep(1)
# 更新当前运行中的任务
= futures[!sapply(futures, resolved)]
current_tasks
}
# 提交新的任务,并保持追踪
= future({
futures[[i]] process_function(df[i, ])
seed = TRUE)
},
}
# 收集所有结果
= future::value(futures) |> unlist()
results
# 展示结果
::expect_equal(rowSums(df), results) testthat
17.3.4 代码说明
future
和future.apply
:future
包允许将任务异步提交,通过plan(multisession, workers = batch_size)
限制同时运行的任务数。- 动态任务调度:
- 每次新任务提交前,检查当前运行中的任务数量(
length(futures)
)。 - 如果运行中的任务达到
batch_size
,程序会等待 1 秒,再次检查任务数量,确保不超出最大并行数。 - 当低于
batch_size
时,新的任务才会被提交。
- 每次新任务提交前,检查当前运行中的任务数量(
- 结果收集:使用
future::value(futures)
来获取所有任务的计算结果。 - 错误处理:使用
tryCatch
捕获错误,并输出错误信息。
通过这种方法,你可以确保批次执行数不会超过 batch_size
,并且动态提交新任务,从而实现更灵活的任务管理。
17.4 包装成一个函数
17.4.1 函数定义
函数采用 pbapply 显示进度,batch_size
设定批次大小,fun
接受命令(如果这个命令是一个 memorise 函数则直接使用,如果不是则将其自动缓存),缓存默认使用 cache_dbi()
,缓存文件地址为 db_file
文件,由用户提供。
<- function(df, fun, db_file) {
run_task_with_cache
# 检查并创建缓存
= DBI::dbConnect(RSQLite::SQLite(), dbname = db_file)
con on.exit(DBI::dbDisconnect(con))
# 如果fun是memoise函数,则提出 warning,并使用fun
if (inherits(fun, "memoised")) {
warning("Fun is already a memoised function. It will be used directly. `db_file` will be ignored.")
else {
} # 如果fun不是memoise函数,则将其缓存
<- memoise::memoise(fun, cache = cache_dbi(con))
fun
}
# 执行计算
= pbapply::pblapply(1:nrow(df), function(i) {
results = df[i, ]
row = tryCatch(fun(row), error = function(e) {
result message("错误发生在第 ", i, " 行: ", e$message)
return(NA)
})return(result)
})
return(results)
}
# 示例数据
set.seed(123456)
= data.frame(a = 1:100, b = rnorm(100))
df
# 定义处理函数
= function(row) {
process_function Sys.sleep(0.1) # 模拟耗时任务
sum(row) # 返回每行的和
}
# 运行函数
<- run_task_with_cache(df, process_function, db_file = "cache.sqlite")
result
::expect_equal(unlist(result), rowSums(df)) testthat
17.4.2 并行计算
并行计算可以使用 future
包,或者 parallel
包。但是在显示进度条、数据库访问方面实现起来有一定难度,特别是涉及到数据库访问的并发问题,暂时没有很好的解决方案。因此,考虑使用数据拆分的方法,拆成多个批次,每个批次使用一个进程,每个进程使用一个数据库连接,这样就可以避免并发问题。
17.5 进度条
在 R 中,可以使用多个包来显示现代化的进度条,其中最常用的包包括:
progressr
:一个强大的进度条包,可以显示并行任务的进度条。progress
:一个简单的、可高度自定义的进度条包。cli
:可以结合进度条显示,也可以显示丰富的命令行界面元素。pbapply
:适用于*apply
系列函数的进度条集成。
以下是这些包的基本用法:
- 使用
progressr
包
progressr
包可以用来显示并行任务的进度条。以下是一个使用 future
和 furrr
包进行并行计算,并使用 progressr
显示进度的示例:
library(future)
library(furrr)
library(progressr)
plan(multisession, workers = 4)
<- 1:100
data
<- function(x) {
time_consuming_task Sys.sleep(0.01)
return(x^2)
}
with_progress({
<- progressor(steps = length(data))
p
<- future_map(data, function(x) {
results <- time_consuming_task(x)
result p()
return(result)
}) })
- 使用
progress
包
progress
包提供了简单易用的进度条,同时可以进行高度自定义。
# 安装并加载 progress 包
# install.packages("progress")
library(progress)
# 创建一个进度条
= progress_bar$new(
pb format = " downloading [:bar] :percent in :elapsed",
total = 100, clear = FALSE, width = 60
)
# 模拟一个任务并更新进度条
for (i in 1:100) {
Sys.sleep(0.01) # 模拟耗时任务
$tick() # 每次循环时,进度条前进一格
pb }
- 使用
cli
包
cli
包不仅提供进度条,还可以创建富文本命令行界面。
# 安装并加载 cli 包
# install.packages("cli")
library(cli)
# 创建并显示进度条
= cli_progress_bar("Computing",
pb total = 100,
format = " {cli::pb_bar} {cli::pb_percent} ETA: {cli::pb_eta}")
for (i in 1:100) {
Sys.sleep(0.05) # 模拟任务
cli_progress_update() # 更新进度条
}
message("Done!")
cli_progress_done() # 任务完成时清除进度条
- 使用
pbapply
包
如果你经常使用 apply
系列函数,pbapply
包可以在它们的基础上添加进度条。
# 安装并加载 pbapply 包
#install.packages("pbapply")
library(pbapply)
# 使用 pbapply 中的 pbsapply 代替 sapply,自动添加进度条
= pbsapply(1:100, function(x) {
result Sys.sleep(0.01) # 模拟任务
^2
x })
这几个包都提供了现代化的进度条显示,你可以根据自己的需求选择最合适的。