如何开发一个支持海量分布式锁的应用库

来源: 投稿
作者: donnie4w
2025-01-10 11:44:00
分布式锁是一种用于控制分布式系统中资源访问的同步机制,确保在任意时刻只有一个客户端能够获取到锁,并对共享资源进行操作。

作用

1. 保证数据一致性:在多个节点并发执行的情况下,分布式锁可以防止同时修改同一份数据,从而避免数据不一致的问题。
2. 协调任务执行:确保特定的任务不会被重复执行,特别是在需要幂等性(idempotent)保证的时候。

应用场景例如

库存扣减:在电商系统中,当用户下单时需要扣减库存,为了避免超卖现象,必须确保每次扣减操作都是原子性的。
定时任务调度:在分布式环境中,确保同一个定时任务只在一个节点上运行,防止重复执行。
缓存更新:当多个服务实例试图更新同一个缓存项时,使用分布式锁可以确保更新过程的线程安全。
秒杀活动:对于高并发的抢购活动,如秒杀,使用分布式锁来控制对有限商品资源的访问是至关重要的。
文件上传:在分布式文件系统中,确保同一文件不会被多次上传或覆盖。

常见实现方式

基于数据库:可以使用数据库的唯一索引来实现简单的分布式锁,也可以通过for update等机制来实现分布式锁。例如,在尝试获取锁时插入一条记录,如果插入成功则表示获取到锁;如果违反了唯一索引约束,则说明锁已经被其他客户端持有。这种方法简单直接,但性能可能不如其他专门设计的解决方案,并且需要处理死锁和锁的自动释放等问题。
基于Redis:Redis是一个内存中的键值存储系统,它提供了原子性的SETNX(Set if Not Exists)命令来设置一个键,只有当该键不存在时才会成功。结合EXPIRE或PEXPIRE命令,可以为锁设置一个过期时间,防止死锁的发生。
基于Zookeeper:Zookeeper支持临时顺序节点,这使得它可以实现复杂的分布式锁逻辑,如公平锁、重入锁以及读写锁。客户端创建一个临时顺序节点作为锁对象,然后检查自己创建的节点是否是最小编号的节点,以此判断是否获得锁。
基于Etcd:Etcd是一个高可用的分布式键值存储系统,它也能够提供分布式锁功能。与Zookeeper类似,etcd使用临时键和租约机制来实现锁。
基于Consul:同样可以用来实现分布式锁。Consul利用KV存储和会话机制,可以方便地构建出分布式锁的应用。

 

本文将利用raftx,用简单的方法,编写一个分布式锁的应用库,它的特点是:

使用方式简单并且可用性强
支持海量创建分布式锁,可以同时创建几十万甚至上百万个分布式锁
占用极少量的系统资源
无自旋阻塞策略,不占用CPU资源
抢占式获取锁
支持TTL(time to live), 防止集群节点宕机造成死锁

 

raftx的分布式易失性数据扩展模块实现分布式锁 有比常见分布式锁的实现较为明显的特点

1. 高效,它基于内存。获取与释放分布式锁过程更快
2. 可以创建海量分布式锁。如果系统需要创建海量分布式锁,比如售票系统,电商秒杀活动等, 对于Zookeeper,Etcd,redis等,在创建海量分布式锁时,可能面临大量日志与大量触发机制,导致系统负载过大的问题。而raftx不会有这个问题。可以通过以下的Lockx的实现过程,详细了解。

 

什么是Raftx

raftx 是一种对经典 Raft 协议的扩展,结合了 Multi-Paxos、ZAB(Zookeeper Atomic Broadcast)和 Raft 协议的优势。RaftX 具备快速选举、并发提案、数据同步、数据回滚以及易失性数据同步等特性,适用于高并发和大规模分布式系统场景。

raftx wiki

 

Lockx 分布式锁应用库,支持创建海量分布式锁

Lockx是依赖raftx实现的一个分布式锁应用库,实现方式简单,代码量少,100行左右代码,但是它的功能却十分强大,主要表现在:

高效性与及时性
资源占用极少
支持海量创建分布式锁
API使用简单方便

 

Lockx 支持一次性创建成千上万,甚至数十万或数百万个分布式锁,它的实现机制保证了它不会大量占用CPU资源和内存资源;它的锁动作变更触发机制针对的是锁资源,而非分布式对象锁本身,也就是说,即使节点中有100万个锁竞争一个锁资源,每次也只会触发一次锁的释放与竞争的指令;比如锁资源"lockmux",那么在分布式系统中,当资源 “lockmux”被释放时,它将触发节点中的 “lockmux”绑定事件一次,并让等待的资源随机发送一条竞争锁的指令竞争该资源锁,而不是触发100万个等待中的锁对象竞争事件。

 

Lockx 实现方式

lockx主要依赖raftx的易失性数据API实现,它的特点是高效,强一致性,并且可以绑定键值的增删改的触发事件;利用这些特性,可以轻松实现分布式锁的逻辑。

	m.raft.MemWatch([]byte(lockstr), func(key, value []byte, watchType raft.WatchType) {
		//获取锁成功与否
		if watchType == raft.ADD {
			if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {
				m.del(string(key), util.BytesToInt64(value))
				close(mb.ctx)
			}
		}
		//锁释放,阻塞代码再次重新获取分布式锁
		if watchType == raft.DELETE {
			m.mux.Lock()
			defer m.mux.Unlock()
			if ids, b := m.rmap[string(key)]; b {
				for k := range ids {
					m.raft.MemCommand(key, util.Int64ToBytes(k), timeout, raft.MEM_PUT)
					break
				}
			}
		}
		//TryLock获取锁失败触发
		if watchType == raft.UPDATE {
			if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {
				if mb.isTry {
					m.del(string(key), util.BytesToInt64(value))
					mb.ctx <- true
					close(mb.ctx)
				}
			}
		}
	}, false, raft.ADD, raft.DELETE, raft.UPDATE)

这是lockx实现的核心代码,主要通过监听raftx易失性数据主键的增删改事件来实现资源锁的锁定与释放

raft.ADD 这是资源锁新增的触发事件,通过它判断哪个对象获取到分布式锁,同时关闭相应阻塞的通道,让获取锁的程序继续执行。
raft.DELETE 这是资源锁删除的触发事件,同时它将再次发送获取资源锁的指令,抢占资源锁
raft.UPDATE 这是资源锁更新的触发事件,它表示资源锁获取失败,用于TryLock,同时关闭相应阻塞的通道并返回false

 

Lockx 使用方式

Lockx 的使用非常简单,并且它可以支持大量创建分布式锁,它一共有3个方法

Lock(string,int) 获取指定资源的分布式锁并设置过期时间,阻塞
TryLock(string,int)bool 获取指定资源的分布式锁并设置过期时间,若获取不到返回false,不阻塞
UnLock(string) 释放指定资源的分布式锁

 

以下模拟3个集群节点

//节点1,创建分布式锁管理器 mutex1
mutex1 = NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})

//节点2,创建分布式锁管理器 mutex2
mutex2 = NewMutex(":20002", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})

