Redis 缓存预热、穿透、击穿和雪崩

一、缓存预热

1. 定义

缓存预热是指在系统上线或重启后,提前将热点数据加载到 Redis 缓存中,避免用户请求直接穿透到数据库,从而提升系统响应速度、降低数据库压力。

2. 业务场景

  • 电商大促:618 / 双 11 前,提前将热门商品(如秒杀商品、爆款商品)的库存、价格、详情等数据加载到缓存。
  • 资讯类 APP:每日早高峰前,将热搜新闻、头条内容预热到缓存,应对早间流量高峰。
  • 游戏开服:新服务器上线前,将玩家初始数据、游戏配置、道具信息等加载到缓存。

3. 实现方式

(1)手动触发

// 缓存预热:加载热门商品数据  
func cacheWarmupHotProducts() error {  
	// 1. 查询热门商品ID(可从配置/数据库/埋点数据获取)  
	hotProductIDs := []uint64{1001, 1002, 1003, 1004}  

	// 2. 批量查询数据库  
	var products []Product  
	if err := db.Where("id IN ?", hotProductIDs).Find(&products).Error; err != nil {  
		return fmt.Errorf("query hot products failed: %v", err)  
	}  

	// 3. 批量写入Redis(使用Pipeline提升效率)  
	pipe := rdb.Pipeline()  
	for _, p := range products {  
		key := fmt.Sprintf("product:%d", p.ID)  
		// Hash结构存储商品信息,设置过期时间(避免缓存永不过期)  
		pipe.HSet(ctx, key, "id", p.ID, "name", p.Name, "price", p.Price, "stock", p.Stock)  
		pipe.Expire(ctx, key, 24*time.Hour)  
	}  

	// 执行Pipeline  
	_, err := pipe.Exec(ctx)  
	if err != nil {  
		return fmt.Errorf("write redis failed: %v", err)  
	}  

	fmt.Printf("缓存预热完成,共加载%d个热门商品\n", len(products))  
	return nil  
}  

(2)定时任务触发

// 定时预热任务  
func scheduledWarmup() {  
	c := cron.New(cron.WithSeconds()) // 支持秒级调度  
	// 每天凌晨2点执行预热(低峰期)  
	_, err := c.AddFunc("0 0 2 * * ?", func() {  
		fmt.Printf("开始定时缓存预热,时间:%s\n", time.Now().Format(time.DateTime))  
		if err := cacheWarmupHotProducts(); err != nil {  
			fmt.Printf("定时预热失败: %v\n", err)  
		}  
	})  
	if err != nil {  
		panic(fmt.Sprintf("cron add func failed: %v", err))  
	}  
	c.Start()  

	// 阻塞主线程  
	select {}  
}  

4. 注意事项

  • 预热时机:选择系统低峰期(如凌晨)执行,避免占用高峰期资源。
  • 分批加载:海量数据避免一次性加载,分批执行防止 Redis / 数据库压力过大。
  • 过期时间:预热数据需设置合理过期时间,避免缓存数据长期失效。
  • 一致性校验:预热后可校验缓存数据与数据库一致性,避免加载错误数据。
  • 资源限制:控制预热进程的内存 / CPU 占用,避免影响核心业务。

5. 面试题

  • 缓存预热的核心目的是什么?如何避免预热过程中对数据库造成压力?
  • 海量数据场景下,缓存预热有哪些优化策略?(分批、异步、增量预热)
  • 预热数据与数据库数据不一致该如何处理?(定时校验、增量更新)

二、缓存穿透

1. 定义

缓存穿透是指用户请求查询一个缓存和数据库中都不存在的数据,导致每次请求都穿透到数据库,数据库反复查询不存在的数据,最终引发数据库压力过大甚至宕机。

示意图:简单说,就是 Redis 查不到
image

2. 产生原因

  • 业务逻辑漏洞:如数据被删除后,缓存未同步清理,且数据库中已无该数据。
  • 恶意攻击:攻击者构造大量不存在的 key,高频发起请求,利用缓存穿透压垮数据库。

