17  批量异步任务

17.1 跟踪进度

在 R 中,apply 函数本身并不提供进度条的功能,但可以结合 pbapply 包来实现进度显示。pbapplyapply 家族函数的增强版,支持显示进度条。

17.1.1 安装和使用 pbapply

  1. 安装 pbapply

    install.packages("pbapply")
  2. 使用 pbapply::pblapply() 代替 apply()

    pbapply 包中的函数与 apply 系列函数接口相同,只是多了进度条显示功能。以下是一个示例,展示如何使用 pblapply() 处理每一行并显示进度:

    library(pbapply)
    
    # 创建一个示例 data.frame
    df = data.frame(a = 1:1000, b = rnorm(1000))
    
    # 使用 pblapply 显示 apply 的进度
    result = pbapply(df, 1, function(row) {
      Sys.sleep(0.01)  # 模拟一些需要时间的计算
      sum(row)         # 计算每一行的和
    })

17.1.2 pbapply 包中的其他常用函数:

  • pblapply():用于 lapply 类的操作。
  • pbapply():用于 apply 类操作(与 apply() 类似)。
  • pbmclapply():用于并行化 lapply 操作(仅适用于 UNIX-like 系统)。

通过这种方法,你可以方便地在 R 中显示 apply 执行的进度,尤其适用于处理大型数据集时跟踪计算进度。

17.2 缓存

为了自动缓存 pbapply 的执行进度,你可以使用 memoise 包,它能将函数的执行结果进行缓存,避免重复计算。它可以缓存到内容、本地文件系统、Google cloud storage、AWS cloud storage 等。结合 pbapply,你可以将每次处理的结果缓存起来,以便下次调用时可以直接使用缓存的数据。

17.2.1 安装和使用 memoise

  1. 安装 memoise

    install.packages("memoise")
  2. 将处理函数缓存起来

    使用 memoise::memoise() 对函数进行缓存处理。这样,pbapply 在每次执行时,都会检查是否已经缓存过该结果。

    library(pbapply)
    library(memoise)
    
    # 示例:对每一行的计算进行缓存
    my_function = function(row) {
      Sys.sleep(0.01)  # 模拟一些耗时操作
      sum(row)         # 简单计算每行的和
    }
    
    # 对函数进行缓存
    cached_function = memoise(my_function)

    定义好了缓存函数后,就可以使用 pbapply 并结合缓存来处理数据了。在第一次运行时,函数会执行并缓存结果。在后续的运行中,如果输入相同,函数会直接从缓存中返回结果,从而提高效率。

    下面是第一次运行的结果:

    # 使用 pbapply 并结合缓存
    set.seed(123)
    df = data.frame(a = 1:1000, b = rnorm(1000))
    system.time(result <- pbapply(df, 1, cached_function))
       user  system elapsed 
      0.318   0.032  12.187 

    再次运行同样的代码,结果会从缓存中直接返回:

    set.seed(123)
    df = data.frame(a = 1:1000, b = rnorm(1000))
    system.time(result <- pbapply(df, 1, cached_function))
       user  system elapsed 
      0.019   0.001   0.019 

    第二次运行时,由于结果已经缓存,所以速度更快。不过,需要注意的是,缓存只对同一个函数有效,如果函数定义不同,缓存不会生效。重新定义缓存函数后,哪怕函数的内部逻辑完全相同,缓存也会失效。

17.2.2 工作原理:

  • memoise 将函数 my_function 的结果缓存起来。当 pbapply 再次调用该函数处理相同的输入时,系统会直接从缓存中读取结果,而不是重新执行。
  • 对于每一行处理的结果,都会被缓存。即使你多次运行相同的操作,已经计算过的部分会从缓存中直接加载,大幅提高效率。

17.2.3 可选的存储方式:

memoise 还支持不同的存储方式(例如文件系统、数据库、内存等),你可以将缓存的数据存储在持久化的存储中,这样即使 R 会话关闭,也能保留缓存结果:

# 使用本地文件系统缓存
cached_function = memoise(my_function, cache = cache_filesystem("cache"))

这样可以在未来的会话中继续使用缓存,提高计算的效率并避免重复执行相同的任务。

17.2.4 自定义缓存键

memoise 中,缓存的唯一键是基于输入参数的值自动生成的,使用的是参数的哈希值。这意味着当传递给函数的参数内容相同时,memoise 会识别并使用已经缓存的结果,而无需重复计算。

然而,有时你可能希望更明确地控制如何生成这些缓存键。你可以通过以下方式确保缓存键的唯一性:

17.2.4.1 默认行为(自动基于参数生成键)

memoise 会默认根据函数的输入参数内容来生成哈希值。例如:

library(memoise)

