func (r *Raft) handleTransferLeader(m pb.Message) {
if r.State != StateLeader {
// forward the message to the current leader
r.msgs = append(r.msgs, pb.Message{
From: r.id,
To: r.Lead,
MsgType: pb.MessageType_MsgTransferLeader,
})
return
}
transferee := m.From
if transferee == r.id {
// log.Debugf("[TRANSFER][PEER %+v] transfer leader to itself", r.id)
return
}
_, exists := r.Prs[transferee]
if !exists {
// Ignore if the peer is not part of the cluster
// log.Debugf("[TRANSFER][PEER %+v] transfer leader to unknown peer %+v", r.id, transferee)
return
}
if r.leadTransferee != None && r.leadTransferee != transferee && r.leaderTransfering {
// log.Debugf("[TRANSFER][PEER %+v] interrupt previous leader transfer (%+v -> %+v), new transferee %+v", r.id, r.leadTransferee, transferee)
r.leadTransferee = transferee
r.leaderTransfering = false
}
r.leadTransferee = transferee
// if transferee is up to date, transfer leader to transferee
if r.Prs[transferee].Match == r.RaftLog.LastIndex() && r.commitUpdated[transferee] {
lastIndex, lastTerm := r.getLastIndexAndTerm()
log.Debugf("[LEADER TRANSFER][PEER %+v] transfer leader to %+v", r.id, transferee)
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgTimeoutNow,
To: transferee,
From: r.id,
Term: r.Term,
LogTerm: lastTerm,
Index: lastIndex,
Commit: r.RaftLog.committed,
})
} else {
if !r.leaderTransfering {
r.sendAppendForUpdateCommit(transferee)
r.leaderTransfering = true
}
r.msgs = append(r.msgs, m)
}
}