from pymongo import MongoClient
from datetime import datetime, timedelta
# 连接到 MongoDB
= MongoClient("mongodb://localhost:27017/")
client = client["model_cache_db"]
db = db["responses"] collection
18 MongoDB 数据库
大语言模型的数据,都是以 JSON 格式存储的。选择合适的数据库,对于数据存储和查询的性能,至关重要。经过调研,MongoDB 是存储 JSON 格式数据的最佳选择。
18.1 选择 MongoDB 的理由
18.1.1 可选的数据库架构
对于缓存大模型的响应,推荐使用以下几种数据库,具体选择取决于对缓存效率、持久性、以及查询能力的需求:
18.1.1.1 Redis
Redis 是一个内存数据库,速度快,适合实时缓存大模型的响应,尤其是在对数据进行频繁读写操作时。可以通过设置过期时间来自动清除旧数据,适合需要高吞吐和低延迟的场景。
优势:
- 高速缓存,响应时间极快。
- 支持自动过期策略,有助于管理缓存空间。
- 可以使用 RedisJSON 插件来更好地支持 JSON 数据结构。
18.1.1.2 Memcached
Memcached 是一种高性能的分布式内存缓存系统,适用于纯缓存用途。和 Redis 类似,它可以快速存取数据,但不支持持久化存储,适合在系统重启后可以清空缓存的场景。
优势:
- 极快的读写速度。
- 占用内存小,适合简单的缓存场景。
- 适用于缓存结构简单且无需持久化的响应。
18.1.1.3 MongoDB
如果希望缓存数据并在应用重启后保持响应数据持久化,MongoDB 是一个选择。它对 JSON 数据支持良好,且能持久化存储。适合那些不仅需要缓存,还希望在缓存中做简单数据分析和检索的场景。
优势:
- JSON 存储和持久化能力。
- 支持灵活查询和索引,适合做更复杂的检索。
18.1.1.4 PostgreSQL
PostgreSQL 也支持 JSON 数据格式,并且提供良好的查询和索引能力。如果需要持久性和较复杂的数据操作,也可以选择 PostgreSQL,但速度会稍逊于内存数据库。
18.1.2 MongoDB 的并行查询能力
MongoDB 的并行查询能力较强,得益于其架构设计和内置的并行处理机制。以下是 MongoDB 并行查询能力的几个主要方面:
18.1.2.1 多线程处理
MongoDB 是多线程的,支持同时处理多个查询请求。当并发请求增加时,MongoDB 会自动为每个请求分配独立的线程,并行执行不同的查询,提升了处理效率。
18.1.2.2 分片(Sharding)支持
对于大规模的数据集,MongoDB 提供了分片功能,将数据分散到多个服务器(分片)上。每个分片可以独立处理查询请求,从而显著提高查询性能。查询请求会并行分发到不同的分片上,减少了单一节点的压力,提升了系统的可扩展性和查询效率。
18.1.2.3 索引优化
MongoDB 支持多种索引,包括单字段索引、复合索引、地理空间索引、全文索引等。合适的索引能够极大地优化查询性能,使查询操作更快地锁定目标数据。MongoDB 的并行查询在有适当索引的情况下表现会更好,因为索引能降低每个查询的处理时间,允许系统更快地响应并发请求。
18.1.2.4 聚合管道的并行处理
MongoDB 的聚合框架能够高效地并行处理复杂的分析查询。聚合管道的每个阶段可以由不同的线程执行,并发处理数据流。尤其在分片集群中,MongoDB 会在各个分片上并行执行聚合查询,最后合并结果,显著提升查询速度。
18.1.2.5 内存与锁管理优化
MongoDB 使用锁的粒度较小,最新版本(4.2 及更高)支持文档级锁,而不是数据库或集合级锁,这减少了并发查询时的锁争用。此外,MongoDB 会将常用数据缓存到内存中,并支持对内存进行并行管理,从而提升查询速度。
18.1.2.6 写入与查询分离
在主从(Primary-Secondary)架构中,读写请求可以分离,写入操作只在主节点上进行,查询可以在从节点上执行。通过将查询请求分散到不同的从节点,可以提升并行查询能力。
18.2 安装 MongoDB
在 macOS 中部署 MongoDB 可以通过 Homebrew 安装并配置以快速启动服务。
18.2.1 使用 Homebrew 安装 MongoDB
首先,确保 macOS 上安装了 Homebrew。执行以下命令来安装 MongoDB 社区版:
brew tap mongodb/brew
brew install mongodb-community
18.2.2 启动 MongoDB 服务
安装完成后,可以通过以下命令启动 MongoDB 服务:
brew services start mongodb/brew/mongodb-community
这将 MongoDB 作为服务启动,默认会监听本地的 27017
端口。
18.2.3 验证 MongoDB 是否启动成功
可以使用以下命令检查 MongoDB 是否正在运行:
brew services list
你还可以直接使用 MongoDB Shell (mongosh
) 连接到本地的 MongoDB 实例:
mongosh
如果成功连接,说明 MongoDB 已正常运行。
18.2.4 停止 MongoDB 服务
如果需要停止 MongoDB 服务,可以执行以下命令:
brew services stop mongodb/brew/mongodb-community
18.2.5 配置 MongoDB 以支持远程连接(可选)
默认情况下,MongoDB 只允许本地连接。如果你想配置 MongoDB 允许远程连接,可以修改 MongoDB 的配置文件:
打开 MongoDB 配置文件:
nano /opt/homebrew/etc/mongod.conf
找到
bindIp
字段,将其从127.0.0.1
修改为0.0.0.0
:net: bindIp: 0.0.0.0 port: 27017
保存文件并重新启动 MongoDB 服务:
brew services restart mongodb/brew/mongodb-community
注意:开启远程连接可能会带来安全风险,建议在生产环境中配置防火墙或限制 IP 访问。
以上步骤完成后,MongoDB 已成功在 macOS 上安装并部署。
18.3 在阿里云配置 MongoDB
在阿里云 Ubuntu 服务器上安装 MongoDB 并对外提供服务的步骤如下:
18.3.1 安装 MongoDB
MongoDB 提供了官方的 APT 仓库,可以直接通过包管理器安装最新版本。
# 更新本地包索引
sudo apt update
# 安装依赖包
sudo apt install -y gnupg
# 添加 MongoDB 官方的公钥
wget -qO - https://www.mongodb.org/static/pgp/server-6.0.asc | sudo apt-key add -
# 创建 MongoDB 的 APT 源文件
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/6.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-6.0.list
# 更新本地包索引
sudo apt update
# 安装 MongoDB
sudo apt install -y mongodb-org
18.3.2 启动 MongoDB 服务
sudo systemctl start mongod
sudo systemctl enable mongod
18.3.3 配置 MongoDB 对外服务
默认情况下,MongoDB 只监听本地接口 127.0.0.1
。为了让 MongoDB 对外提供服务,需要修改其配置文件,监听所有网络接口。
# 编辑 MongoDB 配置文件
sudo nano /etc/mongod.conf
找到 bindIp
字段,将其从 127.0.0.1
修改为 0.0.0.0
:
# /etc/mongod.conf
net:
port: 27017
bindIp: 0.0.0.0
保存文件并重启 MongoDB 服务:
sudo systemctl restart mongod
18.3.4 配置防火墙和安全组
18.3.4.1 阿里云安全组配置
- 登录到阿里云控制台。
- 进入 云服务器 ECS,选择目标实例。
- 进入 安全组 配置,添加一条规则:
- 端口范围:27017
- 授权对象:设置为允许的 IP(如
0.0.0.0/0
表示允许所有 IP 访问),但建议仅对可信的 IP 开放。
18.3.4.2 配置 UFW 防火墙(可选)
若启用了 UFW 防火墙,需要添加 MongoDB 端口允许规则:
sudo ufw allow 27017
18.3.5 创建用户并启用认证(可选)
为了提高安全性,可以为 MongoDB 创建用户并启用身份验证。
# 进入 MongoDB shell
mongosh
# 切换到 admin 数据库
use admin
# 创建管理员用户
db.createUser({
user: "admin",
pwd: "yourpassword",
roles: [{ role: "root", db: "admin" }]
})
在 /etc/mongod.conf
文件中启用身份验证,将 security
设置如下:
security:
authorization: "enabled"
重启 MongoDB:
sudo systemctl restart mongod
连接时需要提供用户名和密码:
mongosh -u admin -p yourpassword --authenticationDatabase admin
完成以上步骤后,MongoDB 即可在阿里云 Ubuntu 服务器上对外提供服务。
18.4 在 Python 中使用 MongoDB
以下是一个在 MongoDB 中存储和检索大模型响应的示例,假设你要存储模型输入和相应的输出,并支持简单的缓存逻辑(比如通过时间戳自动清理旧响应)。
18.4.1 连接 MongoDB
18.4.2 存储响应
我们可以定义一个函数,将模型的输入和响应输出存储到 MongoDB 中。可以选择存储一个过期时间戳,用于后续自动清理。
def cache_response(input_data, response_data, ttl_hours=24):
# 创建记录
= {
document "input": input_data,
"response": response_data,
"timestamp": datetime.utcnow(),
"expiry": datetime.utcnow() + timedelta(hours=ttl_hours) # 设置过期时间
}# 插入到 MongoDB
collection.insert_one(document)
18.4.3 检索缓存响应
查询 MongoDB 时,可以通过输入数据进行查找,并筛选出未过期的响应。
def get_cached_response(input_data):
= collection.find_one({
document "input": input_data,
"expiry": {"$gt": datetime.utcnow()} # 确保未过期
})if document:
return document["response"]
return None # 若未找到,返回 None
18.4.4 自动清理过期缓存
可以定期执行清理,将已过期的记录删除:
def clean_expired_cache():
= collection.delete_many({"expiry": {"$lt": datetime.utcnow()}})
result print(f"Deleted {result.deleted_count} expired cache entries.")
18.4.5 示例使用
= {"question": "What is AI?"}
input_data = {"answer": "AI stands for Artificial Intelligence."}
response_data
# 缓存模型响应
cache_response(input_data, response_data)
# 查询缓存
= get_cached_response(input_data)
cached_response if cached_response:
print("Cache hit:", cached_response)
else:
print("Cache miss")
Cache hit: {'answer': 'AI stands for Artificial Intelligence.'}
# 清理过期缓存
clean_expired_cache()
Deleted 0 expired cache entries.
18.4.6 注意
ttl_hours
可以根据需求设置。- 可以定期调用
clean_expired_cache
来清理过期数据。
18.5 在 R 中使用 MongoDB
在 R 中使用 MongoDB,可以借助 mongolite
包,该包提供了对 MongoDB 的直接连接和操作接口,并支持数据的插入、查询、更新、删除等功能。以下是一个基本的示例,展示如何在 R 中连接 MongoDB 并执行一些常见操作。
18.5.1 安装 mongolite
包
如果还没有安装 mongolite
包,可以通过以下命令安装:
install.packages("mongolite")
18.5.2 连接 MongoDB
假设 MongoDB 在本地运行,端口为默认的 27017
,并且数据库名称为 mydatabase
,集合名称为 mycollection
。
library(mongolite)
# 连接到 MongoDB 数据库
= mongo(
mongo_connection collection = "mycollection",
db = "mydatabase",
url = "mongodb://localhost:27017"
)
18.5.3 插入数据
可以使用 JSON 字符串或 R 的 data.frame
直接插入数据。
# 插入单条 JSON 数据
$insert('{"name": "Alice", "age": 30, "city": "New York"}')
mongo_connection
# 插入多条数据(使用 data.frame)
= data.frame(
data name = c("Bob", "Charlie"),
age = c(25, 35),
city = c("Los Angeles", "Chicago")
)$insert(data) mongo_connection
18.5.4 查询数据
使用 $find()
方法可以查询数据,并将结果返回为 data.frame
。
# 查询所有数据
= mongo_connection$find('{}')
all_data print(all_data)
# 带条件查询
= mongo_connection$find('{"age": {"$gt": 30}}')
filtered_data print(filtered_data)
18.5.5 更新数据
可以使用 $update()
方法更新文档,传入查询条件和更新内容。
# 将 name 为 "Alice" 的 age 更新为 31
$update(
mongo_connection'{"name": "Alice"}',
'{"$set": {"age": 31}}'
)
18.5.6 删除数据
使用 $remove()
方法删除指定条件的数据。
# 删除 age 小于 30 的文档
$remove('{"age": {"$lt": 30}}') mongo_connection
18.5.7 断开连接
操作完成后,断开连接。
rm(mongo_connection) # 释放连接资源
18.5.8 示例总结
综上,以下代码演示了完整的操作流程:
library(mongolite)
# 连接 MongoDB
= mongo(
mongo_connection collection = "mycollection",
db = "mydatabase",
url = "mongodb://localhost:27017"
)
# 插入数据
$insert('{"name": "Alice", "age": 30, "city": "New York"}') mongo_connection
List of 6
$ nInserted : int 1
$ nMatched : int 0
$ nModified : int 0
$ nRemoved : int 0
$ nUpserted : int 0
$ writeErrors: list()
= data.frame(
data name = c("Bob", "Charlie"),
age = c(25, 35),
city = c("Los Angeles", "Chicago")
)$insert(data) mongo_connection
List of 5
$ nInserted : num 2
$ nMatched : num 0
$ nRemoved : num 0
$ nUpserted : num 0
$ writeErrors: list()
# 查询数据
print(mongo_connection$find('{}'))
name age city
1 Alice 31 New York
2 Charlie 35 Chicago
3 Alice 30 New York
4 Bob 25 Los Angeles
5 Charlie 35 Chicago
# 更新数据
$update('{"name": "Alice"}', '{"$set": {"age": 31}}') mongo_connection
List of 3
$ modifiedCount: int 0
$ matchedCount : int 1
$ upsertedCount: int 0
# 删除数据
$remove('{"age": {"$lt": 30}}')
mongo_connection
# 断开连接
rm(mongo_connection)
这种方法可以让 R 用户便捷地在 MongoDB 上存储、查询和更新数据。
18.6 本地和阿里云 MongoDB 的同步
要将本地的 MongoDB 数据库与阿里云服务器上的 MongoDB 实现同步,通常可以使用以下几种方法:
18.6.1 使用 mongodump
和 mongorestore
手动同步
这是最简单的一种方式,通过导出和导入数据来实现同步。
步骤:
在本地主机上导出数据:
mongodump --host localhost --port 27017 --out /path/to/dump
这会在指定的路径下生成数据库的备份文件。
将备份文件上传到阿里云服务器:
使用
scp
命令将备份文件夹传输到阿里云服务器:scp -r /path/to/dump username@aliyun_ip:/path/on/server
在阿里云服务器上导入数据:
连接到阿里云服务器后,使用
mongorestore
命令将数据导入 MongoDB:mongorestore --host localhost --port 27017 /path/on/server/dump
这种方法适合小规模的数据同步。如果数据量较大或同步频繁,不建议使用此方法。
18.6.2 使用 MongoDB 同步工具 mongosync
mongosync
是一个专门用于同步不同 MongoDB 实例的开源工具。它支持实时同步,并适用于数据量较大且需要持续同步的场景。
步骤:
在本地和阿里云 MongoDB 服务器上安装
mongosync
。配置
mongosync
的源和目标数据库,在本地 MongoDB 上设置为源,阿里云 MongoDB 为目标。启动
mongosync
进行同步。具体的命令和配置可以参考mongosync
的官方文档。
18.6.3 设置 MongoDB 的副本集(Replica Set)
如果需要长期的、实时的同步,推荐使用 MongoDB 的 副本集 功能。
步骤:
配置本地主机和阿里云 MongoDB 为副本集成员:
修改本地主机和阿里云服务器上的 MongoDB 配置文件
/etc/mongod.conf
,设置副本集名称。例如:replication: replSetName: "myReplicaSet"
初始化副本集:
在任意一个 MongoDB 实例上启动副本集:
mongosh --host localhost --port 27017 rs.initiate()
添加成员:
将本地主机和阿里云 MongoDB 作为副本集成员添加。可以在 MongoDB shell 中执行:
.add("阿里云服务器IP:27017") // 阿里云服务器的MongoDB实例 rs.add("localhost:27017") // 本地的MongoDB实例 rs
验证副本集同步:
副本集启动后,MongoDB 会自动保持各个节点的数据同步。
18.7 与 PostgreSQL 的对比
使用 PostgreSQL 存储大语言模型的响应,以便作为缓存,可以按照以下步骤实现。我们可以创建一个包含请求数据和模型响应的表,同时记录创建时间,以便后续根据时间判断缓存是否过期。
18.7.1 创建缓存表
我们首先创建一个表来存储模型请求和响应,结构包含输入参数、响应内容、创建时间等字段。
CREATE TABLE model_cache (
id SERIAL PRIMARY KEY,
VARCHAR(64) NOT NULL,
input_hash NOT NULL,
request_data JSONB NOT NULL,
response_data JSONB DEFAULT NOW()
created_at TIMESTAMPTZ
);
-- 为 input_hash 字段创建唯一索引,用于快速查找缓存
CREATE UNIQUE INDEX idx_input_hash ON model_cache (input_hash);
input_hash
:对输入内容进行哈希,避免因输入内容重复而浪费存储。request_data
:存储模型的输入请求,以 JSON 格式存储。response_data
:存储模型的响应内容。created_at
:记录数据创建的时间,用于设置缓存过期策略。
18.7.2 在 Python 中实现缓存存储和检索
以下是一个基于 Python 的示例,假设使用 psycopg2
库连接 PostgreSQL。此示例会在接收到新的请求时,先检查缓存,若未命中缓存则存储新响应。
18.7.2.1 导入必要库
import psycopg2
import hashlib
import json
from datetime import datetime, timedelta
18.7.2.2 设置数据库连接
# 连接 PostgreSQL
= psycopg2.connect(
connection ="your_postgresql_host",
host="your_database",
database="your_user",
user="your_password"
password )
18.7.2.3 哈希生成函数
为输入请求生成唯一的哈希值,用于缓存键值。
def generate_hash(input_data):
return hashlib.sha256(json.dumps(input_data, sort_keys=True).encode()).hexdigest()
18.7.2.4 缓存存储函数
def cache_response(input_data, response_data, ttl_hours=24):
= generate_hash(input_data)
input_hash = connection.cursor()
cursor
# 插入或更新响应数据
"""
cursor.execute( INSERT INTO model_cache (input_hash, request_data, response_data, created_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (input_hash) DO UPDATE
SET response_data = EXCLUDED.response_data,
created_at = NOW();
""", (input_hash, json.dumps(input_data), json.dumps(response_data)))
connection.commit() cursor.close()
18.7.2.5 缓存查询函数
在接收请求时,可以先检查缓存,若命中则直接返回缓存的响应。
def get_cached_response(input_data, ttl_hours=24):
= generate_hash(input_data)
input_hash = connection.cursor()
cursor
# 计算过期时间
= datetime.utcnow() - timedelta(hours=ttl_hours)
ttl_time
"""
cursor.execute( SELECT response_data FROM model_cache
WHERE input_hash = %s AND created_at > %s;
""", (input_hash, ttl_time))
= cursor.fetchone()
result
cursor.close()
if result:
return json.loads(result[0]) # 返回缓存的响应数据
return None # 缓存未命中或已过期
18.7.3 示例使用
# 模拟输入请求数据和模型响应
= {"question": "What is AI?"}
input_data = {"answer": "AI stands for Artificial Intelligence."}
response_data
# 存储模型响应
cache_response(input_data, response_data)
# 查询缓存
= get_cached_response(input_data)
cached_response if cached_response:
print("Cache hit:", cached_response)
else:
print("Cache miss")
18.7.4 定期清理过期缓存
可以创建一个定期任务,清除超过指定时间的缓存数据。
DELETE FROM model_cache WHERE created_at < NOW() - INTERVAL '24 hours';
18.7.5 总结
这种方法可以让 PostgreSQL 有效地作为大语言模型的缓存存储,避免重复计算和存储不必要的数据。