# 定义一个简单的函数
my_function = function(x) {
  Sys.sleep(1)  # 模拟计算延迟
  x^2           # 返回平方
}

# 缓存该函数
cached_function = memoise(my_function)

# 调用函数,memoise 将自动根据 x 的值生成缓存
cached_function(10)  # 第一次调用,会执行
cached_function(10)  # 第二次调用,会从缓存中返回结果

在上面的例子中,memoise 会根据 x=10 生成哈希键,下次调用相同输入时会直接返回缓存结果。

17.2.4.2 自定义键生成方法

如果你需要自定义键的生成方式,可以使用 digest 包来手动生成缓存键。通过将输入参数转换成哈希值作为缓存键来精确控制缓存行为。

例如:

library(memoise)
library(digest)

# 自定义的函数,手动生成缓存键
my_custom_memoise = function(f) {
  mem_f = memoise(function(...) {
    # 使用 digest 手动生成唯一键,基于输入参数的哈希值
    cache_key = digest(list(...))
    message("Cache key: ", cache_key)
    f(...)
  })
  return(mem_f)
}

# 定义一个函数
my_function = function(x) {
  Sys.sleep(1)
  x^2
}

# 将函数缓存,并自定义缓存键生成
cached_function = my_custom_memoise(my_function)

# 调用函数时会生成并输出自定义的缓存键
cached_function(10)
[1] 100
cached_function(10)
[1] 100
cached_function(20)  # 对于不同的输入,缓存键会不同
[1] 400

17.2.5 保存缓存到 sqlite 数据库

使用 cache_filesystem 缓存时,缓存数据会保存在文件系统中,会涉及比较多的磁盘读写运算,当处理量较大时,会严重影响性能。而且每一次函数调用都会生成一个文件,管理起来也不方便。因此,这里扩展了一个 cache_dbi() 函数,使用数据库来缓存数据。

#' SQLite Database Cache
#'
#' Use a cache in a SQLite database that will persist between R sessions.
#'
#' @param con A DBI connection object to a SQLite database.
#' @param table_name Name of the table to use for caching. Default: "cache".
#'
#' @examples
#'
#' \dontrun{
#' library(DBI)
#' library(RSQLite)
#' 
#' # 创建 SQLite 数据库连接
#' con <- dbConnect(SQLite(), dbname = "cache.sqlite")
#' 
#' # 使用 SQLite 缓存
#' db_cache <- cache_dbi(con)
#' 
#' mem_runif <- memoise(runif, cache = db_cache)
#' }
#'
#' @export
#' @inheritParams cache_memory
cache_dbi <- function(con, algo = "xxhash64", table_name = "cache") {
  if (!(requireNamespace("digest"))) { stop("Package `digest` must be installed for `cache_dbi()`.") }
  if (!(requireNamespace("DBI"))) { stop("Package `DBI` must be installed for `cache_dbi()`.") }

  if (!grepl("^[A-Za-z0-9_]+$", table_name)) {
    stop("Invalid table name")
  }

  # 创建缓存表
  DBI::dbExecute(con, sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT PRIMARY KEY, value BLOB)", table_name))

  # 重置缓存
  cache_reset <- function() {
    DBI::dbExecute(con, sprintf("DELETE FROM %s", table_name))
  }

  # 设置缓存
  cache_set <- function(key, value) {
    serialized_value <- serialize(value, NULL)  # 序列化 R 对象
    DBI::dbExecute(con, sprintf("INSERT OR REPLACE INTO %s (key, value) VALUES (?, ?)", table_name),
                   params = list(key, list(serialized_value)))  # 用 list 包裹 serialized_value
  }

  # 获取缓存
  cache_get <- function(key) {
    result <- DBI::dbGetQuery(con, sprintf("SELECT value FROM %s WHERE key = ?", table_name),
                              params = list(key))
    if (nrow(result) == 0) {
      return(NULL)
    }
    unserialize(result$value[[1]])  # 反序列化第一个值
  }

  # 检查缓存是否存在
  cache_has_key <- function(key) {
    result <- DBI::dbGetQuery(con, sprintf("SELECT 1 FROM %s WHERE key = ? LIMIT 1", table_name),
                              params = list(key))
    nrow(result) > 0
  }

  # 删除缓存
  cache_drop_key <- function(key) {
    DBI::dbExecute(con, sprintf("DELETE FROM %s WHERE key = ?", table_name),
                   params = list(key))
  }

  list(
    digest = function(...) digest::digest(..., algo = algo),
    reset = cache_reset,
    set = cache_set,
    get = cache_get,
    has_key = cache_has_key,
    drop_key = cache_drop_key,
    keys = function() DBI::dbGetQuery(con, sprintf("SELECT key FROM %s", table_name))$key
  )
}

下面,使用 cache_dbi() 函数来缓存数据:

