首页IT科技Webflux原理((WebFlux)004、WebFilter踩坑记录)

Webflux原理((WebFlux)004、WebFilter踩坑记录)

时间2025-04-29 02:36:15分类IT科技浏览4052
导读:一、背景 使用SpringWebFlux的WebFilter时,由于不熟悉或一些思考疏忽,容易出现未知的异常。记录一下排查与解决方案,给大家分享一下。...

一            、背景

使用SpringWebFlux的WebFilter时          ,由于不熟悉或一些思考疏忽                  ,容易出现未知的异常          。记录一下排查与解决方案      ,给大家分享一下                  。

二                、问题

2.1 问题描述

在测试接口方法时       ,出现的错误信息如下(对一些项目路径做了修改):

java.lang.IllegalStateException: COMPLETED at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ com.xxx.config.LoginWebFilter$$EnhancerBySpringCGLIB$$f3da6bdf [DefaultWebFilterChain] *__checkpoint ⇢ com.xxx.config.TraceIdFilter [DefaultWebFilterChain] *__checkpoint ⇢ HTTP POST "/abc/test/testMethod" [ExceptionHandlingWebHandler] Original Stack Trace: at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451) at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105)

2.2 解决问题

通过查看错误信息描述                  ,checkpoint点都在webfilter中         ,由于对webflux也不是特别熟    ,所以就只有一个个测试      。

通过一系列操作                 , 把swagger移除            ,细读TraceIdFilter(内容不多)  ,主要归功于原方案是正确的                ,修改后错误               ,最后才定位问题出现在LoginWebFilter       。

说说插曲,原实现方式(有阻塞逻辑             ,没出现上述异常)                  ,代码如下:

@Configuration @Slf4j @Order(-10) public class LoginWebFilter implements WebFilter { // 略... @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); if (!enableGateway) { String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(""); // 获取用户信息 User user = getUser(token); if (user != null) { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .build(); exchange = exchange.mutate().request(mutateRequest).build(); } } return chain.filter(exchange); } private User getUser(String token) { if (StringUtils.isNotBlank(token)) { return redisTemplate.opsForValue().get("xxx:tk:" + token) .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class))).block(); } return null; } }

这样写   ,没有复杂的业务逻辑          ,从上到下                  ,完全OJBK      ,但是调整后       ,就出现了上述异常                  。

改完后的问题代码如下:

// 错误 public class LoginWebFilter implements WebFilter { /...略 @Autowired private ReactiveStringRedisTemplate redisTemplate; @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(""); return getUser(token).flatMap(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build(); return chain.filter(newexchange); // 问题点 }).switchIfEmpty(chain.filter(exchange)); } return chain.filter(exchange); } // 不在用block private Mono<User> getUser(String token) { if (StringUtils.isNotBlank(token)) { return redisTemplate.opsForValue().get("xxx:tk:" + token) .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class))); } return Mono.empty(); } }

2.3 如何解决

对比改造前和改造后的代码                  ,其实差异不大         ,那问题出现在哪呢?

由于对webflux也不是特别熟    ,那就只能一点点试(太蠢了)         。 最后发现问题出现在了switchIfEmpty(chain.filter(exchange))                 ,在去掉了switchIfEmpty(chain.filter(exchange))            ,就不会在出现上述异常    。

修改后部分代码如下:

// 半正确 @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(“            ”); return getUser(token).flatMap(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build(); return chain.filter(newexchange); }); } return chain.filter(exchange); }

虽然现在不回在出现异常  ,但是去掉switchIfEmpty后                ,代码逻辑是不完整的               ,当获取不到User时,返回Mono.emtpy             ,那会直接结束流程                  ,不在执行剩下的filter或其他逻辑                 。真是连环坑   ,一坑接一坑            。所以对代码需要调整一番          ,调整后如下:

// 有点正确 但是不多 @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(“                ”); return getUser(token).switchIfEmpty(Mono.error(() -> new BizException(ErrorCode.USER_IS_NULL_ERROR))) .flatMap(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build(); return chain.filter(newexchange); }).onErrorResume(e -> chain.filter(exchange)); } return chain.filter(exchange); }

当获取用户为空后                  ,抛出异常      ,然后在兜底       ,当异常的时候执行chain.filter(exchange)(好蠢的方式.. 但是解决问题了)  。

2.4 意外之喜

各位看官                  ,就在我写完上完上面的代码修改方案之后         ,读了一下修改完后的代码    ,突然发现问题出在哪了                 ,所以连夜修改了代码方式                。现在我听我细细道来               。

2.4.1 问题点

原因点:chain.filter(exchange)重复执行

switchIfEmpty(chain.filter(exchange))这个点本意是想用在当getUser 方法为空时            ,执行其它WebFilter的逻辑  ,从而不影响主流程。

忽略了一点是:当chain.filter(newexchange)这个方法执行完后                ,返回的也是Mono<Void>               ,也是为空             。所以无论如何,代码最后的逻辑都会走到switchIfEmpty(chain.filter(exchange))                  。

但是当getUser获取到用户后             ,会重复执行chain.filter(exchange)                  ,如下

return chain.filter(newexchange) switchIfEmpty(chain.filter(exchange))

由于第一次执行完chain.filter(exchange)   ,request      、response都已经关闭          ,所以出现了xx COMPLETE                  ,那看来的确符合逻辑   。

2.4.2 验证猜想

这个验证方式还是挺简单的      ,那就是分别传入正常的TOKEN和错误的TOKEN          。

具体操作:.....(本人已完成)

结论:

当传入错误的token的时候       ,确实没有抛出异常                  ,完美执行                  。但是当传入正确的token         ,出现了熟悉的异常      。

2.4.3 代码调整

知道问题的原因    ,那就好调整代码了       。修改后如下:

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(request.getHeaders().getFirst("suuid")); return getUser(token).map(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); return exchange.mutate().request(mutateRequest).build(); // 调整当getUser为空时                 ,返回的内容 }).switchIfEmpty(Mono.just(exchange)).flatMap(chain::filter); } return chain.filter(exchange); }

至此            ,问题就完全解决拉!心里美滋滋!

三         、总结

1                、遇到问题  ,还是要多看看呀                ,细细思考一下

2         、多看代码               ,发现问题,实现完美的解决方案

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
seo网站排名优化案例(SEO绩效考核:优化网站排名,增加流量转化) 最小巧的全画幅微单(最小全画幅的数码相机是什么)