Skip to content

Commit 2d3e5fa

Browse files
authored
scheduler: fix region scatter may transfer leader to removed peer (tikv#1482)
* scheduler: fix region scatter may transfer leader to removed peer Signed-off-by: nolouch <nolouch@gmail.com>
1 parent 656ed4b commit 2d3e5fa

File tree

7 files changed

+141
-29
lines changed

7 files changed

+141
-29
lines changed

server/grpc_service.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,10 +602,15 @@ func (s *Server) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionR
602602
}
603603
region = core.NewRegionInfo(request.GetRegion(), request.GetLeader())
604604
}
605+
605606
cluster.RLock()
606607
defer cluster.RUnlock()
607608
co := cluster.coordinator
608-
if op := co.regionScatterer.Scatter(region); op != nil {
609+
op, err := co.regionScatterer.Scatter(region)
610+
if err != nil {
611+
return nil, err
612+
}
613+
if op != nil {
609614
co.opController.AddOperator(op)
610615
}
611616

server/handler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,11 @@ func (h *Handler) AddScatterRegionOperator(regionID uint64) error {
642642
return ErrRegionNotFound(regionID)
643643
}
644644

645-
op := c.regionScatterer.Scatter(region)
645+
op, err := c.regionScatterer.Scatter(region)
646+
if err != nil {
647+
return err
648+
}
649+
646650
if op == nil {
647651
return nil
648652
}

server/schedule/mockcluster.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,9 @@ func (mc *MockCluster) ApplyOperator(op *Operator) {
398398
if region.GetStorePeer(s.FromStore) == nil {
399399
panic("Remove peer that doesn't exist")
400400
}
401+
if region.GetLeader().GetStoreId() == s.FromStore {
402+
panic("Cannot remove the leader peer")
403+
}
401404
region = region.Clone(core.WithRemoveStorePeer(s.FromStore))
402405
case AddLearner:
403406
if region.GetStorePeer(s.ToStore) != nil {

server/schedule/operator.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -436,12 +436,8 @@ func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, r
436436
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil
437437
}
438438

439-
// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
440-
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
441-
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
442-
if err != nil {
443-
return nil, err
444-
}
439+
// CreateAddPeerSteps creates an OperatorStep list that add a new Peer.
440+
func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep {
445441
var st []OperatorStep
446442
if cluster.IsRaftLearnerEnabled() {
447443
st = []OperatorStep{
@@ -453,6 +449,16 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf
453449
AddPeer{ToStore: newStore, PeerID: peerID},
454450
}
455451
}
452+
return st
453+
}
454+
455+
// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
456+
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
457+
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
458+
if err != nil {
459+
return nil, err
460+
}
461+
st := CreateAddPeerSteps(newStore, peerID, cluster)
456462
steps = append(st, steps...)
457463
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...), nil
458464
}
@@ -630,3 +636,24 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct
630636

631637
return intersection
632638
}
639+
640+
// CheckOperatorValid checks if the operator is valid.
641+
func CheckOperatorValid(op *Operator) bool {
642+
removeStores := []uint64{}
643+
for _, step := range op.steps {
644+
if tr, ok := step.(TransferLeader); ok {
645+
for _, store := range removeStores {
646+
if store == tr.FromStore {
647+
return false
648+
}
649+
if store == tr.ToStore {
650+
return false
651+
}
652+
}
653+
}
654+
if rp, ok := step.(RemovePeer); ok {
655+
removeStores = append(removeStores, rp.FromStore)
656+
}
657+
}
658+
return true
659+
}

server/schedule/region_scatterer.go

Lines changed: 86 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/pingcap/kvproto/pkg/metapb"
2121
"github.com/pingcap/pd/server/core"
2222
"github.com/pingcap/pd/server/namespace"
23+
"github.com/pkg/errors"
2324
)
2425

2526
type selectedStores struct {
@@ -79,23 +80,28 @@ func NewRegionScatterer(cluster Cluster, classifier namespace.Classifier) *Regio
7980
}
8081