3. 解决方法

(思路:拦截无效请求)

(1)接口层参数校验(前置拦截)

在请求到达缓存 / 数据库前,先通过业务规则过滤无效请求。

(2)缓存空值或默认值

对于数据库中不存在的数据,在 Redis 中缓存一个空值(如 “”、null)或默认值,并设置较短的过期时间。

代码示例:

// 查询商品信息,处理缓存穿透(空值缓存)  
func getProductByID(id uint64) (*Product, error) {  
	key := fmt.Sprintf("product:%d", id)  

	// 1. 查询缓存  
	productCache, err := rdb.HGetAll(ctx, key).Result()  
	if err != nil && err != redis.Nil {  
		return nil, fmt.Errorf("redis query failed: %v", err)  
	}  

	// 2. 缓存存在:非空值直接返回,空值返回nil  
	if len(productCache) > 0 {  
		if productCache["id"] == "" { // 标记空值  
			return nil, nil  
		}  
		// 解析缓存数据  
		product := &Product{  
			ID:    id,  
			Name:  productCache["name"],  
			Price: parseFloat(productCache["price"]),  
			Stock: parseInt(productCache["stock"]),  
		}  
		return product, nil  
	}  

	// 3. 缓存不存在,查询数据库  
	var product Product  
	if err := db.Where("id = ?", id).First(&product).Error; err != nil {  
		if err == gorm.ErrRecordNotFound {  
			// 缓存空值,设置短过期时间(如5分钟),避免恶意攻击占满缓存  
			rdb.HSet(ctx, key, "id", "")  
			rdb.Expire(ctx, key, 5*time.Minute)  
			return nil, nil  
		}  
		return nil, fmt.Errorf("mysql query failed: %v", err)  
	}  

	// 4. 数据库存在,写入缓存  
	rdb.HSet(ctx, key, "id", product.ID, "name", product.Name, "price", product.Price, "stock", product.Stock)  
	rdb.Expire(ctx, key, 24*time.Hour)  

	return &product, nil  
}  

// 辅助函数:字符串转浮点数  
func parseFloat(s string) float64 {  
	f, _ := strconv.ParseFloat(s, 64)  
	return f  
}  

// 辅助函数:字符串转整数  
func parseInt(s string) int {  
	i, _ := strconv.Atoi(s)  
	return i  
}  

示意图:
image

(3)布隆过滤器(BloomFilter)

一种空间效率极高的概率型数据结构,可快速判断一个 key 是否存在于数据库中(存在误判,但不存在漏判),适合海量数据场景。

1)原理

提前将数据库中所有有效 key 通过哈希函数映射到布隆过滤器的比特数组中;请求到达时,先通过布隆过滤器判断 key 是否存在:

  • 若不存在,直接返回,无需查缓存和数据库。
  • 若存在,再走正常缓存 - 数据库流程。
2)代码示例
package main  

import (  
	"fmt"  
	"github.com/redis/go-redis/v9"  
	"github.com/tysonmote/gomemcached/client"  
	"math/rand"  
	"time"  
)  

// 布隆过滤器(基于Redis Bitmap实现)  
type BloomFilter struct {  
	rdb    *redis.Client  
	ctx    context.Context  
	key    string   // Redis Bitmap key  
	hashFn []func([]byte) uint64 // 多个哈希函数  
	size   uint64   // Bitmap大小  
}  

// 初始化布隆过滤器  
func NewBloomFilter(rdb *redis.Client, ctx context.Context, key string, size uint64) *BloomFilter {  
	return &BloomFilter{  
		rdb:  rdb,  
		ctx:  ctx,  
		key:  key,  
		size: size,  
		hashFn: []func([]byte) uint64{  
			hashFn1,  
			hashFn2,  
			hashFn3,  
		},  
	}  
}  

