【信号处理】基于CNN自编码器的心电信号异常检测识别(tensorflow)

news/2025/1/19 8:51:39/

关于

本项目主要实现卷积自编码器对于异常心电ECG信号的检测和识别,属于无监督学习中的生理信号检测的典型方法之一。

工具

 

方法实现

读取心电信号
normal_df = pd.read_csv("/heartbeat/ptbdb_normal.csv").iloc[:, :-1]
anomaly_df = pd.read_csv("/heartbeat/ptbdb_abnormal.csv").iloc[:, :-1]
normal_df.head()

信号可视化

def plot_sample(normal, anomaly):index = np.random.randint(0, len(normal_df), 2)fig, ax = plt.subplots(1, 2, sharey=True, figsize=(10, 4))ax[0].plot(normal.iloc[index[0], :].values, label=f"Case {index[0]}")ax[0].plot(normal.iloc[index[1], :].values, label=f"Case {index[1]}")ax[0].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)ax[0].set_title("Normal")ax[1].plot(anomaly.iloc[index[0], :].values, label=f"Case {index[0]}")ax[1].plot(anomaly.iloc[index[1], :].values, label=f"Case {index[1]}")ax[1].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)ax[1].set_title("Anomaly")plt.tight_layout()plt.show()plot_sample(normal_df, anomaly_df)

 

 信号均值计算及可视化
def plot_smoothed_mean(data, class_name = "normal", step_size=5, ax=None):df = pd.DataFrame(data)roll_df = df.rolling(step_size)smoothed_mean = roll_df.mean().dropna().reset_index(drop=True)smoothed_std = roll_df.std().dropna().reset_index(drop=True)margin = 3*smoothed_stdlower_bound = (smoothed_mean - margin).values.flatten()upper_bound = (smoothed_mean + margin).values.flatten()ax.plot(smoothed_mean.index, smoothed_mean)ax.fill_between(smoothed_mean.index, lower_bound, y2=upper_bound, alpha=0.3, color="red")ax.set_title(class_name, fontsize=9)fig, axes = plt.subplots(1, 2, figsize=(8, 4), sharey=True)
axes = axes.flatten()
for i, label in enumerate(CLASS_NAMES, start=1):data_group = df.groupby("target")data = data_group.get_group(label).mean(axis=0, numeric_only=True).to_numpy()plot_smoothed_mean(data, class_name=label, step_size=20, ax=axes[i-1])
fig.suptitle("Plot of smoothed mean for each class", y=0.95, weight="bold")
plt.tight_layout()

 训练/测试数据划分
normal_df.drop("target", axis=1, errors="ignore", inplace=True)
normal = normal_df.to_numpy()
anomaly_df.drop("target", axis=1, errors="ignore", inplace=True)
anomaly = anomaly_df.to_numpy()X_train, X_test = train_test_split(normal, test_size=0.15, random_state=45, shuffle=True)
print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}, anomaly shape: {anomaly.shape}")
搭建自编码器
class AutoEncoder(Model):def __init__(self, input_dim, latent_dim):super(AutoEncoder, self).__init__()self.input_dim = input_dimself.latent_dim = latent_dimself.encoder = tf.keras.Sequential([layers.Input(shape=(input_dim,)),layers.Reshape((input_dim, 1)),  # Reshape to 3D for Conv1Dlayers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),layers.BatchNormalization(),layers.MaxPooling1D(2, padding="same"),layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),layers.BatchNormalization(),layers.MaxPooling1D(2, padding="same"),layers.Conv1D(latent_dim, 3, strides=1, activation='relu', padding="same"),layers.BatchNormalization(),layers.MaxPooling1D(2, padding="same"),])# Previously, I was using UpSampling. I am trying Transposed Convolution this time around.self.decoder = tf.keras.Sequential([layers.Conv1DTranspose(latent_dim, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),layers.BatchNormalization(),layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),layers.BatchNormalization(),layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),layers.BatchNormalization(),layers.Flatten(),layers.Dense(input_dim)])def call(self, X):encoded = self.encoder(X)decoded = self.decoder(encoded)return decodedinput_dim = X_train.shape[-1]
latent_dim = 32model = AutoEncoder(input_dim, latent_dim)
model.build((None, input_dim))
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss="mae")
model.summary()
模型训练
epochs = 100
batch_size = 128
early_stopping = EarlyStopping(patience=10, min_delta=1e-3, monitor="val_loss", restore_best_weights=True)history = model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size,validation_split=0.1, callbacks=[early_stopping])
训练可视化
plt.plot(history.history['loss'], label="Training loss")
plt.plot(history.history['val_loss'], label="Validation loss", ls="--")
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.title("Training loss")
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.show()

 

信号重建可视化
fig, axes = plt.subplots(2, 5, sharey=True, sharex=True, figsize=(12, 6))
random_indexes = np.random.randint(0, len(X_train), size=5)for i, idx in enumerate(random_indexes):data = X_train[[idx]]plot_examples(model, data, ax=axes[0, i], title="Normal")for i, idx in enumerate(random_indexes):data = anomaly[[idx]]plot_examples(model, data, ax=axes[1, i], title="anomaly")plt.tight_layout()
fig.suptitle("Sample plots (Actual vs Reconstructed by the CNN autoencoder)", y=1.04, weight="bold")
fig.savefig("autoencoder.png")
plt.show()

