feat: 集成Redis模块并重构限流存储机制

This commit is contained in:
richarjiang
2025-12-05 10:03:59 +08:00
parent 8a43b1795b
commit b46d99fe69
9 changed files with 618 additions and 5 deletions

View File

@@ -8,6 +8,7 @@ import { ConfigModule } from '@nestjs/config';
import { ScheduleModule } from '@nestjs/schedule';
import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler';
import { LoggerModule } from './common/logger/logger.module';
import { RedisModule, ThrottlerStorageRedisService } from './redis';
import { CheckinsModule } from './checkins/checkins.module';
import { AiCoachModule } from './ai-coach/ai-coach.module';
import { TrainingPlansModule } from './training-plans/training-plans.module';
@@ -33,10 +34,18 @@ import { HealthProfilesModule } from './health-profiles/health-profiles.module';
envFilePath: '.env',
}),
ScheduleModule.forRoot(),
ThrottlerModule.forRoot([{
ttl: 60000, // 时间窗口60秒
limit: 100, // 每个时间窗口最多100个请求
}]),
// 限流模块必须在 RedisModule 之后导入,以确保 Redis 连接可用
RedisModule,
ThrottlerModule.forRootAsync({
useFactory: (throttlerStorage: ThrottlerStorageRedisService) => ({
throttlers: [{
ttl: 60000, // 时间窗口60秒
limit: 100, // 每个时间窗口最多100个请求
}],
storage: throttlerStorage,
}),
inject: [ThrottlerStorageRedisService],
}),
LoggerModule,
DatabaseModule,
UsersModule,

View File

