如何利用Spring Cloud构建起自我修复型分布式系统

发布网友 发布时间:2022-04-19 22:16

我来回答

2个回答

懂视网 时间:2022-05-02 11:05

com.netflix.loadbalancer; public interface IRule{ /* * choose one alive server from lb.allServers or * lb.upServers according to key * * @return choosen Server object. NULL is returned if none * server is available */ public Server choose(Object key); public void setLoadBalancer(ILoadBalancer lb); public ILoadBalancer getLoadBalancer(); }

IRule接口的实现类有以下几种:

技术分享

技术分享

 

其中RandomRule表示随机策略、RoundRobin表示轮询策略、WeightedResponseTimeRule表示加权策略、BestAvailableRule表示请求数最少策略等等。

随机策略很简单,就是从服务器中随机选择一个服务器,RandomRule的实现代码如下:

public Server choose(ILoadBalancer lb, Object key) {
 if (lb == null) {
 return null;
 }
 Server server = null;
 
 while (server == null) {
 if (Thread.interrupted()) {
  return null;
 }
 List<Server> upList = lb.getReachableServers();
 List<Server> allList = lb.getAllServers();
 int serverCount = allList.size();
 if (serverCount == 0) {
  return null;
 }
 int index = rand.nextInt(serverCount); // 使用jdk内部的Random类随机获取索引值index
 server = upList.get(index); // 得到服务器实例
 
 if (server == null) {
  Thread.yield();
  continue;
 }
 
 if (server.isAlive()) {
  return (server);
 }
 
 server = null;
 Thread.yield();
 }
 return server;
}

RoundRobin轮询策略表示每次都取下一个服务器,比如一共有5台服务器,第1次取第1台,第2次取第2台,第3次取第3台,以此类推:

public Server choose(ILoadBalancer lb, Object key) {
 if (lb == null) {
 log.warn("no load balancer");
 return null;
 }
 
 Server server = null;
 int count = 0;
 while (server == null && count++ < 10) { // retry 10 次
 List<Server> reachableServers = lb.getReachableServers();
 List<Server> allServers = lb.getAllServers();
 int upCount = reachableServers.size();
 int serverCount = allServers.size();
 
 if ((upCount == 0) || (serverCount == 0)) {
  log.warn("No up servers available from load balancer: " + lb);
  return null;
 }
 
 int nextServerIndex = incrementAndGetModulo(serverCount); // incrementAndGetModulo方法内部使用nextServerCyclicCounter这个AtomicInteger属性原子递增对serverCount取模得到索引值
 server = allServers.get(nextServerIndex); // 得到服务器实例
 
 if (server == null) {
  Thread.yield();
  continue;
 }
 
 if (server.isAlive() && (server.isReadyToServe())) {
  return (server);
 }
 
 server = null;
 }
 
 if (count >= 10) {
 log.warn("No available alive servers after 10 tries from load balancer: "
  + lb);
 }
 return server;
}
 private int incrementAndGetModulo(int modulo) {
 for (;;) {
  int current = nextServerCyclicCounter.get();
  int next = (current + 1) % modulo;
  if (nextServerCyclicCounter.compareAndSet(current, next))
  return next;
 }
 }

BestAvailableRule策略用来选取最少并发量请求的服务器:

public Server choose(Object key) {
 if (loadBalancerStats == null) {
 return super.choose(key);
 }
 List<Server> serverList = getLoadBalancer().getAllServers(); // 获取所有的服务器列表
 int minimalConcurrentConnections = Integer.MAX_VALUE;
 long currentTime = System.currentTimeMillis();
 Server chosen = null;
 for (Server server: serverList) { // 遍历每个服务器
 ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); // 获取各个服务器的状态
 if (!serverStats.isCircuitBreakerTripped(currentTime)) { // 没有触发断路器的话继续执行
  int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); // 获取当前服务器的请求个数
  if (concurrentConnections < minimalConcurrentConnections) { // 比较各个服务器之间的请求数,然后选取请求数最少的服务器并放到chosen变量中
  minimalConcurrentConnections = concurrentConnections;
  chosen = server;
  }
 }
 }
 if (chosen == null) { // 如果没有选上,调用父类ClientConfigEnabledRoundRobinRule的choose方法,也就是使用RoundRobinRule轮询的方式进行负载均衡 
 return super.choose(key);
 } else {
 return chosen;
 }
}

