接口幂等如何保证
1. 基础
可以先定义一个注解@Idempotent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Idempotent {
/**
* 设置防重令牌 Key 前缀
*/
String keyPrefix() default "";
/**
* 通过 SpEL 表达式生成的唯一 Key
*/
String key();
/**
* 设置防重令牌 Key 过期时间,单位秒,默认 1 小时
*/
long keyTimeout() default 3600L;
}通过Aop来进行幂等性保证,这里先提供一个获得方法上注解的方法
1
2
3
4
5
public static Idempotent getIdempotent(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
return targetMethod.getAnnotation(Idempotent.class);
}2. Http 接口
这里可以有3种方案:
- 基于 SpEL 方法验证请求幂等性
- 基于 Token 验证请求幂等性
- 基于方法参数验证请求幂等性
2.1 基于 SpEL 方法验证请求幂等性
这个方案主要是配合Spel表达式来生成唯一Key。
例如:
1
2
3
4
5
@NoMQDuplicateConsume(
keyPrefix = "coupon_task_execute:idempotent:",
key = "#messageWrapper.message.couponTaskId",
keyTimeout = 120
)在切面逻辑中,通过这个来生成唯一Key。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Object handler(ProceedingJoinPoint joinPoint) {
Idempotent idempotent = IdempotentAspect.getIdempotent(joinPoint);
String key = (String) SpELUtil.parseKey(idempotent.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());
String uniqueKey = idempotent.uniqueKeyPrefix() + key;
Object result = null;
try {
if (!lock.tryLock()) {
throw new ClientException(wrapper.getIdempotent().message());
}
result = joinPoint.proceed();
} finally {
if (lock != null) {
lock.unlock();
}
}
return result;
}具体逻辑就是解析Spel表达式获取到唯一id来生成唯一Key,这个Key作为唯一锁标示进行加锁,获得锁的进行业务处理,否则抛出异常。
2.2 基于方法参数验证请求幂等性
这个方法逻辑和上面其实是一样的,唯一的区别是生成唯一Key的逻辑不一样,这个方法是通过请求的方法参数来生产唯一key,比如说hashcode或者md5处理。
2.3 基于 Token 验证请求幂等性
首先会定义一个获取token的接口,并将生成的token存入Redis,且设置过期时间。
前端在发起请求时会带上接口返回的token,后端则会取出这个token,并直接删除Redis里存放的这个token,方法返回true代表删除成功,false代表Key不存在。根据返回值决定是否抛出异常。
3. MQ消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {
Idempotent idempotent = getNoMQDuplicateConsumeAnnotation(joinPoint);
String uniqueKey = idempotent.keyPrefix() + SpELUtil.parseKey(idempotent.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());
String absentAndGet = stringRedisTemplate.execute(
RedisScript.of(LUA_SCRIPT, String.class),
List.of(uniqueKey),
IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),
String.valueOf(TimeUnit.SECONDS.toMillis(idempotent.keyTimeout()))
);
// 如果不为空证明已经有
if (Objects.nonNull(absentAndGet)) {
boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);
log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");
if (errorFlag) {
throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));
}
return null;
}
Object result;
try {
// 执行标记了消息队列防重复消费注解的方法原逻辑
result = joinPoint.proceed();
// 设置防重令牌 Key 过期时间,单位秒
stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), idempotent.keyTimeout(), TimeUnit.SECONDS);
} catch (Throwable ex) {
// 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费
stringRedisTemplate.delete(uniqueKey);
throw ex;
}
return result;
}这里定义了两种状态:
1
2
3
4
5
6
7
8
9
/**
* 消费中
*/
CONSUMING("0"),
/**
* 已消费
*/
CONSUMED("1");进入方法时,会先执行一段Lua脚本
1
2
3
4
5
6
private static final String LUA_SCRIPT = """
local key = KEYS[1]
local value = ARGV[1]
local expire_time_ms = ARGV[2]
return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
""";这个脚本尝试在 Redis 中设置一个键值对(key-value),并设置键的过期时间为指定的毫秒数(PX 选项),但仅当键不存在时生效(NX 选项)。如果键已经存在,则返回键的旧值(GET 选项)。
如果absentAndGet不为空说明之前消费过,这时候就要判断absentAndGet的值,如果是CONSUMING,就说明上次消费还未结束,现在这次操作就是重复消费了,直接抛出异常,如果为CONSUMED,就说明已经成功消费了,直接返回null。
如果absentAndGet为空,说明是第一次消费,先执行原始逻辑,最后修改之前设置的Redis key的值为CONSUMED。
但是如果执行过程中抛出了异常,就要及时删除这个Key,通过异常的抛出让消息队列进行消息消费重试。