8182
// Scatter relocates the region.
82-
func (r *RegionScatterer) Scatter(region *core.RegionInfo) *Operator {
83+
func (r *RegionScatterer) Scatter(region *core.RegionInfo) (*Operator, error) {
8384
if r.cluster.IsRegionHot(region.GetID()) {
84-
return nil
85+
return nil, errors.Errorf("region %d is a hot region", region.GetID())
8586
}
8687

8788
if len(region.GetPeers()) != r.cluster.GetMaxReplicas() {
88-
return nil
89+
return nil, errors.Errorf("the number replicas of region %d is not expected", region.GetID())
90+
}
91+
92+
if region.GetLeader() == nil {
93+
return nil, errors.Errorf("region %d has no leader", region.GetID())
8994
}
9095

91-
return r.scatterRegion(region)
96+
return r.scatterRegion(region), nil
9297
}
9398

9499
func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {
95-
steps := make([]OperatorStep, 0, len(region.GetPeers()))
96-
97100
stores := r.collectAvailableStores(region)
98-
var kind OperatorKind
101+
var (
102+
targetPeers []*metapb.Peer
103+
replacedPeers []*metapb.Peer
104+
)
99105
for _, peer := range region.GetPeers() {
100106
if len(stores) == 0 {
101107
// Reset selected stores if we have no available stores.
@@ -105,31 +111,93 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {
105111

106112
if r.selected.put(peer.GetStoreId()) {
107113
delete(stores, peer.GetStoreId())
114+
targetPeers = append(targetPeers, peer)
115+
replacedPeers = append(replacedPeers, peer)
108116
continue
109117
}
110118
newPeer := r.selectPeerToReplace(stores, region, peer)
111119
if newPeer == nil {
120+
targetPeers = append(targetPeers, peer)
121+
replacedPeers = append(replacedPeers, peer)
112122
continue
113123
}
114-
115124
// Remove it from stores and mark it as selected.
116125
delete(stores, newPeer.GetStoreId())
117126
r.selected.put(newPeer.GetStoreId())
127+
targetPeers = append(targetPeers, newPeer)
128+
replacedPeers = append(replacedPeers, peer)
129+
}
130+
return r.createOperator(region, replacedPeers, targetPeers)
131+
}
118132

119-
op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
120-
peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
121-
if err != nil {
122-
continue
133+
func (r *RegionScatterer) createOperator(origin *core.RegionInfo, replacedPeers, targetPeers []*metapb.Peer) *Operator {
134+
// Randomly pick a leader
135+
i := rand.Intn(len(targetPeers))
136+
targetLeaderPeer := targetPeers[i]
137+
originLeaderStoreID := origin.GetLeader().GetStoreId()
138+
139+
originStoreIDs := origin.GetStoreIds()
140+
steps := make([]OperatorStep, 0, len(targetPeers)*3+1)
141+
// deferSteps will append to the end of the steps
142+
deferSteps := make([]OperatorStep, 0, 5)
143+
var kind OperatorKind
144+
sameLeader := targetLeaderPeer.GetStoreId() == originLeaderStoreID
145+
// No need to do anything
146+
if sameLeader {
147+
isSame := true
148+
for _, peer := range targetPeers {
149+
if _, ok := originStoreIDs[peer.GetStoreId()]; !ok {
150+
isSame = false
151+
break
152+
}
153+
}
154+
if isSame {
155+
return nil
123156
}
124-
steps = append(steps, op.steps...)
125-
steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()})
126-
kind |= op.Kind()
127157
}
128158

129-
if len(steps) == 0 {
130-
return nil
159+
// Creates the first step
160+
if _, ok := originStoreIDs[targetLeaderPeer.GetStoreId()]; !ok {
161+
st := CreateAddPeerSteps(targetLeaderPeer.GetStoreId(), targetLeaderPeer.GetId(), r.cluster)
162+
steps = append(steps, st...)
163+
// Do not transfer leader to the newly added peer
164+
// Ref: https://github.com/tikv/tikv/issues/3819
165+
deferSteps = append(deferSteps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()})
166+
deferSteps = append(deferSteps, RemovePeer{FromStore: replacedPeers[i].GetStoreId()})
167+
kind |= OpLeader
168+
kind |= OpRegion
169+
} else {
170+
if !sameLeader {
171+
steps = append(steps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()})
172+
kind |= OpLeader
173+
}
174+
}
175+
176+
// For the other steps
177+
for j, peer := range targetPeers {
178+
if peer.GetId() == targetLeaderPeer.GetId() {
179+
continue
180+
}
181+
if _, ok := originStoreIDs[peer.GetStoreId()]; ok {
182+
continue
183+
}
184+
if replacedPeers[j].GetStoreId() == originLeaderStoreID {
185+
st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster)
186+
st = append(st, RemovePeer{FromStore: replacedPeers[j].GetStoreId()})
187+
deferSteps = append(deferSteps, st...)
188+
kind |= OpRegion | OpLeader
189+
continue
190+
}
191+
st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster)
192+
steps = append(steps, st...)
193+
steps = append(steps, RemovePeer{FromStore: replacedPeers[j].GetStoreId()})
194+
kind |= OpRegion
131195
}
132-
return NewOperator("scatter-region", region.GetID(), region.GetRegionEpoch(), kind, steps...)
196+
197+
steps = append(steps, deferSteps...)
198+
op := NewOperator("scatter-region", origin.GetID(), origin.GetRegionEpoch(), kind, steps...)
199+
op.SetPriorityLevel(core.HighPriority)
200+
return op
133201
}
134202

135203
func (r *RegionScatterer) selectPeerToReplace(stores map[uint64]*core.StoreInfo, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer {

server/schedulers/scheduler_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,10 @@ func (s *testScatterRegionSuite) TestFiveStores(c *C) {
174174
s.scatter(c, 5, 5)
175175
}
176176

177+
func (s *testScatterRegionSuite) checkOperator(op *schedule.Operator, c *C) {
178+
c.Assert(schedule.CheckOperatorValid(op), IsTrue)
179+
}
180+
177181
func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
178182
opt := schedule.NewMockSchedulerOptions()
179183
tc := schedule.NewMockCluster(opt)
@@ -184,7 +188,7 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
184188
}
185189

186190
// Add regions 1~4.
187-
seq := newSequencer(numStores)
191+
seq := newSequencer(3)
188192
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
189193
tc.AddLeaderRegion(1, 1, 2, 3)
190194
for i := uint64(2); i <= numRegions; i++ {
@@ -195,7 +199,8 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
195199

196200
for i := uint64(1); i <= numRegions; i++ {
197201
region := tc.GetRegion(i)
198-
if op := scatterer.Scatter(region); op != nil {
202+
if op, _ := scatterer.Scatter(region); op != nil {
203+
s.checkOperator(op, c)
199204
tc.ApplyOperator(op)
200205
}
201206
}

tests/cmd/pdctl_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,7 @@ func (s *cmdTestSuite) TestOperator(c *C) {
10411041
args = []string{"-u", pdAddr, "operator", "show", "region"}
10421042
_, output, err = executeCommandC(cmd, args...)
10431043
c.Assert(err, IsNil)
1044-
c.Assert(strings.Contains(string(output), "transfer leader from store 0 to store 3"), IsTrue)
1044+
c.Assert(strings.Contains(string(output), "scatter-region"), IsTrue)
10451045
}
10461046

10471047
func (s *cmdTestSuite) TestMember(c *C) {

0 commit comments

Comments
 (0)