1. 基础

可以先定义一个注解@Idempotent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Idempotent {

    /**
     * 设置防重令牌 Key 前缀
     */
    String keyPrefix() default "";

    /**
     * 通过 SpEL 表达式生成的唯一 Key
     */
    String key();

    /**
     * 设置防重令牌 Key 过期时间,单位秒,默认 1 小时
     */
    long keyTimeout() default 3600L;
}

通过Aop来进行幂等性保证,这里先提供一个获得方法上注解的方法

1
2
3
4
5
public static Idempotent getIdempotent(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
    MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
    Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
    return targetMethod.getAnnotation(Idempotent.class);
}

2. Http 接口

这里可以有3种方案:

  • 基于 SpEL 方法验证请求幂等性
  • 基于 Token 验证请求幂等性
  • 基于方法参数验证请求幂等性

2.1 基于 SpEL 方法验证请求幂等性

这个方案主要是配合Spel表达式来生成唯一Key。

例如:

1
2
3
4
5
@NoMQDuplicateConsume(
        keyPrefix = "coupon_task_execute:idempotent:",
        key = "#messageWrapper.message.couponTaskId",
        keyTimeout = 120
)

在切面逻辑中,通过这个来生成唯一Key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Object handler(ProceedingJoinPoint joinPoint) {
    Idempotent idempotent = IdempotentAspect.getIdempotent(joinPoint); 
    String key = (String) SpELUtil.parseKey(idempotent.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());    
    String uniqueKey = idempotent.uniqueKeyPrefix() + key;
    Object result = null;
    try {
        if (!lock.tryLock()) {
            throw new ClientException(wrapper.getIdempotent().message());
        }
        result = joinPoint.proceed();
    } finally {
        if (lock != null) {
            lock.unlock();
        }
    }
    return result;
}

具体逻辑就是解析Spel表达式获取到唯一id来生成唯一Key,这个Key作为唯一锁标示进行加锁,获得锁的进行业务处理,否则抛出异常。

2.2 基于方法参数验证请求幂等性

这个方法逻辑和上面其实是一样的,唯一的区别是生成唯一Key的逻辑不一样,这个方法是通过请求的方法参数来生产唯一key,比如说hashcode或者md5处理。

2.3 基于 Token 验证请求幂等性

首先会定义一个获取token的接口,并将生成的token存入Redis,且设置过期时间。

前端在发起请求时会带上接口返回的token,后端则会取出这个token,并直接删除Redis里存放的这个token,方法返回true代表删除成功,false代表Key不存在。根据返回值决定是否抛出异常。

3. MQ消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {
    Idempotent idempotent = getNoMQDuplicateConsumeAnnotation(joinPoint);
    String uniqueKey = idempotent.keyPrefix() + SpELUtil.parseKey(idempotent.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());

    String absentAndGet = stringRedisTemplate.execute(
            RedisScript.of(LUA_SCRIPT, String.class),
            List.of(uniqueKey),
            IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),
            String.valueOf(TimeUnit.SECONDS.toMillis(idempotent.keyTimeout()))
    );

    // 如果不为空证明已经有
    if (Objects.nonNull(absentAndGet)) {
        boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);
        log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");
        if (errorFlag) {
            throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));
        }
        return null;
    }

    Object result;
    try {
        // 执行标记了消息队列防重复消费注解的方法原逻辑
        result = joinPoint.proceed();

        // 设置防重令牌 Key 过期时间,单位秒
        stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), idempotent.keyTimeout(), TimeUnit.SECONDS);
    } catch (Throwable ex) {
        // 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费
        stringRedisTemplate.delete(uniqueKey);
        throw ex;
    }
    return result;
}

这里定义了两种状态:

1
2
3
4
5
6
7
8
9
/**
 * 消费中
 */
CONSUMING("0"),

/**
 * 已消费
 */
CONSUMED("1");

进入方法时,会先执行一段Lua脚本

1
2
3
4
5
6
private static final String LUA_SCRIPT = """
        local key = KEYS[1]
        local value = ARGV[1]
        local expire_time_ms = ARGV[2]
        return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
        """;

这个脚本尝试在 Redis 中设置一个键值对(key-value),并设置键的过期时间为指定的毫秒数(PX 选项),但仅当键不存在时生效(NX 选项)。如果键已经存在,则返回键的旧值(GET 选项)。

如果absentAndGet不为空说明之前消费过,这时候就要判断absentAndGet的值,如果是CONSUMING,就说明上次消费还未结束,现在这次操作就是重复消费了,直接抛出异常,如果为CONSUMED,就说明已经成功消费了,直接返回null。

如果absentAndGet为空,说明是第一次消费,先执行原始逻辑,最后修改之前设置的Redis key的值为CONSUMED

但是如果执行过程中抛出了异常,就要及时删除这个Key,通过异常的抛出让消息队列进行消息消费重试。