library(memoise)
library(DBI)
library(RSQLite)

# 创建 SQLite 数据库连接
con = dbConnect(SQLite(), dbname = tempfile())
# on.exit(dbDisconnect(con))

# 将缓存数据存储在数据库中
cached_function = memoise(my_function, cache = cache_dbi(con))

# 第一次运行
system.time(cached_function(10))
   user  system elapsed 
  0.002   0.001   1.008 
# 第二次运行
system.time(cached_function(10))
   user  system elapsed 
  0.008   0.000   0.007 
# 第三次运行
system.time(cached_function(20))
   user  system elapsed 
  0.002   0.002   1.009 

17.3 动态控制批次执行

为了动态控制批次的执行,并确保正在执行的运算不超过 batch_size,你可以使用 R 的异步编程或并发工具包,比如 parallelfuture,结合 Sys.sleep() 控制批次提交的速率。

17.3.1 使用 parallel 包实现动态分批

以下是使用 parallel 包执行 batch_size 个并发操作的示例:

library(parallel)

# 示例数据
df = data.frame(a = 1:100, b = rnorm(100))

# 定义处理函数
process_function = function(row) {
  Sys.sleep(runif(1, 0.1, 0.5))  # 模拟耗时任务
  sum(row)      # 返回每行的和
}

# 比较串行和并行处理的性能

# 串行处理
start_time_serial = Sys.time()
results_serial = lapply(1:nrow(df), function(i) {
  process_function(df[i, ])
})
end_time_serial = Sys.time()
time_serial = difftime(end_time_serial, start_time_serial, units = "secs")

# 并行处理
start_time_parallel = Sys.time()
results_parallel = mclapply(1:nrow(df), function(i) {
  process_function(df[i, ])
}, mc.cores = parallel::detectCores() - 1)  # 使用可用核心数减1
end_time_parallel = Sys.time()
time_parallel = difftime(end_time_parallel, start_time_parallel, units = "secs")

# 计算性能提升
speedup = as.numeric(time_serial) / as.numeric(time_parallel)

# 输出结果
cat("串行处理时间:", time_serial, "秒\n")
cat("并行处理时间:", time_parallel, "秒\n")
cat("性能提升:", round(speedup, 2), "倍\n")

# 验证结果正确性
testthat::expect_equal(rowSums(df), unlist(results_serial))
testthat::expect_equal(rowSums(df), unlist(results_parallel))

17.3.2 使用 future 包实现动态分批

future 包允许你异步地运行函数,并通过限制同时进行的任务数量来动态控制批次。通过监控并发执行的数量,如果低于 batch_size,则自动启动新的任务,否则等待 1 秒再检查。

17.3.3 代码示例

library(future)
library(future.apply)

# 设定最大并行执行任务数为 batch_size
batch_size = 10

# 使用多线程计划
plan(multisession, workers = batch_size)

# 示例数据
df = data.frame(a = 1:100, b = rnorm(100))

# 定义处理函数
process_function = function(row) {
  Sys.sleep(runif(1, 0.1, 0.5))  # 模拟耗时任务
  sum(row)      # 返回每行的和
}

# 动态控制批次执行
results = list()  # 用于存储结果
futures = list()  # 用于存储每个 future

# 将每行分成任务提交
for (i in 1:nrow(df)) {
  # 检查当前运行中的任务数量
  current_tasks = futures[!sapply(futures, resolved)]

  # 如果当前运行中的任务数量达到 batch_size,等待 1 秒
  while (length(current_tasks) >= batch_size) {
    # 打印当前运行中的任务数量
    message("当前运行中的任务数量:", length(current_tasks))
    # 等待 1 秒
    Sys.sleep(1)
    # 更新当前运行中的任务
    current_tasks = futures[!sapply(futures, resolved)]
  }
  
  # 提交新的任务,并保持追踪
  futures[[i]] = future({
    process_function(df[i, ])
  }, seed = TRUE)

}

# 收集所有结果
results = future::value(futures) |> unlist()

# 展示结果
testthat::expect_equal(rowSums(df), results)

17.3.4 代码说明

  1. futurefuture.applyfuture 包允许将任务异步提交,通过 plan(multisession, workers = batch_size) 限制同时运行的任务数。
  2. 动态任务调度
    • 每次新任务提交前,检查当前运行中的任务数量(length(futures))。
    • 如果运行中的任务达到 batch_size,程序会等待 1 秒,再次检查任务数量,确保不超出最大并行数。
    • 当低于 batch_size 时,新的任务才会被提交。
  3. 结果收集:使用 future::value(futures) 来获取所有任务的计算结果。
  4. 错误处理:使用 tryCatch 捕获错误,并输出错误信息。

