由于不同版本的springcloub源码可能会有一下差异,以下源代码都取自:springcloub的Greenwich.SR1版本
如果对于eureka基础知识还不是很了解可以看这篇文章:服务治理(EUREKA)
?
服务注册流程分析
首先先看`@EnableDiscoveryClient`这个注解,发现这个注解中并没有实现的代码,但是在这个注解中引用了一个`EnableDiscoveryClientImportSelector.class`这个类,那么我们就去这个类中看看都有些什么东西。
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*??????http://www.51sjk.com/Upload/Articles/1/0/257/257550_20210630001616291.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.client.discovery;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
????/**
?????* If true, the ServiceRegistry will automatically register the local server.
?????* @return - {@code true} if you want to automatically register.
?????*/
????boolean autoRegister() default true;
}
?
我们进入`EnableDiscoveryClientImportSelector.class`这个类,在这个类的selectImports方法中会注入一个新的配置类org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*??????http://www.51sjk.com/Upload/Articles/1/0/257/257550_20210630001616291.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.client.discovery;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import org.springframework.cloud.commons.util.SpringFactoryImportSelector;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.type.AnnotationMetadata;
/**
* @author Spencer Gibb
*/
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
????????extends SpringFactoryImportSelector<EnableDiscoveryClient> {
????@Override
????public String[] selectImports(AnnotationMetadata metadata) {
????????String[] imports = super.selectImports(metadata);
????????AnnotationAttributes attributes = AnnotationAttributes.fromMap(
????????????????metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
????????boolean autoRegister = attributes.getBoolean("autoRegister");
????????if (autoRegister) {
????????????List<String> importsList = new ArrayList<>(Arrays.asList(imports));
????????????importsList.add(
????????????????????"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
????????????imports = importsList.toArray(new String[0]);
????????}
????????else {
????????????Environment env = getEnvironment();
????????????if (ConfigurableEnvironment.class.isInstance(env)) {
????????????????ConfigurableEnvironment configEnv = (ConfigurableEnvironment) env;
????????????????LinkedHashMap<String, Object> map = new LinkedHashMap<>();
????????????????map.put("spring.cloud.service-registry.auto-registration.enabled", false);
????????????????MapPropertySource propertySource = new MapPropertySource(
????????????????????????"springCloudDiscoveryClient", map);
????????????????configEnv.getPropertySources().addLast(propertySource);
????????????}
????????}
????????return imports;
????}
????@Override
????protected boolean isEnabled() {
????????return getEnvironment().getProperty("spring.cloud.discovery.enabled",
????????????????Boolean.class, Boolean.TRUE);
????}
????@Override
????protected boolean hasDefaultFactory() {
????????return true;
????}
}
?
那我们就来看一下这个config中都有哪些内容,在这个类中会自动装配AutoServiceRegistrationProperties.class这个类.
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*??????http://www.51sjk.com/Upload/Articles/1/0/257/257550_20210630001616291.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.client.serviceregistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author Spencer Gibb
*/
@Configuration
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public class AutoServiceRegistrationConfiguration {
}
而在这个类中最终会将EurekaAutoServiceRegistration类注入到ApplicationContext中,这样在springboot启动的时候就会调用EurekaAutoServiceRegistration中的satrt方法。
@Override
????public void start() {
????????// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
????????if (this.port.get() != 0) {
????????????if (this.registration.getNonSecurePort() == 0) {
????????????????this.registration.setNonSecurePort(this.port.get());
????????????}
????????????if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
????????????????this.registration.setSecurePort(this.port.get());
????????????}
????????}
????????// only initialize if nonSecurePort is greater than 0 and it isn't already running
????????// because of containerPortInitializer below
????????if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
????????????this.serviceRegistry.register(this.registration);
????????????this.context.publishEvent(new InstanceRegisteredEvent<>(this,
????????????????????this.registration.getInstanceConfig()));
????????????this.running.set(true);
????????}
????}
?
而在这个方法中会往上线文中注入一个InstanceRegisteredEvent事件,最会就会调用到DiscoveryClient中的register方法,而这个可以说是真正开始进行服务注册
boolean register() throws Throwable {
????logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
????EurekaHttpResponse<Void> httpResponse;
????try {
????????httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
????} catch (Exception e) {
????????logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
????????throw e;
????}
????if (logger.isInfoEnabled()) {
????????logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
????}
????return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
?
这里要提一下,Eureka的服务注册使用的装饰者和代理模式,所有的实现类都实现了EurekaHttpClientDecorator类,大家可以看一下这个类中的register方法,在这个方法中会执行execute方法,而这个方法在EurekaHttpClientDecorator中是一个抽象类,在每一个实现类中都有自己的一套实现方案。
可能大家被绕晕了,这里我用一个图来讲一下,DiscoveryClient是要调用register方法的,register方法需要调用里面的execute方法,在EurekaHttpClientDecorator中execute方法是一个抽象方法,所以需要去实现类中调用execute方法,调用完后在返回register方法中,依次调用。
?
?
?
@Override
public EurekaHttpResponse<Void> register(final InstanceInfo info) {
????return execute(new RequestExecutor<Void>() {
????????@Override
????????public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
????????????return delegate.register(info);
????????}
????????@Override
????????public RequestType getRequestType() {
????????????return RequestType.Register;
????????}
????});
}
?
EurekaHttpClientDecorator实现类总共分为4层,第一层为SessionedEurekaHttpClient,这层主要是获取需要带有session的eureka客户端信息,也是防止一个eureka客户端一直连接一台服务器,为高可用做铺垫
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
????long now = System.currentTimeMillis();
????long delay = now - lastReconnectTimeStamp;
????if (delay >= currentSessionDurationMs) {
????????logger.debug("Ending a session and starting anew");
????????lastReconnectTimeStamp = now;
????????currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
????????TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
????}
????EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
????if (eurekaHttpClient == null) {
????????eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
????}
????return requestExecutor.execute(eurekaHttpClient);
}
?
第二层为RetryableEurekaHttpClient,这层主要是用来在服务端的列表中获取一个可用的服务端信息
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
????List<EurekaEndpoint> candidateHosts = null;
????int endpointIdx = 0;
????for (int retry = 0; retry < numberOfRetries; retry++) {
????????EurekaHttpClient currentHttpClient = delegate.get();
????????EurekaEndpoint currentEndpoint = null;
????????if (currentHttpClient == null) {
????????????if (candidateHosts == null) {
????????????????candidateHosts = getHostCandidates();
????????????????if (candidateHosts.isEmpty()) {
????????????????????throw new TransportException("There is no known eureka server; cluster server list is empty");
????????????????}
????????????}
????????????if (endpointIdx >= candidateHosts.size()) {
????????????????throw new TransportException("Cannot execute request on any known server");
????????????}
????????????currentEndpoint = candidateHosts.get(endpointIdx++);
????????????currentHttpClient = clientFactory.newClient(currentEndpoint);
????????}
????????try {
????????????EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
????????????if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
????????????????delegate.set(currentHttpClient);
????????????????if (retry > 0) {
????????????????????logger.info("Request execution succeeded on retry #{}", retry);
????????????????}
????????????????return response;
????????????}
????????????logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
????????} catch (Exception e) {
????????????logger.warn("Request execution failed with message: {}", e.getMessage());??// just log message as the underlying client should log the stacktrace
????????}
????????// Connection error or 5xx from the server that must be retried on another server
????????delegate.compareAndSet(currentHttpClient, null);
????????if (currentEndpoint != null) {
????????????quarantineSet.add(currentEndpoint);
????????}
????}
????throw new TransportException("Retry limit reached; giving up on completing the request");
}
?
第三层为RedirectingEurekaHttpClient层,这层主要是要来寻找非302重定向的 Eureka-Server 的 EurekaHttpClient 。
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
????EurekaHttpClient currentEurekaClient = delegateRef.get();
????if (currentEurekaClient == null) {
????????AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint));
????????try {
????????????EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef);
????????????TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get()));
????????????return response;
????????} catch (Exception e) {
????????????logger.error("Request execution error. endpoint={}", serviceEndpoint, e);
????????????TransportUtils.shutdown(currentEurekaClientRef.get());
????????????throw e;
????????}
????} else {
????????try {
????????????return requestExecutor.execute(currentEurekaClient);
????????} catch (Exception e) {
????????????logger.error("Request execution error. endpoint={}", serviceEndpoint, e);
????????????delegateRef.compareAndSet(currentEurekaClient, null);
????????????currentEurekaClient.shutdown();
????????????throw e;
????????}
????}
}
private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor,
?????????????????????????????????????????????????????AtomicReference<EurekaHttpClient> currentHttpClientRef) {
????URI targetUrl = null;
????for (int followRedirectCount = 0; followRedirectCount < MAX_FOLLOWED_REDIRECTS; followRedirectCount++) {
????????EurekaHttpResponse<R> httpResponse = requestExecutor.execute(currentHttpClientRef.get());
????????if (httpResponse.getStatusCode() != 302) {
????????????if (followRedirectCount == 0) {
????????????????logger.debug("Pinning to endpoint {}", targetUrl);
????????????} else {
????????????????logger.info("Pinning to endpoint {}, after {} redirect(s)", targetUrl, followRedirectCount);
????????????}
????????????return httpResponse;
????????}
????????targetUrl = getRedirectBaseUri(httpResponse.getLocation());
????????if (targetUrl == null) {
????????????throw new TransportException("Invalid redirect URL " + httpResponse.getLocation());
????????}
????????currentHttpClientRef.getAndSet(null).shutdown();
????????currentHttpClientRef.set(factory.newClient(new DefaultEndpoint(targetUrl.toString())));
????}
????String message = "Follow redirect limit crossed for URI " + serviceEndpoint.getServiceUrl();
????logger.warn(message);
????throw new TransportException(message);
}
?
第四层为MetricsCollectingEurekaHttpClient层,这层主要是监控指标收集 EurekaHttpClient ,配合 Netflix Servo 实现监控信息采集。
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
????EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
????Stopwatch stopwatch = requestMetrics.latencyTimer.start();
????try {
????????EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
????????requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
????????return httpResponse;
????} catch (Exception e) {
????????requestMetrics.connectionErrors.increment();
????????exceptionsMetric.count(e);
????????throw e;
????} finally {
????????stopwatch.stop();
????}
}
?
最后调用AbstractJerseyEurekaHttpClient,去真正的向eureka服务端发送请求的方法了。
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
????String urlPath = "apps/" + info.getAppName();
????ClientResponse response = null;
????try {
????????Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
????????addExtraHeaders(resourceBuilder);
????????response = resourceBuilder
????????????????.header("Accept-Encoding", "gzip")
????????????????.type(MediaType.APPLICATION_JSON_TYPE)
????????????????.accept(MediaType.APPLICATION_JSON)
????????????????.post(ClientResponse.class, info);
????????return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
????} finally {
????????if (logger.isDebugEnabled()) {
????????????logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
????????????????????response == null ? "N/A" : response.getStatus());
????????}
????????if (response != null) {
????????????response.close();
????????}
????}
}
?