Pytorch+Mumu模拟器+萤石摄像头实现对小孩学习的监控

ops/2024/12/25 22:07:04/

思路:

1.利用萤石云监控,放置摄像头在小孩学习桌前,监控小孩是否在学习桌前学习

2.利用Mumu模拟器,通过对萤石app的截图,每10秒采集孩子的学习照片;

3.通过Pytorch深度学习,对采集的图片进行训练、测试,判断是否孩子在学习桌前;

4.通过Python操作Mumu模拟器,对孩子不在学习桌前,点击App的响铃按钮,实施对小孩的监控,并且用Sqlite3数据库记录小孩的学习记录。

采集照片的代码:

# -*- coding: utf-8 -*-
"""
Created on Sun Dec 22 11:01:23 2024@author: YBK
"""import os
import random
import time
import cv2
import numpy as np
import win32api   
from apscheduler.schedulers.blocking import BlockingScheduler
from skimage.metrics import structural_similarity as ssim#连接mumu模拟器   首先先断开服务再执行连接
def Connection():cmd = 'adb kill-server'tmp = os.popen(cmd).readlines()cmd = 'adb connect 127.0.0.1:16384'tmp = os.popen(cmd).readlines()print(tmp)#模拟点击   这个是代码的核心  也是繁琐的根源
def Click(x,y):cmd = 'adb shell input tap {x1} {y1}'.format(x1 = x,y1 = y)print(cmd)os.system(cmd)#通过adb进行截图
def Screenshot(jpgname):if jpgname is None:jpgname = nowtime_to_str()os.system(f'adb shell screencap -p /sdcard/{jpgname}.png')os.system(f'adb pull /sdcard/{jpgname}.png e:/sb/{jpgname}.png')# 读取图片image = cv2.imread(f'e:/sb/{jpgname}.png', cv2.IMREAD_UNCHANGED)     # 设定剪裁区域的坐标和尺寸x, y, w, h = 150, 0, 690, 540  # 示例坐标和尺寸     # 使用numpy数组进行剪裁cropped_image = image[y:y+h, x:x+w]   # 保存剪裁后的图片cv2.imwrite(f'e:/sb/{jpgname}.jpg', cropped_image, [int(cv2.IMWRITE_JPEG_QUALITY), 90])file_path = f'e:/sb/{jpgname}.png'if os.path.exists(file_path):  # 检查文件是否存在os.remove(file_path)  # 删除文件else:print(f"文件 {file_path} 不存在")# 读取图片imageA = cv2.imread(f'e:/sb/{jpgname}.jpg', cv2.IMREAD_GRAYSCALE)  # 图片路径替换为你的图片路径imageB = cv2.imread(r'E:\sb\chick.jpg', cv2.IMREAD_GRAYSCALE)  # 图片路径替换为你的图片路径imageC = cv2.imread(r'E:\sb\chick0.jpg', cv2.IMREAD_GRAYSCALE)# 确保图片大小相同imageA0 = cv2.resize(imageA, (imageB.shape[1], imageB.shape[0]))  imageA1 = cv2.resize(imageA, (imageC.shape[1], imageC.shape[0])) # 计算SSIM(score, diff) = ssim(imageA0, imageB, full=True)diff = (diff * 255).astype("uint8")(score1, diff) = ssim(imageA1, imageC, full=True)print(f"SSIM: {score} , {score1}")if score > 0.98:Click(352, 308)time.sleep(1)if score1 > 0.98:Click(452, 293)time.sleep(1)os.system(f'adb shell rm /sdcard/{jpgname}.png')   #Click(446, 867) #响铃#自动启动mumu模拟器
def Lon():win32api.ShellExecute(0, 'open', r'D:\Program Files\Netease\MuMuPlayer-12.0\shell\MuMuPlayer.exe', '', '', 1)time.sleep(25)Connection()Click(1349,674)time.sleep(5)time.sleep(10) #程序有广告再等待Click(400,400)time.sleep(3)def dec_to_36(num):base = [str(x) for x in range(10)] + [chr(x) for x in range(ord('A'),ord("A")+26)]# 前者把 0 ~ 9 转换成字符串存进列表 base 里,后者把 A ~ Z 存进列表l = []if num<0:return "-"+dec_to_36(abs(num))while True:num,rem = divmod(num,36) # 求商 和 留余数l.append(base[rem])if num == 0:return "".join(l[::-1])def nowtime_to_str():#将当前时间戳转化为36进制,约6位字符,减少文件名长度unix_timestamp = int(time.time())return(dec_to_36(unix_timestamp))Lon() #打开模拟器打开程序
Connection()#首先连接上mumu# time.sleep(10) #程序有广告再等待
# Click(400,400)
# time.sleep(3)sched = BlockingScheduler()
sched.add_job(Screenshot, args=[None,], trigger='cron', day_of_week='mon-sun', hour='*', minute='*', second='*/30', )
sched.start()