加权响应时间负载均衡 (WeightedResponseTime)

区域感知轮询负载均衡(ZoneAware):

区域感知负载均衡内置电路跳闸逻辑,可被配置基于区域同源关系(Zone Affinity,也就是更倾向于选择发出调用的服务所在的托管区域内,这样可以降低延迟,节省成本)选择目标服务实例。它监控每个区域中运行实例的行为,而且能够实时的快速丢弃一整个区域。这样在面对整个区域故障时,帮我们提升了弹性。

实例验证Ribbon中的LoadBalance功能

ServerList中提供了3个instance,分别是:

compute-service:2222
compute-service:2223
compute-service:2224

然后使用不同的IRule策略查看负载均衡的实现。

package org.springframework.cloud.client.loadbalancer;

import org.springframework.beans.factory.annotation.Qualifier;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

LoadBalancerAutoConfiguration.java为实现客户端负载均衡器的自动化配置类。

package org.springframework.cloud.client.loadbalancer;



/**
 * Auto configuration for Ribbon (client side load balancing).
 *
 * @author Spencer Gibb
 * @author Dave Syer
 * @author Will Tran
 */
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

 @LoadBalanced
 @Autowired(required = false)
 private List<RestTemplate> restTemplates = Collections.emptyList();

 @Bean
 public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
  final List<RestTemplateCustomizer> customizers) {
 return new SmartInitializingSingleton() {
  @Override
  public void afterSingletonsInstantiated() {
  for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
   for (RestTemplateCustomizer customizer : customizers) {
   customizer.customize(restTemplate);
   }
  }
  }
 };
 }

