Webflux原理((WebFlux)004、WebFilter踩坑记录)
一 、背景
使用SpringWebFlux的WebFilter时 ,由于不熟悉或一些思考疏忽 ,容易出现未知的异常 。记录一下排查与解决方案 ,给大家分享一下 。
二 、问题
2.1 问题描述
在测试接口方法时 ,出现的错误信息如下(对一些项目路径做了修改):
java.lang.IllegalStateException: COMPLETED at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ com.xxx.config.LoginWebFilter$$EnhancerBySpringCGLIB$$f3da6bdf [DefaultWebFilterChain] *__checkpoint ⇢ com.xxx.config.TraceIdFilter [DefaultWebFilterChain] *__checkpoint ⇢ HTTP POST "/abc/test/testMethod" [ExceptionHandlingWebHandler] Original Stack Trace: at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451) at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105)2.2 解决问题
通过查看错误信息描述 ,checkpoint点都在webfilter中 ,由于对webflux也不是特别熟 ,所以就只有一个个测试 。
通过一系列操作 , 把swagger移除 ,细读TraceIdFilter(内容不多) ,主要归功于原方案是正确的 ,修改后错误 ,最后才定位问题出现在LoginWebFilter 。
说说插曲,原实现方式(有阻塞逻辑 ,没出现上述异常) ,代码如下:
@Configuration @Slf4j @Order(-10) public class LoginWebFilter implements WebFilter { // 略... @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); if (!enableGateway) { String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(""); // 获取用户信息 User user = getUser(token); if (user != null) { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .build(); exchange = exchange.mutate().request(mutateRequest).build(); } } return chain.filter(exchange); } private User getUser(String token) { if (StringUtils.isNotBlank(token)) { return redisTemplate.opsForValue().get("xxx:tk:" + token) .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class))).block(); } return null; } }这样写,没有复杂的业务逻辑 ,从上到下 ,完全OJBK ,但是调整后 ,就出现了上述异常 。
改完后的问题代码如下:
// 错误 public class LoginWebFilter implements WebFilter { /...略 @Autowired private ReactiveStringRedisTemplate redisTemplate; @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(""); return getUser(token).flatMap(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build(); return chain.filter(newexchange); // 问题点 }).switchIfEmpty(chain.filter(exchange)); } return chain.filter(exchange); } // 不在用block private Mono<User> getUser(String token) { if (StringUtils.isNotBlank(token)) { return redisTemplate.opsForValue().get("xxx:tk:" + token) .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class))); } return Mono.empty(); } }2.3 如何解决
对比改造前和改造后的代码 ,其实差异不大 ,那问题出现在哪呢?
由于对webflux也不是特别熟 ,那就只能一点点试(太蠢了) 。 最后发现问题出现在了switchIfEmpty(chain.filter(exchange)) ,在去掉了switchIfEmpty(chain.filter(exchange)) ,就不会在出现上述异常 。
修改后部分代码如下:
// 半正确 @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(“ ”); return getUser(token).flatMap(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build(); return chain.filter(newexchange); }); } return chain.filter(exchange); }虽然现在不回在出现异常 ,但是去掉switchIfEmpty后 ,代码逻辑是不完整的 ,当获取不到User时,返回Mono.emtpy ,那会直接结束流程 ,不在执行剩下的filter或其他逻辑 。真是连环坑,一坑接一坑 。所以对代码需要调整一番 ,调整后如下:
// 有点正确 但是不多 @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(“ ”); return getUser(token).switchIfEmpty(Mono.error(() -> new BizException(ErrorCode.USER_IS_NULL_ERROR))) .flatMap(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build(); return chain.filter(newexchange); }).onErrorResume(e -> chain.filter(exchange)); } return chain.filter(exchange); }当获取用户为空后 ,抛出异常 ,然后在兜底 ,当异常的时候执行chain.filter(exchange)(好蠢的方式.. 但是解决问题了) 。
2.4 意外之喜
各位看官 ,就在我写完上完上面的代码修改方案之后 ,读了一下修改完后的代码 ,突然发现问题出在哪了 ,所以连夜修改了代码方式 。现在我听我细细道来 。
2.4.1 问题点原因点:chain.filter(exchange)重复执行
switchIfEmpty(chain.filter(exchange))这个点本意是想用在当getUser 方法为空时 ,执行其它WebFilter的逻辑 ,从而不影响主流程。
忽略了一点是:当chain.filter(newexchange)这个方法执行完后 ,返回的也是Mono<Void> ,也是为空 。所以无论如何,代码最后的逻辑都会走到switchIfEmpty(chain.filter(exchange)) 。
但是当getUser获取到用户后 ,会重复执行chain.filter(exchange) ,如下
return chain.filter(newexchange) switchIfEmpty(chain.filter(exchange))由于第一次执行完chain.filter(exchange),request 、response都已经关闭 ,所以出现了xx COMPLETE ,那看来的确符合逻辑。
2.4.2 验证猜想这个验证方式还是挺简单的 ,那就是分别传入正常的TOKEN和错误的TOKEN 。
具体操作:.....(本人已完成)
结论:
当传入错误的token的时候 ,确实没有抛出异常 ,完美执行 。但是当传入正确的token ,出现了熟悉的异常 。
2.4.3 代码调整知道问题的原因 ,那就好调整代码了 。修改后如下:
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { if (!enableGateway) { ServerHttpRequest request = exchange.getRequest(); String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)) .orElse(request.getHeaders().getFirst("suuid")); return getUser(token).map(user -> { ServerHttpRequest mutateRequest = exchange.getRequest().mutate() .header(UserUtils.MEMBER_ID, user.getMemId()) .header(UserUtils.MOBILE, user.getMobile()) .build(); return exchange.mutate().request(mutateRequest).build(); // 调整当getUser为空时 ,返回的内容 }).switchIfEmpty(Mono.just(exchange)).flatMap(chain::filter); } return chain.filter(exchange); }至此 ,问题就完全解决拉!心里美滋滋!
三 、总结
1 、遇到问题 ,还是要多看看呀 ,细细思考一下
2 、多看代码 ,发现问题,实现完美的解决方案
创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!