计算重建MAE误差
train_mae = model.evaluate(X_train, X_train, verbose=0)
test_mae = model.evaluate(X_test, X_test, verbose=0)
anomaly_mae = model.evaluate(anomaly_df, anomaly_df, verbose=0)print("Training dataset error: ", train_mae)
print("Testing dataset error: ", test_mae)
print("Anormaly dataset error: ", anomaly_mae)

 异常检测阈值选取

MAE误差阈值=正常数据重建MAE均值+正常数据重建MAE标准差,此阈值可以用来直接检测某信号为正常信号还是异常心电信号。

def predict(model, X):pred = model.predict(X, verbose=False)loss = mae(pred, X)return pred, loss_, train_loss = predict(model, X_train)
_, test_loss = predict(model, X_test)
_, anomaly_loss = predict(model, anomaly)
threshold = np.mean(train_loss) + np.std(train_loss) # Setting threshold for distinguish normal data from anomalous databins = 40
plt.figure(figsize=(9, 5), dpi=100)
sns.histplot(np.clip(train_loss, 0, 0.5), bins=bins, kde=True, label="Train Normal")
sns.histplot(np.clip(test_loss, 0, 0.5), bins=bins, kde=True, label="Test Normal")
sns.histplot(np.clip(anomaly_loss, 0, 0.5), bins=bins, kde=True, label="anomaly")ax = plt.gca()  # Get the current Axes
ylim = ax.get_ylim()
plt.vlines(threshold, 0, ylim[-1], color="k", ls="--")
plt.annotate(f"Threshold: {threshold:.3f}", xy=(threshold, ylim[-1]), xytext=(threshold+0.009, ylim[-1]),arrowprops=dict(facecolor='black', shrink=0.05), fontsize=9)
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.show()

模型评估
plot_confusion_matrix(model, X_train, X_test, anomaly, threshold=threshold)ytrue, ypred = prepare_labels(model, X_train, X_test, anomaly, threshold=threshold)
print(classification_report(ytrue, ypred, target_names=CLASS_NAMES))

 

代码获取

相关项目开发和问题,欢迎后台沟通交流。


http://www.ppmy.cn/news/1441842.html

相关文章

Spring Boot 的文件配置

SpringBoot的配置文件,有三种格式 1.properties 2.yaml 3.yml(yaml的简写) 这里主要介绍1和3格式的。 在项目中,同时存在properties和yml配置文件, properties的优先级更高 同时存在时,两个文件都生效 如果两个文件中,都包含同一个配置,以properties为主。 properties的配置…

Python爬虫--Ajax异步抓取腾讯视频评论

在某些网站 ,当我们滑下去的时候才会显示出后面的内容 就像淘宝一样,滑下去才逐渐显示其他商品 这个就是采用 Ajax 做的 然后我们现在就是要编写这样的爬虫。 规律分析: 这个时候就要用到我们的 Fiddler 了 我们需要分析加载评论的规律 …

PotatoPie 4.0 实验教程(21) —— FPGA实现摄像头图像二值化(RGB2Gray2Bin)

PotatoPie 4.0开发板教程目录(2024/04/21) 为什么要进行图像的二值化? 当我们处理图像时,常常需要将其转换为二值图像。这是因为在很多应用中,我们只对图像中的某些特定部分感兴趣,而不需要考虑所有像素的…

引入线程的贪吃蛇风骚走位

1.在main函数中分别引入线程t1 和线程 t2 一个线程用来刷新界面,一个线程用来改变方向 2.刷新界面函数,无限次刷新 3. 也是无限循环while(1) 定义key 从键盘获取输入方向,赋值给dir; 4.在初始化函数中确定蛇向有行走为方向 5.从改变方向的函数…

计算机网络复习(第一章概述)

一、基本概念 1、计算机网络:由若干节点和连接这些结点的链路组成(交换机连接) 2、互连网:路由器连接的多个计算机网络 3、ISP:互联网服务提供商,ISP高级路由器连接全部组成互联网 4、互联网必须使用TC…

图像的矩(MATLAB源码)

颜色矩(Color Moment)是一种用来描述图像颜色分布的统计特征。它可以用来衡量图像中不同颜色之间的关系,以及颜色分布的特征。常见的颜色矩包括一阶矩(Mean)、二阶矩(Variance)、三阶矩(Skewness)和四阶矩(Kurtosis)等。 颜色矩能够提供关于图像颜色分布的信息,例…

FRPC+PHP+MYSQL+APACHE2=个人网站

应用背景有公网需求,但是又不想去买又贵又低配置的服务器,然后方案就应运而生 frp/README_zh.md at dev fatedier/frp (github.com) 在这里, FRPC作为内网穿透服务, PHPMYSQLAPACHE2,作为网站搭建,具体细节不细讲, 但是在我的/var/www/html下面 linaroHinlink:/var/www/h…

世媒讯提供海内外媒体宣发服务,引领企业新媒体发展之路

在这个信息化的时代,软文发稿已经成为企业发展不可或缺的重要工具。随着社会的快速发展,消费者需要更多定制化、个性化的信息。利用软性推广,凭借其细致入微的信息传递,可以迅速抓住消费者的注意力,从而进一步推动企业…