首页IT科技抽刀断水水更流打一字(架构高可用之限流-抽刀断水水更流)

抽刀断水水更流打一字(架构高可用之限流-抽刀断水水更流)

时间2025-05-04 00:32:45分类IT科技浏览4450
导读:上图中是一个水坝泄洪的图,那么,对于软件系统,如何使用最方便的可编程的方式增加服务限流能力呢?...

上图中是一个水坝泄洪的图            ,那么                  ,对于软件系统      ,如何使用最方便的可编程的方式增加服务限流能力呢?

下面我结合一个常规的springCloud项目实践了一把         ,希望他山之石可以攻玉            。

背景

简单使用jmeter                  ,压20个并发         ,访问 列表查询接口 /worksheet/findInfo, 对应的服务崩溃                  。【apprun,common】

架构复杂度的一个种类是: 保护API和服务端点免受攻击      ,

比如:拒绝服务                  ,级联失败            ,或者 超额使用资源      。

限流是一种技术   ,来控制API或者服务的消费速度                  ,在分布式系统中               ,没有比集中式的配置和管理API的消费速度更好的选择,

只有这些请求在限定的速度内访问               ,才能保证API的正常                  ,更多的将会产生Http的 请求频繁错误         。

交互模型图:

SpringCloudGateway是一个简单和轻量级的组件   ,也是一种管理限制API的消费速度有效的方式                  。

springCloudGateway的限流模型:

目标

当前企业600人            ,按照两倍估算                  ,即1200人使用      ,高频接口秒并发限制为20         , 即有20个人同时使用同一个接口操作数据         。

需要增加限流和熔断的点:

组件 增加限制 业务说明 openresty 限流                  ,熔断 【统一】 保证流量再nginx的处理阈值         ,参考数据:5W/S gateway 限流      ,熔断 【统一】 保证每个API的访问速度在20/S 峰值40 ; apprun 高频接口限流                  ,每个接口统一分类定制熔断逻辑 限流可以复用封装的组件            ,熔断采用最简单的hystix ; devops 高频接口限流   ,每个接口统一分类定制熔断逻辑 限流可以复用封装的组件                  ,熔断采用最简单的hystix ; common 高频接口限流               ,每个接口统一分类定制熔断逻辑,feign定制熔断逻辑 限流可以复用封装的组件               ,熔断采用最简单的hystix ; job 高频接口限流                  ,每个接口统一分类定制熔断逻辑   ,feign定制熔断逻辑 限流可以复用封装的组件            ,熔断采用最简单的hystix ;

实现路径

网关做整体限制                  ,接口由业务来增加限流      。

gateway

gateway自带过滤器

RequestRateLimiter GatewayFilter工厂使用了RateLimiter实现来决定当前的并发请求是否允许处理      ,

如果不能处理         ,默认返回状态码 429 - 太多请求;

这个过滤器采用了可选的KeyResolver参数和对于速度限制的特殊参数                  ,下面会介绍                  。

keyResolver是一个实体实现了KeyResolver接口         ,配置指向一个bean的名字      ,

使用SpEL表达式            。 #{@myKeyResolver} 是一个SPEL表达式指向了一个叫做myKeyResolver的bean,下面展示了 KeyResolver接口;

public interface KeyResolver { Mono<String> resolve(ServerWebExchange exchange); }

keyResolver接口是的插件策略驱动请求限制                  ,再未来的里程碑版本            ,将会由一些KeyResolver的实现   。

默认实现KeyResolver的类是 PrincipalNameKeyResolver   , 会接受ServerWebExchange的Principal参数                  , 并且会调用 Principal.getName()方法                  。

默认的               ,如果KeyResolver没有找到key, 请求会被拒绝,你可以配置这个行为               。

spring.cloud.gateway.filter.request-rate-limiter.deny-empty-key=true spring.cloud.gateway.filter.request-rate-limiter.empty-key-status-code=xxxx

注意: RequestRateLimiter没有配置短注解               ,下面的例子是非法的。

spring.cloud.gateway.routes[0].filters[0]=RequestRateLimiter=2, 2, #{@userkeyresolver}

RedisLimiter介绍

Redis实现是基于Stripe . 它需要使用 spring-boot-starter-data-redis-reactive 这个starter ;

算法使用的是令牌桶               。

key 业务含义 用途 redis-rate-limiter.replenishRate 一个用户每秒多少请求数                  ,不包含丢弃的请求   ,这个速度就是令牌桶的数量                  。 补充速度 redis-rate-limiter.burstCapacity 用户每秒允许最大的请求数量            ,这个令牌数量就是令牌桶可以持有的数量                  ,设置为0标识阻塞所有请求 突增容量 redis-rate-limiter.requestedTokens 单个请求消耗多少令牌      ,这个数量就是从令牌桶中每个请求获取令牌的数量         ,默认是1 请求消耗令牌数量

如果你把 replenishRate 和 burstCapacity值设置为一样                  ,则完成了一个稳定的速度设置   。

临时突增流量可以允许设置 burstCapacity > replenishRate          ,

这种场景下      ,RateLimiter需要允许一些时间在 burstCapacity和 replenishRate 之间             。

两种连续的徒增会导致丢弃请求                  ,下面的例子配置了一个 redis-rate-limit.

速度限制在1个请求每秒            , replenishRate=1, requestedTokens=60,burstCapacity=60 ;

spring: cloud: gateway: routes: - id: requestratelimiter_route uri: https://example.org filters: - name: RequestRateLimiter args: redis-rate-limiter.replenishRate: 10 redis-rate-limiter.burstCapacity: 20 redis-rate-limiter.requestedTokens: 1

上面的配置补充令牌的速度是10   , 突增容量是20                  ,但是在下一秒               ,只有10个请求是可以进入的;

