Hyiki's 技术博客 Back-end Dev Engineer

微服务迁移过程需要注意的地方

2022-06-22

【调用与响应】

feign(Http远程方法调用)标准化Response响应体(这里定为BaseFeignResponse)

  • 用于适配被调用端Server的响应体结构,使其既能提供RPC的服务也能给前端提供服务

    @Data
    public class BaseFeignResponse<T> implements Serializable {
      
        private static final long serialVersionUID = 1L;
      
        /**
         * 响应状态码
         */
        private String code;
      
        /**
         * 响应描述
         */
        private String msg;
      
        /**
         * 响应业务数据
         */
        private T data;
      
    }
    
    {"code":"SUCCESS","msg":"success","data":"success"}
    
  • 定义好响应体之后,需要通过切面Aspect对RestController作增强处理:将调用结果转成目标体,并做好空值与异常的自适应

    @Aspect
    @Component
    @Slf4j
    public class HadesFeignAspect {
      
        public static final String SUCCESS = "SUCCESS";
      
        @Pointcut("execution(* com.shop*hyiki*.plutus.app.hades.logistics.*.*(..))")
        private void returnResult() {
        }
      
        @Around("returnResult()")
        private Object handleReturnResult(ProceedingJoinPoint pjp) throws Throwable {
            try {
                Object proceed = pjp.proceed();
                /* null值 */
                if (proceed == null) {
                    return null;
                }
                /* void */
                Signature s = pjp.getSignature();
                MethodSignature ms = (MethodSignature)s;
                Method m = ms.getMethod();
                if (m.getReturnType() == Void.class) {
                    return proceed;
                }
                /* Response */
                if (m.getReturnType() == Response.class) {
                    return proceed;
                }
                /* 标准转换 */
                BaseFeignResponse feignResponse = (BaseFeignResponse) proceed;
                if (log.isDebugEnabled()) {
                    log.debug(feignResponse.toString());
                }
                if (Objects.equals(feignResponse.getCode(), SUCCESS)) {
                    return feignResponse;
                } else {
                    throw new BizException(feignResponse.getCode(), feignResponse.getMsg());
                }
            } catch (FeignException e) {
                log.error("feign exception, body:{}", e.contentUTF8(), e);
                throw new BizException("调用外部系统异常:" + e.getMessage());
            } catch (BizException e) {
                log.error("feign bizException", e);
                throw e;
            } catch (Exception e) {
                log.error("feign system exception", e);
                throw e;
            }
      
        }
    }
    

【Response的解码器】executeAndDecode(RequestTemplate template)

public interface Decoder {

  /**
   * Decodes an http response into an object corresponding to its
   * {@link java.lang.reflect.Method#getGenericReturnType() generic return type}. If you need to
   * wrap exceptions, please do so via {@link DecodeException}.
   *
   * @param response the response to decode
   * @param type {@link java.lang.reflect.Method#getGenericReturnType() generic return type} of the
   *        method corresponding to this {@code response}.
   * @return instance of {@code type}
   * @throws IOException will be propagated safely to the caller.
   * @throws DecodeException when decoding failed due to a checked exception besides IOException.
   * @throws FeignException when decoding succeeds, but conveys the operation failed.
   */
  // ☆☆☆☆☆☆☆☆☆☆☆☆这句是关键代码☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆
  Object decode(Response response, Type type) throws IOException, DecodeException, FeignException;

  /** Default implementation of {@code Decoder}. */
  public class Default extends StringDecoder {

    @Override
    public Object decode(Response response, Type type) throws IOException {
      if (response.status() == 404)
        return Util.emptyValueOf(type);
      if (response.body() == null)
        return null;
      if (byte[].class.equals(type)) {
        return Util.toByteArray(response.body().asInputStream());
      }
      return super.decode(response, type);
    }
  }
}
/* FeignConfig - ErrorDecoder */
@Bean
public *hyiki*ErrorDecoder errorDecoder() {
  return new *hyiki*ErrorDecoder();
}