// 哈希函数1  
func hashFn1(data []byte) uint64 {  
	// 简化实现,实际使用MurmurHash/CRC等高效哈希  
	var h uint64  
	for _, b := range data {  
		h = h*31 + uint64(b)  
	}  
	return h  
}  

// 哈希函数2  
func hashFn2(data []byte) uint64 {  
	var h uint64  
	for _, b := range data {  
		h = h*131 + uint64(b)  
	}  
	return h  
}  

// 哈希函数3  
func hashFn3(data []byte) uint64 {  
	var h uint64  
	for _, b := range data {  
		h = h*13131 + uint64(b)  
	}  
	return h  
}  

// 添加数据到布隆过滤器  
func (bf *BloomFilter) Add(data []byte) error {  
	for _, fn := range bf.hashFn {  
		index := fn(data) % bf.size  
		if err := bf.rdb.SetBit(bf.ctx, bf.key, int64(index), 1).Err(); err != nil {  
			return err  
		}  
	}  
	return nil  
}  

// 检查数据是否存在(存在误判,不存在则一定不存在)  
func (bf *BloomFilter) Exists(data []byte) (bool, error) {  
	for _, fn := range bf.hashFn {  
		index := fn(data) % bf.size  
		bit, err := bf.rdb.GetBit(bf.ctx, bf.key, int64(index)).Result()  
		if err != nil {  
			return false, err  
		}  
		if bit == 0 {  
			return false, nil // 任意一个位为0,数据一定不存在  
		}  
	}  
	return true, nil // 所有位为1,数据可能存在  
}  

// 初始化布隆过滤器:加载所有有效商品ID  
func initBloomFilter() (*BloomFilter, error) {  
	bf := NewBloomFilter(rdb, ctx, "bloom:product", 10000000) // 1000万位  

	// 查询所有有效商品ID  
	var productIDs []uint64  
	if err := db.Model(&Product{}).Pluck("id", &productIDs).Error; err != nil {  
		return nil, err  
	}  

	// 批量添加到布隆过滤器  
	for _, id := range productIDs {  
		if err := bf.Add([]byte(fmt.Sprintf("%d", id))); err != nil {  
			return nil, err  
		}  
	}  
	return bf, nil  
}  

// 使用布隆过滤器处理缓存穿透  
func getProductWithBloomFilter(id uint64, bf *BloomFilter) (*Product, error) {  
	// 1. 布隆过滤器快速判断:不存在则直接返回  
	exists, err := bf.Exists([]byte(fmt.Sprintf("%d", id)))  
	if err != nil {  
		return nil, err  
	}  
	if !exists {  
		return nil, nil  
	}  

	// 2. 布隆过滤器命中,查询缓存/数据库(后续逻辑同空值缓存)  
	return getProductByID(id)  
}  

(注:fpp 是误判率,误判率越低,分配的 bit 越大,使用的 hash 函数越多)

示意图:
image

(4)布谷鸟过滤器(GuavaFilter)

(可补充:布谷鸟过滤器在某些场景下比布隆过滤器有优势,如支持删除操作,但实现相对复杂,适用于需要动态删除数据的场景)

4. 业务场景

  • 恶意攻击:黑客构造大量不存在的商品 ID / 用户 ID 请求接口,如 GET /product/999999(999999 不存在)。
  • 业务异常:前端参数校验缺失,用户输入无效 ID(如负数、超长字符串),导致请求穿透到数据库。
  • 数据删除:数据从数据库删除后,缓存未及时清理,后续请求仍穿透查询。

5. 注意事项

  • 空值缓存过期时间:不宜过长(如 5 - 10 分钟),避免正常数据入库后缓存未更新;也不宜过短,否则无法抵御高频攻击。
  • 布隆过滤器误判率:哈希函数数量越多、Bitmap 越大,误判率越低,但占用内存越高,需根据业务平衡。
  • 布隆过滤器更新:数据库数据新增 / 删除时,需同步更新布隆过滤器,避免误判。
  • 参数校验:前端 / 网关层先校验参数合法性(如 ID 范围、格式),从源头拦截无效请求。