Ribbon要实现负载均衡自动化配置需要满足如下两个条件:

  • @ConditionalOnClass(RestTemplate.class):RestTemplate类必须存在于当前工程的环境中。
  • @ConditionalOnBean(LoadBalancerClient.class):在spring的Bean工程中必须有LoadBalancerClient.class的实现Bean。
  • 在自动化配置中主要做三件事:

  • 创建一个LoadBalancerInterceptor的Bean,用于实现对客户端发起请求时进行拦截,以实现客户端负载均衡。
  • 创建一个RestTemplateCustomizer的Bean,用于给RestTemplate增加LoadBalancerInterceptor拦截器。
  • 维护了一个被@LoadBalanced注解修饰的RestTemplate对象列表,并在这里进行初始化,通过调用RestTemplateCustomizer的实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor拦截器。
  • LoadBalancerAutoConfiguration.java里面有2个内部类,如下:

    @Configuration
     @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
     static class LoadBalancerInterceptorConfig {
     @Bean
     public LoadBalancerInterceptor ribbonInterceptor(
      LoadBalancerClient loadBalancerClient,
      LoadBalancerRequestFactory requestFactory) {
      return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
     }
    
     @Bean
     @ConditionalOnMissingBean
     public RestTemplateCustomizer restTemplateCustomizer(
      final LoadBalancerInterceptor loadBalancerInterceptor) {
      return new RestTemplateCustomizer() {
      @Override
      public void customize(RestTemplate restTemplate) {
       List<ClientHttpRequestInterceptor> list = new ArrayList<>(
        restTemplate.getInterceptors());
       list.add(loadBalancerInterceptor);
       restTemplate.setInterceptors(list);
      }
      };
     }
     }
    
     @Configuration
     @ConditionalOnClass(RetryTemplate.class)
     static class RetryAutoConfiguration {
     @Bean
     public RetryTemplate retryTemplate() {
      RetryTemplate template = new RetryTemplate();
      template.setThrowLastExceptionOnExhausted(true);
      return template;
     }
    
     @Bean
     @ConditionalOnMissingBean
     public LoadBalancedRetryPolicyFactory loadBalancedRetryPolicyFactory() {
      return new LoadBalancedRetryPolicyFactory.NeverRetryFactory();
     }
    
     @Bean
     public RetryLoadBalancerInterceptor ribbonInterceptor(
      LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,
      LoadBalancedRetryPolicyFactory lbRetryPolicyFactory,
      LoadBalancerRequestFactory requestFactory) {
      return new RetryLoadBalancerInterceptor(loadBalancerClient, retryTemplate(), properties,
       lbRetryPolicyFactory, requestFactory);
     }
    
     @Bean
     @ConditionalOnMissingBean
     public RestTemplateCustomizer restTemplateCustomizer(
      final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
      return new RestTemplateCustomizer() {
      @Override
      public void customize(RestTemplate restTemplate) {
       List<ClientHttpRequestInterceptor> list = new ArrayList<>(
        restTemplate.getInterceptors());
       list.add(loadBalancerInterceptor);
       restTemplate.setInterceptors(list);
      }
      };
     }
     }

     接下来,我们看看LoadBalancerInterceptor拦截器是如何将一个普通的RestTemplate变成客户度负载均衡的:

    /*
     * Copyright 2013-2017 the original author or authors.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.springframework.cloud.client.loadbalancer;
    
    import java.io.IOException;
    import java.net.URI;
    
    import org.springframework.http.HttpRequest;
    import org.springframework.http.client.ClientHttpRequestExecution;
    import org.springframework.http.client.ClientHttpRequestInterceptor;
    import org.springframework.http.client.ClientHttpResponse;
    
    /**
     * @author Spencer Gibb
     * @author Dave Syer
     * @author Ryan Baxter
     * @author William Tran
     */
    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
     private LoadBalancerClient loadBalancer;
     private LoadBalancerRequestFactory requestFactory;
    
     public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
     this.loadBalancer = loadBalancer;
     this.requestFactory = requestFactory;
     }
    
     public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
     // for backwards compatibility
     this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
     }
    
     @Override
     public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
     final URI originalUri = request.getURI();
     String serviceName = originalUri.getHost();
     return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); //看这里
     }
    }

    LoadBalancerRequestFactory .java  

    /*
     * Copyright 2017 the original author or authors.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.springframework.cloud.client.loadbalancer;
    
    import java.util.List;
    
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.http.HttpRequest;
    import org.springframework.http.client.ClientHttpRequestExecution;
    import org.springframework.http.client.ClientHttpResponse;
    
    /**
     * Creates {@link LoadBalancerRequest}s for {@link LoadBalancerInterceptor} and
     * {@link RetryLoadBalancerInterceptor}. Applies
     * {@link LoadBalancerRequestTransformer}s to the intercepted
     * {@link HttpRequest}.
     * 
     * @author William Tran
     *
     */
    public class LoadBalancerRequestFactory {
    
     private LoadBalancerClient loadBalancer;
     private List<LoadBalancerRequestTransformer> transformers;
    
     public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
      List<LoadBalancerRequestTransformer> transformers) {
     this.loadBalancer = loadBalancer;
     this.transformers = transformers;
     }
    
     public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
     this.loadBalancer = loadBalancer;
     }
    
     public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
      final byte[] body, final ClientHttpRequestExecution execution) {
     return new LoadBalancerRequest<ClientHttpResponse>() {
    
      @Override
      public ClientHttpResponse apply(final ServiceInstance instance)
       throws Exception {
      HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
      if (transformers != null) {
       for (LoadBalancerRequestTransformer transformer : transformers) {
       serviceRequest = transformer.transformRequest(serviceRequest, instance);
       }
      }
      return execution.execute(serviceRequest, body);
      }
    
     };
     }
    
    }
    ServiceRequestWrapper.java
    package org.springframework.cloud.client.loadbalancer;
    
    import java.net.URI;
    
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.http.HttpRequest;
    import org.springframework.http.client.support.HttpRequestWrapper;
    
    /**
     * @author Ryan Baxter
     */
    public class ServiceRequestWrapper extends HttpRequestWrapper {
     private final ServiceInstance instance;
     private final LoadBalancerClient loadBalancer;
    
     public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
         LoadBalancerClient loadBalancer) {
     super(request);
     this.instance = instance;
     this.loadBalancer = loadBalancer;
     }
    
     @Override
     public URI getURI() {
     URI uri = this.loadBalancer.reconstructURI(
      this.instance, getRequest().getURI());
     return uri;
     }
    }

     spring cloud中对应的实现类:

    package org.springframework.cloud.netflix.ribbon;
    
    
    /**
     * @author Spencer Gibb
     * @author Dave Syer
     * @author Ryan Baxter
     */
    public class RibbonLoadBalancerClient implements LoadBalancerClient {
    
     private SpringClientFactory clientFactory;
    
     public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
     this.clientFactory = clientFactory;
     }
    
     @Override
     public URI reconstructURI(ServiceInstance instance, URI original) {
     Assert.notNull(instance, "instance can not be null");
     String serviceId = instance.getServiceId();
     RibbonLoadBalancerContext context = this.clientFactory
      .getLoadBalancerContext(serviceId);
     Server server = new Server(instance.getHost(), instance.getPort());
     IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
     ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
     URI uri = RibbonUtils.updateToHttpsIfNeeded(original, clientConfig,
      serverIntrospector, server);
     return context.reconstructURIWithServer(server, uri);
     }
    
     @Override
     public ServiceInstance choose(String serviceId) {
     Server server = getServer(serviceId);
     if (server == null) {
      return null;
     }
     return new RibbonServer(serviceId, server, isSecure(server, serviceId),
      serverIntrospector(serviceId).getMetadata(server));
     }
    
     @Override
     public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
     ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
     Server server = getServer(loadBalancer);
     if (server == null) {
      throw new IllegalStateException("No instances available for " + serviceId);
     }
     RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
      serviceId), serverIntrospector(serviceId).getMetadata(server));
    
     return execute(serviceId, ribbonServer, request);
     }
    
     @Override
     public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
     Server server = null;
     if(serviceInstance instanceof RibbonServer) {
      server = ((RibbonServer)serviceInstance).getServer();
     }
     if (server == null) {
      throw new IllegalStateException("No instances available for " + serviceId);
     }
    
     RibbonLoadBalancerContext context = this.clientFactory
      .getLoadBalancerContext(serviceId);
     RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
     try {
      T returnVal = request.apply(serviceInstance);
      statsRecorder.recordStats(returnVal);
      return returnVal;
     }
     // catch IOException and rethrow so RestTemplate behaves correctly
     catch (IOException ex) {
      statsRecorder.recordStats(ex);
      throw ex;
     }
     catch (Exception ex) {
      statsRecorder.recordStats(ex);
      ReflectionUtils.rethrowRuntimeException(ex);
     }
     return null;
     }
    
     private ServerIntrospector serverIntrospector(String serviceId) {
     ServerIntrospector serverIntrospector = this.clientFactory.getInstance(serviceId,
      ServerIntrospector.class);
     if (serverIntrospector == null) {
      serverIntrospector = new DefaultServerIntrospector();
     }
     return serverIntrospector;
     }
    
     private boolean isSecure(Server server, String serviceId) {
     IClientConfig config = this.clientFactory.getClientConfig(serviceId);
     ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
     return RibbonUtils.isSecure(config, serverIntrospector, server);
     }
    
     protected Server getServer(String serviceId) {
     return getServer(getLoadBalancer(serviceId));
     }
    
     protected Server getServer(ILoadBalancer loadBalancer) {
     if (loadBalancer == null) {
      return null;
     }
     return loadBalancer.chooseServer("default"); // TODO: better handling of key
     }
    
     protected ILoadBalancer getLoadBalancer(String serviceId) {
     return this.clientFactory.getLoadBalancer(serviceId);
     }

    getServer方法中并没有使用LoadBalancerClient中的choose方法,而是使用Netflix Rion自身的ILoadBalancer接口中定义的chooseServer方法。再看ILoadBalancer 接口:

    package com.netflix.loadbalancer;
    
    public interface ILoadBalancer {
     //向负载均衡器的实例列表中增加实例
     public void addServers(List<Server> newServers);
    
     //通过某种策略,从负载均衡器中选择一个具体的实例 
     public Server chooseServer(Object key);
     //用来通知和标识负载均衡器中某个具体实例已经停止服务,不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务的。
     public void markServerDown(Server server);
     
     //获取正常服务列表
     public List<Server> getReachableServers();
    
     //所有已知实例列表
     public List<Server> getAllServers();

     再看实现类,BaseLoadBalancer类是实现了基础的负载均衡,而DynamicServerListLoadBalancer和ZoneAwareLoadBalancer在负载均衡基础上做了一些功能的扩展。

     技术分享

     那么Spring cloud在整合Ribbon的时候采用的哪个具体实现,可以看RibbonClientConfiguration配置类中的代码片段如下,采用的是ZoneAwareLoadBalancer来实现负载均衡器。

     @Bean
     @ConditionalOnMissingBean
     public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
      ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
      IRule rule, IPing ping) {
     if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
      return this.propertiesFactory.get(ILoadBalancer.class, config, name);
     }
     ZoneAwareLoadBalancer<Server> balancer = LoadBalancerBuilder.newBuilder()
      .withClientConfig(config).withRule(rule).withPing(ping)
      .withServerListFilter(serverListFilter).withDynamicServerList(serverList)
      .buildDynamicServerListLoadBalancer();
     return balancer;
     }

     在回到主方法RibbonLoadBalancerClient.execute()

    LoadBalancerClient的execute()
    -->1、ZoneAwareLoadBalancer.chooseServer()获取了负载均衡策略分配到的服务实例对象Server
    -->2、将Server对象封装成RibbonService实例
    -->3、调用LoadBalancerRequest的apply()
    -->4-1、在apply()中,先将request包装成ServiceRequestWrapper,在Wrapper中拼接URI
    -->4-2、拼接URI中,调用RibbonLoadBalancerClient.reconstructURI()
    -->4-3、拼接URI中,调用RibbonLoadBalancerContext.reconstructURIWithServer()
    -->4-4、拼接URI中,调用RibbonLoadBalancerContext.reconstructURIWithServer()
    -->5、拦截器调用

    -->6、执行完成后,Ribbon还通过RibbonStatsRecorder对象对服务的请求进行了记录

     @Override
     public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
     ILoadBalancer loadBalancer = getLoadBalancer(serviceId); //在springcloud中是ZoneAwareLoadBalancer实例
     Server server = getServer(loadBalancer); //1、ZoneAwareLoadBalancer获取了负载均衡策略分配到的服务实例对象Server
     if (server == null) {
      throw new IllegalStateException("No instances available for " + serviceId);
     }
     //2、将Server实例封装成RibbonService实例
     RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
      serviceId), serverIntrospector(serviceId).getMetadata(server)); 
    
     return execute(serviceId, ribbonServer, request);
     }
     @Override
     public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
     Server server = null;
     if(serviceInstance instanceof RibbonServer) {
      server = ((RibbonServer)serviceInstance).getServer();
     }
     if (server == null) {
      throw new IllegalStateException("No instances available for " + serviceId);
     }
    
     RibbonLoadBalancerContext context = this.clientFactory
      .getLoadBalancerContext(serviceId);
     RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
     try {
      T returnVal = request.apply(serviceInstance); //3、request看LoadBalancerRequestFactory中的createRequest()方法返回的匿名类
      statsRecorder.recordStats(returnVal);
      return returnVal;
     }
     // catch IOException and rethrow so RestTemplate behaves correctly
     catch (IOException ex) {
      statsRecorder.recordStats(ex);
      throw ex;
     }
     catch (Exception ex) {
      statsRecorder.recordStats(ex);
      ReflectionUtils.rethrowRuntimeException(ex);
     }
     return null;
     }

     

     public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
      final byte[] body, final ClientHttpRequestExecution execution) {
     return new LoadBalancerRequest<ClientHttpResponse>() {
    
      @Override
      public ClientHttpResponse apply(final ServiceInstance instance)
       throws Exception {
      HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer); //4-1、中拼接URI,见下面的ServiceRequestWrapper中的getURI()
      if (transformers != null) {
       for (LoadBalancerRequestTransformer transformer : transformers) {
       serviceRequest = transformer.transformRequest(serviceRequest, instance);
       }
      }
      return execution.execute(serviceRequest, body); //5、拦截器调用见InterceptingClientHttpRequest的内部类InterceptingRequestExecution.execute()
      }
    
     };
     }
     @Override
     public URI getURI() {
     URI uri = this.loadBalancer.reconstructURI(
      this.instance, getRequest().getURI()); //4-2、reconstructURI被RibbonLoadBalancerClient重载,看RibbonLoadBalancerClient.reconstructURI()
     return uri;
     }
     @Override
     public URI reconstructURI(ServiceInstance instance, URI original) {
     Assert.notNull(instance, "instance can not be null");
     String serviceId = instance.getServiceId();
     RibbonLoadBalancerContext context = this.clientFactory
      .getLoadBalancerContext(serviceId);
     Server server = new Server(instance.getHost(), instance.getPort());
     IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
     ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
     URI uri = RibbonUtils.updateToHttpsIfNeeded(original, clientConfig,
      serverIntrospector, server);
     return context.reconstructURIWithServer(server, uri);//4-3、构建服务实例的URI,RibbonLoadBalancerContext.reconstructURIWithServer()见下面的LoadBalancerContext.reconstructURIWithServer()
     }
     public URI reconstructURIWithServer(Server server, URI original) {//4-4、LoadBalancerContext的reconstructURIWithServer()
     String host = server.getHost();
     int port = server .getPort();
     if (host.equals(original.getHost()) 
      && port == original.getPort()) {
      return original;
     }
     String scheme = original.getScheme();
     if (scheme == null) {
      scheme = deriveSchemeAndPortFromPartialUri(original).first();
     }
    
     try {
      StringBuilder sb = new StringBuilder();
      sb.append(scheme).append("://");
      if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
      sb.append(original.getRawUserInfo()).append("@");
      }
      sb.append(host);
      if (port >= 0) {
      sb.append(":").append(port);
      }
      sb.append(original.getRawPath());
      if (!Strings.isNullOrEmpty(original.getRawQuery())) {
      sb.append("?").append(original.getRawQuery());
      }
      if (!Strings.isNullOrEmpty(original.getRawFragment())) {
      sb.append("#").append(original.getRawFragment());
      }
      URI newURI = new URI(sb.toString());
      return newURI;  
     } catch (URISyntaxException e) {
      throw new RuntimeException(e);
     }
     }
     @Override
     public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {//5、拦截器调用,InterceptingClientHttpRequest的内部类InterceptingRequestExecution.execute()
      if (this.iterator.hasNext()) {
      ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
      return nextInterceptor.intercept(request, body, this);
      }
      else {
      ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
      for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {
       List<String> values = entry.getValue();
       for (String value : values) {
       delegate.getHeaders().add(entry.getKey(), value);
       }
      }
      if (body.length > 0) {
       StreamUtils.copy(body, delegate.getBody());
      }
      return delegate.execute();
      }
     }
    public class RibbonStatsRecorder {
    
     private RibbonLoadBalancerContext context;
     private ServerStats serverStats;
     private Stopwatch tracer;
    
     public RibbonStatsRecorder(RibbonLoadBalancerContext context, Server server) {
     this.context = context;
     if (server != null) {
      serverStats = context.getServerStats(server);
      context.noteOpenConnection(serverStats);
      tracer = context.getExecuteTracer().start();
     }
     }
    
     public void recordStats(Object entity) {
     this.recordStats(entity, null);
     }
    
     public void recordStats(Throwable t) {
     this.recordStats(null, t);
     }
    
     protected void recordStats(Object entity, Throwable exception) {
     if (this.tracer != null && this.serverStats != null) {
      this.tracer.stop();
      long duration = this.tracer.getDuration(TimeUnit.MILLISECONDS);
      this.context.noteRequestCompletion(serverStats, entity, exception, duration, null/* errorHandler */);
     }
     }
    }

     

    热心网友 时间:2022-05-02 08:13

      利用Netflix所打造的组件及各类大家熟知的工具,完全可以顺利应对由微服务以及分布式计算所带来的技术挑战。

      为了享受微服务所带来的诸多优势(包括松散耦合、自治服务、分散化治理以及易于持续交付等等),必须避免由单一故障依次递进而最终导致系统崩溃的恐怖状况。

      Spring工程技术团队从建立之初至今一直在努力打造出足以应对Java复杂性的强大武器。Spring Cloud项目的既定目标在于为Spring开发人员提供一整套易于使用的工具集,从而保证其轻松构建起自己需要的分布式系统方案。为了实现这一目标,Spring Cloud以Netflix OSS堆栈为基础将大量实现堆栈加以整合并打包。这些堆栈而后可以通过所熟知的各类基于注释的配置工具、Java配置工具以及基于模板的编程工具实现交付。

      Spring Cloud Config Server

      Spring Cloud Config Server能够提供一项具备横向扩展能力的集中式配置服务。它所使用的数据被保存在一套可插拔库层当中,后者目前能够支持本地存储、Git以及Subversion。通过利用一套版本控制系统作为配置存储方案,开发人员能够轻松实现版本与审计配置的内容调整。

      图一:Spring Cloud Config Server

      配置内容会以Java属性或者YAML文件的形式体现。该Config Server会将这些文件合并为环境对象,其中包含易于理解的Spring属性模型以及作为REST API存在的配置文件。任何应用程序都能够直接调用该REST API当中所包含的配置数据,也可以将智能客户端绑定方案添加到Spring Boot应用程序当中,并由后者自动将接收自Config Server的配置信息分配至任意本地配置当中。

      Spring Cloud Bus

      Spring Cloud Config Server是一套强大的配置分发机制,能够在保障一致性的前提下将配置内容分发到多个应用程序实例当中。然而根据其设计思路的限定,目前只能在应用程序启动时对其配置进行更新。在向Git中的某一属性发送新值时,需要以手动方式重启每个应用程序进程,从而保证该值被切实纳入应用当中。很明显,需要能够在无需重启的前提下完成对应用程序配置内容的更新工作。

      图二: 配备Spring Cloud Bus的Spring Cloud Config Server

      Spring Cloud Bus的任务正是为应用程序实例添加一套管理背板。目前依靠将一套客户端绑定至一组AMQP交换与队列当中来实现,但这一后端在设计上也实现了可插拔特性。Spring Cloud Bus为应用程序带来了更多管理端点。在图二中,可以看到一个面向greeting属性的值被发送至Git当中,而后一条请求被发送至应用A中的/bus/refresh端点。该请求会触发以下三个事件:

      应用A从Config Server处请求获取最新版本的配置内容。任意注明了@RefreshScope的Spring Bean都会被重新初始化并载入新的配置内容。

      应用A向AMQP交换机制发送一条消息,表明其已经收到更新指示。

      通过监听AMQP队列而被纳入Cloud Bus的应用B与应用C会获取到上述消息,并以与应用A同样的方式实现配置更新。

      现在已经有能力在无需重启的情况下对应用程序配置进行更新了。

      Spring Cloud Netflix

      Spring Cloud Netflix针对多种Netflix组件提供打包方案,其中包括Eureka、Ribbon、Hystrix以及Zuul。

      Eureka是一套弹性服务注册实现方案。其中服务注册属于服务发现模式的一种实现机制(如图三所示)。

      图三:利用服务注册实现服务发现

      Spring Cloud Netflix通过直接将spring-cloud-starter-eureka-server关联性添加到Spring Boot应用程序、随后将该应用程序的配置类与@EnableEurekaServer相整合的方式病嵌入式Eureka服务器的部署工作。

      应用程序能够通过添加spring-cloud-starter-eureka关联性并将其配置类与@EnableDiscoveryClient相整合的方式加入到服务发现流程当中。通过整合,能够将经过配置的适合DiscoveryClient实例注入至任意Spring Bean内。在所列举的实例中,DiscoveryClient作为服务发现的一种抽象机制恰好可以通过Eureka实现,不过也可以将其与Consul等其它备选堆栈相集成。DiscoveryClient能够通过服务的逻辑标识符提供位置信息(例如网络地址)以及其它与已注册至Eureka的服务实例相关的元数据。

      Eureka提供的负载均衡机制仅支持单循环条件。而Ribbon提供的客户端IPC库则更为精巧,其同时具备可配置负载均衡机制与故障容错能力。Ribbon能够通过获取自Eureka服务器的动态服务器列表进行内容填充。Spring Cloud Netflix通过将spring-cloud-starter-ribbon关联性添加至Spring Boot应用程序的方式实现与Ribbon的集成。这套额外库允许用户将经过适当配置的LoadBalancerClient实例注入至Spring Bean当中,从而实现客户端负载均衡(如图四所示)。

      图四:使用客户端负载均衡机制

      在此类任务当中,可以利用Ribbon实现额外负载均衡算法,包括可用性过滤、加权响应时间以及可用域亲和等。

      Spring Cloud Netflix还通过自动创建能够被注入至任意Spring Bean的Ribbon强化型RestTemplate实例的方式进一步改进了Spring开发者的Ribbon使用方式。在此之后,开发人员能够轻松将URL所提供的逻辑服务名称递交至RestTemplate:

      @Autowired  @LoadBalanced  private RestTemplate restTemplate;  @RequestMapping("/")  public String consume() {  ProcerResponse response = restTemplate.getForObject("http://procer", ProcerResponse.class);  return String.format("{\"value\": %s}", response.getValue());  }

      Hystrix能够为断路器以及密闭闸门等分布式系统提供一套通用型故障容错实现模式。断路器通常会被作为一台状态机使用,具体如图五所示。

      图五:断路器状态机

      断路器能够介于服务及其远程关联性之间。如果该电路处于闭合状态,则所有指向该关联性的调用通常将直接通过。如果某一调用失败,则故障将被计入计数。而一旦失败次数达到可配置时间区间内的阈值,该电路将被跳闸至断开。在处于断开状态时,调用将不再被发往该关联,而由此产生的结果将可自行定制(包括报告异常、返回虚假数据或者调用其它关联等等)。

      该状态机会定期进入所谓“半开”状态,旨在检测关联性是否处于健康运作状态。在这种状态下,请求一般仍将继续得以通过。当请求成功通过时,该设备会重新回归闭合状态。而如果请求失败,则该设备会重新回归断开状态。

      Spring Cloud应用程序能够通过添加spring-cloud-starter-hystrix关联性并将其配置类与@EnableCircuitBreaker相整合的方式利用Hystrix。在此之后,可以通过与@HystrixCommand整合的方式将断路器机制纳入到任意Spring Bean方法内:

      @HystrixCommand(fallbackMethod = "getProcerFallback")  public ProcerResponse getValue() {  return restTemplate.getForObject("http://procer", ProcerResponse.class);  }  以上实例中指定了一个名为getProcerFallback的备用方法。当该断路器处于断开状态时,此方法将替代getValue接受调用:  private ProcerResponse getProcerFallback() {  return new ProcerResponse(42);  }

      除了实现状态机机制之外,Hystrix还能够提供来自各断路机制的重要遥测指标流,具体包括请求计量、响应时间直方图以及成功、失败与短路请求数量等(如图六所示)。

      图六:Hystrix仪表板

      Zuul能够处理全部指向Netflix边缘服务的输入请求。它能够与Ribbon以及Hystrix等其它Netflix组件相结合,从而提供一个灵活且具有弹性的Netflix服务路由层。

      Netflix公司在Zuul当中加载动态过滤机制,从而实现以下各项功能:

      验证与安全保障: 识别面向各类资源的验证要求并拒绝那些与要求不符的请求。

      审查与监控: 在边缘位置追踪有意义数据及统计结果,从而带来准确的生产状态结论。

      动态路由: 以动态方式根据需要将请求路由至不同后端集群处。

      压力测试: 逐渐增加指向集群的负载流量,从而计算性能水平。

      负载分配: 为每一种负载类型分配对应容量,并弃用超出限定值的请求。

      静态响应处理: 在边缘位置直接建立部分响应,从而避免其流入内部集群。

      多区域弹性: 跨越AWS区域进行请求路由,旨在实现ELB使用多样化并保证边缘位置与使用者尽可能接近。

      除此之外,Netflix公司还利用Zuul的功能通过金丝雀版本实现精确路由与压力测试。

      Spring Cloud已经建立起一套嵌入式Zuul代理机制,从而简化常见用例当中UI应用需要将调用代理至一项或者多项后端服务处的对应开发流程。这项功能对于要求将用户界面代理至后端服务的用例而言极为便捷,其避免了管理CORS(即跨域资源共享)以及为全部后端进行独立验证等复杂流程。Zuul代理机制的一类重要应用在于实现API网关模式(如图七所示)。

      图七:API网关模式

      Spring Cloud对嵌入式Zuul代理进行了强化,从而使其能够自动实现文件上传处理。而与Spring Cloud Security配合之后,其能够轻松实现OAuth2 SSO以及将令牌传递至下游服务等工作。Zuul利用Ribbon作为其客户端与全部出站请求的负载均衡机制。Ribbon的动态服务器列表内容通常由Eureka负责填充,但Spring Cloud也能够通过其它来源填充该列表。Spring Cloud Lattice项目就已经能够通过轮询Cloud Foundry Diego的Receptor API填充Ribbon的服务器列表。

      跨入微服务领域的决定意味着将正式迎接分布式系统所带来的诸多挑战,而分布式系统绝不是那种能够“凑合使用”的方案。因此,系统内各组件的行为及位置始终处于不断变化当中,甚至经常表现出不可预知状态。

      *备注:这八大误区分别为:

      1.网络环境是可靠的

      2.延迟水平为零

      3.传输带宽是无限的

      4.网络环境是安全的

      5.拓扑结构不会变化

      6.总会有管理员帮助解决问题

      7.流量成本为零

      8.网络内各组成部分拥有同质性

    声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。
    E-MAIL:11247931@qq.com