/**
* 自定义错误解码器
*/
public static class *hyiki*ErrorDecoder extends ErrorDecoder.Default {

  @Override
  public Exception decode(String methodKey, Response response) {
    if (response.status() == HttpServletResponse.SC_BAD_REQUEST) {
      // 自定义异常
      BizException exception = null;
      try {
        // 获取原始的返回内容
        String json = Util.toString(response.body().asReader());
        String msg = JSONUtil.parseObject(json, *hyiki*Response.class).map(*hyiki*Response::getMsg).orElse(json);
        // 抛出业务异常
        exception = new BizException(msg);
      } catch (IOException ex) {
        log.error(ex.getMessage(), ex);
      }
      return exception;
    }
    return super.decode(methodKey, response);
  }
}

【签名】在Feign客户端的FeignConfig发起apply,将Signature签名注入到RequestHeader当

/* FeignConfig */
@Override
public void apply(RequestTemplate requestTemplate) {

  String timestamp = String.valueOf(System.currentTimeMillis());
  requestTemplate.header("key", dionysusServiceSign.getKey())
    .header("timestamp", timestamp)
    .header("signature", dionysusServiceSign.getSignature(timestamp));
}

【文件接收】关键点在于将RPC的输出流作为输入流,asInputStream写入响应服务的输出流

  • feign.Response :An immutable response to an http invocation which only returns string content.
  • 只返回String内容的响应会视为feign.Response对象:文件下载
/** String -> feign.Response **/
@Slf4j
public class FeignUtils {

    /**
     * 通过feign下载文件 (feign.Response)
     * An immutable response to an http invocation which only returns string content.
     * 
     *
     * @param response
     * @param feignResponseSupplier
     * @throws IOException
     */
    public static void downloadFile(HttpServletResponse response, ResponseSupplier<Response> feignResponseSupplier) throws Exception {
        downloadFile(response, feignResponseSupplier.request());
    }

    /**
     * 通过feign下载文件
     *
     * @param response
     * @param serviceResponse
     * @throws IOException
     */
    public static void downloadFile(HttpServletResponse response, Response serviceResponse) throws IOException {
        InputStream inputStream = null;
        Response.Body body = serviceResponse.body();
        inputStream = body.asInputStream();
        BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
        response.setHeader("Content-Disposition", serviceResponse.headers().get("Content-Disposition").toString().replace("[", "").replace("]", ""));
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(response.getOutputStream());
        int length = 0;
        byte[] temp = new byte[1024 * 10];
        while ((length = bufferedInputStream.read(temp)) != -1) {
            bufferedOutputStream.write(temp, 0, length);
        }
        bufferedOutputStream.flush();
        bufferedOutputStream.close();
        bufferedInputStream.close();
        inputStream.close();
    }

    /**
     * 响应方法
     * @param
     */
    public interface ResponseSupplier<T> {
        /**
         * 下载文件
         *
         * @return
         * @throws Exception
         */
        T request() throws Exception;
    }
}

