18  MongoDB 数据库

大语言模型的数据,都是以 JSON 格式存储的。选择合适的数据库,对于数据存储和查询的性能,至关重要。经过调研,MongoDB 是存储 JSON 格式数据的最佳选择。

18.1 选择 MongoDB 的理由

18.1.1 可选的数据库架构

对于缓存大模型的响应,推荐使用以下几种数据库,具体选择取决于对缓存效率、持久性、以及查询能力的需求:

18.1.1.1 Redis

Redis 是一个内存数据库,速度快,适合实时缓存大模型的响应,尤其是在对数据进行频繁读写操作时。可以通过设置过期时间来自动清除旧数据,适合需要高吞吐和低延迟的场景。

优势:

  • 高速缓存,响应时间极快。
  • 支持自动过期策略,有助于管理缓存空间。
  • 可以使用 RedisJSON 插件来更好地支持 JSON 数据结构。

18.1.1.2 Memcached

Memcached 是一种高性能的分布式内存缓存系统,适用于纯缓存用途。和 Redis 类似,它可以快速存取数据,但不支持持久化存储,适合在系统重启后可以清空缓存的场景。

优势:

  • 极快的读写速度。
  • 占用内存小,适合简单的缓存场景。
  • 适用于缓存结构简单且无需持久化的响应。

18.1.1.3 MongoDB

如果希望缓存数据并在应用重启后保持响应数据持久化,MongoDB 是一个选择。它对 JSON 数据支持良好,且能持久化存储。适合那些不仅需要缓存,还希望在缓存中做简单数据分析和检索的场景。

优势:

  • JSON 存储和持久化能力。
  • 支持灵活查询和索引,适合做更复杂的检索。

18.1.1.4 PostgreSQL

PostgreSQL 也支持 JSON 数据格式,并且提供良好的查询和索引能力。如果需要持久性和较复杂的数据操作,也可以选择 PostgreSQL,但速度会稍逊于内存数据库。

18.1.2 MongoDB 的并行查询能力

MongoDB 的并行查询能力较强,得益于其架构设计和内置的并行处理机制。以下是 MongoDB 并行查询能力的几个主要方面:

18.1.2.1 多线程处理

MongoDB 是多线程的,支持同时处理多个查询请求。当并发请求增加时,MongoDB 会自动为每个请求分配独立的线程,并行执行不同的查询,提升了处理效率。

18.1.2.2 分片(Sharding)支持

对于大规模的数据集,MongoDB 提供了分片功能,将数据分散到多个服务器(分片)上。每个分片可以独立处理查询请求,从而显著提高查询性能。查询请求会并行分发到不同的分片上,减少了单一节点的压力,提升了系统的可扩展性和查询效率。

18.1.2.3 索引优化

MongoDB 支持多种索引,包括单字段索引、复合索引、地理空间索引、全文索引等。合适的索引能够极大地优化查询性能,使查询操作更快地锁定目标数据。MongoDB 的并行查询在有适当索引的情况下表现会更好,因为索引能降低每个查询的处理时间,允许系统更快地响应并发请求。

18.1.2.4 聚合管道的并行处理

MongoDB 的聚合框架能够高效地并行处理复杂的分析查询。聚合管道的每个阶段可以由不同的线程执行,并发处理数据流。尤其在分片集群中,MongoDB 会在各个分片上并行执行聚合查询,最后合并结果,显著提升查询速度。

18.1.2.5 内存与锁管理优化

MongoDB 使用锁的粒度较小,最新版本(4.2 及更高)支持文档级锁,而不是数据库或集合级锁,这减少了并发查询时的锁争用。此外,MongoDB 会将常用数据缓存到内存中,并支持对内存进行并行管理,从而提升查询速度。

18.1.2.6 写入与查询分离

在主从(Primary-Secondary)架构中,读写请求可以分离,写入操作只在主节点上进行,查询可以在从节点上执行。通过将查询请求分散到不同的从节点,可以提升并行查询能力。

18.2 安装 MongoDB

在 macOS 中部署 MongoDB 可以通过 Homebrew 安装并配置以快速启动服务。

18.2.1 使用 Homebrew 安装 MongoDB

