问:如何在python3脚本中运行shell的命令?
答:subprocess
import subprocess
cmd="lsscsi"
p = subprocess.run(cmd,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(p)
看一下这个输出是什么
CompletedProcess(args='lsscsi', returncode=0, stdout=b'[5:0:0:0] disk ATA KINGSTON SA400S3 71E0 /dev/sda \n', stderr=b'')
进一步结构分析:
root@unassigned:~/Desktop/xian# python3
Python 3.6.9 (default, Mar 10 2023, 16:46:00)
[GCC 8.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import subprocess
>>> cmd="lsscsi"
>>> p = subprocess.run(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
>>> print(p)
CompletedProcess(args='lsscsi', returncode=0, stdout=b'[5:0:0:0] disk ATA KINGSTON SA400S3 71E0 /dev/sda \n', stderr=b'')
#它的输出包含args即原始shell cmd, returncode即执行结果,0为True,非0为False,stdout是标准输出,stderr是错误输出
>>> print(p.args)
lsscsi
#可以直接使用 p.关键字 将关键字对应的值取出来
>>> print(p.returncode)
0
#returncode可以让我们判断shell命令的执行是否顺利,一般来是0为True,非0为False
>>> print(p.stdout)
b'[5:0:0:0] disk ATA KINGSTON SA400S3 71E0 /dev/sda \n'
>>> print(p.stdout.decode('utf-8'))
[5:0:0:0] disk ATA KINGSTON SA400S3 71E0 /dev/sda#这里增加 .decode('utf-8') 将原本字节序列转换为字符串
>>> print(p.stdout.decode('utf-8').replace('\n',''))
[5:0:0:0] disk ATA KINGSTON SA400S3 71E0 /dev/sda
#这里则再次增加 .replace('\n','')把字符串内的 \n 换行符替换为空
>>> print(p.stderr)
b''
>>>
问:现在已经知道如何在python脚本中执行shell cmd,我开始想,我得多线程去处理测试,我需要同时执行2个及以上的cmd,这又如何去做?
答:threading & def
import threading
import subprocess
import timedef test1(runtime):print("fio start")cmd="fio --name=test --filename=/dev/nvme0n1 --ioengine=libaio --direct=1 --rw=write --iodepth=128 --numjobs=1 --bs=128k --runtime="+str(runtime)+" --time_based"p = subprocess.run(cmd,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)print("fio test done")
def test2():for i in range(100):print("-------%d---------"%i)cmd="nvme reset /dev/nvme0n1"p = subprocess.run(cmd,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)time.sleep(1)def run():mythreads=[]t1=threading.Thread(target=test1,args=(30,))t2=threading.Thread(target=test2)mythreads.append(t1)mythreads.append(t2)for t in mythreads:t.start()for t in mythreads:t.join()if __name__ == "__main__":run()print("done")
如上代码,我们使用 def 定义函数 test1和test2,每个函数内部执行不同的cmd,然后定义函数 run,在run函数中定义thread创建线程
然后让其一起跑起来,实现多条shell cmd 同时执行
执行结果:
fio start
-------0---------
-------1---------
.......
-------30---------
fio test done
-------31---------
.....
-------99---------
done
可以看到test1与test2是同步进行的,且等到所有的线程都结束后,才会继续往下执行剩余的脚本命令
进阶剖析run函数:
def run(): #定义函数mythreads=[] #定义一个空的列表 mythreads,以便稍后存储线程t1=threading.Thread(target=test1,args=(30,)) #定义线程t1需要做的事情,target为需要调用的函数块,args后接的是函数需要给的参数,如无参数可以省略argst2=threading.Thread(target=test2)mythreads.append(t1) #将线程加入到列表中mythreads.append(t2)for t in mythreads: #遍历列表t.start() #启动线程,开始测试for t in mythreads:t.join() #启用多线程,等待所有线程结束后,程序才会继续往下执行
做到这里,我大概知道如何多线程进行命令执行,脚本也跑起来了,但是如果我想要控制所有进程呢?
比如在上述例子之中,test1有自己的执行时间,test2也有,现在的脚本只能保证所有的线程都结束了才能继续往下走,我是否可以指定一个重要项,以它为目标
它做完了,其他线程就直接停掉,继续往下执行呢?
于是我想到了增加一个 flag参数进行进程管控:
内容如下:
import threading
import subprocess
import time
def test1(runtime):print("fio start")cmd="fio --name=test --filename=/dev/nvme0n1 --ioengine=libaio --direct=1 --rw=write --iodepth=128 --numjobs=1 --bs=128k --runtime="+str(runtime)+" --time_based"p = subprocess.run(cmd,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)print("fio test done")global flagflag=False
def test2():i=0global flagwhile flag and i <100:print("-------%d---------"%i)cmd="nvme reset /dev/nvme0n1"p = subprocess.run(cmd,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)time.sleep(1)i+=1print("reset done")
def run():mythreads=[]global flagt1=threading.Thread(target=test1,args=(30,))t2=threading.Thread(target=test2)mythreads.append(t1)mythreads.append(t2)for t in mythreads:t.start()for t in mythreads:t.join()if __name__ == "__main__":flag=Truerun()print("done")
在此脚本中,我定义了一个全局变量 flag=True,默认开始为True,然后我希望test1作为我的主线程,因此我在函数末尾增加了
global flag #声明此为全局变量flag=False #重新赋值
于是在test1中的fio执行完毕后,flag会被置为false
我又在test2中加入了一个判断 while flag and i <100: 即当flag为True且 i小于100的时候,while循环内的cmd才会执行,这样来控制test2的执行
我预期的结果是,当test1先执行完毕,test2需要也结束掉,程序继续往下执行,当test2先执行完毕,测试也需要等待test1完全执行完毕才能继续往下执行
,让我们看一下程序执行的结果:
fio start
-------0---------
.....
-------30---------
fio test done
reset done
dnoe
可以看到,我设定了test1执行30s,test2每秒打印一次----*------,当test1 30s执行完毕,随即test2也结束了,并在结束之后继续往下执行 print(“done”)
接下来将test1的执行时间改为110秒,即将 t1=threading.Thread(target=test1,args=(30,)) 修改为 t1=threading.Thread(target=test1,args=(110,))
再看一下执行结果:
fio start
-------0---------
....
-------99---------
reset done
fio test done
dnoe
可以看到,在test2执行完毕打印 reset done以后,test1还未执行完毕,等待test1执行完毕后,打印fio test done,所有线程结束,脚本继续往下进行,打印done
以上结果均符合预期,说明脚本无误,成功实现了测试目标