【文件上传】遇到的已知问题:the request was rejected because no multipart boundary was found

  • 分析问题:

    • StandardMultipartHttpServletRequest#parseRequest -> contentType: multipart/form-data

    • FileUploadBase#getBoundary -> return null

      public byte[] getBoundary(String contentType) {
        ParameterParser parser = new ParameterParser();
        parser.setLowerCaseNames(true);
        Map<String, String> params = parser.parse(contentType, new char[]{';', ','});
        String boundaryStr = (String)params.get("boundary");
        if (boundaryStr == null) {
          return null;
        } else {
          byte[] boundary = boundaryStr.getBytes(StandardCharsets.ISO_8859_1);
          return boundary;
        }
      }
      
    • FileItemIteratorImpl#init -> throw Exception

      multiPartBoundary = fileUploadBase.getBoundary(contentType);
      if (multiPartBoundary == null) {
        IOUtils.closeQuietly(input); // avoid possible resource leak
        throw new FileUploadException("the request was rejected because no multipart boundary was found");
      }
      
  • 解决问题:

    • @PostMapping注解属性添加consumes类型 & 文件类型前的@RequestPart 注解

      @PostMapping(value = "/upload", consumes = MULTIPART_FORM_DATA_VALUE)
      @*hyiki*ResponseBody
      BaseFeignResponse<String> logisticsRead(@RequestPart("file") MultipartFile file,
                                              @RequestParam("courierCompanyId") Long courierCompanyId,
                                              @RequestParam("templateType") String templateType,
                                              @RequestParam(value = "sheetName", required = false) List<String> sheetName) throws IOException;
      
    • 【注意】值得注意的点是序列化Json作为日志输出时,流相关的属性不能被操作:因此需要作filter处理

      Object[] parameters = pjp.getArgs();
      if (log.isInfoEnabled() && parameters != null) {
        List<Object> params = Arrays.stream(parameters)
          .filter(o -> !(o instanceof InputStreamSource)
                  && !(o instanceof ServletResponse)
                  && !(o instanceof ServletRequest))
          .collect(Collectors.toList());
        log.info("class:{}, method:{}, parameters :{}", className, methodName, JSON.toJSONString(params));
      }
      

