基本操作
go get go.etcd.io/etcd/client/v3# 此处使用的 版本是:
# go.etcd.io/etcd/client/v3 v3.5.8
这里使用的是
"go.etcd.io/etcd/client/v3"
而不是"go.etcd.io/etcd/clientv3"
我们不使用 etcd/clientv3,因为它与grpc 最新版本不兼容,官方最新推荐的方式 etcd/client/v3
package mainimport ("context""fmt""log""time"clientv3 "go.etcd.io/etcd/client/v3"
)func main() {client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,})if err != nil {log.Fatalln(err)}ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()// PUT = add + editkey := "/ns/service" // key 的位置在 /ns/ 目录下的 名为 service 文件 中value := "127.0.0.1:80001"_, err = client.Put(ctx, key, value)if err != nil {log.Printf("etcd put error,%v\n", err)return}// GETgetResponse, err := client.Get(ctx, key)if err != nil {log.Printf("etcd GET error,%v\n", err)return}for _, kv := range getResponse.Kvs {fmt.Printf("Key:%s ===> Val:%s \n", kv.Key, kv.Value)}// DEL_, err = client.Delete(ctx, key)if err != nil {// 处理错误log.Printf("etcd GET error,%v\n", err)return}
}
监听
键的变化
如果值一直没有变化,程序不会退出,而是一直等待。这是因为Watch()方法是一个阻塞方法,会一直等待etcd中指定键的值发生变化或者超时。
如果没有设置超时时间,程序将一直等待,直到键值对发生变化或者程序被强制退出。
如果需要设置超时时间,可以使用context.WithTimeout()
方法创建一个带有超时时间的上下文对象,例如:
key := "/ns/service" // key 的位置在 /ns/ 目录下的 名为 service 文件 中// 发布者go func() {for {time.Sleep(time// 使用cli.Txn()方法创建了一个事务txn := client.Txn(context.Background())// 事务包含了一个条件判断和两个操作:// 条件判断使用 clientv3.Compare() 方法,判断键值对的值是否等于"old_value",// 如果满足条件,则执行clientv3.OpPut()方法,将键值对的值修改为"new_value",// 否则执行clientv3.OpGet()方法,读取键值对的值。txn.If(clientv3.Compare(clientv3.Value("/wtt/key"), "=", "value")).Then(clientv3.OpPut("/wtt/key", "new_value")).Else(clientv3.OpGet("/wtt/key"))// 最后,使用txn.Commit()方法提交事务,如果事务执行成功,则返回resp.Succeeded为true,否则为false。resp, err := txn.Commit()if err != nil {fmt.Println(err)return}if resp.Succeeded {fmt.Println("事务执行成功")} else {fmt.Println("事务执行失败")}.Second * 1)_, err = client.Put(context.Background(), key, "value")if err != nil {log.Printf("etcd put error,%v\n", err)return} else {fmt.Println("have change")}}}()// 订阅者rch := client.Watch(context.Background(), key)for wresp := range rch {for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)}}
**注意: **
需要注意的是,如果Watch()方法返回错误,例如etcd server连接失败或者上下文对象超时,
程序也会退出。因此,在使用Watch()方法时,需要处理可能出现的错误,以避免程序异常退出
租约
etcd的租约是etcd中一种重要的机制,用于管理键值对的生命周期。
在etcd中,每个键值对都可以关联
一个租约,租约可以设置一个过期时间
,当租约过期时,etcd会自动删除
该键值对。
租约的作用类似于一个定时器,当租约到期时,etcd会自动删除与该租约关联的键值对。租约可以用于实现一些高级功能,例如:
- 实现分布式锁:使用租约来实现分布式锁,当一个客户端获取锁时,可以创建一个带有租约的键值对,当锁被释放时,租约到期,etcd会自动删除该键值对,从而释放锁。
- 实现健康检查:使用租约来实现健康检查,当一个服务启动时,可以创建一个带有租约的键值对,当服务停止时,租约到期,etcd会自动删除该键值对,从而通知其他服务该服务已经停止。
- 实现自动续租:使用租约来实现自动续租,当一个客户端获取租约时,可以定期向etcd发送心跳,从而保持租约的有效性,避免租约过期导致键值对被删除。
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,})if err != nil {log.Fatalln(err)}// 创建租约leaseResp, err := client.Grant(context.Background(), 30)if err != nil {fmt.Println(err)return}// 绑定租约到键_, err = client.Put(context.Background(), "/wtt/who", "whero", clientv3.WithLease(leaseResp.ID))if err != nil {fmt.Println(err)return} else {fmt.Println("绑定租约到键 成功")}time.Sleep(time.Second * 10)// 续约租约 (相当于 给 绑定租约 的 键 重新绑定 租约)ch, err := client.KeepAlive(context.Background(), leaseResp.ID)if err != nil {fmt.Println(err)return}ka, ok := <-chif ok {fmt.Println("ttl:", ka.TTL)}// 撤销租约/*当租约被撤销时,etcd会自动删除与该租约关联的键值对。需要注意的是,撤销租约是一个异步操作,etcd会在后台删除与该租约关联的键值对,因此,在撤销租约后,键值对可能不会立即被删除,需要等待一段时间才能生效。*/_, err = client.Revoke(context.Background(), leaseResp.ID)if err != nil {fmt.Println(err)return} else {fmt.Println("撤销租约 成功")}
事务
etcd的事务是指一组操作,这些操作要么全部执行成功,要么全部执行失败,保证了数据的一致性和可靠性。
etcd的事务支持多个操作,包括读取、写入、修改和删除等操作,可以在一个事务中同时执行多个操作,
从而避免了多个操作之间的竞态条件和数据不一致的问题。
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,})if err != nil {log.Fatalln(err)}_, err = client.Put(context.Background(), "/wtt/key", "value")if err != nil {log.Printf("etcd put error,%v\n", err)return}// 使用cli.Txn(etcd的分布式锁是一种基于etcd实现的分布式锁,可以用于多个进程或多台机器之间的协调。etcd的分布式锁使用了etcd的事务操作和租约机制,可以保证锁的正确性和高可用性。)方法创建了一个事务txn := client.Txn(context.Background())// 事务包含了一个条件判断和两个操作:// 条件判断使用 clientv3.Compare() 方法,判断键值对的值是否等于"old_value",// 如果满足条件,则执行clientv3.OpPut()方法,将键值对的值修改为"new_value",// 否则执行clientv3.OpGet()方法,读取键值对的值。txn.If(clientv3.Compare(clientv3.Value("/wtt/key"), "=", "value")).Then(clientv3.OpPut("/wtt/key", "new_value")).Else(clientv3.OpGet("/wtt/key"))// 最后,使用txn.Commit()方法提交事务,如果事务执行成功,则返回resp.Succeeded为true,否则为false。resp, err := txn.Commit()if err != nil {fmt.Println(err)return}if resp.Succeeded {fmt.Println("事务执行成功")} else {fmt.Println("事务执行失败")}
需要注意的是,etcd的事务是原子性的,即要么全部执行成功,要么全部执行失败,不支持部分执行成功的情况。
因此,在使用etcd的事务时,需要仔细考虑每个操作的顺序和条件,避免出现竞态条件和数据不一致的问题。
分布式
分布式锁
etcd的分布式锁是一种基于etcd实现的分布式锁,可以用于 多个进程 或 多台机器 之间的协调。
etcd的分布式锁使用了etcd的 事务操作 和 租约机制,可以保证锁的正确性和高可用性。
package mainimport ("context""fmt""time"clientv3 "go.etcd.io/etcd/client/v3""go.etcd.io/etcd/client/v3/concurrency"
)func main() {// 创建etcd客户端client, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"},DialTimeout: 5 * time.Second,})if err != nil {panic(err)}defer client.Close()go func() {time.Sleep(time.Second * 2)fmt.Println("son start")// 创建一个sessionsession, err := concurrency.NewSession(client)if err != nil {panic(err)}defer session.Close()// 创建一个锁mutex := concurrency.NewMutex(session, "/my-lock")// 加锁err = mutex.Lock(context.Background())if err != nil {panic(err)}fmt.Println("son lock acquired")// 执行业务逻辑time.Sleep(5 * time.Second)// 解锁err = mutex.Unlock(context.Background())if err != nil {panic(err)}fmt.Println("son lock released")}()// 创建一个sessionsession, err := concurrency.NewSession(client)if err != nil {panic(err)}defer session.Close()// 创建一个锁mutex := concurrency.NewMutex(session, "/my-lock")// 加锁err = mutex.Lock(context.Background())if err != nil {panic(err)}fmt.Println("dad lock acquired")// 执行业务逻辑time.Sleep(5 * time.Second)// 解锁err = mutex.Unlock(context.Background())if err != nil {panic(err)}fmt.Println("dad lock released")time.Sleep(time.Hour)
}/*
输出:dad lock acquiredson startdad lock releasedson lock acquiredson lock released
*/
分布式事务
分布式事务是指在分布式系统中,多个节点之间进行的事务操作。
在分布式系统中,不同的节点可能位于不同的物理机器上,它们之间通过网络进行通信。
由于网络通信的不确定性和不可靠性,分布式事务的实现比较复杂。
分布式事务需要满足ACID原则,即原子性、一致性、隔离性和持久性。
其中,原子性指事务是一个不可分割的操作序列,要么全部执行成功,要么全部执行失败;
一致性指事务执行前后,数据库的状态必须保持一致;
隔离性指多个事务之间应该相互隔离,互不干扰;
持久性指事务执行成功后,对数据库的修改应该永久保存。
常见的分布式事务实现方式包括两阶段提交
和 三阶段提交
。
两阶段提交是指在分布式系统中,事务的提交分为两个阶段,第一个阶段是准备阶段
,第二个阶段是提交阶段
;
三阶段提交是在两阶段提交的基础上,增加了一个预提交阶段
,用于解决两阶段提交中的 阻塞问题。