图片进行训练的代码,我用GPU:

# -*- coding: utf-8 -*-
"""
Created on Tue Dec 24 09:02:10 2024@author: YBK
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 数据预处理
transform = transforms.Compose([transforms.Resize((224, 224)),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])# 加载数据集
train_dataset = datasets.ImageFolder(root=r'E:\.spyder-py3\jjxjwl\haveman0', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)val_dataset = datasets.ImageFolder(root=r'E:\.spyder-py3\jjxjwl\haveman00', transform=transform)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)# 选择预训练模型并修改分类头
model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)  # 2表示二分类# 将模型移动到GPU
model = model.to(device)# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 训练模型
num_epochs = 50
for epoch in range(num_epochs):model.train()running_loss = 0.0for inputs, labels in train_loader:# 将数据和标签移动到GPUinputs, labels = inputs.to(device), labels.to(device)optimizer.zero_grad()outputs = model(inputs)loss = criterion(outputs, labels)loss.backward()optimizer.step()running_loss += loss.item()print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}')# 验证模型model.eval()correct = 0total = 0with torch.no_grad():for inputs, labels in val_loader:inputs, labels = inputs.to(device), labels.to(device)outputs = model(inputs)_, predicted = torch.max(outputs.data, 1)total += labels.size(0)correct += (predicted == labels).sum().item()print(f'Validation Accuracy: {100 * correct / total}%')# 保存模型
torch.save(model.state_dict(), 'model.pth')

文件目录:

haveman00我只保存10张有人和无人的照片。

模型测试的代码:

# -*- coding: utf-8 -*-
"""
Created on Tue Dec 24 15:37:49 2024@author: YBK
"""import torch
import torch.nn as nn
from torchvision import transforms, models
from PIL import Image# 加载模型
model = models.resnet18()
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)  # 确保分类头与训练时相匹配
model.load_state_dict(torch.load('model.pth'))
model.eval()  # 设置为评估模式# 如果模型是在GPU上训练的,确保在推理时也使用GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)# 定义预处理操作
preprocess = transforms.Compose([transforms.Resize((224, 224)),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])# 加载并预处理图片
img_path = r'E:\sb\man\SOVSBI.jpg'
# img_path = r'E:\sb\noman\SOVR96.jpg'
# img_path = r'E:\sxtpz\OLD\20240820\0820100504.jpg'
image = Image.open(img_path).convert('RGB')
image_tensor = preprocess(image).unsqueeze(0)  # 添加一个batch维度
image_tensor = image_tensor.to(device)# 执行推理
with torch.no_grad():outputs = model(image_tensor)_, predicted = torch.max(outputs, 1)# 解释预测结果
if predicted.item() == 0:print("图片中有人")
else:print("图片中没有人")

训练大概300张有人和无人的照片后,启用监控的程序,其中使用了番茄时间,即学习25分钟休息5分钟:

# -*- coding: utf-8 -*-
"""
Created on Sun Dec 22 11:01:23 2024@author: YBK
"""import os
import time
import cv2import win32api   
from apscheduler.schedulers.blocking import BlockingScheduler
from skimage.metrics import structural_similarity as ssimimport sqlite3
import datetime
from pathlib import Path
import torch
import torch.nn as nn
from torchvision import transforms, models
from PIL import Image# 加载模型
model = models.resnet18()
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)  # 确保分类头与训练时相匹配
model.load_state_dict(torch.load('model.pth'))
model.eval()  # 设置为评估模式# 如果模型是在GPU上训练的,确保在推理时也使用GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# 定义预处理操作
preprocess = transforms.Compose([transforms.Resize((224, 224)),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
db_filepath = Path(__file__).joinpath("../jk.db").resolve()def insertdb(sj,furl,xl,zt): #插入一行数据zt1为有人0为无人conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)c = conn.cursor()insert_query = "INSERT INTO jk(sj,furl,xl,zt) VALUES(?,?,?,?);"insert_data = (sj,furl,xl,zt)c.execute(insert_query,insert_data)conn.commit()c.close()conn.close
def predict(image_path):# 加载并预处理图片img_path = image_pathimage = Image.open(img_path).convert('RGB')image_tensor = preprocess(image).unsqueeze(0)  # 添加一个batch维度image_tensor = image_tensor.to(device)# 执行推理with torch.no_grad():outputs = model(image_tensor)_, predicted = torch.max(outputs, 1)# 解释预测结果if predicted.item() == 0:print("图片中有人")result = '有人'else:print("图片中没有人")result = '无人'return result#连接mumu模拟器   首先先断开服务再执行连接
def Connection():cmd = 'adb kill-server'tmp = os.popen(cmd).readlines()cmd = 'adb connect 127.0.0.1:16384'tmp = os.popen(cmd).readlines()print(tmp)#模拟点击   这个是代码的核心  也是繁琐的根源
def Click(x,y):cmd = 'adb shell input tap {x1} {y1}'.format(x1 = x,y1 = y)print(cmd)os.system(cmd)def getprezt(): #获取最后一个ztconn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)c = conn.cursor()cursor = c.execute("select zt from jk order by id desc limit 0,1;")row = cursor.fetchone()if row:zt = row[0]else:zt = 1    c.close()conn.closereturn ztdef getprexl(): #获取最后一个ztconn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)c = conn.cursor()cursor = c.execute("select xl from jk order by id desc limit 0,1;")row = cursor.fetchone()if row:xl = row[0]else:xl = 0c.close()conn.closereturn xldef isling(): #是否在响铃,因为摄像头响铃时间是1分钟,在1分钟内人如果有回来可以按掉conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)c = conn.cursor()cursor = c.execute("select sj from jk where xl >= 1 order by id desc limit 0,1;")row = cursor.fetchone()xltime = row[0] #获取最后响铃时间# 如果“最后响铃时间”距离现在大于1分钟,那么就没有在响铃now = datetime.datetime.now()nowtime = now.strftime("%Y-%m-%d %H:%M:%S")delta = datetime.datetime.strptime(nowtime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(xltime, "%Y-%m-%d %H:%M:%S")seconds_diff = delta.total_seconds()if seconds_diff > 60:isling = Falseprint(f"“最后响铃时间{xltime}”距离现在大于1分钟")else:#如果“最后响铃时间”距离现在小于1分钟,那么获取真正的响铃时间cursor = c.execute("select sj from jk where xl = 0 and sj < datetime('" + xltime + "') order by id desc limit 0,1;")row = cursor.fetchone()realtime = row[0]delta = datetime.datetime.strptime(nowtime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(realtime, "%Y-%m-%d %H:%M:%S")seconds_diff = delta.total_seconds()if seconds_diff <= 60:isling = True #如果真正响铃时间距离现在也小于1分钟,那么就是真正在响铃else:isling = Falseprint(f"真正响铃时间{realtime}距离现在大于1分钟")# 这里还要分析无人超过1分钟,但要求响铃的情况c.close()conn.closereturn islingdef nomanmin():#无人情况经过多少个10秒nomanonemin = 0conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)c = conn.cursor()cursor = c.execute("SELECT count(*) FROM jk WHERE xl = 1 and sj > datetime('now', 'localtime', '-1 minute')")row = cursor.fetchone()tj = row[0]print(f'无人统计={tj}')c.close()conn.closenomanonemin = tjreturn nomanonemindef getmiaocha(): #计算秒差seconds_diff = 0now = datetime.datetime.now()nowtime = now.strftime("%Y-%m-%d %H:%M:%S")conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)c = conn.cursor()cursor = c.execute("select sj from jk where xl = 1 order by id desc limit 0,1;") #获取最后响铃时间row = cursor.fetchone()if row:xltime = row[0]c.close()conn.closeif xltime:delta = datetime.datetime.strptime(nowtime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(xltime, "%Y-%m-%d %H:%M:%S")seconds_diff = delta.total_seconds()return seconds_diffdef xxms(): #休息时间经过多少个10秒xxms = 0conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)c = conn.cursor()cursor = c.execute("SELECT count(*) FROM jk WHERE xl = 2 and zt = 2 and sj > datetime('now', 'localtime', '-5 minute')")row = cursor.fetchone()tj = row[0]print(f'休息统计={tj}')c.close()conn.closexxms = tjreturn xxmsdef fqms(): #休息时间经过多少个10秒fqms = 0conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)c = conn.cursor()cursor = c.execute("SELECT count(*) FROM jk WHERE zt = 1 and sj > datetime('now', 'localtime', '-25 minute')")row = cursor.fetchone()tj = row[0]print(f'学习统计={tj}')c.close()conn.closefqms = tjreturn fqms
#通过adb进行截图  让后面图片的匹配有基准
def Screenshot(jpgname):if jpgname is None:jpgname = nowtime_to_str()os.system(f'adb shell screencap -p /sdcard/{jpgname}.png')os.system(f'adb pull /sdcard/{jpgname}.png e:/sb/{jpgname}.png')# 读取图片image = cv2.imread(f'e:/sb/{jpgname}.png', cv2.IMREAD_UNCHANGED)     # 设定剪裁区域的坐标和尺寸x, y, w, h = 150, 0, 690, 540  # 示例坐标和尺寸     # 使用numpy数组进行剪裁cropped_image = image[y:y+h, x:x+w]   # 保存剪裁后的图片cv2.imwrite(f'e:/sb/{jpgname}.jpg', cropped_image, [int(cv2.IMWRITE_JPEG_QUALITY), 90])file_path = f'e:/sb/{jpgname}.png'if os.path.exists(file_path):  # 检查文件是否存在os.remove(file_path)  # 删除文件else:print(f"文件 {file_path} 不存在")# 读取图片imageA = cv2.imread(f'e:/sb/{jpgname}.jpg', cv2.IMREAD_GRAYSCALE)  # 图片路径替换为你的图片路径imageB = cv2.imread(r'E:\sb\chick.jpg', cv2.IMREAD_GRAYSCALE)  # 图片路径替换为你的图片路径imageC = cv2.imread(r'E:\sb\chick0.jpg', cv2.IMREAD_GRAYSCALE)# 确保图片大小相同imageA0 = cv2.resize(imageA, (imageB.shape[1], imageB.shape[0]))  imageA1 = cv2.resize(imageA, (imageC.shape[1], imageC.shape[0])) # 计算SSIM(score, diff) = ssim(imageA0, imageB, full=True)diff = (diff * 255).astype("uint8")(score1, diff) = ssim(imageA1, imageC, full=True)print(f"SSIM: {score} , {score1}")if score > 0.98:Click(352, 308)time.sleep(1)if score1 > 0.94:Click(452, 293)time.sleep(1)os.system(f'adb shell rm /sdcard/{jpgname}.png')   image_path = f'e:/sb/{jpgname}.jpg'if score > 0.98 or score1 > 0.94:print('点击激活监控')else:#先休息模式xxmstj = xxms()if xxmstj == 30:#休息5分钟到,响铃回来# Click(446, 867)print('响铃回来')xl = 0 zt = 0elif xxmstj > 0 and xxmstj <= 29:#继续休息print('继续休息')xl = 2 zt = 2 else:#判断是否可以休息模式fqmstj = fqms()if fqmstj > 130:# Click(446, 867)print('响铃休息')xl = 2 zt = 2else:    #正常学习模式prexl = getprexl()nomanonemin = nomanmin()xl = 0 zt = 0is00 = isling()if predict(image_path) == '有人':zt = 1 xl = 0 #有人不响铃if is00 and prexl == 1: #如果在响铃,则取消响铃# Click(446, 867) #再点一次取消响铃print('取消响铃')else:zt = 0 xl = 1 #无人就响铃if (is00 != True and prexl == 0) and nomanonemin < 1: #如果上次没有第一次响铃,或者无人超过1分钟(间距1分钟内有5个响铃),就响铃# Click(446, 867) #再点一次取消响铃print('点击响铃')elif is00 != True and nomanonemin == 5:# Click(446, 867) #再点一次取消响铃print('无人再次点击响铃')else:print('在响铃了,不点击')if nomanonemin == 5:xl = 0 #无人超过1分钟时记录响铃为0,伪造记录用于确保程序运行#正常学习模式结束now = datetime.datetime.now()        insertdb(now.strftime("%Y-%m-%d %H:%M:%S"),f'e:/sb/{jpgname}.jpg',xl,zt)#Click(446, 867) #响铃#自动启动mumu模拟器  这是我的电脑上mumu 的位置
def Lon():win32api.ShellExecute(0, 'open', r'D:\Program Files\Netease\MuMuPlayer-12.0\shell\MuMuPlayer.exe', '', '', 1)time.sleep(25)Connection()Click(1349,674)time.sleep(5)def dec_to_36(num):base = [str(x) for x in range(10)] + [chr(x) for x in range(ord('A'),ord("A")+26)]# 前者把 0 ~ 9 转换成字符串存进列表 base 里,后者把 A ~ Z 存进列表l = []if num<0:return "-"+dec_to_36(abs(num))while True:num,rem = divmod(num,36) # 求商 和 留余数l.append(base[rem])if num == 0:return "".join(l[::-1])def nowtime_to_str():#将当前时间戳转化为36进制,约6位字符,减少文件名长度unix_timestamp = int(time.time())return(dec_to_36(unix_timestamp))# Lon() #打开模拟器打开程序
Connection() #首先连接上mumu
# time.sleep(10) #程序有广告再等待
# Click(400,400)
# time.sleep(3)
# Screenshot('4')
sched = BlockingScheduler()
sched.add_job(Screenshot, args=[None,], trigger='cron', day_of_week='mon-sun', hour='*', minute='*', second='*/10', )
sched.start()

chick图片是网络问题吧:

chick0:


http://www.ppmy.cn/ops/144948.html

相关文章

AWTK-WEB 快速入门(2) - JS 应用程序

AWTK 可以使用相同的技术栈开发各种平台的应用程序。有时我们需要使用 Web 界面与设备进行交互&#xff0c;本文介绍一下如何使用 JS 语言开发 AWTK-WEB 应用程序。 用 AWTK Designer 新建一个应用程序 先安装 AWTK Designer&#xff1a; https://awtk.zlg.cn/web/index.html…

使用TC命令模拟弱网丢包

在网络测试和优化过程中&#xff0c;模拟弱网环境&#xff08;如高延迟、丢包、抖动等&#xff09;是非常重要的一环。tc&#xff08;Traffic Control&#xff09;是 Linux 内核中的一个强大工具&#xff0c;它可以用于流量整形、流量控制、队列管理等。通过 tc&#xff0c;我们…

“Content type ‘text/plain;charset=UTF-8‘ not supported“,

用postman进行新增数据时&#xff0c;如下提示&#xff1a; "Content type text/plain;charsetUTF-8 not supported" Content type text/plain 不支持 点击Headers我们看到Content-Type 支持的类型是json 所以问题出现在这个地方&#xff0c;要将Text切换成JSON…

C++23新特性详解:多维下标运算符

1. 为什么需要多维下标运算符&#xff1f; 在C的发展历程中&#xff0c;多维数组的访问一直是一个令人困扰的问题。让我们先看看传统的多维数组访问方式及其问题&#xff1a; 1.1 传统方式的问题 1.1.1 C风格数组 int matrix[3][4]; matrix[1][2] 42; // 需要多次下标访问…

案例分析-THC7984设计问题报告

目录 简介 配置信息 结论: 简介 使用的环境 AD芯片:THC7984 VGA信号:通过电脑主机产生1024x768 60HZ信号。 配置信息 AD数字数字产生通过FPGA接收。 AD寄存器配置(第一个数数据,第二个是地址): iic_write_reg 1 0x1e iic_write_reg 02 0x5

重温设计模式--迭代器模式

文章目录 迭代器模式&#xff08;Iterator Pattern&#xff09;概述迭代器模式的结构迭代器模式UML图C 代码示例应用场景 迭代器模式&#xff08;Iterator Pattern&#xff09;概述 定义&#xff1a; 迭代器模式是一种行为型设计模式&#xff0c;它提供了一种方法来顺序访问一个…

关于uni-forms组件的bug【提交的字段[‘*‘]在数据库中并不存在】

问题&#xff1a;在使用 uni-forms校验的时候&#xff0c;出来的一个问题&#xff0c;这个字段都没有设置校验的规则&#xff0c;不知道什么原因就出现了下图的问题&#xff1a; 解决办法&#xff1a; 在uni-forms-item 添加key 值就解决了 原因不知道&#xff0c;有大佬发现…

​在VMware虚拟机上设置Ubuntu与主机共享文件夹​

‌在VMware虚拟机上设置Ubuntu与主机共享文件夹的步骤如下‌&#xff1a; ‌主机共享文件夹的设置‌&#xff1a;首先&#xff0c;在主机上选择一个磁盘分区创建一个文件夹&#xff0c;并设置其共享属性。右键点击该文件夹&#xff0c;选择“属性”&#xff0c;然后在“共享”选…