spring cloud gateway cors(Spring Cloud GateWay基于nacos如何去做灰度发布)
如果想直接查看修改部分请跳转 动手-点击跳转
本文基于 ReactiveLoadBalancerClientFilter使用RoundRobinLoadBalancer
灰度发布
灰度发布 ,又称为金丝雀发布 ,是一种新旧版本平滑发布的方式 。在上面可以对同一个API进行两个版本 的内容 ,由一部分用户先行体验 ,如无问题 ,逐步扩大发布范围
本文将讲述如何基于基于nacos的matedata与Ribbon如何去做灰度发布
重点知识
Spring Cloud Gateway两种负载均衡器官网说明两种负载均衡器
Gateway有两种客户端负载均衡器 ,LoadBalancerClientFilter和ReactiveLoadBalancerClientFilter 。
LoadBalancerClientFilter使用一个Ribbon的阻塞式LoadBalancerClient ,Gateway建议使用ReactiveLoadBalancerClientFilter 。
可以通过设置spring.cloud.loadbalancer.ribbon.enabled=false ,切换到ReactiveLoadBalancerClientFilter 。无论使用Ribbon还是LoadBalancer ,在Route中配置的lb是一样的本节采用 ReactiveLoadBalancerClientFilter 进行设置
采用ReactiveLoadBalancerClientFilter使用RoundRobinLoadBalancer
灰度发布服务器选择 简单示意图Client —-> gateway —-> GlobalFilter 拦截 选择一个灰度发布服务器 如果没有灰度服务则选取正常服务器 —->转发到服务
nacos的matedata我们在向 Nacos Server 进行服务注册的时候往往会附加一些 metadata ,可以参考官方文档中 Dubbo 融合 Nacos 成为注册中心 章节 。
充分利用好服务实例的 metadata ,可以衍生出许多有意思的实践 。
完全可以把相关内容放进 metadata 中 ,好比说版本号,特性名等等然后再根据负载均衡路由到不同的服务
12 spring.cloud.nacos.discovery.metadata.version=1.15spring.cloud.nacos.discovery.metadata.advance=true准备工作
nacos 部署
gateway 部署 -可以参考
部署两台服务A 开始 跟踪代码 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR); if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) { ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url); if (log.isTraceEnabled()) { log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url); } return this.choose(exchange).doOnNext((response) -> { if (!response.hasServer()) { throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost()); } else { ServiceInstance retrievedInstance = (ServiceInstance)response.getServer(); URI uri = exchange.getRequest().getURI(); String overrideScheme = retrievedInstance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme); URI requestUrl = this.reconstructURI(serviceInstance, uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl); } }).then(chain.filter(exchange)); } else { return chain.filter(exchange); }}protected URI reconstructURI(ServiceInstance serviceInstance, URI original) { return LoadBalancerUriTools.reconstructURI(serviceInstance, original);}private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) { URI uri = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); ReactorLoadBalancer<ServiceInstance> loadBalancer = (ReactorLoadBalancer)this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class); if (loadBalancer == null) { throw new NotFoundException("No loadbalancer available for " + uri.getHost()); } else { return loadBalancer.choose(this.createRequest()); }} 12345678910111213141516171819202122232425262728293031 @SuppressWarnings("rawtypes")@Override// see original// https://github.com/Netflix/ocelli/blob/master/ocelli-core/// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.javapublic Mono<Response<ServiceInstance>> choose(Request request) { // TODO: move supplier to Request? // Temporary conditional logic till deprecated members are removed. if (serviceInstanceListSupplierProvider != null) { ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new); return supplier.get().next().map(this::getInstanceResponse); } ServiceInstanceSupplier supplier = this.serviceInstanceSupplier .getIfAvailable(NoopServiceInstanceSupplier::new); return supplier.get().collectList().map(this::getInstanceResponse);}private Response<ServiceInstance> getInstanceResponse( List<ServiceInstance> instances) { if (instances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } // TODO: enforce order? int pos = Math.abs(this.position.incrementAndGet()); ServiceInstance instance = instances.get(pos % instances.size()); return new DefaultResponse(instance);}通过代码跟踪 ReactiveLoadBalancerClientFilter 与 RoundRobinLoadBalancer 可以发现 ,最终 我们只需要对 getInstanceResponse 进行改造 即可满足所有需要
动手!
开始修改代码
我们只需要新增一个 GlobalFilter 在 AdvanceReactiveLoadBalancerClientFilter 执行之前 ,并且对LoadBalancer 的getInstanceResponse 做一下稍微改造就OK了
复制 RoundRobinLoadBalancer 内容 并修改 getInstanceResponse() 逻辑 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 package top.lingma.gateway.loadbalancer;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.beans.factory.ObjectProvider;import org.springframework.cloud.client.ServiceInstance;import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;import org.springframework.cloud.client.loadbalancer.reactive.Request;import org.springframework.cloud.client.loadbalancer.reactive.Response;import org.springframework.cloud.loadbalancer.core.*;import reactor.core.publisher.Mono;import java.util.List;import java.util.Random;import java.util.concurrent.atomic.AtomicInteger;import java.util.stream.Collectors;public class AdvanceRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer { private static final Log log = LogFactory.getLog(AdvanceRoundRobinLoadBalancer.class); private final AtomicInteger position; private final AtomicInteger positionAdvance; @Deprecated private ObjectProvider<ServiceInstanceSupplier> serviceInstanceSupplier; private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider; private final String serviceId; @Deprecated public AdvanceRoundRobinLoadBalancer(String serviceId, ObjectProvider<ServiceInstanceSupplier> serviceInstanceSupplier) { this(serviceId, serviceInstanceSupplier, new Random().nextInt(1000)); } public AdvanceRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) { this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000)); } public AdvanceRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, int seedPosition) { this.serviceId = serviceId; this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider; this.position = new AtomicInteger(seedPosition); this.positionAdvance = new AtomicInteger(seedPosition); } @Deprecated public AdvanceRoundRobinLoadBalancer(String serviceId, ObjectProvider<ServiceInstanceSupplier> serviceInstanceSupplier, int seedPosition) { this.serviceId = serviceId; this.serviceInstanceSupplier = serviceInstanceSupplier; this.position = new AtomicInteger(seedPosition); this.positionAdvance = new AtomicInteger(seedPosition); } @Override public Mono<Response<ServiceInstance>> choose(Request request) { if (serviceInstanceListSupplierProvider != null) { ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new); return supplier.get().next().map((instances) -> { // 此处做了选择逻辑的修改 if (request instanceof AdvanceRequestContext) { List<ServiceInstance> advanceInstance = instances.stream().filter(s -> s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList()); return getInstanceResponse(advanceInstance, request); } else { List<ServiceInstance> routineInstance = instances.stream().filter(s -> !s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList()); return getInstanceResponse(routineInstance, request); } }); } ServiceInstanceSupplier supplier = this.serviceInstanceSupplier.getIfAvailable(NoopServiceInstanceSupplier::new); return supplier.get().collectList().map((instances) -> { if (request instanceof AdvanceRequestContext) { // 此处做了选择逻辑的修改 List<ServiceInstance> advanceInstance = instances.stream().filter(s -> s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList()); return getInstanceResponse(advanceInstance, request); } else { List<ServiceInstance> instance = instances.stream().filter(s -> !s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList()); return getInstanceResponse(instance, request); } }); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, Request request) { if (instances.isEmpty()) { if (request instanceof AdvanceRequestContext) { return new AdvanceEmptyResponse(); } log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } int pos = 1; //灰度发布选择逻辑 if (request instanceof AdvanceRequestContext) { pos = Math.abs(this.positionAdvance.incrementAndGet()); } else { pos = Math.abs(this.position.incrementAndGet()); } ServiceInstance instance = instances.get(pos % instances.size()); return new DefaultResponse(instance); }} AdvanceEmptyResponse 类是为了标识无灰度发布服务器,此时可以走正常服务器 12345678910111213 package top.lingma.gateway.loadbalancer;import org.springframework.cloud.client.ServiceInstance;import org.springframework.cloud.client.loadbalancer.reactive.CompletionContext;import org.springframework.cloud.client.loadbalancer.reactive.Response;public class AdvanceEmptyResponse extends org.springframework.cloud.client.loadbalancer.EmptyResponse implements Response<ServiceInstance> { public AdvanceEmptyResponse() { } public void onComplete(CompletionContext completionContext) { }} AdvanceRequestContext 是为了能从 GlobalFilter 传递信息到 LoadBalancer 12345678910111213141516171819 package top.lingma.gateway.loadbalancer;import org.springframework.cloud.client.loadbalancer.reactive.Request;import org.springframework.web.server.ServerWebExchange;public class AdvanceRequestContext<T> implements Request { private T exchange; public AdvanceRequestContext(T exchange) { this.exchange = exchange; } @Override public T getContext() { return exchange; }} AdvanceReactiveLoadBalancerClientFilter 复制于 ReactiveLoadBalancerClientFilter注意两点
第一灰度服务器选择在ReactiveLoadBalancerClientFilter 之前 LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150 - 1; 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 package top.lingma.gateway.loadbalancer;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.cloud.client.ServiceInstance;import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;import org.springframework.cloud.client.loadbalancer.reactive.Response;import org.springframework.cloud.gateway.config.LoadBalancerProperties;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;import org.springframework.cloud.gateway.support.DelegatingServiceInstance;import org.springframework.cloud.gateway.support.NotFoundException;import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;import org.springframework.core.Ordered;import org.springframework.stereotype.Component;import org.springframework.web.server.ServerWebExchange;import reactor.core.publisher.Mono;import java.net.URI;import java.util.List;import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;@Componentpublic class AdvanceReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered { private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class); private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150 - 1; private final LoadBalancerClientFactory clientFactory; private LoadBalancerProperties properties; public AdvanceReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) { this.clientFactory = clientFactory; this.properties = properties; } @Override public int getOrder() { return LOAD_BALANCER_CLIENT_FILTER_ORDER; } @Override @SuppressWarnings("Duplicates") public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 灰度用户专属服务器 判定是否是灰度用户 ,是否拥有灰度权限 不然直接进行下一步 List<String> secChUa = exchange.getRequest().getHeaders().get("sec-ch-ua"); if (secChUa == null || secChUa.isEmpty() || !secChUa.stream().findFirst().map(r -> r.contains("Edge")).orElse(false)) { return chain.filter(exchange); } URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } // preserve the original url addOriginalRequestUrl(exchange, url); if (log.isTraceEnabled()) { log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url); } return choose(exchange).doOnNext(response -> { if (response instanceof AdvanceEmptyResponse) { return; } if (!response.hasServer()) { throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost()); } ServiceInstance retrievedInstance = response.getServer(); URI uri = exchange.getRequest().getURI(); // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default, // if the loadbalancer doesnt provide one. String overrideScheme = retrievedInstance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme); URI requestUrl = reconstructURI(serviceInstance, uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); }).then(chain.filter(exchange)); } protected URI reconstructURI(ServiceInstance serviceInstance, URI original) { return LoadBalancerUriTools.reconstructURI(serviceInstance, original); } @SuppressWarnings("deprecation") private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) { URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class); if (loadBalancer == null) { throw new NotFoundException("No loadbalancer available for " + uri.getHost()); } return loadBalancer.choose(createRequest(exchange)); } /*** * 此处进行了改造 传入了内容 方便后续 LoadBalancer 处理信息 * @param exchange * @return */ @SuppressWarnings("deprecation") private AdvanceRequestContext<ServerWebExchange> createRequest(ServerWebExchange exchange) { return new AdvanceRequestContext(exchange); }} 以上已经完成了灰度发布的必要部分 ,再进行一下AutoConfiguration 注意 ,这里不能被Spring 扫描 1234567891011121314151617181920 package top.lingma.gateway.loadbalancer;import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled;import org.springframework.cloud.client.ServiceInstance;import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;import org.springframework.context.annotation.Bean;import org.springframework.core.env.Environment;@ConditionalOnDiscoveryEnabledpublic class AdvanceLoadBalancerAutoConfiguration { @Bean @ConditionalOnMissingBean public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new AdvanceRoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name); }} 最后 启动类配置 @LoadBalancerClients 的 defaultConfiguration 1234567891011 @SpringBootApplication()@LoadBalancerClients(defaultConfiguration = AdvanceLoadBalancerAutoConfiguration.class)public class LingmaGatewayApplication { public static void main(String[] args) { SpringApplication.run(LingmaGatewayApplication.class, args); }}关注公众号 [龗孖] 或搜索公众号[lingmaW] , 获得更多新干货!!!- 本文链接: https://blog.lingma.top/2022/12/01/36d5a1ed4a38/spring-cloud-gateway基于nacos如何去做灰度发布/index.html
版权声明: 本博客所有文章除特别声明外 ,均采用 反996许可证版本1.0 许可协议 。转载请注明出处!创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!