Spring Cloud Eureka:【Spring Cloud学习】-2.Spring Cloud Eureka 服务注册中心
1.EurekaServer 如何启动?
在使用 @EnableEurekaServer 时,激活了 EurekaServerMarkerConfiguration 配置类,在 EurekaServer 的自动装配类 EurekaServerAutoConfiguration 中,通过构造函数声明了 EurekaController
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
这个 EurekaController 实际上就是访问 http://localhost:9090/ 时对应的 eureka 管理后台,它使用spring-mvc 来进行实现。
@Controller
@RequestMapping("${eureka.dashboard.path:/}")
public class EurekaController {
@Value("${eureka.dashboard.path:/}")
private String dashboardPath = "";
private ApplicationInfoManager applicationInfoManager;
public EurekaController(ApplicationInfoManager applicationInfoManager) {
this.applicationInfoManager = applicationInfoManager;
}
@RequestMapping(method = RequestMethod.GET)
public String status(HttpServletRequest request, Map<String, Object> model) {
populateBase(request, model);
populateApps(model);
StatusInfo statusInfo;
try {
statusInfo = new StatusResource().getStatusInfo();
}
catch (Exception e) {
statusInfo = StatusInfo.Builder.newBuilder().isHealthy(false).build();
}
model.put("statusInfo", statusInfo);
populateInstanceInfo(model, statusInfo);
filterReplicas(model, statusInfo);
return "eureka/status";
}
}
可以看到 spring-cloud-starter-netflix-eureka-server 的依赖关系如下
2.EurekaClient如何注册?
服务启动的时候,会在刷新上下文的时候启动 Lifecycle,EurekaAutoServiceRegistration 是 Lifecycle 的一个实现类,所以会调用它的 start 方法,在 start 方法中通过调用 serviceRegistry.register 方法来进行注册。
这里的 serviceRegistry 是 EurekaServiceRegistry,EurekaServiceRegistry 实现了 ServiceRegistry 接口,ServiceRegistry 在 spring-cloud-common 中进行定义,它是一个通用的接口,根据实现方案的不同,它还可以是 ConsulServiceRegistry、NacosServiceRegistry、ZookeeperServiceRegistry 等。
EurekaServiceRegistry#register
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application "
+ reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
// 设置实例状态
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
// 健康检查
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
在 setInstanceStatus 中调用 listener.notify 进行通知
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
这里的 listeners 通过 registerStatusChangeListener 方法进行注册
public void registerStatusChangeListener(StatusChangeListener listener) {
listeners.put(listener.getId(), listener);
}
这个方法在 DiscoveryClient 中进行调用
private ApplicationInfoManager.StatusChangeListener statusChangeListener;
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
这里的 listener 为 ApplicationInfoManager.StatusChangeListener,所以调用到它的 notify 方法;然后调用到
instanceInfoReplicator.onDemandUpdate()
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
// 开启一个任务
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
最终调用到 InstanceInfoReplicator.this.run(),通过 discoveryClient.register 进行注册
public void run() {
try {
// 刷新实例信息
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
这里使用 jersey 进行了 http 调用,发送 post 请求
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
// 请求路径为 apps/EUREKA-CLIENT-PROVIDER 或 apps/EUREKA-CLIENT-CONSUMER
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
// Post 请求,媒体类型是 json
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
3.EurekaServer处理注册请求
client 端发送请求后到达 ApplicationResource#addInstance
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
// 参数校验
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
// 注册
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
这里的 registry 是 PeerAwareInstanceRegistry,它的类图如下
最终调用的 register 方法在 AbstractInstanceRegistry 中
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// 从registry中获取当前app的实例信息map
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 增加注册次数
REGISTER.increment(isReplication);
// 如果是第一次注册,初始化一个ConcurrentHashMap
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 从gMap中获取存在的Lease信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 构建一个Lease
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// 得到Lease实例,判断状态是否为UP
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 设置注册类型为新增
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 缓存过期
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
在 register 完成之后,有一个 replicateToPeers() 方法,它用来实现集群节点之间信息复制
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 获取到所有的node
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 复制实例信息到每个node
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
replicateInstanceActionsToPeers 实现如下
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
// 判断操作类型
switch (action) {
case Cancel:
// 取消注册
node.cancel(appName, id);
break;
case Heartbeat:
// 心跳
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
// 注册
node.register(info);
break;
case StatusUpdate:
// 状态变更
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
// 删除被重写的实例
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}