@@ -17,7 +17,7 @@ import * as dayjs from 'dayjs';
@Injectable()
export class MedicationReminderService {
private readonly logger = new Logger(MedicationReminderService.name);
private readonly REMINDER_MINUTES_BEFORE = 2; // 提前5分钟提醒
private readonly REMINDER_MINUTES_BEFORE = 5; // 提前5分钟提醒
private readonly OVERDUE_HOURS_THRESHOLD = 1; // 超过1小时后发送超时提醒
private readonly EXPIRY_ONE_MONTH_DAYS = 30; // 提前一个月预警
private readonly EXPIRY_ONE_WEEK_DAYS = 7; // 提前一周预警

3
src/redis/index.ts Normal file
View File

@@ -0,0 +1,3 @@
export * from './redis.module';
export * from './redis.service';
export * from './throttler-storage-redis.service';

46
src/redis/redis.module.ts Normal file
View File

@@ -0,0 +1,46 @@
import { Global, Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { RedisService } from './redis.service';
import { ThrottlerStorageRedisService } from './throttler-storage-redis.service';
@Global()
@Module({
imports: [ConfigModule],
providers: [
{
provide: 'REDIS_CLIENT',
useFactory: async (configService: ConfigService) => {
const Redis = (await import('ioredis')).default;
const client = new Redis({
host: configService.get<string>('REDIS_HOST', '127.0.0.1'),
port: configService.get<number>('REDIS_PORT', 6379),
password: configService.get<string>('REDIS_PASSWORD', ''),
db: configService.get<number>('REDIS_DB', 0),
keyPrefix: configService.get<string>('REDIS_PREFIX', 'pilates:'),
retryStrategy: (times: number) => {
if (times > 3) {
return null; // 停止重试
}
return Math.min(times * 200, 2000);
},
maxRetriesPerRequest: 3,
});
client.on('connect', () => {
console.log('Redis client connected');
});
client.on('error', (err) => {
console.error('Redis client error:', err);
});
return client;
},
inject: [ConfigService],
},
RedisService,
ThrottlerStorageRedisService,
],
exports: ['REDIS_CLIENT', RedisService, ThrottlerStorageRedisService],
})
export class RedisModule {}

315
src/redis/redis.service.ts Normal file
View File

@@ -0,0 +1,315 @@
import { Injectable, Inject, OnModuleDestroy, Logger } from '@nestjs/common';
import Redis from 'ioredis';
@Injectable()
export class RedisService implements OnModuleDestroy {
private readonly logger = new Logger(RedisService.name);
constructor(
@Inject('REDIS_CLIENT')
private readonly redis: Redis,
) {}
async onModuleDestroy() {
await this.redis.quit();
this.logger.log('Redis connection closed');
}
/**
* 获取原始 Redis 客户端(用于高级操作)
*/
getClient(): Redis {
return this.redis;
}
// ==================== 基础操作 ====================
/**
* 设置键值
* @param key 键
* @param value 值
* @param ttlSeconds 过期时间(秒),可选
*/
async set(key: string, value: string, ttlSeconds?: number): Promise<void> {
if (ttlSeconds) {
await this.redis.setex(key, ttlSeconds, value);
} else {
await this.redis.set(key, value);
}
}
/**
* 获取键值
*/
async get(key: string): Promise<string | null> {
return this.redis.get(key);
}
/**
* 删除键
*/
async del(...keys: string[]): Promise<number> {
return this.redis.del(...keys);
}
/**
* 检查键是否存在
*/
async exists(key: string): Promise<boolean> {
const result = await this.redis.exists(key);
return result === 1;
}
/**
* 设置过期时间
*/
async expire(key: string, seconds: number): Promise<boolean> {
const result = await this.redis.expire(key, seconds);
return result === 1;
}
/**
* 获取剩余过期时间
*/
async ttl(key: string): Promise<number> {
return this.redis.ttl(key);
}
// ==================== JSON 操作 ====================
/**
* 设置 JSON 对象
*/
async setJson<T>(key: string, value: T, ttlSeconds?: number): Promise<void> {
const jsonString = JSON.stringify(value);
await this.set(key, jsonString, ttlSeconds);
}
/**
* 获取 JSON 对象
*/
async getJson<T>(key: string): Promise<T | null> {
const value = await this.get(key);
if (!value) return null;
try {
return JSON.parse(value) as T;
} catch {
return null;
}
}
// ==================== Hash 操作 ====================
/**
* 设置 Hash 字段
*/
async hset(key: string, field: string, value: string): Promise<number> {
return this.redis.hset(key, field, value);
}
/**
* 获取 Hash 字段
*/
async hget(key: string, field: string): Promise<string | null> {
return this.redis.hget(key, field);
}
/**
* 获取所有 Hash 字段
*/
async hgetall(key: string): Promise<Record<string, string>> {
return this.redis.hgetall(key);
}
/**
* 删除 Hash 字段
*/
async hdel(key: string, ...fields: string[]): Promise<number> {
return this.redis.hdel(key, ...fields);
}
// ==================== List 操作 ====================
/**
* 从左侧推入列表
*/
async lpush(key: string, ...values: string[]): Promise<number> {
return this.redis.lpush(key, ...values);
}
/**
* 从右侧推入列表
*/
async rpush(key: string, ...values: string[]): Promise<number> {
return this.redis.rpush(key, ...values);
}
/**
* 获取列表范围
*/
async lrange(key: string, start: number, stop: number): Promise<string[]> {
return this.redis.lrange(key, start, stop);
}
/**
* 获取列表长度
*/
async llen(key: string): Promise<number> {
return this.redis.llen(key);
}
// ==================== Set 操作 ====================
/**
* 添加 Set 成员
*/
async sadd(key: string, ...members: string[]): Promise<number> {
return this.redis.sadd(key, ...members);
}
/**
* 获取所有 Set 成员
*/
async smembers(key: string): Promise<string[]> {
return this.redis.smembers(key);
}
/**
* 检查是否是 Set 成员
*/
async sismember(key: string, member: string): Promise<boolean> {
const result = await this.redis.sismember(key, member);
return result === 1;
}
/**
* 移除 Set 成员
*/
async srem(key: string, ...members: string[]): Promise<number> {
return this.redis.srem(key, ...members);
}
// ==================== 计数器操作 ====================
/**
* 自增
*/
async incr(key: string): Promise<number> {
return this.redis.incr(key);
}
/**
* 自增指定值
*/
async incrby(key: string, increment: number): Promise<number> {
return this.redis.incrby(key, increment);
}
/**
* 自减
*/
async decr(key: string): Promise<number> {
return this.redis.decr(key);
}
// ==================== 分布式锁 ====================
/**
* 获取分布式锁
* @param lockKey 锁的键名
* @param ttlSeconds 锁的过期时间(秒)
* @param retryTimes 重试次数
* @param retryDelay 重试间隔(毫秒)
* @returns 锁的唯一标识,获取失败返回 null
*/
async acquireLock(
lockKey: string,
ttlSeconds: number = 10,
retryTimes: number = 3,
retryDelay: number = 100,
): Promise<string | null> {
const lockValue = `${Date.now()}-${Math.random().toString(36).slice(2)}`;
for (let i = 0; i < retryTimes; i++) {
const result = await this.redis.set(
`lock:${lockKey}`,
lockValue,
'EX',
ttlSeconds,
'NX',
);
if (result === 'OK') {
return lockValue;
}
if (i < retryTimes - 1) {
await new Promise((resolve) => setTimeout(resolve, retryDelay));
}
}
return null;
}
/**
* 释放分布式锁
* @param lockKey 锁的键名
* @param lockValue 锁的唯一标识
*/
async releaseLock(lockKey: string, lockValue: string): Promise<boolean> {
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
const result = await this.redis.eval(script, 1, `lock:${lockKey}`, lockValue);
return result === 1;
}
// ==================== 缓存辅助方法 ====================
/**
* 带缓存的数据获取
* 如果缓存存在则返回缓存,否则执行 factory 函数获取数据并缓存
*/
async getOrSet<T>(
key: string,
factory: () => Promise<T>,
ttlSeconds: number = 300,
): Promise<T> {
const cached = await this.getJson<T>(key);
if (cached !== null) {
return cached;
}
const value = await factory();
await this.setJson(key, value, ttlSeconds);
return value;
}
/**
* 批量删除匹配模式的键
* @param pattern 匹配模式,如 "user:*"
*/
async delByPattern(pattern: string): Promise<number> {
const keys = await this.redis.keys(pattern);
if (keys.length === 0) return 0;
return this.redis.del(...keys);
}
// ==================== 健康检查 ====================
/**
* 检查 Redis 连接状态
*/
async ping(): Promise<boolean> {
try {
const result = await this.redis.ping();
return result === 'PONG';
} catch {
return false;
}
}
}

View File

@@ -0,0 +1,93 @@
import { Injectable, Inject, OnModuleDestroy } from '@nestjs/common';
import { ThrottlerStorage } from '@nestjs/throttler';
import Redis from 'ioredis';
export interface ThrottlerStorageRecord {
totalHits: number;
timeToExpire: number;
isBlocked: boolean;
timeToBlockExpire: number;
}
@Injectable()
export class ThrottlerStorageRedisService
implements ThrottlerStorage, OnModuleDestroy
{
private readonly prefix = 'throttler:';
constructor(
@Inject('REDIS_CLIENT')
private readonly redis: Redis,
) {}
async onModuleDestroy() {
// Redis 连接由 RedisModule 管理,这里不需要关闭
}
/**
* 增加指定 key 的请求计数
* @param key 限流 key通常是 IP 或用户标识)
* @param ttl 过期时间(毫秒)
* @param limit 限制次数
* @param blockDuration 封禁时长(毫秒)
* @param throttlerName 限流器名称
*/
async increment(
key: string,
ttl: number,
limit: number,
blockDuration: number,
throttlerName: string,
): Promise<ThrottlerStorageRecord> {
const redisKey = `${this.prefix}${throttlerName}:${key}`;
const blockKey = `${this.prefix}${throttlerName}:block:${key}`;
// 检查是否被封禁
const blockTtl = await this.redis.pttl(blockKey);
if (blockTtl > 0) {
return {
totalHits: limit + 1,
timeToExpire: ttl,
isBlocked: true,
timeToBlockExpire: blockTtl,
};
}
// 使用 Lua 脚本保证原子性操作
const luaScript = `
local current = redis.call('INCR', KEYS[1])
if current == 1 then
redis.call('PEXPIRE', KEYS[1], ARGV[1])
end
local pttl = redis.call('PTTL', KEYS[1])
return {current, pttl}
`;
const result = (await this.redis.eval(
luaScript,
1,
redisKey,
ttl.toString(),
)) as [number, number];
const totalHits = result[0];
const timeToExpire = result[1] > 0 ? result[1] : ttl;
// 如果超过限制且设置了封禁时长,则设置封禁
let isBlocked = false;
let timeToBlockExpire = 0;
if (totalHits > limit && blockDuration > 0) {
await this.redis.set(blockKey, '1', 'PX', blockDuration);
isBlocked = true;
timeToBlockExpire = blockDuration;
}
return {
totalHits,
timeToExpire,
isBlocked,
timeToBlockExpire,
};
}
}