【源码分析-Spring Boot】-9.Spring Boot Admin 请求和通信原理


Spring Boot Admin:【从零开始学Spring Boot】-9.Spring Boot Admin

1.admin server 的请求原理

Spring Boot Admin 组件中,通过 @EnableAdminServer 标识一个 admin server 组件,并启动。EnableAdminServer 的内容如下.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AdminServerMarkerConfiguration.class)
public @interface EnableAdminServer {

}

EnableAdminServer 通过 @Import 导入一个 AdminServerMarkerConfiguration 配置类。

@Configuration(proxyBeanMethods = false)
public class AdminServerMarkerConfiguration {

   @Bean
   public Marker adminServerMarker() {
      return new Marker();
   }

   public static class Marker {

   }
}

AdminServerMarkerConfiguration 中定义了一个静态内部类 Marker,并将它声明为 Bean。那么这个 admin server 又是如何生效的呢?实际上,admin server 有一个自动装配类 AdminServerAutoConfiguration,这个类的内容如下。

@Configuration(proxyBeanMethods = false)
@ConditionalOnBean(AdminServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties(AdminServerProperties.class)
@ImportAutoConfiguration({ AdminServerInstanceWebClientConfiguration.class, AdminServerWebConfiguration.class })
@AutoConfigureAfter({ WebClientAutoConfiguration.class })
public class AdminServerAutoConfiguration {

   private final AdminServerProperties adminServerProperties;

   public AdminServerAutoConfiguration(AdminServerProperties adminServerProperties) {
      this.adminServerProperties = adminServerProperties;
   }

   @Bean
   @ConditionalOnMissingBean
   public InstanceRegistry instanceRegistry(InstanceRepository instanceRepository,
         InstanceIdGenerator instanceIdGenerator) {
      return new InstanceRegistry(instanceRepository, instanceIdGenerator);
   }

   @Bean
   @ConditionalOnMissingBean
   public ApplicationRegistry applicationRegistry(InstanceRegistry instanceRegistry,
         InstanceEventPublisher instanceEventPublisher) {
      return new ApplicationRegistry(instanceRegistry, instanceEventPublisher);
   }

   @Bean
   @ConditionalOnMissingBean
   public InstanceIdGenerator instanceIdGenerator() {
      return new HashingInstanceUrlIdGenerator();
   }

   @Bean
   @ConditionalOnMissingBean
   public StatusUpdater statusUpdater(InstanceRepository instanceRepository,
         InstanceWebClient.Builder instanceWebClientBulder) {
      return new StatusUpdater(instanceRepository, instanceWebClientBulder.build());
   }

   @Bean(initMethod = "start", destroyMethod = "stop")
   @ConditionalOnMissingBean
   public StatusUpdateTrigger statusUpdateTrigger(StatusUpdater statusUpdater, Publisher<InstanceEvent> events) {
      StatusUpdateTrigger trigger = new StatusUpdateTrigger(statusUpdater, events);
      trigger.setInterval(this.adminServerProperties.getMonitor().getStatusInterval());
      trigger.setLifetime(this.adminServerProperties.getMonitor().getStatusLifetime());
      return trigger;
   }

   @Bean
   @ConditionalOnMissingBean
   public EndpointDetector endpointDetector(InstanceRepository instanceRepository,
         InstanceWebClient.Builder instanceWebClientBuilder) {
      InstanceWebClient instanceWebClient = instanceWebClientBuilder.build();
      ChainingStrategy strategy = new ChainingStrategy(new QueryIndexEndpointStrategy(instanceWebClient),
            new ProbeEndpointsStrategy(instanceWebClient, this.adminServerProperties.getProbedEndpoints()));
      return new EndpointDetector(instanceRepository, strategy);
   }

   @Bean(initMethod = "start", destroyMethod = "stop")
   @ConditionalOnMissingBean
   public EndpointDetectionTrigger endpointDetectionTrigger(EndpointDetector endpointDetector,
         Publisher<InstanceEvent> events) {
      return new EndpointDetectionTrigger(endpointDetector, events);
   }

   @Bean
   @ConditionalOnMissingBean
   public InfoUpdater infoUpdater(InstanceRepository instanceRepository,
         InstanceWebClient.Builder instanceWebClientBuilder) {
      return new InfoUpdater(instanceRepository, instanceWebClientBuilder.build());
   }

   @Bean(initMethod = "start", destroyMethod = "stop")
   @ConditionalOnMissingBean
   public InfoUpdateTrigger infoUpdateTrigger(InfoUpdater infoUpdater, Publisher<InstanceEvent> events) {
      InfoUpdateTrigger trigger = new InfoUpdateTrigger(infoUpdater, events);
      trigger.setInterval(this.adminServerProperties.getMonitor().getInfoInterval());
      trigger.setLifetime(this.adminServerProperties.getMonitor().getInfoLifetime());
      return trigger;
   }

   @Bean
   @ConditionalOnMissingBean(InstanceEventStore.class)
   public InMemoryEventStore eventStore() {
      return new InMemoryEventStore();
   }

   @Bean(initMethod = "start", destroyMethod = "stop")
   @ConditionalOnMissingBean(InstanceRepository.class)
   public SnapshottingInstanceRepository instanceRepository(InstanceEventStore eventStore) {
      return new SnapshottingInstanceRepository(eventStore);
   }
}

AdminServerAutoConfiguration 通过 @ImportAutoConfiguration 导入了 AdminServerWebConfiguration 和

AdminServerInstanceWebClientConfiguration。查看 AdminServerWebConfiguration 内容

@Configuration(proxyBeanMethods = false)
public class AdminServerWebConfiguration {

    private final AdminServerProperties adminServerProperties;

    public AdminServerWebConfiguration(AdminServerProperties adminServerProperties) {
        this.adminServerProperties = adminServerProperties;
    }

    @Bean
    public SimpleModule adminJacksonModule() {
        SimpleModule module = new SimpleModule();
        module.addDeserializer(Registration.class, new RegistrationDeserializer());
        module.setSerializerModifier(new RegistrationBeanSerializerModifier(
            new SanitizingMapSerializer(this.adminServerProperties.getMetadataKeysToSanitize())));
        return module;
    }

    @Bean
    @ConditionalOnMissingBean
    public InstancesController instancesController(InstanceRegistry instanceRegistry, InstanceEventStore eventStore) {
        return new InstancesController(instanceRegistry, eventStore);
    }

    @Bean
    @ConditionalOnMissingBean
    public ApplicationsController applicationsController(ApplicationRegistry applicationRegistry) {
        return new ApplicationsController(applicationRegistry);
    }

    // reactive web configuration ...

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
    @AutoConfigureAfter(WebMvcAutoConfiguration.class)
    public static class ServletRestApiConfirguation {

        private final AdminServerProperties adminServerProperties;

        public ServletRestApiConfirguation(AdminServerProperties adminServerProperties) {
            this.adminServerProperties = adminServerProperties;
        }

        @Bean
        @ConditionalOnMissingBean
        public de.codecentric.boot.admin.server.web.servlet.InstancesProxyController instancesProxyController(
            InstanceRegistry instanceRegistry, InstanceWebClient.Builder instanceWebClientBuilder) {
            return new de.codecentric.boot.admin.server.web.servlet.InstancesProxyController(
                this.adminServerProperties.getContextPath(),
                this.adminServerProperties.getInstanceProxy().getIgnoredHeaders(), instanceRegistry,
                instanceWebClientBuilder.build());
        }

        @Bean
        public org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping adminHandlerMapping(
            ContentNegotiationManager contentNegotiationManager) {
            org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping mapping = new de.codecentric.boot.admin.server.web.servlet.AdminControllerHandlerMapping(
                this.adminServerProperties.getContextPath());
            mapping.setOrder(0);
            mapping.setContentNegotiationManager(contentNegotiationManager);
            return mapping;
        }
    }
}

默认 servlet 情况下,会使用 ServletRestApiConfirguation 配置,生成 web mvc 的服务。在 ServletRestApiConfirguation 声明了 AdminControllerHandlerMapping 的 RequestMappingHandlerMapping 实例对象。

这个 AdminControllerHandlerMapping 用来处理被 @AdminController 修饰的类

@Override
protected boolean isHandler(Class<?> beanType) {
    return AnnotatedElementUtils.hasAnnotation(beanType, AdminController.class);
}

spring-boot-admin-server.jar 中提供了一些被 @AdminController 修饰的类,比如:ApplicationsController、NotificationFilterController、InstancesController、InstancesProxyController 等。ApplicationsController 内容如下。

@AdminController
@ResponseBody
public class ApplicationsController {

    private static final Logger log = LoggerFactory.getLogger(ApplicationsController.class);

    private static final ServerSentEvent<?> PING = ServerSentEvent.builder().comment("ping").build();

    private static final Flux<ServerSentEvent<?>> PING_FLUX = Flux.interval(Duration.ZERO, Duration.ofSeconds(10L))
        .map((tick) -> PING);

    private final ApplicationRegistry registry;

    public ApplicationsController(ApplicationRegistry registry) {
        this.registry = registry;
    }

    @GetMapping(path = "/applications", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<Application> applications() {
        return registry.getApplications();
    }

    @GetMapping(path = "/applications/{name}", produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<ResponseEntity<Application>> application(@PathVariable("name") String name) {
        return registry.getApplication(name).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @GetMapping(path = "/applications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Application>> applicationsStream() {
        return registry.getApplicationStream().map((application) -> ServerSentEvent.builder(application).build())
            .mergeWith(ping());
    }

    @DeleteMapping(path = "/applications/{name}")
    public Mono<ResponseEntity<Void>> unregister(@PathVariable("name") String name) {
        log.debug("Unregister application with name '{}'", name);
        return registry.deregister(name).collectList().map((deregistered) -> !deregistered.isEmpty()
                                                           ? ResponseEntity.noContent().build() : ResponseEntity.notFound().build());
    }

    @SuppressWarnings("unchecked")
    private static <T> Flux<ServerSentEvent<T>> ping() {
        return (Flux<ServerSentEvent<T>>) (Flux) PING_FLUX;
    }
}

其中 applications() 方法刚好处理 http://localhost:8080/ 跳转后的 http://localhost:8080/applications 请求。

2.server 和 client 之间通信

在 spring-boot-admin-server 中有自动装配类 AdminServerAutoConfiguration,相应地,在 spring-boot-admin-client 也有一个自动装配类。它是 SpringBootAdminClientAutoConfiguration。

这个 SpringBootAdminClientAutoConfiguration 声明了如下主要类型

@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication
@Conditional(SpringBootAdminClientEnabledCondition.class)
@AutoConfigureAfter({ WebEndpointAutoConfiguration.class, RestTemplateAutoConfiguration.class, WebClientAutoConfiguration.class })
@EnableConfigurationProperties({ ClientProperties.class, InstanceProperties.class, ServerProperties.class, ManagementServerProperties.class })
public class SpringBootAdminClientAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public ApplicationRegistrator registrator(RegistrationClient registrationClient, ClientProperties client, ApplicationFactory applicationFactory) {

        return new ApplicationRegistrator(applicationFactory, registrationClient, client.getAdminUrl(), client.isRegisterOnce());
    }

    @Bean
    @ConditionalOnMissingBean
    public RegistrationApplicationListener registrationListener(ClientProperties client,
                                                                ApplicationRegistrator registrator, Environment environment) {
        RegistrationApplicationListener listener = new RegistrationApplicationListener(registrator);
        listener.setAutoRegister(client.isAutoRegistration());
        listener.setAutoDeregister(client.isAutoDeregistration(environment));
        listener.setRegisterPeriod(client.getPeriod());
        return listener;
    }

    @Bean
    @ConditionalOnMissingBean
    public StartupDateMetadataContributor startupDateMetadataContributor() {
        return new StartupDateMetadataContributor();
    }

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnWebApplication(type = Type.SERVLET)
    @AutoConfigureAfter(DispatcherServletAutoConfiguration.class)
    public static class ServletConfiguration {

        @Bean
        @ConditionalOnMissingBean
        public ApplicationFactory applicationFactory(InstanceProperties instance, ManagementServerProperties management, ServerProperties server, ServletContext servletContext, PathMappedEndpoints pathMappedEndpoints, WebEndpointProperties webEndpoint, ObjectProvider<List<MetadataContributor>> metadataContributors, DispatcherServletPath dispatcherServletPath) {
            return new ServletApplicationFactory(instance, management, server, servletContext, pathMappedEndpoints, webEndpoint, new CompositeMetadataContributor(
                metadataContributors.getIfAvailable(Collections::emptyList)), dispatcherServletPath);
        }

    }

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnBean(RestTemplateBuilder.class)
    public static class BlockingRegistrationClientConfig {

        @Bean
        @ConditionalOnMissingBean
        public BlockingRegistrationClient registrationClient(ClientProperties client) {
            RestTemplateBuilder builder = new RestTemplateBuilder().setConnectTimeout(client.getConnectTimeout())
                .setReadTimeout(client.getReadTimeout());
            if (client.getUsername() != null && client.getPassword() != null) {
                builder = builder.basicAuthentication(client.getUsername(), client.getPassword());
            }
            return new BlockingRegistrationClient(builder.build());
        }

    }
    // ...
}

ApplicationRegistrator 用来注册到 admin-server,其中定义了 register 和 deregister 方法

/**
 * Registers the client application at spring-boot-admin-server
 */
public class ApplicationRegistrator {

   private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationRegistrator.class);

   private final ConcurrentHashMap<String, LongAdder> attempts = new ConcurrentHashMap<>();

   private final AtomicReference<String> registeredId = new AtomicReference<>();

   private final ApplicationFactory applicationFactory;

   private final String[] adminUrls;

   private final boolean registerOnce;

   private final RegistrationClient registrationClient;

   public ApplicationRegistrator(ApplicationFactory applicationFactory, RegistrationClient registrationClient,
         String[] adminUrls, boolean registerOnce) {
      this.applicationFactory = applicationFactory;
      this.adminUrls = adminUrls;
      this.registerOnce = registerOnce;
      this.registrationClient = registrationClient;
   }

   /**
    * Registers the client application at spring-boot-admin-server.
    * @return true if successful registration on at least one admin server
    */
   public boolean register() {
      Application application = this.applicationFactory.createApplication();
      boolean isRegistrationSuccessful = false;

      for (String adminUrl : this.adminUrls) {
         LongAdder attempt = this.attempts.computeIfAbsent(adminUrl, (k) -> new LongAdder());
         boolean successful = register(application, adminUrl, attempt.intValue() == 0);

         if (!successful) {
            attempt.increment();
         }
         else {
            attempt.reset();
            isRegistrationSuccessful = true;
            if (this.registerOnce) {
               break;
            }
         }
      }

      return isRegistrationSuccessful;
   }

   protected boolean register(Application application, String adminUrl, boolean firstAttempt) {
      try {
         String id = this.registrationClient.register(adminUrl, application);
         if (this.registeredId.compareAndSet(null, id)) {
            LOGGER.info("Application registered itself as {}", id);
         }
         else {
            LOGGER.debug("Application refreshed itself as {}", id);
         }
         return true;
      }
      catch (Exception ex) {
         if (firstAttempt) {
            LOGGER.warn(
                  "Failed to register application as {} at spring-boot-admin ({}): {}. Further attempts are logged on DEBUG level",
                  application, this.adminUrls, ex.getMessage());
         }
         else {
            LOGGER.debug("Failed to register application as {} at spring-boot-admin ({}): {}", application,
                  this.adminUrls, ex.getMessage());
         }
         return false;
      }
   }

   public void deregister() {
      String id = this.registeredId.get();
      if (id == null) {
         return;
      }

      for (String adminUrl : this.adminUrls) {
         try {
            this.registrationClient.deregister(adminUrl, id);
            this.registeredId.compareAndSet(id, null);
            if (this.registerOnce) {
               break;
            }
         }
         catch (Exception ex) {
            LOGGER.warn("Failed to deregister application (id={}) at spring-boot-admin ({}): {}", id, adminUrl,
                  ex.getMessage());
         }
      }
   }

   /**
    * @return the id of this client as given by the admin server. Returns null if the
    * client has not registered against the admin server yet.
    */
   public String getRegisteredId() {
      return this.registeredId.get();
   }
}

RegistrationApplicationListener 则是用来触发开始注册和停止注册任务,它实现了 InitializingBean 和 DisposableBean 接口,分别定定义了初始化完成后和销毁前的处理方式。

private final ThreadPoolTaskScheduler taskScheduler;

public RegistrationApplicationListener(ApplicationRegistrator registrator) {
    this(registrator, registrationTaskScheduler());
}

// 声明线程池
private static ThreadPoolTaskScheduler registrationTaskScheduler() {
    ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
    taskScheduler.setPoolSize(1);
    taskScheduler.setRemoveOnCancelPolicy(true);
    taskScheduler.setThreadNamePrefix("registrationTask");
    return taskScheduler;
}

RegistrationApplicationListener(ApplicationRegistrator registrator, ThreadPoolTaskScheduler taskScheduler) {
    this.registrator = registrator;
    this.taskScheduler = taskScheduler;
}

// 实现 InitializingBean 中 afterPropertiesSet 方法
@Override
public void afterPropertiesSet() {
    taskScheduler.afterPropertiesSet();
}

// 实现 DisposableBean 中 destroy 方法
@Override
public void destroy() {
    taskScheduler.destroy();
}

同时,它还接收 ApplicationReadyEvent 事件和 ContextClosedEvent 事件

// 接收 ApplicationReadyEvent 事件
@EventListener
@Order(Ordered.LOWEST_PRECEDENCE)
public void onApplicationReady(ApplicationReadyEvent event) {
    if (autoRegister) {
        // 启动开始注册任务
        startRegisterTask();
    }
}

// 接收 ContextClosedEvent 事件
@EventListener
@Order(Ordered.LOWEST_PRECEDENCE)
public void onClosedContext(ContextClosedEvent event) {
    if (event.getApplicationContext().getParent() == null
        || "bootstrap".equals(event.getApplicationContext().getParent().getId())) {
        // 停止注册任务
        stopRegisterTask();

        if (autoDeregister) {
            // 删除注册内容
            registrator.deregister();
        }
    }
}

// 启动注册任务
public void startRegisterTask() {
    if (scheduledTask != null && !scheduledTask.isDone()) {
        return;
    }

    // 开启定时任务,默认执行周期 10s,定时任务调用 registrator::register 方法
    scheduledTask = taskScheduler.scheduleAtFixedRate(registrator::register, registerPeriod);
    LOGGER.debug("Scheduled registration task for every {}ms", registerPeriod);
}

// 停止注册任务
public void stopRegisterTask() {
    if (scheduledTask != null && !scheduledTask.isDone()) {
        // 取消任务
        scheduledTask.cancel(true);
        LOGGER.debug("Canceled registration task");
    }
}

其中 registrator.register 实现如下

public boolean register() {
    Application application = this.applicationFactory.createApplication();
    boolean isRegistrationSuccessful = false;

    for (String adminUrl : this.adminUrls) {
        LongAdder attempt = this.attempts.computeIfAbsent(adminUrl, (k) -> new LongAdder());
        // 注册
        boolean successful = register(application, adminUrl, attempt.intValue() == 0);

        if (!successful) {
            attempt.increment();
        }
        else {
            attempt.reset();
            isRegistrationSuccessful = true;
            if (this.registerOnce) {
                break;
            }
        }
    }

    return isRegistrationSuccessful;
}

protected boolean register(Application application, String adminUrl, boolean firstAttempt) {
    try {
        // 调用 registrationClient 的 register 方法
        String id = this.registrationClient.register(adminUrl, application);
        if (this.registeredId.compareAndSet(null, id)) {
            LOGGER.info("Application registered itself as {}", id);
        }
        else {
            LOGGER.debug("Application refreshed itself as {}", id);
        }
        return true;
    }
    catch (Exception ex) {
        if (firstAttempt) {
            LOGGER.warn(
                "Failed to register application as {} at spring-boot-admin ({}): {}. Further attempts are logged on DEBUG level",
                application, this.adminUrls, ex.getMessage());
        }
        else {
            LOGGER.debug("Failed to register application as {} at spring-boot-admin ({}): {}", application,
                         this.adminUrls, ex.getMessage());
        }
        return false;
    }
}

最终调用 registrationClient.register 方法,它的实现如下

public class BlockingRegistrationClient implements RegistrationClient {

    private static final ParameterizedTypeReference<Map<String, Object>> RESPONSE_TYPE = new ParameterizedTypeReference<Map<String, Object>>() {
    };

    private final RestTemplate restTemplate;

    public BlockingRegistrationClient(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }

    // 向指定的 adminUrl 方式 Post 请求
    @Override
    public String register(String adminUrl, Application application) {
        ResponseEntity<Map<String, Object>> response = this.restTemplate.exchange(adminUrl, HttpMethod.POST, new HttpEntity<>(application, this.createRequestHeaders()), RESPONSE_TYPE);
        return response.getBody().get("id").toString();
    }

    protected HttpHeaders createRequestHeaders() {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
        return HttpHeaders.readOnlyHttpHeaders(headers);
    }
}

registrator.deregister

public void deregister() {
    String id = this.registeredId.get();
    if (id == null) {
        return;
    }

    for (String adminUrl : this.adminUrls) {
        try {
            // 调用 registrationClient 的 deregister 方法
            this.registrationClient.deregister(adminUrl, id);
            this.registeredId.compareAndSet(id, null);
            if (this.registerOnce) {
                break;
            }
        }
        catch (Exception ex) {
            LOGGER.warn("Failed to deregister application (id={}) at spring-boot-admin ({}): {}", id, adminUrl, ex.getMessage());
        }
    }
}

最终调用 registrationClient.deregister 方法,它的实现如下

@Override
public void deregister(String adminUrl, String id) {
    // 调用 Delete 方法
    this.restTemplate.delete(adminUrl + '/' + id);
}

文章作者: Soulballad
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Soulballad !
评论
 上一篇
【从零开始学Spring Boot】-10.Spring Boot Jpa操作数据库 【从零开始学Spring Boot】-10.Spring Boot Jpa操作数据库
1.简介1.1 概述 The Java Persistence API is a standard technology that lets you “map” objects to relational databases. The sp
下一篇 
【源码分析-Spring Boot】-8.Spring Boot WebEndpoint 请求原理 【源码分析-Spring Boot】-8.Spring Boot WebEndpoint 请求原理
Spring Boot Actuator监控:【从零开始学Spring Boot】-8.Spring Boot Actuator监控 1.web endpoint 请求原理spring boot actuator 中有很多 endpoi
2020-07-20
  目录