from pymongo import MongoClient
from datetime import datetime, timedelta
# 连接到 MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["model_cache_db"]
collection = db["responses"]18 MongoDB 数据库
大语言模型的数据,都是以 JSON 格式存储的。选择合适的数据库,对于数据存储和查询的性能,至关重要。经过调研,MongoDB 是存储 JSON 格式数据的最佳选择。
18.1 选择 MongoDB 的理由
18.1.1 可选的数据库架构
对于缓存大模型的响应,推荐使用以下几种数据库,具体选择取决于对缓存效率、持久性、以及查询能力的需求:
18.1.1.1 Redis
Redis 是一个内存数据库,速度快,适合实时缓存大模型的响应,尤其是在对数据进行频繁读写操作时。可以通过设置过期时间来自动清除旧数据,适合需要高吞吐和低延迟的场景。
优势:
- 高速缓存,响应时间极快。
- 支持自动过期策略,有助于管理缓存空间。
- 可以使用 RedisJSON 插件来更好地支持 JSON 数据结构。
18.1.1.2 Memcached
Memcached 是一种高性能的分布式内存缓存系统,适用于纯缓存用途。和 Redis 类似,它可以快速存取数据,但不支持持久化存储,适合在系统重启后可以清空缓存的场景。
优势:
- 极快的读写速度。
- 占用内存小,适合简单的缓存场景。
- 适用于缓存结构简单且无需持久化的响应。
18.1.1.3 MongoDB
如果希望缓存数据并在应用重启后保持响应数据持久化,MongoDB 是一个选择。它对 JSON 数据支持良好,且能持久化存储。适合那些不仅需要缓存,还希望在缓存中做简单数据分析和检索的场景。
优势:
- JSON 存储和持久化能力。
- 支持灵活查询和索引,适合做更复杂的检索。
18.1.1.4 PostgreSQL
PostgreSQL 也支持 JSON 数据格式,并且提供良好的查询和索引能力。如果需要持久性和较复杂的数据操作,也可以选择 PostgreSQL,但速度会稍逊于内存数据库。
18.1.2 MongoDB 的并行查询能力
MongoDB 的并行查询能力较强,得益于其架构设计和内置的并行处理机制。以下是 MongoDB 并行查询能力的几个主要方面:
18.1.2.1 多线程处理
MongoDB 是多线程的,支持同时处理多个查询请求。当并发请求增加时,MongoDB 会自动为每个请求分配独立的线程,并行执行不同的查询,提升了处理效率。
18.1.2.2 分片(Sharding)支持
对于大规模的数据集,MongoDB 提供了分片功能,将数据分散到多个服务器(分片)上。每个分片可以独立处理查询请求,从而显著提高查询性能。查询请求会并行分发到不同的分片上,减少了单一节点的压力,提升了系统的可扩展性和查询效率。
18.1.2.3 索引优化
MongoDB 支持多种索引,包括单字段索引、复合索引、地理空间索引、全文索引等。合适的索引能够极大地优化查询性能,使查询操作更快地锁定目标数据。MongoDB 的并行查询在有适当索引的情况下表现会更好,因为索引能降低每个查询的处理时间,允许系统更快地响应并发请求。
18.1.2.4 聚合管道的并行处理
MongoDB 的聚合框架能够高效地并行处理复杂的分析查询。聚合管道的每个阶段可以由不同的线程执行,并发处理数据流。尤其在分片集群中,MongoDB 会在各个分片上并行执行聚合查询,最后合并结果,显著提升查询速度。
18.1.2.5 内存与锁管理优化
MongoDB 使用锁的粒度较小,最新版本(4.2 及更高)支持文档级锁,而不是数据库或集合级锁,这减少了并发查询时的锁争用。此外,MongoDB 会将常用数据缓存到内存中,并支持对内存进行并行管理,从而提升查询速度。
18.1.2.6 写入与查询分离
在主从(Primary-Secondary)架构中,读写请求可以分离,写入操作只在主节点上进行,查询可以在从节点上执行。通过将查询请求分散到不同的从节点,可以提升并行查询能力。
18.2 安装 MongoDB
在 macOS 中部署 MongoDB 可以通过 Homebrew 安装并配置以快速启动服务。
18.2.1 使用 Homebrew 安装 MongoDB
首先,确保 macOS 上安装了 Homebrew。执行以下命令来安装 MongoDB 社区版:
brew tap mongodb/brew
brew install mongodb-community18.2.2 启动 MongoDB 服务
安装完成后,可以通过以下命令启动 MongoDB 服务:
brew services start mongodb/brew/mongodb-community这将 MongoDB 作为服务启动,默认会监听本地的 27017 端口。
18.2.3 验证 MongoDB 是否启动成功
可以使用以下命令检查 MongoDB 是否正在运行:
brew services list你还可以直接使用 MongoDB Shell (mongosh) 连接到本地的 MongoDB 实例:
mongosh如果成功连接,说明 MongoDB 已正常运行。
18.2.4 停止 MongoDB 服务
如果需要停止 MongoDB 服务,可以执行以下命令:
brew services stop mongodb/brew/mongodb-community18.2.5 配置 MongoDB 以支持远程连接(可选)
默认情况下,MongoDB 只允许本地连接。如果你想配置 MongoDB 允许远程连接,可以修改 MongoDB 的配置文件:
打开 MongoDB 配置文件:
nano /opt/homebrew/etc/mongod.conf找到
bindIp字段,将其从127.0.0.1修改为0.0.0.0:net: bindIp: 0.0.0.0 port: 27017保存文件并重新启动 MongoDB 服务:
brew services restart mongodb/brew/mongodb-community
注意:开启远程连接可能会带来安全风险,建议在生产环境中配置防火墙或限制 IP 访问。
以上步骤完成后,MongoDB 已成功在 macOS 上安装并部署。
18.3 在阿里云配置 MongoDB
在阿里云 Ubuntu 服务器上安装 MongoDB 并对外提供服务的步骤如下:
18.3.1 安装 MongoDB
MongoDB 提供了官方的 APT 仓库,可以直接通过包管理器安装最新版本。
# 更新本地包索引
sudo apt update
# 安装依赖包
sudo apt install -y gnupg
# 添加 MongoDB 官方的公钥
wget -qO - https://www.mongodb.org/static/pgp/server-6.0.asc | sudo apt-key add -
# 创建 MongoDB 的 APT 源文件
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/6.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-6.0.list
# 更新本地包索引
sudo apt update
# 安装 MongoDB
sudo apt install -y mongodb-org18.3.2 启动 MongoDB 服务
sudo systemctl start mongod
sudo systemctl enable mongod18.3.3 配置 MongoDB 对外服务
默认情况下,MongoDB 只监听本地接口 127.0.0.1。为了让 MongoDB 对外提供服务,需要修改其配置文件,监听所有网络接口。
# 编辑 MongoDB 配置文件
sudo nano /etc/mongod.conf找到 bindIp 字段,将其从 127.0.0.1 修改为 0.0.0.0:
# /etc/mongod.conf
net:
port: 27017
bindIp: 0.0.0.0保存文件并重启 MongoDB 服务:
sudo systemctl restart mongod18.3.4 配置防火墙和安全组
18.3.4.1 阿里云安全组配置
- 登录到阿里云控制台。
- 进入 云服务器 ECS,选择目标实例。
- 进入 安全组 配置,添加一条规则:
- 端口范围:27017
- 授权对象:设置为允许的 IP(如
0.0.0.0/0表示允许所有 IP 访问),但建议仅对可信的 IP 开放。
18.3.4.2 配置 UFW 防火墙(可选)
若启用了 UFW 防火墙,需要添加 MongoDB 端口允许规则:
sudo ufw allow 2701718.3.5 创建用户并启用认证(可选)
为了提高安全性,可以为 MongoDB 创建用户并启用身份验证。
# 进入 MongoDB shell
mongosh
# 切换到 admin 数据库
use admin
# 创建管理员用户
db.createUser({
user: "admin",
pwd: "yourpassword",
roles: [{ role: "root", db: "admin" }]
})在 /etc/mongod.conf 文件中启用身份验证,将 security 设置如下:
security:
authorization: "enabled"重启 MongoDB:
sudo systemctl restart mongod连接时需要提供用户名和密码:
mongosh -u admin -p yourpassword --authenticationDatabase admin完成以上步骤后,MongoDB 即可在阿里云 Ubuntu 服务器上对外提供服务。
18.4 在 Python 中使用 MongoDB
以下是一个在 MongoDB 中存储和检索大模型响应的示例,假设你要存储模型输入和相应的输出,并支持简单的缓存逻辑(比如通过时间戳自动清理旧响应)。
18.4.1 连接 MongoDB
18.4.2 存储响应
我们可以定义一个函数,将模型的输入和响应输出存储到 MongoDB 中。可以选择存储一个过期时间戳,用于后续自动清理。
def cache_response(input_data, response_data, ttl_hours=24):
# 创建记录
document = {
"input": input_data,
"response": response_data,
"timestamp": datetime.utcnow(),
"expiry": datetime.utcnow() + timedelta(hours=ttl_hours) # 设置过期时间
}
# 插入到 MongoDB
collection.insert_one(document)18.4.3 检索缓存响应
查询 MongoDB 时,可以通过输入数据进行查找,并筛选出未过期的响应。
def get_cached_response(input_data):
document = collection.find_one({
"input": input_data,
"expiry": {"$gt": datetime.utcnow()} # 确保未过期
})
if document:
return document["response"]
return None # 若未找到,返回 None18.4.4 自动清理过期缓存
可以定期执行清理,将已过期的记录删除:
def clean_expired_cache():
result = collection.delete_many({"expiry": {"$lt": datetime.utcnow()}})
print(f"Deleted {result.deleted_count} expired cache entries.")18.4.5 示例使用
input_data = {"question": "What is AI?"}
response_data = {"answer": "AI stands for Artificial Intelligence."}
# 缓存模型响应
cache_response(input_data, response_data)
# 查询缓存
cached_response = get_cached_response(input_data)
if cached_response:
print("Cache hit:", cached_response)
else:
print("Cache miss")Cache hit: {'answer': 'AI stands for Artificial Intelligence.'}
# 清理过期缓存
clean_expired_cache()Deleted 0 expired cache entries.
18.4.6 注意
ttl_hours可以根据需求设置。- 可以定期调用
clean_expired_cache来清理过期数据。
18.5 在 R 中使用 MongoDB
在 R 中使用 MongoDB,可以借助 mongolite 包,该包提供了对 MongoDB 的直接连接和操作接口,并支持数据的插入、查询、更新、删除等功能。以下是一个基本的示例,展示如何在 R 中连接 MongoDB 并执行一些常见操作。
18.5.1 安装 mongolite 包
如果还没有安装 mongolite 包,可以通过以下命令安装:
install.packages("mongolite")18.5.2 连接 MongoDB
假设 MongoDB 在本地运行,端口为默认的 27017,并且数据库名称为 mydatabase,集合名称为 mycollection。
library(mongolite)
# 连接到 MongoDB 数据库
mongo_connection = mongo(
collection = "mycollection",
db = "mydatabase",
url = "mongodb://localhost:27017"
)18.5.3 插入数据
可以使用 JSON 字符串或 R 的 data.frame 直接插入数据。
# 插入单条 JSON 数据
mongo_connection$insert('{"name": "Alice", "age": 30, "city": "New York"}')
# 插入多条数据(使用 data.frame)
data = data.frame(
name = c("Bob", "Charlie"),
age = c(25, 35),
city = c("Los Angeles", "Chicago")
)
mongo_connection$insert(data)18.5.4 查询数据
使用 $find() 方法可以查询数据,并将结果返回为 data.frame。
# 查询所有数据
all_data = mongo_connection$find('{}')
print(all_data)
# 带条件查询
filtered_data = mongo_connection$find('{"age": {"$gt": 30}}')
print(filtered_data)18.5.5 更新数据
可以使用 $update() 方法更新文档,传入查询条件和更新内容。
# 将 name 为 "Alice" 的 age 更新为 31
mongo_connection$update(
'{"name": "Alice"}',
'{"$set": {"age": 31}}'
)18.5.6 删除数据
使用 $remove() 方法删除指定条件的数据。
# 删除 age 小于 30 的文档
mongo_connection$remove('{"age": {"$lt": 30}}')18.5.7 断开连接
操作完成后,断开连接。
rm(mongo_connection) # 释放连接资源18.5.8 示例总结
综上,以下代码演示了完整的操作流程:
library(mongolite)
# 连接 MongoDB
mongo_connection = mongo(
collection = "mycollection",
db = "mydatabase",
url = "mongodb://localhost:27017"
)
# 插入数据
mongo_connection$insert('{"name": "Alice", "age": 30, "city": "New York"}')List of 6
$ nInserted : int 1
$ nMatched : int 0
$ nModified : int 0
$ nRemoved : int 0
$ nUpserted : int 0
$ writeErrors: list()
data = data.frame(
name = c("Bob", "Charlie"),
age = c(25, 35),
city = c("Los Angeles", "Chicago")
)
mongo_connection$insert(data)List of 5
$ nInserted : num 2
$ nMatched : num 0
$ nRemoved : num 0
$ nUpserted : num 0
$ writeErrors: list()
# 查询数据
print(mongo_connection$find('{}')) name age city
1 Alice 31 New York
2 Charlie 35 Chicago
3 Alice 30 New York
4 Bob 25 Los Angeles
5 Charlie 35 Chicago
# 更新数据
mongo_connection$update('{"name": "Alice"}', '{"$set": {"age": 31}}')List of 3
$ modifiedCount: int 0
$ matchedCount : int 1
$ upsertedCount: int 0
# 删除数据
mongo_connection$remove('{"age": {"$lt": 30}}')
# 断开连接
rm(mongo_connection)这种方法可以让 R 用户便捷地在 MongoDB 上存储、查询和更新数据。
18.6 本地和阿里云 MongoDB 的同步
要将本地的 MongoDB 数据库与阿里云服务器上的 MongoDB 实现同步,通常可以使用以下几种方法:
18.6.1 使用 mongodump 和 mongorestore 手动同步
这是最简单的一种方式,通过导出和导入数据来实现同步。
步骤:
在本地主机上导出数据:
mongodump --host localhost --port 27017 --out /path/to/dump这会在指定的路径下生成数据库的备份文件。
将备份文件上传到阿里云服务器:
使用
scp命令将备份文件夹传输到阿里云服务器:scp -r /path/to/dump username@aliyun_ip:/path/on/server在阿里云服务器上导入数据:
连接到阿里云服务器后,使用
mongorestore命令将数据导入 MongoDB:mongorestore --host localhost --port 27017 /path/on/server/dump
这种方法适合小规模的数据同步。如果数据量较大或同步频繁,不建议使用此方法。
18.6.2 使用 MongoDB 同步工具 mongosync
mongosync 是一个专门用于同步不同 MongoDB 实例的开源工具。它支持实时同步,并适用于数据量较大且需要持续同步的场景。
步骤:
在本地和阿里云 MongoDB 服务器上安装
mongosync。配置
mongosync的源和目标数据库,在本地 MongoDB 上设置为源,阿里云 MongoDB 为目标。启动
mongosync进行同步。具体的命令和配置可以参考mongosync的官方文档。
18.6.3 设置 MongoDB 的副本集(Replica Set)
如果需要长期的、实时的同步,推荐使用 MongoDB 的 副本集 功能。
步骤:
配置本地主机和阿里云 MongoDB 为副本集成员:
修改本地主机和阿里云服务器上的 MongoDB 配置文件
/etc/mongod.conf,设置副本集名称。例如:replication: replSetName: "myReplicaSet"
初始化副本集:
在任意一个 MongoDB 实例上启动副本集:
mongosh --host localhost --port 27017 rs.initiate()添加成员:
将本地主机和阿里云 MongoDB 作为副本集成员添加。可以在 MongoDB shell 中执行:
rs.add("阿里云服务器IP:27017") // 阿里云服务器的MongoDB实例 rs.add("localhost:27017") // 本地的MongoDB实例验证副本集同步:
副本集启动后,MongoDB 会自动保持各个节点的数据同步。
18.7 与 PostgreSQL 的对比
使用 PostgreSQL 存储大语言模型的响应,以便作为缓存,可以按照以下步骤实现。我们可以创建一个包含请求数据和模型响应的表,同时记录创建时间,以便后续根据时间判断缓存是否过期。
18.7.1 创建缓存表
我们首先创建一个表来存储模型请求和响应,结构包含输入参数、响应内容、创建时间等字段。
CREATE TABLE model_cache (
id SERIAL PRIMARY KEY,
input_hash VARCHAR(64) NOT NULL,
request_data JSONB NOT NULL,
response_data JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 为 input_hash 字段创建唯一索引,用于快速查找缓存
CREATE UNIQUE INDEX idx_input_hash ON model_cache (input_hash);input_hash:对输入内容进行哈希,避免因输入内容重复而浪费存储。request_data:存储模型的输入请求,以 JSON 格式存储。response_data:存储模型的响应内容。created_at:记录数据创建的时间,用于设置缓存过期策略。
18.7.2 在 Python 中实现缓存存储和检索
以下是一个基于 Python 的示例,假设使用 psycopg2 库连接 PostgreSQL。此示例会在接收到新的请求时,先检查缓存,若未命中缓存则存储新响应。
18.7.2.1 导入必要库
import psycopg2
import hashlib
import json
from datetime import datetime, timedelta18.7.2.2 设置数据库连接
# 连接 PostgreSQL
connection = psycopg2.connect(
host="your_postgresql_host",
database="your_database",
user="your_user",
password="your_password"
)18.7.2.3 哈希生成函数
为输入请求生成唯一的哈希值,用于缓存键值。
def generate_hash(input_data):
return hashlib.sha256(json.dumps(input_data, sort_keys=True).encode()).hexdigest()18.7.2.4 缓存存储函数
def cache_response(input_data, response_data, ttl_hours=24):
input_hash = generate_hash(input_data)
cursor = connection.cursor()
# 插入或更新响应数据
cursor.execute("""
INSERT INTO model_cache (input_hash, request_data, response_data, created_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (input_hash) DO UPDATE
SET response_data = EXCLUDED.response_data,
created_at = NOW();
""", (input_hash, json.dumps(input_data), json.dumps(response_data)))
connection.commit()
cursor.close()18.7.2.5 缓存查询函数
在接收请求时,可以先检查缓存,若命中则直接返回缓存的响应。
def get_cached_response(input_data, ttl_hours=24):
input_hash = generate_hash(input_data)
cursor = connection.cursor()
# 计算过期时间
ttl_time = datetime.utcnow() - timedelta(hours=ttl_hours)
cursor.execute("""
SELECT response_data FROM model_cache
WHERE input_hash = %s AND created_at > %s;
""", (input_hash, ttl_time))
result = cursor.fetchone()
cursor.close()
if result:
return json.loads(result[0]) # 返回缓存的响应数据
return None # 缓存未命中或已过期18.7.3 示例使用
# 模拟输入请求数据和模型响应
input_data = {"question": "What is AI?"}
response_data = {"answer": "AI stands for Artificial Intelligence."}
# 存储模型响应
cache_response(input_data, response_data)
# 查询缓存
cached_response = get_cached_response(input_data)
if cached_response:
print("Cache hit:", cached_response)
else:
print("Cache miss")18.7.4 定期清理过期缓存
可以创建一个定期任务,清除超过指定时间的缓存数据。
DELETE FROM model_cache WHERE created_at < NOW() - INTERVAL '24 hours';18.7.5 总结
这种方法可以让 PostgreSQL 有效地作为大语言模型的缓存存储,避免重复计算和存储不必要的数据。