Spring Cloud Eureka手动同步服务实现

Spring Cloud Eureka手动同步服务实现

Scroll Down

前言

当初因为业务需求的缘故,博主在Eureka的基础上做了一定的修改和封装,用来更好的适配我们的业务需求,下面我就写写当初实现手动同步服务的过程。(有段时间不接触eureka有点遗忘,如有错误之处请见谅)

需求:手动指定任意Eureka server上的服务同步到指定Eureka server,可一次同步服务到多个Eureka server

什么是手动同步服务?

顾名思义是将某个服务手动同步到指定的Eureka server上。

Eureka Server之间是如何进行服务同步的?

服务之间同步是使用PeerEurekaNode来维持的

  • 使用PeerEurekaNode#register(InstanceInfo)来向其他Eureka server 上注册服务
  • 使用PeerEurekaNode#heartbeat(args...)来同步心跳的

手动同步服务实现分析

既然Eureka server之间使用PeerEurekaNode来进行同步,那我们也借助PeerEurekaNode来手动同步服务不就可以了?

步骤:

  1. 创建PeerEurekaNode
  2. 使用PeerEurekaNode#register来同步服务到指定注册中心
  3. PeerEurekaNode存储,后续维持心跳
  4. 监听服务心跳事件,使用PeerEurekaNode#heartbeat转发心跳

如何创建PeerEurekaNode?

使用PeerEurekaNodes#createPeerEurekaNode(peerEurekaNodeUrl)来创建的
createPeerEurekaNode方法的访问修饰符是protected的,只允许子类访问,但是我们可以使用反射的方式来调用
那你肯定又问PeerEurekaNodes如何创建吧,只要你使用了Spring cloudPeerEurekaNodes就已经存在你的IOC容器中了,你可以直接注入使用

private PeerEurekaNodes peerEurekaNodes;

@Autowired
public void setPeerEurekaNodes(PeerEurekaNodes peerEurekaNodes){
    this.peerEurekaNodes = peerEurekaNodes;
}

然后使用peerEurekaNodes.createPeerEurekaNode(peerEurekaUrl)来创建PeerEurekaNode

peerEurekaUrl:要同步的eurekaUrl,例如:http://localhost:8761/eureka/

实现手动同步服务

核心代码如下,对不相关部分进行删减

/**
 * @author mikasa
 */
@Component @Slf4j
public class PeerEurekaNodeService {

    @Autowired
    private PeerEurekaNodes peerEurekaNodes;
    @Autowired
    private PeerAwareInstanceRegistry peerAwareInstanceRegistry;

    // 待续约Map
    private static final Map<String, Set<PeerEurekaNode>> renewableNode = new ConcurrentHashMap<>();
    private static final Object lock = new Object();
    /**
     * @param services  服务ID,多个服务ID用英文状态下的逗号分割
     * @param eurekaUrl 要同步到的注册中心
     * @return
     */
    public Result syncServiceToRegisterCenter(String services, String eurekaUrl) {
	...
	// Effective 第三版书中所说,lambda表达式中语句要简短,此处不再优化,懂意思即可
        Stream.of(services.split(",")).forEach(serviceId -> {
            Application application = peerAwareInstanceRegistry.getApplication(serviceId);
            List<InstanceInfo> instances = application.getInstances();
            if (CollectionUtils.isEmpty(instances)) return;
            log.info("【手动同步服务】开始同步服务【{}】到注册中心【{}】", serviceId, eurekaUrl);
            PeerEurekaNode peerEurekaNode = getPeerEurekaNode(eurekaUrl);
            List<Boolean> results = instances.stream().map(instanceInfo -> register(instanceInfo, peerEurekaNode)).collect(Collectors.toList());
            // 维持心跳续约
            putRenewableNode(serviceId, peerEurekaNode);
        });
	...
    }

    private static boolean register(InstanceInfo info, PeerEurekaNode peerNode) {
        try {
            peerNode.register(info);
            return true;
        } catch (Exception e) {
            // do something
            return false;
        }
    }
    
    // 通过反射的方式得到PeerEurekaNode 
    public PeerEurekaNode getPeerEurekaNode(String serverUrl) {
        if (StringUtils.isNoneBlank(serverUrl)) {
            Method method = ReflectionUtils.findMethod(PeerEurekaNodes.class, "createPeerEurekaNode", String.class);
            method.setAccessible(true);
            try {
                PeerEurekaNode peerEurekaNode = (PeerEurekaNode) method.invoke(peerEurekaNodes, serverUrl);
                return peerEurekaNode;
            } catch (Exception e) {
                // do something
            }
        }
        ...
    }

    // 加入待续约map
    public static void putRenewableNode(String serverId, PeerEurekaNode node) {
        if (renewableNode.containsKey(serverId)) {
            renewableNode.get(serverId).add(node);
        } else {
            synchronized (lock) {
                if (renewableNode.containsKey(serverId)) {
                    renewableNode.get(serverId).add(node);
                } else {
                    Set<PeerEurekaNode> nodes = new HashSet<>();
                    nodes.add(node);
                    renewableNode.put(serverId, nodes);
                }
            }
        }
    }

    /**
     * 服务向该Eureka Server续约时,将续约转发给手动同步的Eureka Server
     */
    @EventListener
    public void onEurekaEvent(EurekaInstanceRenewedEvent event) {
        String appName = event.getAppName();                // 服务ID
        String instanceId = event.getServerId();            // 实例ID
        InstanceInfo info = event.getInstanceInfo();
	// 获取要续约的Node
        Set<PeerEurekaNode> nodes = renewableNode.get(appName);
        if (CollectionUtils.isNotEmpty(nodes)) {
            nodes.forEach(node -> {
                try {
                    node.heartbeat(appName, instanceId, info, null, false);
                } catch (Throwable e) {
                    log.error(......);
                }
            });
        }
    }
}