通过这种方法,你可以确保批次执行数不会超过 batch_size,并且动态提交新任务,从而实现更灵活的任务管理。

17.4 包装成一个函数

17.4.1 函数定义

函数采用 pbapply 显示进度,batch_size 设定批次大小,fun 接受命令(如果这个命令是一个 memorise 函数则直接使用,如果不是则将其自动缓存),缓存默认使用 cache_dbi(),缓存文件地址为 db_file 文件,由用户提供。

run_task_with_cache <- function(df, fun, db_file) {

  # 检查并创建缓存
  con = DBI::dbConnect(RSQLite::SQLite(), dbname = db_file)
  on.exit(DBI::dbDisconnect(con))
  
  # 如果fun是memoise函数,则提出 warning,并使用fun 
  if (inherits(fun, "memoised")) {
    warning("Fun is already a memoised function. It will be used directly. `db_file` will be ignored.")
  } else {
    # 如果fun不是memoise函数,则将其缓存
    fun <- memoise::memoise(fun, cache = cache_dbi(con))
  }
  
  # 执行计算
  results = pbapply::pblapply(1:nrow(df), function(i) {
    row = df[i, ]
    result = tryCatch(fun(row), error = function(e) {
        message("错误发生在第 ", i, " 行: ", e$message)
        return(NA)
      })
    return(result)
  })
  
  return(results)
}
# 示例数据
set.seed(123456)
df = data.frame(a = 1:100, b = rnorm(100))

# 定义处理函数
process_function = function(row) {
  Sys.sleep(0.1)  # 模拟耗时任务
  sum(row)      # 返回每行的和
}
# 运行函数
result <- run_task_with_cache(df, process_function, db_file = "cache.sqlite")

testthat::expect_equal(unlist(result), rowSums(df))

17.4.2 并行计算

并行计算可以使用 future 包,或者 parallel 包。但是在显示进度条、数据库访问方面实现起来有一定难度,特别是涉及到数据库访问的并发问题,暂时没有很好的解决方案。因此,考虑使用数据拆分的方法,拆成多个批次,每个批次使用一个进程,每个进程使用一个数据库连接,这样就可以避免并发问题。

17.5 进度条

在 R 中,可以使用多个包来显示现代化的进度条,其中最常用的包包括:

  1. progressr:一个强大的进度条包,可以显示并行任务的进度条。
  2. progress:一个简单的、可高度自定义的进度条包。
  3. cli:可以结合进度条显示,也可以显示丰富的命令行界面元素。
  4. pbapply:适用于 *apply 系列函数的进度条集成。

以下是这些包的基本用法:

  1. 使用 progressr

progressr 包可以用来显示并行任务的进度条。以下是一个使用 futurefurrr 包进行并行计算,并使用 progressr 显示进度的示例:

library(future)
library(furrr)
library(progressr)

plan(multisession, workers = 4)

data <- 1:100

time_consuming_task <- function(x) {
  Sys.sleep(0.01)
  return(x^2)
}

with_progress({
  p <- progressor(steps = length(data))
  
  results <- future_map(data, function(x) {
    result <- time_consuming_task(x)
    p()
    return(result)
  })
})
  1. 使用 progress

progress 包提供了简单易用的进度条,同时可以进行高度自定义。

# 安装并加载 progress 包
# install.packages("progress")
library(progress)

# 创建一个进度条
pb = progress_bar$new(
  format = "  downloading [:bar] :percent in :elapsed",
  total = 100, clear = FALSE, width = 60
)

# 模拟一个任务并更新进度条
for (i in 1:100) {
  Sys.sleep(0.01)  # 模拟耗时任务
  pb$tick()  # 每次循环时,进度条前进一格
}
  1. 使用 cli

cli 包不仅提供进度条,还可以创建富文本命令行界面。

# 安装并加载 cli 包
# install.packages("cli")
library(cli)

# 创建并显示进度条
pb = cli_progress_bar("Computing", 
                      total = 100, 
                      format = " {cli::pb_bar} {cli::pb_percent} ETA: {cli::pb_eta}")

for (i in 1:100) {
  Sys.sleep(0.05)  # 模拟任务
  cli_progress_update()  # 更新进度条
}

message("Done!")
cli_progress_done()  # 任务完成时清除进度条
  1. 使用 pbapply

如果你经常使用 apply 系列函数,pbapply 包可以在它们的基础上添加进度条。

# 安装并加载 pbapply 包
#install.packages("pbapply")
library(pbapply)

# 使用 pbapply 中的 pbsapply 代替 sapply,自动添加进度条
result = pbsapply(1:100, function(x) {
  Sys.sleep(0.01)  # 模拟任务
  x^2
})

这几个包都提供了现代化的进度条显示,你可以根据自己的需求选择最合适的。