首先,确保 macOS 上安装了 Homebrew。执行以下命令来安装 MongoDB 社区版:

brew tap mongodb/brew
brew install mongodb-community

18.2.2 启动 MongoDB 服务

安装完成后,可以通过以下命令启动 MongoDB 服务:

brew services start mongodb/brew/mongodb-community

这将 MongoDB 作为服务启动,默认会监听本地的 27017 端口。

18.2.3 验证 MongoDB 是否启动成功

可以使用以下命令检查 MongoDB 是否正在运行:

brew services list

你还可以直接使用 MongoDB Shell (mongosh) 连接到本地的 MongoDB 实例:

mongosh

如果成功连接,说明 MongoDB 已正常运行。

18.2.4 停止 MongoDB 服务

如果需要停止 MongoDB 服务,可以执行以下命令:

brew services stop mongodb/brew/mongodb-community

18.2.5 配置 MongoDB 以支持远程连接(可选)

默认情况下,MongoDB 只允许本地连接。如果你想配置 MongoDB 允许远程连接,可以修改 MongoDB 的配置文件:

  1. 打开 MongoDB 配置文件:

    nano /opt/homebrew/etc/mongod.conf
  2. 找到 bindIp 字段,将其从 127.0.0.1 修改为 0.0.0.0

    net:
      bindIp: 0.0.0.0
      port: 27017
  3. 保存文件并重新启动 MongoDB 服务:

    brew services restart mongodb/brew/mongodb-community

注意:开启远程连接可能会带来安全风险,建议在生产环境中配置防火墙或限制 IP 访问。

以上步骤完成后,MongoDB 已成功在 macOS 上安装并部署。

18.3 在阿里云配置 MongoDB

在阿里云 Ubuntu 服务器上安装 MongoDB 并对外提供服务的步骤如下:

18.3.1 安装 MongoDB

MongoDB 提供了官方的 APT 仓库,可以直接通过包管理器安装最新版本。

# 更新本地包索引
sudo apt update

# 安装依赖包
sudo apt install -y gnupg

# 添加 MongoDB 官方的公钥
wget -qO - https://www.mongodb.org/static/pgp/server-6.0.asc | sudo apt-key add -

# 创建 MongoDB 的 APT 源文件
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/6.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-6.0.list

# 更新本地包索引
sudo apt update

# 安装 MongoDB
sudo apt install -y mongodb-org

18.3.2 启动 MongoDB 服务

sudo systemctl start mongod
sudo systemctl enable mongod

18.3.3 配置 MongoDB 对外服务

默认情况下,MongoDB 只监听本地接口 127.0.0.1。为了让 MongoDB 对外提供服务,需要修改其配置文件,监听所有网络接口。

# 编辑 MongoDB 配置文件
sudo nano /etc/mongod.conf

找到 bindIp 字段,将其从 127.0.0.1 修改为 0.0.0.0

# /etc/mongod.conf
net:
  port: 27017
  bindIp: 0.0.0.0

保存文件并重启 MongoDB 服务:

sudo systemctl restart mongod

18.3.4 配置防火墙和安全组

18.3.4.1 阿里云安全组配置

  1. 登录到阿里云控制台。
  2. 进入 云服务器 ECS,选择目标实例。
  3. 进入 安全组 配置,添加一条规则:
    • 端口范围:27017
    • 授权对象:设置为允许的 IP(如 0.0.0.0/0 表示允许所有 IP 访问),但建议仅对可信的 IP 开放。

18.3.4.2 配置 UFW 防火墙(可选)

若启用了 UFW 防火墙,需要添加 MongoDB 端口允许规则:

sudo ufw allow 27017

18.3.5 创建用户并启用认证(可选)

为了提高安全性,可以为 MongoDB 创建用户并启用身份验证。

# 进入 MongoDB shell
mongosh

# 切换到 admin 数据库
use admin

# 创建管理员用户
db.createUser({
  user: "admin",
  pwd: "yourpassword",
  roles: [{ role: "root", db: "admin" }]
})

/etc/mongod.conf 文件中启用身份验证,将 security 设置如下:

security:
  authorization: "enabled"

重启 MongoDB:

sudo systemctl restart mongod

连接时需要提供用户名和密码:

