首页IT科技Webflux原理((WebFlux)004、WebFilter踩坑记录)

Webflux原理((WebFlux)004、WebFilter踩坑记录)

时间2025-06-19 01:52:07分类IT科技浏览5056
导读:一、背景 使用SpringWebFlux的WebFilter时,由于不熟悉或一些思考疏忽,容易出现未知的异常。记录一下排查与解决方案,给大家分享一下。...

一             、背景

使用SpringWebFlux的WebFilter时            ,由于不熟悉或一些思考疏忽                   ,容易出现未知的异常            。记录一下排查与解决方案        ,给大家分享一下                   。

二                    、问题

2.1 问题描述

在测试接口方法时         ,出现的错误信息如下(对一些项目路径做了修改):

java.lang.IllegalStateException: COMPLETED at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ com.xxx.config.LoginWebFilter$$EnhancerBySpringCGLIB$$f3da6bdf [DefaultWebFilterChain] *__checkpoint ⇢ com.xxx.config.TraceIdFilter [DefaultWebFilterChain] *__checkpoint ⇢ HTTP POST "/abc/test/testMethod" [ExceptionHandlingWebHandler] Original Stack Trace: at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451) at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105)

2.2 解决问题

通过查看错误信息描述                  ,checkpoint点都在webfilter中           ,由于对webflux也不是特别熟      ,所以就只有一个个测试        。

通过一系列操作                  , 把swagger移除              ,细读TraceIdFilter(内容不多)   ,主要归功于原方案是正确的                  ,修改后错误                 ,最后才定位问题出现在LoginWebFilter         。

说说插曲,原实现方式(有阻塞逻辑               ,没出现上述异常)                    ,代码如下:

@Configuration @Slf4j @Order(-10) public class LoginWebFilter implements WebFilter { // 略... @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); if (!enableGateway) { String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(""); // 获取用户信息 User user = getUser(token); if (user != null) { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .build(); exchange = exchange.mutate().request(mutateRequest).build(); } } return chain.filter(exchange); } private User getUser(String token) { if (StringUtils.isNotBlank(token)) { return redisTemplate.opsForValue().get("xxx:tk:" + token) .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class))).block(); } return null; } }

这样写    ,没有复杂的业务逻辑            ,从上到下                   ,完全OJBK        ,但是调整后         ,就出现了上述异常                  。

改完后的问题代码如下:

// 错误 public class LoginWebFilter implements WebFilter { /...略 @Autowired private ReactiveStringRedisTemplate redisTemplate; @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(""); return getUser(token).flatMap(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build(); return chain.filter(newexchange); // 问题点 }).switchIfEmpty(chain.filter(exchange)); } return chain.filter(exchange); } // 不在用block private Mono<User> getUser(String token) { if (StringUtils.isNotBlank(token)) { return redisTemplate.opsForValue().get("xxx:tk:" + token) .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class))); } return Mono.empty(); } }

2.3 如何解决

对比改造前和改造后的代码                  ,其实差异不大           ,那问题出现在哪呢?

由于对webflux也不是特别熟      ,那就只能一点点试(太蠢了)           。 最后发现问题出现在了switchIfEmpty(chain.filter(exchange))                  ,在去掉了switchIfEmpty(chain.filter(exchange))              ,就不会在出现上述异常      。

修改后部分代码如下:

// 半正确 @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(“             ”); return getUser(token).flatMap(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build(); return chain.filter(newexchange); }); } return chain.filter(exchange); }

虽然现在不回在出现异常   ,但是去掉switchIfEmpty后                  ,代码逻辑是不完整的                 ,当获取不到User时,返回Mono.emtpy               ,那会直接结束流程                    ,不在执行剩下的filter或其他逻辑                  。真是连环坑    ,一坑接一坑              。所以对代码需要调整一番            ,调整后如下:

// 有点正确 但是不多 @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(“                    ”); return getUser(token).switchIfEmpty(Mono.error(() -> new BizException(ErrorCode.USER_IS_NULL_ERROR))) .flatMap(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build(); return chain.filter(newexchange); }).onErrorResume(e -> chain.filter(exchange)); } return chain.filter(exchange); }

当获取用户为空后                   ,抛出异常        ,然后在兜底         ,当异常的时候执行chain.filter(exchange)(好蠢的方式.. 但是解决问题了)   。

2.4 意外之喜

各位看官                  ,就在我写完上完上面的代码修改方案之后           ,读了一下修改完后的代码      ,突然发现问题出在哪了                  ,所以连夜修改了代码方式                  。现在我听我细细道来                 。

2.4.1 问题点

原因点:chain.filter(exchange)重复执行

switchIfEmpty(chain.filter(exchange))这个点本意是想用在当getUser 方法为空时              ,执行其它WebFilter的逻辑   ,从而不影响主流程。

忽略了一点是:当chain.filter(newexchange)这个方法执行完后                  ,返回的也是Mono<Void>                 ,也是为空               。所以无论如何,代码最后的逻辑都会走到switchIfEmpty(chain.filter(exchange))                    。

但是当getUser获取到用户后               ,会重复执行chain.filter(exchange)                    ,如下

return chain.filter(newexchange) switchIfEmpty(chain.filter(exchange))

由于第一次执行完chain.filter(exchange)    ,request      、response都已经关闭            ,所以出现了xx COMPLETE                   ,那看来的确符合逻辑    。

2.4.2 验证猜想

这个验证方式还是挺简单的        ,那就是分别传入正常的TOKEN和错误的TOKEN            。

具体操作:.....(本人已完成)

结论:

当传入错误的token的时候         ,确实没有抛出异常                  ,完美执行                   。但是当传入正确的token           ,出现了熟悉的异常        。

2.4.3 代码调整

知道问题的原因      ,那就好调整代码了         。修改后如下:

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(request.getHeaders().getFirst("suuid")); return getUser(token).map(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); return exchange.mutate().request(mutateRequest).build(); // 调整当getUser为空时                  ,返回的内容 }).switchIfEmpty(Mono.just(exchange)).flatMap(chain::filter); } return chain.filter(exchange); }

至此              ,问题就完全解决拉!心里美滋滋!

三         、总结

1                    、遇到问题   ,还是要多看看呀                  ,细细思考一下

2         、多看代码                 ,发现问题,实现完美的解决方案

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
intellij idea maven配置(Maven安装与配置,Idea配置Maven) 文件打开方式默认了怎么改(文件打开方式设置默认教程)