下面的例子配置了一个KeyResolver                  。简单的从请求参数中获取user(在生产环境不推荐使用),

@Bean KeyResolver userKeyResolver() { return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user")); }

你也可以定义自己的RateLImiter,作为一个bean,实现RateLimiter接口即可               ,

在下面的配置中      。你可以引用一个bean通过名字                  ,使用SpEL表达式         。

{@myRateLimiter} 是一个表达式   ,引用了一个名字叫做 myRateLimiter的bean ,

下面的例子定义了一个rateLimite并且使用自定义的KeyResolver.

spring: cloud: gateway: routes: - id: requestratelimiter_route uri: https://example.org filters: - name: RequestRateLimiter args: rate-limiter: "#{@myRateLimiter}" key-resolver: "#{@userKeyResolver}"

魔方的限流配置

对所有的请求            ,限制如下                  。

key value 设置值原因 replenishRate 20 每个用户每秒处理请求速度 为20 burstCapacity 40 40                  ,每秒处理请求数量突增容量 ; requestedTokens 1 每个连接耗费1个令牌;

源代码分析: RequestRateLimiterGatewayFilterFactory

public GatewayFilter apply(Config config) { KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver); RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter); boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey); HttpStatusHolder emptyKeyStatus = HttpStatusHolder .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode)); return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> { if (EMPTY_KEY.equals(key)) { if (denyEmpty) { setResponseStatus(exchange, emptyKeyStatus); return exchange.getResponse().setComplete(); } return chain.filter(exchange); } String routeId = config.getRouteId(); if (routeId == null) { Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); routeId = route.getId(); } return limiter.isAllowed(routeId, key).flatMap(response -> { for (Map.Entry<String, String> header : response.getHeaders().entrySet()) { exchange.getResponse().getHeaders().add(header.getKey(), header.getValue()); } if (response.isAllowed()) { return chain.filter(exchange); } setResponseStatus(exchange, config.getStatusCode()); return exchange.getResponse().setComplete(); }); }); }

处理流程如下:

单个路由的限流配置:

spring: cloud: gateway: routes: - id: account-service uri: http://localhost:8090 predicates: - Path=/account/** filters: - RewritePath=/account/(?<path>.*), /$\{path} - name: RequestRateLimiter args: redis-rate-limiter.replenishRate: 1 redis-rate-limiter.burstCapacity: 60 redis-rate-limiter.requestedTokens: 15

重写429的返回值         。

package com.zengame.cycube.api.gateway.rest.aspect; import cn.hutool.json.JSONUtil; import com.zengame.cycube.api.lib.common.bean.R; import com.zengame.cycube.api.lib.common.util.UUIDUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory; import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver; import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter; import org.springframework.cloud.gateway.route.Route; import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; import org.springframework.stereotype.Component; import java.util.Map; import java.util.stream.Stream; /** * 魔方自定义限流 * @author Carter.li * @createtime 2022/8/1 17:30 */ @Slf4j @Component public class CubeRequestLimiterGatewayFilterFactory extends RequestRateLimiterGatewayFilterFactory { private final RateLimiter redisRateLimiter; private final KeyResolver keyResolver; private final boolean denyEmptyKey = true; private static final String EMPTY_KEY = "____EMPTY_KEY__"; public CubeRequestLimiterGatewayFilterFactory(RateLimiter redisRateLimiter, KeyResolver keyResolver) { super(redisRateLimiter, keyResolver); this.redisRateLimiter = redisRateLimiter; this.keyResolver = keyResolver; } @Override public GatewayFilter apply(Config config) { KeyResolver resolver = getOrDefault(config.getKeyResolver(), keyResolver); RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), redisRateLimiter); boolean denyEmpty = getOrDefault(config.getDenyEmptyKey(), this.denyEmptyKey); return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> { if (EMPTY_KEY.equals(key)) { if (denyEmpty) { return TokenCheckGatewayFilterFactory.generateJson(exchange, R.error(9998, "请求key为空")); } return chain.filter(exchange); } String routeId = config.getRouteId(); if (routeId == null) { Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); routeId = route.getId(); } return limiter.isAllowed(routeId, key).flatMap(response -> { for (Map.Entry<String, String> header : response.getHeaders().entrySet()) { exchange.getResponse().getHeaders().add(header.getKey(), header.getValue()); } if (response.isAllowed()) { return chain.filter(exchange); } R<String> r = R.error(9998, "请求太频繁"); r.setData(key); r.setGuid("请控制请求速度"); r.setTraceId(Stream.of(exchange.getRequest().getHeaders().getFirst("requestId"), exchange.getRequest().getQueryParams().getFirst("requestId")).filter(StringUtils::isNotBlank).findFirst().orElse(UUIDUtils.uuid())); log.warn("too many requests: {}", JSONUtil.toJsonStr(r)); return TokenCheckGatewayFilterFactory.generateJson(exchange, r); }); }); } private <T> T getOrDefault(T configValue, T defaultValue) { return (configValue != null) ? configValue : defaultValue; } }

测试

jmeter脚本

线程配置:

接口配置:

经过测试      ,对高频接口增加了限流能力         ,而且限流能力是可以设定的      。

小结

在网关添加了最低限度的保护限流策略                  。

企业用户数量有限                  ,可以使用最小的资源满足软件系统的需求;

原创不易         ,关注诚可贵      ,转发价更高!转载请注明出处                  ,让我们互通有无            ,共同进步   ,欢迎沟通交流            。

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
马云近日已跑出国(马云回国,首谈ChatGPT) win10重新开机后自动打开关机前的程序和文档在哪里(Win10开机或重启自动打开此电脑怎么解决?)