mongosh -u admin -p yourpassword --authenticationDatabase admin

完成以上步骤后,MongoDB 即可在阿里云 Ubuntu 服务器上对外提供服务。

18.4 在 Python 中使用 MongoDB

以下是一个在 MongoDB 中存储和检索大模型响应的示例,假设你要存储模型输入和相应的输出,并支持简单的缓存逻辑(比如通过时间戳自动清理旧响应)。

18.4.1 连接 MongoDB

from pymongo import MongoClient
from datetime import datetime, timedelta

# 连接到 MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["model_cache_db"]
collection = db["responses"]

18.4.2 存储响应

我们可以定义一个函数,将模型的输入和响应输出存储到 MongoDB 中。可以选择存储一个过期时间戳,用于后续自动清理。

def cache_response(input_data, response_data, ttl_hours=24):
    # 创建记录
    document = {
        "input": input_data,
        "response": response_data,
        "timestamp": datetime.utcnow(),
        "expiry": datetime.utcnow() + timedelta(hours=ttl_hours)  # 设置过期时间
    }
    # 插入到 MongoDB
    collection.insert_one(document)

18.4.3 检索缓存响应

查询 MongoDB 时,可以通过输入数据进行查找,并筛选出未过期的响应。

def get_cached_response(input_data):
    document = collection.find_one({
        "input": input_data,
        "expiry": {"$gt": datetime.utcnow()}  # 确保未过期
    })
    if document:
        return document["response"]
    return None  # 若未找到,返回 None

18.4.4 自动清理过期缓存

可以定期执行清理,将已过期的记录删除:

def clean_expired_cache():
    result = collection.delete_many({"expiry": {"$lt": datetime.utcnow()}})
    print(f"Deleted {result.deleted_count} expired cache entries.")

18.4.5 示例使用

input_data = {"question": "What is AI?"}
response_data = {"answer": "AI stands for Artificial Intelligence."}

# 缓存模型响应
cache_response(input_data, response_data)

# 查询缓存
cached_response = get_cached_response(input_data)
if cached_response:
    print("Cache hit:", cached_response)
else:
    print("Cache miss")
Cache hit: {'answer': 'AI stands for Artificial Intelligence.'}
# 清理过期缓存
clean_expired_cache()
Deleted 0 expired cache entries.

18.4.6 注意

  • ttl_hours 可以根据需求设置。
  • 可以定期调用 clean_expired_cache 来清理过期数据。

18.5 在 R 中使用 MongoDB

在 R 中使用 MongoDB,可以借助 mongolite 包,该包提供了对 MongoDB 的直接连接和操作接口,并支持数据的插入、查询、更新、删除等功能。以下是一个基本的示例,展示如何在 R 中连接 MongoDB 并执行一些常见操作。

18.5.1 安装 mongolite

如果还没有安装 mongolite 包,可以通过以下命令安装:

install.packages("mongolite")

18.5.2 连接 MongoDB

假设 MongoDB 在本地运行,端口为默认的 27017,并且数据库名称为 mydatabase,集合名称为 mycollection

library(mongolite)

# 连接到 MongoDB 数据库
mongo_connection = mongo(
  collection = "mycollection",
  db = "mydatabase",
  url = "mongodb://localhost:27017"
)

18.5.3 插入数据

可以使用 JSON 字符串或 R 的 data.frame 直接插入数据。

# 插入单条 JSON 数据
mongo_connection$insert('{"name": "Alice", "age": 30, "city": "New York"}')

# 插入多条数据(使用 data.frame)
data = data.frame(
  name = c("Bob", "Charlie"),
  age = c(25, 35),
  city = c("Los Angeles", "Chicago")
)
mongo_connection$insert(data)

18.5.4 查询数据

使用 $find() 方法可以查询数据,并将结果返回为 data.frame

# 查询所有数据
all_data = mongo_connection$find('{}')
print(all_data)

# 带条件查询
filtered_data = mongo_connection$find('{"age": {"$gt": 30}}')
print(filtered_data)

18.5.5 更新数据

可以使用 $update() 方法更新文档,传入查询条件和更新内容。

# 将 name 为 "Alice" 的 age 更新为 31
mongo_connection$update(
  '{"name": "Alice"}', 
  '{"$set": {"age": 31}}'
)

