分布式锁是一种用于控制分布式系统中资源访问的同步机制,确保在任意时刻只有一个客户端能够获取到锁,并对共享资源进行操作。
作用
应用场景例如
常见实现方式
本文将利用raftx,用简单的方法,编写一个分布式锁的应用库,它的特点是:
raftx的分布式易失性数据扩展模块实现分布式锁 有比常见分布式锁的实现较为明显的特点
什么是Raftx
raftx 是一种对经典 Raft 协议的扩展,结合了 Multi-Paxos、ZAB(Zookeeper Atomic Broadcast)和 Raft 协议的优势。RaftX 具备快速选举、并发提案、数据同步、数据回滚以及易失性数据同步等特性,适用于高并发和大规模分布式系统场景。
Lockx 分布式锁应用库,支持创建海量分布式锁
Lockx是依赖raftx实现的一个分布式锁应用库,实现方式简单,代码量少,100行左右代码,但是它的功能却十分强大,主要表现在:
Lockx 支持一次性创建成千上万,甚至数十万或数百万个分布式锁,它的实现机制保证了它不会大量占用CPU资源和内存资源;它的锁动作变更触发机制针对的是锁资源,而非分布式对象锁本身,也就是说,即使节点中有100万个锁竞争一个锁资源,每次也只会触发一次锁的释放与竞争的指令;比如锁资源"lockmux",那么在分布式系统中,当资源 “lockmux”被释放时,它将触发节点中的 “lockmux”绑定事件一次,并让等待的资源随机发送一条竞争锁的指令竞争该资源锁,而不是触发100万个等待中的锁对象竞争事件。
Lockx 实现方式
lockx主要依赖raftx的易失性数据API实现,它的特点是高效,强一致性,并且可以绑定键值的增删改的触发事件;利用这些特性,可以轻松实现分布式锁的逻辑。
m.raft.MemWatch([]byte(lockstr), func(key, value []byte, watchType raft.WatchType) {
//获取锁成功与否
if watchType == raft.ADD {
if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {
m.del(string(key), util.BytesToInt64(value))
close(mb.ctx)
}
}
//锁释放,阻塞代码再次重新获取分布式锁
if watchType == raft.DELETE {
m.mux.Lock()
defer m.mux.Unlock()
if ids, b := m.rmap[string(key)]; b {
for k := range ids {
m.raft.MemCommand(key, util.Int64ToBytes(k), timeout, raft.MEM_PUT)
break
}
}
}
//TryLock获取锁失败触发
if watchType == raft.UPDATE {
if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {
if mb.isTry {
m.del(string(key), util.BytesToInt64(value))
mb.ctx <- true
close(mb.ctx)
}
}
}
}, false, raft.ADD, raft.DELETE, raft.UPDATE)
这是lockx实现的核心代码,主要通过监听raftx易失性数据主键的增删改事件来实现资源锁的锁定与释放
Lockx 使用方式
Lockx 的使用非常简单,并且它可以支持大量创建分布式锁,它一共有3个方法
Lock(string,int)
获取指定资源的分布式锁并设置过期时间,阻塞
TryLock(string,int)bool
获取指定资源的分布式锁并设置过期时间,若获取不到返回false,不阻塞
UnLock(string)
释放指定资源的分布式锁
以下模拟3个集群节点
//节点1,创建分布式锁管理器 mutex1
mutex1 = NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
//节点2,创建分布式锁管理器 mutex2
mutex2 = NewMutex(":20002", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
//节点3,创建分布式锁管理器 mutex3
mutex3 = NewMutex(":20003", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
这样就完成了分布式锁管理器的创建,并可以直接获取各个自定义资源的分布式锁,这里的资源指的是字符串,比如 “test”
示例
//节点1
func lock1(i int) {
logger.Debugf("mutex1 lock%d lock.....", i)
mutex1.Lock("test", 10)
logger.Debugf("mutex1 lock%d get lock successful", i)
time.Sleep(2 * time.Second)
mutex1.Unlock("test")
logger.Debugf("mutex1 lock%d unlock", i)
}
//节点2
func lock2(i int) {
logger.Debugf("mutex2 lock%d lock.....", i)
mutex2.Lock("test", 10)
logger.Debugf("mutex2 lock%d get lock successful", i)
}
//节点3
func lock3(i int) {
logger.Debugf("mutex3 lock%d lock.....", i)
mutex3.Lock("test", 10)
logger.Debugf("mutex3 lock%d get lock successful", i)
time.Sleep(2 * time.Second)
mutex3.Unlock("test")
logger.Debugf("mutex3 lock%d unlock", i)
}
测试调用
func Test_lock(t *testing.T) {
go lock1(1)
go lock2(2)
go lock3(3)
select {}
}
执行结果:
可以看到:
2024/12/31 22:34:35
三个节点的同时抢占分布式锁
2024/12/31 22:34:45
mutex2持有的分布式锁被服务自动释放,同时mutex1节点获取到分布式锁
2024/12/31 22:34:47
mutex1在2秒后显式调用UnLock释放锁,同时mutex3节点获取到分布式锁
2024/12/31 22:34:49
mutex3在2秒后显式调用UnLock释放锁
Lockx 可以海量创建分布式锁,如:
func Test_multi_lock(t *testing.T) {
for i := 1; i < 1<<15; i++ { //mutex1节点创建32768个并发任务
go lock1(i)
}
for i := 1; i < 1<<15; i++ { //mutex2节点创建32768个并发任务
go lock2(i)
}
for i := 1; i < 1<<15; i++ { //mutex3节点创建32768个并发任务
go lock3(i)
}
select {}
}
执行结果:
可以看到,每2秒有一个对象获取到分布式锁,按顺序依次执行获取分布式锁与解锁。
(注意:mutex2增加了2秒后释放锁,否则mutex2节点获取锁后,将等待10秒后有raftx集群释放锁)
Lockx 的源码地址
可以直接将其当成第三方分布式锁库在工程中使用
程序中调用示例
import "github.com/donnie4w/lockx"
//创建分布式锁管理器 mutex1
var mutex1 = lockx.NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
结论
Lockx
利用了 raftx
的高效特性和易失性数据存储能力,提供了一种简洁而强大的分布式锁解决方案。它不仅适合常规的分布式锁需求,还能够在高并发环境下保持性能优势,确保系统的稳定性和可靠性。
如果你考虑在项目中引入这样的分布式锁库,可以参考上述信息进行评估和集成。此外,也可以根据自己的具体需求调整 Lockx
的实现,例如实现更复杂的锁行为(如公平锁等)。
raft的pre vote发生在follower阶段,当心跳超时后,经典的raft协议会发起自选与投票协议。
这将可能出现不断拒绝投票其他节点或互投等情况,导致任期不断叠加重选,选举时间就会被拉长。
预选可以有效的避免这种情况,发出预选后,节点可以确定正式投票能不能被选为leader,避免了无效的投票。
这点与zab异曲同工,都是为了能快速选出leader。
raftx的prepare本质已经是leader了,已经过了投票的阶段,它就是一个leader,由于节点理论上存在数据
不完整的情况,这时它在leader阶段上,标记为prepare,并同时检查数据是否完整,同步或处理缺失事务数据
完成后,它标记为leader。这两个阶段对于其他节点,都是leader,它不影响正常处理提案数据。
但是对于通过raftx接口读取状态机数据的客户端来说,在prepare时,可能读取到不完整的数据。这一点,multi-paxos也是一样的,
在实践时,使用multi-paxos或raftx,它们在leader时,如果还在同步数据,可以拒绝客户端读取操作,或如果状态机实现MVCC时,
可以通过判断读取的数据是否新版本数据等多种策略来处理。