【server端服务关闭】server端为保证服务的稳定,当前服务节点从注册中心内摘除应先于Spring容器的关闭

  • 【环境架构】依赖组件:spring-boot + spring-cloud-starter-openfeign + spring-cloud-starter-zookeeper-all

    1. 2.4.1
    2. 3.0.0
      • spring-cloud-openfeign-core:3.0.0
      • spring-cloud-netflix-ribbon:3.0.0
    3. 3.0.0
  • 【背景】服务下线后于容器关闭会导致的问题: 请求进来,由于注册中心中还残留节点(未完全下线),负载均衡会把请求分发给存活的服务节点,但是实际上该节点的容器已经关闭,无法提供正常服务,最终导致这个请求会被拒绝,从而影响了用户的使用体验。

  • 【目标】避免请求会被拒绝,应在容器关闭前将节点从注册中心下线,那么在下线之前容器未完全关闭,服务正常提供,当节点真正下线后,负载均衡不会将请求分发到即将下线的节点时,此时才进行服务容器的关闭,这种做法可以支撑节点优雅(graceful)关闭,从而进行维护操作:代码更新发版、单节点的临时维护或事故。

  • 【做法】

    • 【前提】客户端的参数配置——服务节点列表的缓存问题 值得注意:节点列表缓存间隔ServerListRefreshInterval 换句话说,就是每一个时间间隔,客户端会从注册中心刷新一次服务节点列表;那么在缓存时间内,如果有节点进行下线,ribbon也会通过负载均衡将请求分发到下线节点上,导致出现服务拒绝异常。

      # client
      feign:
        client:
          config:
            default:
              connectTimeout: 10000 #ms
              readTimeout: 60000 #ms
          
      ribbon:
        ServerListRefreshInterval: 1000
        ReadTimeout: 5000
        ConnectTimeout: 2000
        MaxAutoRetries: 0 #同一台实例最大重试次数,不包括首次调用
        MaxAutoRetriesNextServer: 0 #重试负载均衡其他的实例最大重试次数,不包括首次调用
        OkToRetryOnAllOperations: false  #是否所有操作都重试
      
    • 【事件监听】从Spring容器的事件监听入手:ApplicationEvent

      package org.springframework.context;
          
      import java.util.EventObject;
          
      public abstract class ApplicationEvent extends EventObject {
          private static final long serialVersionUID = 7099057708183571937L;
          private final long timestamp = System.currentTimeMillis();
          
          public ApplicationEvent(Object source) {
              super(source);
          }
          
          public final long getTimestamp() {
              return this.timestamp;
          }
      }
      
    • 【监听入口】从抽象类ApplicationEvent中列出实现@Override

    • 【注入容器】定义@Component组件注入用于事件监听的Bean:示例代码

      public class SpringEventListener implements ApplicationListener<E extends ApplicationEvent>, TomcatConnectorCustomizer
      
    • 【选择事件】Spring容器常用的事件监听:

      public abstract class ApplicationContextEvent extends ApplicationEvent
      

      Refreshed 对应 Closed 、Started 对应 Stopped

      • ContextRefreshedEvent初始化或刷新ApplicationContext时发布(例如,通过使用ConfigurableApplicationContext接口上的refresh()方法)。在这里,“已初始化”是指所有Bean都已加载,检测到并激活了后处理器Bean,已预先实例化单例并且可以使用ApplicationContext对象。只要尚未关闭上下文,只要选定的ApplicationContext实际上支持这种“热”刷新,就可以多次触发刷新。例如,XmlWebApplicationContext支持热刷新,但GenericApplicationContext不支持。
      • ContextStartedEvent 使用ConfigurableApplicationContext接口上的start()方法启动ApplicationContext时发布。此处,“已启动”表示所有Lifecycle bean都收到一个明确的启动信号。通常,此信号用于在显式停止后重新启动Bean,但也可以用于启动尚未配置为自动启动的组件(例如,尚未在初始化时启动的组件)。
      • ContextStoppedEvent 通过使用ConfigurableApplicationContext接口上的stop()方法停止ApplicationContext时发布。在这里,“已停止”表示所有Lifecycle bean都收到一个明确的停止信号。停止的上下文可以通过start()调用重新启动。
      • ContextClosedEvent 通过使用ConfigurableApplicationContext接口上的close()方法关闭ApplicationContext时发布。在此,“封闭”表示所有单例Bean都被破坏。封闭的情境到了生命的尽头。无法刷新或重新启动。
    • 【解决方案】容器关闭前从注册中心摘除节点 实际做法:【重点】借助ContextClosedEvent,在Spring容器关闭前摘除注册中心服务节点

      server:
        port: 8080
      #  优雅关闭
        shutdown: graceful
      
      @Configuration
      public class StopConfig {
          
          @Bean
          public SpringClosedListener gracefulShutdown() {
              return new SpringClosedListener();
          }
          
          @Bean
          public ConfigurableServletWebServerFactory webServerFactory(final SpringClosedListener springStopListener) {
              TomcatServletWebServerFactory factory = new TomcatServletWebServerFactory();
              factory.addConnectorCustomizers(springStopListener);
              return factory;
          }
      }
      
      public class SpringStopListener implements ApplicationListener<ContextClosedEvent>, TomcatConnectorCustomizer {
          
          @Autowired
          RedissonClient redisClient;
          
          @Autowired
          private ZookeeperServiceRegistry serviceRegistry;
          
          private static final int TIMEOUT = 30;
          
          /**
           * tomcat才填充,对于本地测试实例存在不启用tomcat
           */
          private volatile Connector connector;
          
          @Override
          public void customize(Connector connector) {
              this.connector = connector;
          }
          
          @Override
          public void onApplicationEvent(ContextClosedEvent event) {
              // Zookeeper 优雅下线服务,先摘除当前服务注册再关闭容器
      //        log.info("ZookeeperServiceRegistry close");
              serviceRegistry.close();
              // 等待5s,防止调用方的缓存导致拒绝访问:等待时间 > 缓存时间
              try {
                  Thread.sleep(5000);
              } catch (InterruptedException e) {
      //            log.error("InterruptedException", e);
              }
          
              Executor executor = null;
              if (Objects.nonNull(this.connector)) {
                  this.connector.pause();
                  executor = this.connector.getProtocolHandler().getExecutor();
              }
          
              // 释放redis锁
      //        RLock lock = redisClient.getLock("");
      //        if(lock.isLocked()) {
      //            lock.unlock();
      //        }
          
                  
              // 优雅关闭线程池
              if (executor instanceof ThreadPoolExecutor) {
                  try {
                      ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                      // 将状态设置为shutdown,不再接收新的请求,正在跑的任务会执行完
                      threadPoolExecutor.shutdown();
                      if (!threadPoolExecutor.awaitTermination(TIMEOUT, TimeUnit.SECONDS)) {
      //                    log.warn("Tomcat thread pool did not shut down gracefully within "
      //                            + TIMEOUT + " seconds. Proceeding with forceful shutdown");
          
                          // 如果在规定的时间30s之内还未处理完成,那么直接关闭
                          threadPoolExecutor.shutdownNow();
                          if (!threadPoolExecutor.awaitTermination(TIMEOUT, TimeUnit.SECONDS)) {
      //                        log.error("Tomcat thread pool did not terminate");
                          }
                      }
                  } catch (InterruptedException ex) {
                      Thread.currentThread().interrupt();
                  }
              }
          
          }
      }
      