18.5.6 删除数据

使用 $remove() 方法删除指定条件的数据。

# 删除 age 小于 30 的文档
mongo_connection$remove('{"age": {"$lt": 30}}')

18.5.7 断开连接

操作完成后,断开连接。

rm(mongo_connection)  # 释放连接资源

18.5.8 示例总结

综上,以下代码演示了完整的操作流程:

library(mongolite)

# 连接 MongoDB
mongo_connection = mongo(
  collection = "mycollection",
  db = "mydatabase",
  url = "mongodb://localhost:27017"
)

# 插入数据
mongo_connection$insert('{"name": "Alice", "age": 30, "city": "New York"}')
List of 6
 $ nInserted  : int 1
 $ nMatched   : int 0
 $ nModified  : int 0
 $ nRemoved   : int 0
 $ nUpserted  : int 0
 $ writeErrors: list()
data = data.frame(
  name = c("Bob", "Charlie"),
  age = c(25, 35),
  city = c("Los Angeles", "Chicago")
)
mongo_connection$insert(data)
List of 5
 $ nInserted  : num 2
 $ nMatched   : num 0
 $ nRemoved   : num 0
 $ nUpserted  : num 0
 $ writeErrors: list()
# 查询数据
print(mongo_connection$find('{}'))
     name age        city
1   Alice  31    New York
2 Charlie  35     Chicago
3   Alice  30    New York
4     Bob  25 Los Angeles
5 Charlie  35     Chicago
# 更新数据
mongo_connection$update('{"name": "Alice"}', '{"$set": {"age": 31}}')
List of 3
 $ modifiedCount: int 0
 $ matchedCount : int 1
 $ upsertedCount: int 0
# 删除数据
mongo_connection$remove('{"age": {"$lt": 30}}')

# 断开连接
rm(mongo_connection)

这种方法可以让 R 用户便捷地在 MongoDB 上存储、查询和更新数据。

18.6 本地和阿里云 MongoDB 的同步

要将本地的 MongoDB 数据库与阿里云服务器上的 MongoDB 实现同步,通常可以使用以下几种方法:

18.6.1 使用 mongodumpmongorestore 手动同步

这是最简单的一种方式,通过导出和导入数据来实现同步。

步骤:

  1. 在本地主机上导出数据

    mongodump --host localhost --port 27017 --out /path/to/dump

    这会在指定的路径下生成数据库的备份文件。

  2. 将备份文件上传到阿里云服务器

    使用 scp 命令将备份文件夹传输到阿里云服务器:

    scp -r /path/to/dump username@aliyun_ip:/path/on/server
  3. 在阿里云服务器上导入数据

    连接到阿里云服务器后,使用 mongorestore 命令将数据导入 MongoDB:

    mongorestore --host localhost --port 27017 /path/on/server/dump

这种方法适合小规模的数据同步。如果数据量较大或同步频繁,不建议使用此方法。

18.6.2 使用 MongoDB 同步工具 mongosync

mongosync 是一个专门用于同步不同 MongoDB 实例的开源工具。它支持实时同步,并适用于数据量较大且需要持续同步的场景。

步骤:

  1. 在本地和阿里云 MongoDB 服务器上安装 mongosync

  2. 配置 mongosync 的源和目标数据库,在本地 MongoDB 上设置为源,阿里云 MongoDB 为目标。

  3. 启动 mongosync 进行同步。具体的命令和配置可以参考 mongosync 的官方文档。

18.6.3 设置 MongoDB 的副本集(Replica Set)

如果需要长期的、实时的同步,推荐使用 MongoDB 的 副本集 功能。

步骤:

  1. 配置本地主机和阿里云 MongoDB 为副本集成员

    • 修改本地主机和阿里云服务器上的 MongoDB 配置文件 /etc/mongod.conf,设置副本集名称。例如:

      replication:
        replSetName: "myReplicaSet"
  2. 初始化副本集

    在任意一个 MongoDB 实例上启动副本集:

    mongosh --host localhost --port 27017
    rs.initiate()
  3. 添加成员

    将本地主机和阿里云 MongoDB 作为副本集成员添加。可以在 MongoDB shell 中执行:

    rs.add("阿里云服务器IP:27017")  // 阿里云服务器的MongoDB实例
    rs.add("localhost:27017")        // 本地的MongoDB实例
  4. 验证副本集同步

    副本集启动后,MongoDB 会自动保持各个节点的数据同步。

