目录
- CRUD
- 1、基本的put/get/del
- 2、获取当前所有的key
- 3、获取/删除带有前缀的键
- lease使用
- 1、创建lease,续租lease,撤销lease
- 2、将lease attach到key上
- watch使用
- watch、watch_once
- replace
- delete
- watch_prefix、watch_prefix_once
- cancel_watch
- add_watch_callback
- 参考
CRUD
1、基本的put/get/del
#普通的get与delete
res = etcd.put('bar')
value, kv_msg = etcd.get('bar')
print(value)
print(etcd.delete('bar'))
print(etcd.get('bar'))
2、获取当前所有的key
print(etcd.put('demo/bar1', 'doot'))
print(etcd.put('demo/bar2', 'doot'))
list_of_kv = list(etcd.get_all())
for value, kv_msg in list_of_kv:print("key:{}, value:{}" .format(kv_msg.key, value))
3、获取/删除带有前缀的键
# [2]获取带有前缀的键
list_of_kv = list(etcd.get_prefix('demo/'))
for value, kv_msg in list_of_kv:print("key:{}, value:{}" .format(kv_msg.key, value))
# [3]删除带有前缀的键
res = etcd.delete_prefix('demo/')
print(res)
list_of_kv = list(etcd.get_prefix('demo/'))
for value, kv_msg in list_of_kv:print("key:{}, value:{}" .format(kv_msg.key, value))
lease使用
1、创建lease,续租lease,撤销lease
import etcd3
import json
import time
etcd = etcd3.client(host='127.0.0.1', port=2379)# lease使用
# 创建新租约。返回lease对象
new_lease = etcd.lease(5)
print("new_lease:{}" .format(new_lease.__str__()))
etcd.get_lease_info(new_lease.id)
print("get_lease_info:{}" .format(etcd.get_lease_info(new_lease.id)))
# 如果租约到期,附加到该租约的所有密钥都将过期并删除。
# 可以发送租约保持活动消息以刷新 ttl。如果超时了的话续租仍然是可以成功的
for i in range(3):# 续租print("=========={}============" .format(i))res = list(etcd.refresh_lease(new_lease.id))# 打印续租是否成功信息time.sleep(3)print("refresh_lease resp:{}" .format(res))
# time.sleep(5) # 超时的话,打印error
# 撤销租约
try:res = etcd.revoke_lease(new_lease.id)print("revoke_lease resp:{}" .format(res.__str__()))
except:print("revoke_lease error")# 打印信息如下:
'''
new_lease:<etcd3.leases.Lease object at 0x7fdb5f709220>
get_lease_info:header {cluster_id: 17237436991929493444member_id: 9372538179322589801revision: 2raft_term: 4
}
ID: 3632582335366158369
TTL: 4
grantedTTL: 5==========0============
refresh_lease resp:[header {cluster_id: 17237436991929493444member_id: 9372538179322589801revision: 2raft_term: 4
}
ID: 3632582335366158369
TTL: 5
]
==========1============
refresh_lease resp:[header {cluster_id: 17237436991929493444member_id: 9372538179322589801revision: 2raft_term: 4
}
ID: 3632582335366158369
TTL: 5
]
==========2============
refresh_lease resp:[header {cluster_id: 17237436991929493444member_id: 9372538179322589801revision: 2raft_term: 4
}
ID: 3632582335366158369
TTL: 5
]
revoke_lease resp:None
'''
2、将lease attach到key上
import etcd3
import json
import time
etcd = etcd3.client(host='127.0.0.1', port=2379)# 基础操作 put get delete
# 获取当前所有存储的key
print(list(etcd.get_all()))# 创建新租约。返回lease对象
new_lease = etcd.lease(5)
print("new_lease:{}" .format(new_lease.__str__()))
etcd.get_lease_info(new_lease.id)
print("get_lease_info:{}" .format(etcd.get_lease_info(new_lease.id)))
# 将lease attach 到某个key上
etcd.put('/demo/key', 'hello world',lease=new_lease)
# 查看key是否存在
print(list(etcd.get_all()))
time.sleep(6)
# 查看key是否存在
print(list(etcd.get_all()))
etcd.close()# 打印结果如下:
# [(b'hello', <etcd3.client.KVMetadata object at 0x7f0011933370>)]
# new_lease:<etcd3.leases.Lease object at 0x7f0011933280>
# get_lease_info:header {
# cluster_id: 17237436991929493444
# member_id: 9372538179322589801
# revision: 2
# raft_term: 4
# }
# ID: 3632582335366158376
# TTL: 4
# grantedTTL: 5# [(b'hello world', <etcd3.client.KVMetadata object at 0x7f0011933430>), (b'hello', <etcd3.client.KVMetadata object at 0x7f00119333a0>)]
# [(b'hello', <etcd3.client.KVMetadata object at 0x7f0011933400>)]
watch使用
watch、watch_once
我们这里开一个进程watch某一个key,在另外一个进程里进行替换或者删除key。
import etcd3
import json
import time
etcd = etcd3.client(host='127.0.0.1', port=2379)# 查看当前集群的kv
list_of_kv = list(etcd.get_all())
for value, kv_msg in list_of_kv:print("key:{}, value:{}" .format(kv_msg.key, value))
# key:b'key', value:b'hello'# 对一个key进行watch
# 这里是阻塞的,我们在其他的程序中进行key的修改
events_iterator, cancel = etcd.watch('key')
for event in events_iterator:print(event)
print(cancel)
etcd.close()
import etcd3
import json
etcd = etcd3.client(host='127.0.0.1', port=2379)# 修改key的value
print(etcd.replace('key', initial_value='hello', new_value='hello2'))
print(etcd.get('key'))
# 删除key
print(etcd.delete('key'))
print(etcd.get('key'))
etcd.close()
replace
delete
可以看到在接收到事件后watch进程仍在监听状态
我们可以使用watch_once在第一个事件后停止
watch_prefix、watch_prefix_once
监视一系列key
import etcd3
import json
import time
etcd = etcd3.client(host='127.0.0.1', port=2379)print(etcd.put('demo/bar1', 'doot'))
print(etcd.put('demo/bar2', 'doot'))
print(etcd.put('demo/bar3', 'doot'))
print(etcd.put('demo/bar4', 'doot'))
# 查看当前集群的kv
list_of_kv = list(etcd.get_all())
for value, kv_msg in list_of_kv:print("key:{}, value:{}" .format(kv_msg.key, value))
# key:b'key', value:b'hello'# 对一个key进行watch
# 这里是阻塞的,我们在其他的程序中进行key的修改
events_iterator, cancel = etcd.watch_prefix('demo/')
for event in events_iterator:print(event)
print(cancel)
etcd.close()
cancel_watch
当我们不想继续watch后,我们可以显式取消watch
这里我们在第3次处理watch事件后进行取消
import etcd3
import json
import time
etcd = etcd3.client(host='127.0.0.1', port=2379)# 查看当前集群的kv
list_of_kv = list(etcd.get_all())
for value, kv_msg in list_of_kv:print("key:{}, value:{}" .format(kv_msg.key, value))
# key:b'key', value:b'hello'# 对一个key进行watch
# 这里是阻塞的,我们在其他的程序中进行key的修改
cnt = 0
events_iterator, cancel = etcd.watch('key')
for event in events_iterator:cnt += 1print(event)print("handle times:{}" .format(cnt))if 3 == cnt:cancel() # cancel底层调用的就是cancel_watch(watchid)print("cancel watch")
etcd.close()
很显然,如果我们在主线程进行event监控,会造成主线程阻塞。
add_watch_callback
我们可以向watch加入回调函数,这样就不会阻塞主进程了。
import etcd3
import time
etcd = etcd3.client(host='127.0.0.1', port=2379)def callback(resp):# resp is a events_iteratorfor event in resp.events:print("Key:{}发生改变, 新的value是:{}" .format(event.key, event.value))etcd.add_watch_callback('key', callback)
# 程序主流程
while True:time.sleep(1)
参考
https://www.modb.pro/db/53046