From 34931b460f728cc49a9ba5987181aac7bb131f91 Mon Sep 17 00:00:00 2001 From: "testemail@qq.com" Date: Fri, 19 Mar 2021 02:56:01 +0800 Subject: [PATCH] fix#37 --- .../service/impl/ExampleServiceImpl.java | 36 ++++++------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/raft-java-example/src/main/java/com/github/wenweihu86/raft/example/server/service/impl/ExampleServiceImpl.java b/raft-java-example/src/main/java/com/github/wenweihu86/raft/example/server/service/impl/ExampleServiceImpl.java index 5fe801c..feb0bcf 100644 --- a/raft-java-example/src/main/java/com/github/wenweihu86/raft/example/server/service/impl/ExampleServiceImpl.java +++ b/raft-java-example/src/main/java/com/github/wenweihu86/raft/example/server/service/impl/ExampleServiceImpl.java @@ -14,6 +14,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -27,35 +29,13 @@ public class ExampleServiceImpl implements ExampleService { private RaftNode raftNode; private ExampleStateMachine stateMachine; - private int leaderId = -1; - private RpcClient leaderRpcClient = null; - private Lock leaderLock = new ReentrantLock(); + private Map serviceMap = new HashMap<>(); public ExampleServiceImpl(RaftNode raftNode, ExampleStateMachine stateMachine) { this.raftNode = raftNode; this.stateMachine = stateMachine; } - private void onLeaderChangeEvent() { - if (raftNode.getLeaderId() != -1 - && raftNode.getLeaderId() != raftNode.getLocalServer().getServerId() - && leaderId != raftNode.getLeaderId()) { - leaderLock.lock(); - if (leaderId != -1 && leaderRpcClient != null) { - leaderRpcClient.stop(); - leaderRpcClient = null; - leaderId = -1; - } - leaderId = raftNode.getLeaderId(); - Peer peer = raftNode.getPeerMap().get(leaderId); - Endpoint endpoint = new Endpoint(peer.getServer().getEndpoint().getHost(), - peer.getServer().getEndpoint().getPort()); - RpcClientOptions rpcClientOptions = new RpcClientOptions(); - rpcClientOptions.setGlobalThreadPoolSharing(true); - leaderRpcClient = new RpcClient(endpoint, rpcClientOptions); - leaderLock.unlock(); - } - } @Override public ExampleProto.SetResponse set(ExampleProto.SetRequest request) { @@ -64,8 +44,14 @@ public ExampleProto.SetResponse set(ExampleProto.SetRequest request) { if (raftNode.getLeaderId() <= 0) { responseBuilder.setSuccess(false); } else if (raftNode.getLeaderId() != raftNode.getLocalServer().getServerId()) { - onLeaderChangeEvent(); - ExampleService exampleService = BrpcProxy.getProxy(leaderRpcClient, ExampleService.class); + Peer leaderPeer = raftNode.getPeerMap().get(raftNode.getLeaderId()); + String host = leaderPeer.getServer().getEndpoint().getHost(); + int port = leaderPeer.getServer().getEndpoint().getPort(); + ExampleService exampleService = serviceMap.get(leaderPeer); + if (exampleService == null) { + exampleService = BrpcProxy.getProxy(new RpcClient(new Endpoint(host, port)), ExampleService.class); + serviceMap.put(leaderPeer, exampleService); + } ExampleProto.SetResponse responseFromLeader = exampleService.set(request); responseBuilder.mergeFrom(responseFromLeader); } else {