【RequestBody参数顺序】feign限制了参数顺序,RequestBody在前

【PathVariable使用方式】 PathVariable的value必须写,且path上不能有格式,如(/{id:\d+})不行,需要(/{id})

/* Feign Client Service */
// @PostMapping("/{id:\\d+}")
@PostMapping("/{id}")
@*hyiki*ResponseBody
BaseFeignResponse<ActionEnum> update(@RequestBody @Valid RuleSaveOrUpdateDto ruleDto, @PathVariable("id") Long id);

【GET请求格式】feign如果不标识@RequestParam注解,即使指定了GET方法,feign依然会以POST方法发送请求

/* Feign Client Service */
@GetMapping("/tracking/refresh")
@*hyiki*ResponseBody
BaseFeignResponse<LogisticsNumberDTO> refreshLogisticsTracking(@RequestParam("logisticsNumber") String logisticsNumber);

【@RequestBody报错】已知POST请求进行对象的传递,需要在传递的对象类前加@RequestBody注解

/* Feign Client Service */
@RequestMapping(value = "/department/updateByExampleSelective",method = RequestMethod.PUT)
int updateByExampleSelective(@RequestBody final Department record, @RequestBody final DepartmentExample example);

报错信息:

Caused by: java.lang.IllegalStateException: Method has too many Body parameters: 

解决方式:

  • 使用@RequestParam代替@RequestBody,在某些地方是能够实现的,具体还得看状况对象

  • 【不推荐】将接收参数定义为Map<String, Object>,而后使用map转object工具,转换成须要的对象。

【json解析】fastxml解析会用xml的解析???FeignConfig配置fastjson解析

@Bean
public Encoder feignEncoder(){
  return new SpringEncoder(feignHttpMessageConverter());
}

@Bean
public Decoder feignDecoder(){
  return new SpringDecoder(feignHttpMessageConverter());
}

/**
     * feign和Springboot使用的都是jackson,可以都修改为fastjson解析方式
     */
private ObjectFactory<HttpMessageConverters> feignHttpMessageConverter() {
  final HttpMessageConverters httpMessageConverters = new HttpMessageConverters(this.getFastJsonConverter());
  return () -> httpMessageConverters;
}

private FastJsonHttpMessageConverter getFastJsonConverter() {
  CustomFastJsonHttpMessageConverter converter = new CustomFastJsonHttpMessageConverter();
  List<MediaType> supportedMediaTypes = new ArrayList<>();
  MediaType mediaTypeJson = MediaType.valueOf(MediaType.APPLICATION_JSON_UTF8_VALUE);
  supportedMediaTypes.add(mediaTypeJson);
  FastJsonConfig config = new FastJsonConfig();
  config.getSerializeConfig().put(JSON.class, new SwaggerJsonSerializer());
  config.setSerializerFeatures(SerializerFeature.DisableCircularReferenceDetect);
  converter.setFastJsonConfig(config);
  converter.setSupportedMediaTypes(supportedMediaTypes);
  return converter;
}

【包路径扫描】Springboot-maven引入其他模块无法扫描到spring bean的问题(遇到过新建rpc模块无法扫描@FeignClient)

  • 解决方法:将A模块和B模块的Application置于相同路径下,例如com.xx下
  • 启动模块的包路径在com.hyiki.hades.***,新建模块的代码也需要放在这个路径下

