实现功能:
1.从FTP下载最新线刷包。
2.检测adb连接是否正常。
3.进行adb root
4.push_file()把本地升级包EP40_IMAGES_ALL.zip推送到/data/update/temp/目录下。
5.用updatecmd()升级/data/update/temp/目录下的EP40_IMAGES_ALL.zip升级包。
获取最新升级包到本地
#!/usr/bin/env python
# -*- coding: utf-8 -*-import os
import urllib.request
import re
from ftplib import FTP
from datetime import datetime
import timedef current_time():return datetime.now().strftime('%Y-%m-%d_%H-%M-%S')class FTPDownLoad:def __init__(self): # 初始化self.host = "10.4.12.203" # ftp主机ipself.username = "Anonymous" # ftp用户名self.password = "" # ftp密码self.port = 2112 # ftp端口 (默认21)self.ftp = FTP()def connect(self): # 连接FTP serverself.ftp.encoding = 'GBK' # 设置编码,解决上传的文件包含中文的问题self.ftp.set_debuglevel(0) # 调试模式self.ftp.connect(host=self.host, port=self.port) # 连接ftpself.ftp.login(self.username, self.password) # 登录ftpself.ftp.set_pasv(True) # 打开被动模式# print(self.ftp.getwelcome()) # 显示登录ftp信息def quit(self): # 断开连接FTP serverself.ftp.quit()def download_file(self, ftp_file_path):self.ftp.cwd(ftp_file_path)self.entries = list(self.ftp.mlsd())print(self.entries)# Only interested in directoriesself.entries = [entry for entry in self.entries if entry[1]["type"] == "dir"]# Sort by timestampself.entries.sort(key=lambda entry: entry[1]['modify'], reverse=True)# Pick the first onelatest_name = self.entries[0][0]print(latest_name)dir_name = ftp_file_path + "/" + latest_nameself.ftp.cwd(dir_name)self.entries = list(self.ftp.mlsd())print(self.entries)# Only interested in directoriesentries = [entry for entry in self.entries if entry[1]["type"] == "file" and re.match(r'.*_ALL.zip', entry[0])]print("匹配最新文件:", entries)# Pick the first onelatest_file_name = entries[0][0]print("最新文件名:", latest_file_name)# EP40R项目路径local_EP40 = os.path.join(r'/home/devops/tools/autoupdatesystem/updatePackage', latest_file_name)# EP40项目路径print('self.host:',self.host)print('self.port:',self.port)self.url_EP40 = "ftp://10.4.12.203:2112" + dir_name + "/" + latest_file_nameprint(str(self.url_EP40))urllib.request.urlretrieve(str(self.url_EP40), local_EP40)# 程序入口
if __name__ == '__main__':ftp_file_path = "/EP40/IHU_Release/AndroidR/UAT/" # FTP目录 遍历此目录内的文件# ftp_file_key = ['update.zip', 'ReleaseNote.xls', 'update-mcu.*.zip'] # 以正则表达式形式表示的关键字列表,只下载此列表内的文件FTP_Download = FTPDownLoad()wait_time = 10try:print('start one loop package download process in %s' % current_time())FTP_Download.connect()FTP_Download.download_file(ftp_file_path)FTP_Download.quit()print('finish one loop package download process in %s' % current_time())time.sleep(wait_time)except Exception as e:print(current_time()+': '+str(e))time.sleep(wait_time)
升级安卓车机版本
# -*- coding: utf-8 -*-
import os
import sys
import time
from datetime import datetimeclass AutoUpdateSystem:'''实现功能:通过adb命令的方式将本地的update.zip升级包上传到挂载到qnx的目录下‘/qnx’,然后通过shell脚本将升级命令写入到‘/qnx/update/recovery/command’里;通过命令reboot recovery重启进入升级模式进行升级。'''def adb_root(self):self.adb_root_result = os.system('adb root')if self.adb_root_result != 0:print(current_time(), '----------adb root failed! ----------')print(current_time(), '----------adb root passed! ----------')def mount(self):self.adb_mount = os.system('adb shell /sbin/busybox mount -t nfs -o nolock 172.16.1.69:/ /qnx')time.sleep(3)if self.adb_mount != 0:print(current_time(), '----------adb mount failed! ----------')print(current_time(), '----------adb mount passed! ----------')def create_file(self):self.mkdir_file = os.system('adb shell mkdir -p /qnx/update/temp')if self.mkdir_file != 0:print(current_time(), '----------mkdir temp failed! ----------')print(current_time(), '----------mkdir temp passed! ----------')def chmod_file(self):self.change_power = os.system('adb shell chmod 777 /qnx/update/temp')if self.change_power != 0:print(current_time(), '----------chmod file failed! ----------')print(current_time(), '----------chmod file passed! ----------')def push_package_file(self, fname):self.push_update_package = os.system('adb push {0} /qnx/update/temp/update.zip'.format(os.path.join( sys.path[0], fname)))if self.push_update_package != 0:print(current_time(), '----------push package file failed! ----------')print(current_time(), '----------push package file passed! ----------')def push_script(self):self.rm_script = os.system('adb shell rm -rf /qnx/update/recovery/command')self.push_script = os.system('adb push {0} /qnx/update/recovery/'.format(os.path.join(sys.path[0], 'Script', 'updateCommand.sh')))print(os.path.join(sys.path[0], 'Script', 'updateCommand.sh'))if self.push_script != 0:print(current_time(), '----------push script failed! ----------')print(current_time(), '----------push script passed! ----------')def exec_script(self):self.exec_script = os.system('adb shell sh /qnx/update/recovery/updateCommand.sh')if self.exec_script != 0:print(current_time(), '----------exec script failed! ----------')print(current_time(), '----------exec script passed! ----------')def exec_sync(self):self.sync = os.system('adb shell sync')if self.sync != 0:print(current_time(), '----------exec sync failed! ----------')print(current_time(), '----------exec sync passed! ----------')def reboot_recovery(self):self.rboot_recovery = os.system('adb shell reboot recovery')print(current_time(), '----------reboot_recovery ----------')def current_time():return datetime.now().strftime('%Y-%m-%d_%H-%M-%S')def TestAdbConnect():while True:time.sleep(3)ConnectStatus = os.system('adb shell ls')if ConnectStatus == 0:print('adb可正常连接.')breakelse:print('adb连接失败,请到系统设置->版本->关闭/开启adb模式')class E40UpdateSystem:'''实现功能:1.检测adb连接是否正常。2.进行adb root3.push_file()把本地升级包EP40_IMAGES_ALL.zip推送到/data/update/temp/目录下。4.用updatecmd()升级/data/update/temp/目录下的EP40_IMAGES_ALL.zip升级包。'''def create_file(self, file_path):self.mkdir_file = os.system('adb shell mkdir -p {0}'.format(file_path))if self.mkdir_file != 0:print(current_time(), '----------mkdir temp failed! ----------')print(current_time(), '----------mkdir temp passed! ----------')def chmod_file(self, file_path):self.change_power = os.system('adb shell chmod 777 {0}/*'.format(file_path))if self.change_power != 0:print(current_time(), '----------chmod file failed! ----------')print(current_time(), '----------chmod file passed! ----------')def push_file(self, fname, file_path):self.rm_update_package = os.system('adb shell rm -rf {0}*'.format(file_path))self.push_update_package = os.system('adb push {0} {1}'.format(os.path.join(sys.path[0], fname), file_path))if self.push_update_package != 0:print(current_time(), '----------push package file failed! ----------')print(current_time(), '----------push package file passed! ----------')def updatecmd(self, file_path):self.update_cmd = os.system('adb shell am broadcast -a com.hezon.update.action --es "path" "{0}/EP40_IMAGES_ALL.zip"'.format(file_path))if self.update_cmd != 0:print(current_time(), '----------update package file failed! ----------')print(current_time(), '----------update package file passed! ----------')if __name__ == '__main__':file_path = '/data/update/temp/'print(current_time(), '----------start update system ----------')TestAdbConnect() # 检测adb是否正常连接。time.sleep(2)EP31cmd = AutoUpdateSystem()EP31cmd.adb_root() # 进行adb roottime.sleep(2)EP40cmd = E40UpdateSystem()EP40cmd.create_file(file_path) # 给升级包增加可读可写权限time.sleep(2)fname = os.path.join('updatePackage', 'EP40_IMAGES_ALL.zip')print(fname)if sys.argv[1] is not None and os.path.exists(sys.argv[1]):fname = sys.argv[1]EP40cmd.push_file(sys.argv[1], file_path) # 把updatePackage目录下的升级包推送到‘/mnt/user/0/emulated/’目录里time.sleep(2)EP40cmd.chmod_file(file_path)time.sleep(2)EP40cmd.updatecmd(file_path) # 执行升级指令,进行升级。time.sleep(600)print(current_time(), '----------End update system ----------')