//节点3,创建分布式锁管理器 mutex3
mutex3 = NewMutex(":20003", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
第一个参数 raftx服务地址
第二个参数是所有集群节点都相同的,为所有节点的访问地址 []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"}

这样就完成了分布式锁管理器的创建,并可以直接获取各个自定义资源的分布式锁,这里的资源指的是字符串,比如 “test”

 

示例

//节点1
func lock1(i int) {
	logger.Debugf("mutex1 lock%d lock.....", i)
	mutex1.Lock("test", 10)
	logger.Debugf("mutex1 lock%d get lock successful", i)
	time.Sleep(2 * time.Second)
	mutex1.Unlock("test")
	logger.Debugf("mutex1 lock%d unlock", i)
}

//节点2
func lock2(i int) {
	logger.Debugf("mutex2 lock%d lock.....", i)
	mutex2.Lock("test", 10)
	logger.Debugf("mutex2 lock%d get lock successful", i)
}

//节点3
func lock3(i int) {
	logger.Debugf("mutex3 lock%d lock.....", i)
	mutex3.Lock("test", 10)
	logger.Debugf("mutex3 lock%d get lock successful", i)
	time.Sleep(2 * time.Second)
	mutex3.Unlock("test")
	logger.Debugf("mutex3 lock%d unlock", i)
}

测试调用

func Test_lock(t *testing.T) {
	go lock1(1)
	go lock2(2)
	go lock3(3)
	select {}
}

执行结果:

可以看到:

2024/12/31 22:34:35 三个节点的同时抢占分布式锁
节点mutex2获取到了锁,由于mutex2没有主动释放锁,mutex2.Lock("test", 10) 这里表示10秒后 ,集群自动释放锁
2024/12/31 22:34:45 mutex2持有的分布式锁被服务自动释放,同时mutex1节点获取到分布式锁
2024/12/31 22:34:47 mutex1在2秒后显式调用UnLock释放锁,同时mutex3节点获取到分布式锁
2024/12/31 22:34:49 mutex3在2秒后显式调用UnLock释放锁

 

Lockx 可以海量创建分布式锁,如:

func Test_multi_lock(t *testing.T) {
	for i := 1; i < 1<<15; i++ {  //mutex1节点创建32768个并发任务
		go lock1(i)
	}
	for i := 1; i < 1<<15; i++ { //mutex2节点创建32768个并发任务
		go lock2(i)
	}
	for i := 1; i < 1<<15; i++ { //mutex3节点创建32768个并发任务
		go lock3(i)
	}
	select {}
}
每个节点同时并发创建32768个分布式锁对象

执行结果:

 

可以看到,每2秒有一个对象获取到分布式锁,按顺序依次执行获取分布式锁与解锁。

(注意:mutex2增加了2秒后释放锁,否则mutex2节点获取锁后,将等待10秒后有raftx集群释放锁)

 


 

Lockx 的源码地址

可以直接将其当成第三方分布式锁库在工程中使用

程序中调用示例

import "github.com/donnie4w/lockx"

//创建分布式锁管理器 mutex1
var mutex1 = lockx.NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})

结论

Lockx 利用了 raftx 的高效特性和易失性数据存储能力,提供了一种简洁而强大的分布式锁解决方案。它不仅适合常规的分布式锁需求,还能够在高并发环境下保持性能优势,确保系统的稳定性和可靠性。

如果你考虑在项目中引入这样的分布式锁库,可以参考上述信息进行评估和集成。此外,也可以根据自己的具体需求调整 Lockx 的实现,例如实现更复杂的锁行为(如公平锁等)。

展开阅读全文
点击加入讨论🔥(3) 发布并加入讨论🔥
本篇精彩评论
好的,这个问题我详细说一下

raft的pre vote发生在follower阶段,当心跳超时后,经典的raft协议会发起自选与投票协议。
这将可能出现不断拒绝投票其他节点或互投等情况,导致任期不断叠加重选,选举时间就会被拉长。
预选可以有效的避免这种情况,发出预选后,节点可以确定正式投票能不能被选为leader,避免了无效的投票。
这点与zab异曲同工,都是为了能快速选出leader。

raftx的prepare本质已经是leader了,已经过了投票的阶段,它就是一个leader,由于节点理论上存在数据
不完整的情况,这时它在leader阶段上,标记为prepare,并同时检查数据是否完整,同步或处理缺失事务数据
完成后,它标记为leader。这两个阶段对于其他节点,都是leader,它不影响正常处理提案数据。
但是对于通过raftx接口读取状态机数据的客户端来说,在prepare时,可能读取到不完整的数据。这一点,multi-paxos也是一样的,
在实践时,使用multi-paxos或raftx,它们在leader时,如果还在同步数据,可以拒绝客户端读取操作,或如果状态机实现MVCC时,
可以通过判断读取的数据是否新版本数据等多种策略来处理。
2025-01-11 10:58
1
举报
3 评论
6 收藏
分享
返回顶部
顶部