18.7 与 PostgreSQL 的对比

使用 PostgreSQL 存储大语言模型的响应,以便作为缓存,可以按照以下步骤实现。我们可以创建一个包含请求数据和模型响应的表,同时记录创建时间,以便后续根据时间判断缓存是否过期。

18.7.1 创建缓存表

我们首先创建一个表来存储模型请求和响应,结构包含输入参数、响应内容、创建时间等字段。

CREATE TABLE model_cache (
    id SERIAL PRIMARY KEY,
    input_hash VARCHAR(64) NOT NULL,
    request_data JSONB NOT NULL,
    response_data JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- 为 input_hash 字段创建唯一索引,用于快速查找缓存
CREATE UNIQUE INDEX idx_input_hash ON model_cache (input_hash);
  • input_hash:对输入内容进行哈希,避免因输入内容重复而浪费存储。
  • request_data:存储模型的输入请求,以 JSON 格式存储。
  • response_data:存储模型的响应内容。
  • created_at:记录数据创建的时间,用于设置缓存过期策略。

18.7.2 在 Python 中实现缓存存储和检索

以下是一个基于 Python 的示例,假设使用 psycopg2 库连接 PostgreSQL。此示例会在接收到新的请求时,先检查缓存,若未命中缓存则存储新响应。

18.7.2.1 导入必要库

import psycopg2
import hashlib
import json
from datetime import datetime, timedelta

18.7.2.2 设置数据库连接

# 连接 PostgreSQL
connection = psycopg2.connect(
    host="your_postgresql_host",
    database="your_database",
    user="your_user",
    password="your_password"
)

18.7.2.3 哈希生成函数

为输入请求生成唯一的哈希值,用于缓存键值。

def generate_hash(input_data):
    return hashlib.sha256(json.dumps(input_data, sort_keys=True).encode()).hexdigest()

18.7.2.4 缓存存储函数

def cache_response(input_data, response_data, ttl_hours=24):
    input_hash = generate_hash(input_data)
    cursor = connection.cursor()
    
    # 插入或更新响应数据
    cursor.execute("""
        INSERT INTO model_cache (input_hash, request_data, response_data, created_at)
        VALUES (%s, %s, %s, NOW())
        ON CONFLICT (input_hash) DO UPDATE
        SET response_data = EXCLUDED.response_data,
            created_at = NOW();
    """, (input_hash, json.dumps(input_data), json.dumps(response_data)))
    
    connection.commit()
    cursor.close()

18.7.2.5 缓存查询函数

在接收请求时,可以先检查缓存,若命中则直接返回缓存的响应。

def get_cached_response(input_data, ttl_hours=24):
    input_hash = generate_hash(input_data)
    cursor = connection.cursor()
    
    # 计算过期时间
    ttl_time = datetime.utcnow() - timedelta(hours=ttl_hours)
    
    cursor.execute("""
        SELECT response_data FROM model_cache
        WHERE input_hash = %s AND created_at > %s;
    """, (input_hash, ttl_time))
    
    result = cursor.fetchone()
    cursor.close()
    
    if result:
        return json.loads(result[0])  # 返回缓存的响应数据
    return None  # 缓存未命中或已过期

18.7.3 示例使用

# 模拟输入请求数据和模型响应
input_data = {"question": "What is AI?"}
response_data = {"answer": "AI stands for Artificial Intelligence."}

# 存储模型响应
cache_response(input_data, response_data)

# 查询缓存
cached_response = get_cached_response(input_data)
if cached_response:
    print("Cache hit:", cached_response)
else:
    print("Cache miss")

18.7.4 定期清理过期缓存

可以创建一个定期任务,清除超过指定时间的缓存数据。

DELETE FROM model_cache WHERE created_at < NOW() - INTERVAL '24 hours';

18.7.5 总结

这种方法可以让 PostgreSQL 有效地作为大语言模型的缓存存储,避免重复计算和存储不必要的数据。