6. 面试题

  • 缓存穿透的根本原因是什么?空值缓存和布隆过滤器各有什么优缺点?
  • 布隆过滤器的误判率如何计算?如何降低误判率?
  • 除了空值缓存和布隆过滤器,还有哪些方式可以解决缓存穿透?(网关拦截、参数校验、限流)

三、缓存击穿

1. 定义

缓存击穿是指一个热点 Key(如秒杀商品、热门资讯)在缓存过期的瞬间,恰好有大量请求命中该 Key,导致所有请求瞬间穿透到数据库,造成数据库瞬时压力过大。

2. 产生原因

  • 热点 key 过期 + 高频访问:时间到了自然清除。
  • delete 掉的 key,刚巧又被访问了(业务逻辑导致)。

3. 解决方法

(思路:保护热点 key 的缓存重建)

(1)互斥锁(分布式锁)

通过分布式锁,保证同一时间只有一个请求能查询数据库并重建缓存,其他请求等待重试。

代码示例:

import (  
	"fmt"  
	"github.com/redis/go-redis/v9"  
	"sync"  
	"time"  
)  

var mutex sync.Mutex // 本地锁(单机),分布式场景用Redis/Zookeeper锁  

// 分布式锁(Redis实现)  
func getDistributedLock(key string, expire time.Duration) (bool, error) {  
	// SET NX EX:不存在则设置,同时设置过期时间,避免死锁  
	ok, err := rdb.SetNX(ctx, key, "lock", expire).Result()  
	if err != nil {  
		return false, err  
	}  
	return ok, nil  
}  

// 释放分布式锁  
func releaseDistributedLock(key string) error {  
	// 用Lua脚本保证原子性:判断锁是否存在,存在则删除  
	script := `  
		if redis.call('get', KEYS[1]) == ARGV[1] then  
			return redis.call('del', KEYS[1])  
		else  
			return 0  
		end  
	`  
	_, err := rdb.Eval(ctx, script, []string{key}, "lock").Result()  
	return err  
}  

// 查询商品(互斥锁解决缓存击穿)  
func getProductWithLock(id uint64) (*Product, error) {  
	key := fmt.Sprintf("product:%d", id)  
	lockKey := fmt.Sprintf("lock:product:%d", id)  

	// 1. 查询缓存  
	productCache, err := rdb.HGetAll(ctx, key).Result()  
	if err != nil && err != redis.Nil {  
		return nil, err  
	}  
	if len(productCache) > 0 {  
		// 解析缓存数据(同前文)  
		product := &Product{  
			ID:    id,  
			Name:  productCache["name"],  
			Price: parseFloat(productCache["price"]),  
			Stock: parseInt(productCache["stock"]),  
		}  
		return product, nil  
	}  

	// 2. 缓存不存在,获取分布式锁  
	lockOk, err := getDistributedLock(lockKey, 5*time.Second)  
	if err != nil {  
		return nil, err  
	}  
	if !lockOk {  
		// 未获取到锁,休眠后重试(避免频繁请求)  
		time.Sleep(100 * time.Millisecond)  
		return getProductWithLock(id)  
	}  
	// 确保锁释放  
	defer releaseDistributedLock(lockKey)  

	// 3. 获取锁成功,查询数据库  
	var product Product  
	if err := db.Where("id = ?", id).First(&product).Error; err != nil {  
		if err == gorm.ErrRecordNotFound {  
			// 缓存空值  
			rdb.HSet(ctx, key, "id", "")  
			rdb.Expire(ctx, key, 5*time.Minute)  
			return nil, nil  
		}  
		return nil, err  
	}  

	// 4. 写入缓存  
	rdb.HSet(ctx, key, "id", product.ID, "name", product.Name, "price", product.Price, "stock", product.Stock)  
	rdb.Expire(ctx, key, 24*time.Hour)  

	return &product, nil  
}  