【@SpringQueryMap使用】rpc 请求过程中使用DTO作为Get请求的参数需要在RPC Client类中,打上@SpringQueryMap注解

/**
 * 单件成本变化记录弹窗
 */
@GetMapping("listSkuCostLog")
PageResponse<SkuCostLogListVO> listSkuCostLog(@SpringQueryMap SkuCostLogQuery query);
  • SpringQueryMap没有生效?

    • 配置请求端的Fegin,加强SpringQueryMap解析

      @Configuration
      public class FeignConfiguration {
       @Bean
       public Feign.Builder feignBuilder() {
           return Feign.builder()
                   .queryMapEncoder(new BeanQueryMapEncoder())
                   .retryer(Retryer.NEVER_RETRY);
       }
      }
      
    • 使用Fegin加载该配置

      @FeignClient(value = "test", configuration = FeignClientConfiguration.class)
      
    • 附带项目的FeignConfiguration

      @Slf4j
      public class FeignClientConfiguration extends AbstractFeignClientConfiguration {
          
       @Value("xxx")
       private String sign;
          
       @Override
       public String getSignature() {
        return sign;
       }
          
       @Bean
       Logger.Level feignLogger() {
        return Logger.Level.FULL;
       }
          
       @Bean
       public ErrorDecoder errorDecoder() {
        return new RpcCommonConfig.ErrorDecoder();
       }
          
       @Bean
       public Feign.Builder feignBuilder() {
        return Feign.builder().queryMapEncoder(new BeanQueryMapEncoder());
       }
      }
      
    • context设置对象头 & 签名等等

      @Data
      public abstract class AbstractFeignClientConfiguration implements RequestInterceptor {
          
      	@Override
      	public void apply(RequestTemplate template) {
      		String pathMd5 = DigestUtils.md5DigestAsHex(Base64.getEncoder().encode(getURL(template).getBytes()));
      		template.header("sign", getSignature())
      				.header(URL_NAME, pathMd5);
      	}
          
          
      	public String getURL(RequestTemplate template) {
      		// URL拼接规则
      	}
          
      	public abstract String getSignature();
          
      }
      

【@RequestParam使用】Spring MVC 传递值的细节处理:Get请求,非基础类型的入参必须要打上@RequestParam 注解,这个注解作用是强制匹配入参

   /**
   * skuId -> 单件成本
   */
   @GetMapping("cargoCostMap")
   @ResponseBody
   public Map<Long, BigDecimal> listCargoCostMapInTheSameWarehouse(@NotNull(message = "仓库必传") long warehouseId,
   @NotNull(message = "skuId必传") @RequestParam List<Long> skuIds) {
    return skuCostQueryApplicationService.listCargoCostMapInTheSameWarehouse(warehouseId, skuIds);
   }

【优化HTTP客户端】优化Feign默认的HTTP客户端,支持连接池

  1. Feign底层的客户端实现 Feign 发送 HTTP 请求时,底层会使用到别的客户端。下面列出常用的 3 种 HTTP 客户端。

HTTP客户端 特点 URLConnection Feign 的默认实现,不支持连接池 Apache HttpClient 支持连接池 OKHttp 支持连接池 其中,URLConnection 是 Feign 默认使用的 HTTP 客户端,是 JDK 自带的,但是性能不太好,而且不支持连接池。连接池可以减小 HTTP 连接创建和销毁所带来的性能损耗,因为 HTTP 每次创建连接时都要 3 次握手,断开连接时都要 4 次挥手,还是十分消耗性能的。

  1. Feign性能优化思路 使用支持连接池的 HTTP 客户端代替默认的 URLConnection 。 日志级别设置为 BASIC 或 NONE 。

【集成 openfeign 报错】Consider defining a bean of type ‘org.springframework.cloud.openfeign.FeignContext’ in your configuration

ApplicationStarter启动类加上 @EnableFeignClients & @ImportAutoConfiguration({FeignAutoConfiguration.class})注解即可解决


上一篇 异地多活

Comments

Content