(2)热点 key 永不过期(或动态续期)

业务主动更新(如商品库存变化时同步更新缓存),或定时任务异步更新(如每小时刷新一次热点商品数据),结合互斥更新、双检加锁。

代码示例:

// 热点Key永不过期 + 后台定时更新  
func getHotProductNeverExpire(id uint64) (*Product, error) {  
	key := fmt.Sprintf("product:hot:%d", id)  

	// 1. 查询缓存(永不过期)  
	productCache, err := rdb.HGetAll(ctx, key).Result()  
	if err != nil && err != redis.Nil {  
		return nil, err  
	}  
	if len(productCache) > 0 {  
		product := &Product{  
			ID:    id,  
			Name:  productCache["name"],  
			Price: parseFloat(productCache["price"]),  
			Stock: parseInt(productCache["stock"]),  
		}  
		return product, nil  
	}  

	// 2. 缓存不存在,查询数据库并写入(永不过期)  
	var product Product  
	if err := db.Where("id = ?", id).First(&product).Error; err != nil {  
		return nil, err  
	}  
	rdb.HSet(ctx, key, "id", product.ID, "name", product.Name, "price", product.Price, "stock", product.Stock)  

	// 3. 后台定时更新缓存(单独goroutine)  
	go func() {  
		ticker := time.NewTicker(5 * time.Minute)  
		defer ticker.Stop()  
		for range ticker.C {  
			var latestProduct Product  
			if err := db.Where("id = ?", id).First(&latestProduct).Error; err != nil {  
				continue  
			}  
			rdb.HSet(ctx, key, "id", latestProduct.ID, "name", latestProduct.Name, "price", latestProduct.Price, "stock", latestProduct.Stock)  
		}  
	}()  

	return &product, nil  
}  

(3)热点 key 预热(Preload)

在业务高峰期,通过定时任务或脚本提前将热点数据加载到 Redis 中,并设置合理的过期时间。

4. 业务场景

  • 秒杀商品:某秒杀商品的缓存过期时间到了,恰好此时上万用户同时抢购,请求全部打到数据库。
  • 热门榜单:APP 首页热门榜单缓存过期,大量用户刷新页面,请求穿透到数据库。
  • 热点接口:某个高频访问的接口缓存过期,瞬时请求量击穿数据库。

5. 注意事项

  • 锁的粒度:分布式锁需按 Key 粒度设置,避免全局锁导致性能瓶颈。
  • 锁过期时间:设置合理的锁过期时间,避免死锁(如 5 - 10 秒,需大于数据库查询耗时)。
  • 重试机制:未获取锁的请求需添加重试 / 休眠逻辑,避免高频重试。
  • 永不过期 Key 的更新:后台定时更新需保证幂等性,避免更新时数据不一致。
  • 热点 Key 识别:通过监控(如 Redis 慢查询、访问频次)识别热点 Key,针对性处理。

6. 面试题

  • 缓存击穿和缓存穿透的区别是什么?如何针对性解决?
  • 分布式锁解决缓存击穿的核心原理?如何避免死锁?
  • 热点 Key 永不过期方案有什么潜在问题?(数据一致性、内存占用)
  • 除了互斥锁和永不过期,还有哪些解决缓存击穿的方案?(预热延长过期时间、熔断降级)

四、缓存雪崩

1. 定义

缓存雪崩是指大量缓存 Key 在同一时间段内过期,或者 Redis 服务宕机,导致所有请求瞬间穿透到数据库,数据库无法承受海量请求而宕机,最终引发整个系统雪崩。

2. 产生原因

  • key 集中过期。
  • Redis 集群不可用。

3. 解决方法

(思路:分散风险 + 保障缓存高可用)

(1)避免 key 集中过期

过期时间加随机值。

代码示例:

// 设置缓存时添加随机过期时间,避免批量过期  
func setCacheWithRandomExpire(key string, fields map[string]interface{}) error {  
	baseExpire := 24 * time.Hour  
	randomExpire := time.Duration(rand.Intn(3600)) * time.Second // 0-1小时随机  
	totalExpire := baseExpire + randomExpire  

	pipe := rdb.Pipeline()  
	pipe.HSet(ctx, key, fields)  
	pipe.Expire(ctx, key, totalExpire)  
	_, err := pipe.Exec(ctx)  
	return err  
}  

(2)缓存分层

引入 “本地缓存 + 分布式缓存” 的双层架构,即使分布式缓存(Redis)失效,本地缓存也能临时扛住部分请求。

(3)保证 Redis 高可用

  • 主从复制 + 哨兵。
  • 分片集群。

(4)服务熔断和降级

  • 熔断:适用 Hystrix、Sentinel 等组件,当数据库请求失败率超过阈值,自动熔断数据库访问,直接返回默认值(如 “服务繁忙,请稍后重试”)。
  • 降级:有限保障核心业务,非核心业务直接返回缓存旧数据或默认值,减少数据库请求。

代码示例:

import (  
	"github.com/gin-gonic/gin"  
	"golang.org/x/time/rate"  
	"net/http"  
	"sync"  
)  

// 限流器  
var limiter = rate.NewLimiter(100, 200) // 每秒100个请求,突发200  
var downgradeLock sync.Mutex  
var isDowngrade bool = false  

// 接口降级+限流处理缓存雪崩  
func productHandler(c *gin.Context) {  
	// 1. 限流:超过阈值直接返回  
	if !limiter.Allow() {  
		c.JSON(http.StatusTooManyRequests, gin.H{"code": 429, "msg": "请求过于频繁"})  
		return  
	}  

	// 2. 降级:Redis宕机时返回默认数据/提示  
	downgradeLock.Lock()  
	defer downgradeLock.Unlock()  
	if isDowngrade {  
		c.JSON(http.StatusOK, gin.H{"code": 200, "msg": "系统临时维护,请稍后重试"})  
		return  
	}  

	// 3. 正常业务逻辑  
	idStr := c.Param("id")  
	id, _ := strconv.ParseUint(idStr, 10, 64)  
	product, err := getProductByID(id)  
	if err != nil {  
		// 检测到Redis/数据库异常,触发降级  
		isDowngrade = true  
		c.JSON(http.StatusInternalServerError, gin.H{"code": 500, "msg": "服务异常"})  
		return  
	}  
	c.JSON(http.StatusOK, gin.H{"code": 200, "data": product})  
}  

4. 业务场景

  • 批量过期:初始化缓存时,所有 Key 设置了相同的过期时间(如 24 小时),导致 24 小时后大量 Key 同时过期。
  • Redis 宕机:Redis 集群故障(如主从切换、网络中断),所有缓存请求无法响应,全部打到数据库。
  • 大促场景:大促前批量预热的缓存,过期时间集中在大促结束后,导致过期瞬间请求穿透。

5. 注意事项

  • 过期时间分散:随机过期时间需控制范围(如 ±1 小时),避免过期时间过长 / 过短。
  • Redis 高可用:采用主从复制 + 哨兵 / 集群模式,避免单点故障;配置持久化(RDB + AOF)防止数据丢失。
  • 降级策略:降级需定义清晰的触发条件(如 Redis 超时、数据库压力阈值)和恢复机制。
  • 限流粒度:按接口 / 用户 / IP 限流,避免恶意请求占满资源。
  • 监控告警:实时监控 Redis 健康状态、缓存命中率、数据库 QPS,异常时及时告警。

6. 面试题

  • 缓存雪崩的核心原因是什么?如何从缓存层、数据库层、系统层全方位解决?
  • 过期时间随机化的实现原理?随机范围如何选择?
  • Redis 集群宕机时,除了降级限流,还有哪些兜底方案?(本地缓存、熔断)
  • 缓存雪崩和缓存击穿的区别?(击穿是单个热点 Key,雪崩是大量 Key / Redis 整体故障)