基于通道注意力机制的高光谱图像分类
目录
- 前言
- 步骤
- Python 实现
- 1.定义HybridSN类
- 2.创建数据集
- 3.开始训练
- 4.模型测试
- 5.基于通道注意力机制改进网络
- 6.再次训练
- 7.再次测试
- 完整代码
- 总结
前言
近年来,由于高光谱数据的独特性质以及所包含的海量信息,对于高光谱图像的分析与处理已经成为遥感影像研究领域的热点之一,而其中的高光谱图像分类任务又对地质勘探、农作物检测、国防军事等领域起着实质性的重要作用,值得更加深入的研究。然而高光谱图像分类任务中,数据特征的获取和学习一直是研究的重点与难点,如何提取充分有效的特征直接影响到分类结果的好坏。
文章参考了论文《HybridSN: Exploring 3-D–2-DCNN Feature Hierarchy for Hyperspectral Image Classification》后,这篇论文构建了一个混合网络解决高光谱图像分类问题,首先用 3D卷积,然后使用 2D卷积。
在此基础上,文章尝试加入通道注意力机制,得到了更好的效果。
步骤
- 定义网络和创建数据集
- 训练与测试
- 基于通道注意力机制改进网络
- 再次训练和测试,并进行对比
Python 实现
1.定义HybridSN类
- 模型的网络结构为如下图所示:
三维卷积部分:
- conv1:(1, 30, 25, 25), 8个 7x3x3 的卷积核 ==>(8, 24, 23, 23)
- conv2:(8, 24, 23, 23), 16个 5x3x3 的卷积核 ==>(16, 20, 21, 21)
- conv3:(16, 20, 21, 21),32个 3x3x3 的卷积核 ==>(32, 18, 19, 19)
接下来要进行二维卷积,因此把前面的 32*18 reshape 一下,得到 (576, 19, 19)
二维卷积:(576, 19, 19) 64个 3x3 的卷积核,得到 (64, 17, 17)
接下来是一个 flatten 操作,变为 18496 维的向量,
接下来依次为256,128节点的全连接层,都使用比例为0.4的 Dropout,
最后输出为 16 个节点,是最终的分类类别数。
下面是HybridSN网络层数结构表:
下面是HybridSN网络定义:
class HybridSN(nn.Module): def __init__(self): super(HybridSN, self).__init__() self.conv_3d = nn.Sequential( nn.Conv3d(1, 8, (7, 3, 3)), nn.LeakyReLU(0.2, inplace=True), nn.Conv3d(8, 16, (5, 3, 3)), nn.LeakyReLU(0.2, inplace=True), nn.Conv3d(16, 32, (3, 3, 3)), nn.LeakyReLU(0.2, inplace=True), ) self.conv_2d = nn.Sequential( nn.Conv2d(576, 64, (3, 3)), nn.LeakyReLU(0.2, inplace=True) ) self.linear = nn.Sequential( nn.Linear(18496, 256), nn.LeakyReLU(0.2, inplace=True), nn.Dropout(0.4), nn.Linear(256, 128), nn.LeakyReLU(0.2, inplace=True), nn.Dropout(0.4), nn.Linear(128, CLASS_NUM), nn.LogSoftmax(dim=1) ) def forward(self, x): x = self.conv_3d(x) x = x.view(-1, x.shape[1] * x.shape[2], x.shape[3], x.shape[4]) x = self.conv_2d(x) x = x.view(x.size(0), -1) x = self.linear(x) return x
- 测试一下网络结构
def test_net(): # 随机输入,测试网络结构是否通 x = torch.randn(1, 1, 30, 25, 25) net = HybridSN() y = net(x) print(y.shape)
得到结果如下,可见网络结构是正常的
2.创建数据集
首先对高光谱数据实施PCA降维;然后创建 keras 方便处理的数据格式;然后随机抽取 10% 数据做为训练集,剩余的做为测试集。
- 首先定义基本函数:
# 对高光谱数据 x 应用 PCA 变换def apply_pca(x, num_components): new_x = np.reshape(x, (-1, x.shape[2])) pca = PCA(n_components=num_components, whiten=True) new_x = pca.fit_transform(new_x) new_x = np.reshape(new_x, (x.shape[0], x.shape[1], num_components)) return new_x# 对单个像素周围提取 patch 时,边缘像素就无法取了,因此,给这部分像素进行 padding 操作def padding_with_zeros(x, margin=2): new_x = np.zeros((x.shape[0] 2 * margin, x.shape[1] 2 * margin, x.shape[2])) x_offset = margin y_offset = margin new_x[x_offset:x.shape[0] x_offset, y_offset:x.shape[1] y_offset, :] = x return new_x# 在每个像素周围提取 patch ,然后创建成符合 keras 处理的格式def create_image_cubes(x, y, window_size=5, remove_zero_labels=True): # 给 x 做 padding margin = int((window_size - 1) / 2) zero_padded_x = padding_with_zeros(x, margin=margin) # split patches patches_data = np.zeros((x.shape[0] * x.shape[1], window_size, window_size, x.shape[2])) patches_labels = np.zeros((x.shape[0] * x.shape[1])) patch_index = 0 for r in range(margin, zero_padded_x.shape[0] - margin): for c in range(margin, zero_padded_x.shape[1] - margin): patch = zero_padded_x[r - margin:r margin 1, c - margin:c margin 1] patches_data[patch_index, :, :, :] = patch patches_labels[patch_index] = y[r-margin, c-margin] patch_index = 1 if remove_zero_labels: patches_data = patches_data[patches_labels > 0, :, :, :] patches_labels = patches_labels[patches_labels > 0] patches_labels -= 1 return patches_data, patches_labelsdef split_train_test_set(x, y, test_ratio, random_state=345): x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_ratio, random_state=random_state, stratify=y) return x_train, x_test, y_train, y_test
- 下面读取并创建数据集:
def create_data_loader(): x = sio.loadmat('data/Indian_pines_corrected.mat')['indian_pines_corrected'] y = sio.loadmat('data/Indian_pines_gt.mat')['indian_pines_gt'] # 用于测试样本的比例 test_ratio = 0.90 # 每个像素周围提取 patch 的尺寸 patch_size = 25 # 使用 PCA 降维,得到主成分的数量 pca_components = 30 print('Hyperspectral data shape: ', x.shape) print('Label shape: ', y.shape) print('\n... ... PCA tranformation ... ...') x_pca = apply_pca(x, num_components=pca_components) print('Data shape after PCA: ', x_pca.shape) print('\n... ... create data cubes ... ...') x_pca, y = create_image_cubes(x_pca, y, window_size=patch_size) print('Data cube X shape: ', x_pca.shape) print('Data cube y shape: ', y.shape) print('\n... ... create train & test data ... ...') x_train, x_test, y_train, y_test = split_train_test_set(x_pca, y, test_ratio) print('Xtrain shape: ', x_train.shape) print('Xtest shape: ', x_test.shape) # 改变 x_train, y_train 的形状,以符合 keras 的要求 x_train = x_train.reshape(-1, patch_size, patch_size, pca_components, 1) x_test = x_test.reshape(-1, patch_size, patch_size, pca_components, 1) print('before transpose: Xtrain shape: ', x_train.shape) print('before transpose: Xtest shape: ', x_test.shape) # 为了适应 pytorch 结构,数据要做 transpose x_train = x_train.transpose(0, 4, 3, 1, 2) x_test = x_test.transpose(0, 4, 3, 1, 2) print('after transpose: Xtrain shape: ', x_train.shape) print('after transpose: Xtest shape: ', x_test.shape) # 创建 train_loader 和 test_loader train_set = TrainDS(x_train, y_train) test_set = TestDS(x_test, y_test) train_loader = torch.utils.data.DataLoader(dataset=train_set, batch_size=128, shuffle=True, num_workers=2) test_loader = torch.utils.data.DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=2) return train_loader, test_loader, y_test""" Training dataset"""class TrainDS(torch.utils.data.Dataset): def __init__(self, x_train, y_train): self.len = x_train.shape[0] self.x_data = torch.FloatTensor(x_train) self.y_data = torch.LongTensor(y_train) def __getitem__(self, index): # 根据索引返回数据和对应的标签 return self.x_data[index], self.y_data[index] def __len__(self): # 返回文件数据的数目 return self.len""" Testing dataset"""class TestDS(torch.utils.data.Dataset): def __init__(self, x_test, y_test): self.len = x_test.shape[0] self.x_data = torch.FloatTensor(x_test) self.y_data = torch.LongTensor(y_test) def __getitem__(self, index): # 根据索引返回数据和对应的标签 return self.x_data[index], self.y_data[index] def __len__(self): # 返回文件数据的数目 return self.len
运行结果如下
3.开始训练
def train(train_loader): # 使用GPU训练 device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 网络放到GPU上 net = HybridSN().to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(net.parameters(), lr=0.001) # 开始训练 total_loss = 0 for epoch in range(100): loss = 0 for i, (inputs, labels) in enumerate(train_loader): inputs = inputs.to(device) labels = labels.to(device) # 优化器梯度归零 optimizer.zero_grad() # 正向传播 反向传播 优化 outputs = net(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() total_loss = loss.item() print('[Epoch: %d] [loss avg: %.4f] [current loss: %.4f]' % (epoch 1, total_loss / (epoch 1), loss.item())) print('Finished Training') return net, device
运行结果如下:
4.模型测试
def test(device, net, test_loader, y_test): count = 0 # 模型测试 y_pred_test = 0 for inputs, _ in test_loader: inputs = inputs.to(device) outputs = net(inputs) outputs = np.argmax(outputs.detach().cpu().numpy(), axis=1) if count == 0: y_pred_test = outputs count = 1 else: y_pred_test = np.concatenate((y_pred_test, outputs)) # 生成分类报告 classification = classification_report(y_test, y_pred_test, digits=4) print(classification)
运行结果如下:
可以看出,准确率为 95.71%,性能还可以
5.基于通道注意力机制改进网络
- 如果想在提升一些性能,可以尝试引入注意力机制,为了方便,此文章中我们引入通道注意力机制SELayer
class SELayer(nn.Module): def __init__(self): super(SELayer, self).__init__() self.fc_41 = nn.Conv2d(64, 4, kernel_size=1) self.fc_42 = nn.Conv2d(4, 64, kernel_size=1) def forward(self, x): x = F.avg_pool2d(x, x.size(2)) x = F.relu(self.fc_41(x)) x = F.sigmoid(self.fc_42(x)) return x
- 在此基础上,我们修改一下网络,加入SELayer:
class HybridSN(nn.Module): def __init__(self): super(HybridSN, self).__init__() self.conv_3d = nn.Sequential( nn.Conv3d(1, 8, (7, 3, 3)), nn.LeakyReLU(0.2, inplace=True), nn.Conv3d(8, 16, (5, 3, 3)), nn.LeakyReLU(0.2, inplace=True), nn.Conv3d(16, 32, (3, 3, 3)), nn.LeakyReLU(0.2, inplace=True), ) self.conv_2d = nn.Sequential( nn.Conv2d(576, 64, (3, 3)), nn.LeakyReLU(0.2, inplace=True) ) self.linear = nn.Sequential( nn.Linear(18496, 256), nn.LeakyReLU(0.2, inplace=True), nn.Dropout(0.4), nn.Linear(256, 128), nn.LeakyReLU(0.2, inplace=True), nn.Dropout(0.4), nn.Linear(128, CLASS_NUM), nn.LogSoftmax(dim=1) ) self.se_layer = SELayer() def forward(self, x): x = self.conv_3d(x) x = x.view(-1, x.shape[1] * x.shape[2], x.shape[3], x.shape[4]) x = self.conv_2d(x) x = x * self.se_layer(x) x = x.view(x.size(0), -1) x = self.linear(x) return x
6.再次训练
基于新网络,重新训练,结果如下:
7.再次测试
再次测试,结果如下:
可以看到,准确率从之前的 95.71%提升到了97.34%,效果显著!!!
完整代码
import numpy as npimport scipy.io as siofrom sklearn.decomposition import PCAfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import classification_reportimport torchimport torch.nn as nnimport torch.nn.functional as Fimport torch.optim as optimimport torch.utils.dataCLASS_NUM = 16class SELayer(nn.Module): def __init__(self): super(SELayer, self).__init__() self.fc_41 = nn.Conv2d(64, 4, kernel_size=1) self.fc_42 = nn.Conv2d(4, 64, kernel_size=1) def forward(self, x): x = F.avg_pool2d(x, x.size(2)) x = F.relu(self.fc_41(x)) x = F.sigmoid(self.fc_42(x)) return xclass HybridSN(nn.Module): def __init__(self): super(HybridSN, self).__init__() self.conv_3d = nn.Sequential( nn.Conv3d(1, 8, (7, 3, 3)), nn.LeakyReLU(0.2, inplace=True), nn.Conv3d(8, 16, (5, 3, 3)), nn.LeakyReLU(0.2, inplace=True), nn.Conv3d(16, 32, (3, 3, 3)), nn.LeakyReLU(0.2, inplace=True), ) self.conv_2d = nn.Sequential( nn.Conv2d(576, 64, (3, 3)), nn.LeakyReLU(0.2, inplace=True) ) self.linear = nn.Sequential( nn.Linear(18496, 256), nn.LeakyReLU(0.2, inplace=True), nn.Dropout(0.4), nn.Linear(256, 128), nn.LeakyReLU(0.2, inplace=True), nn.Dropout(0.4), nn.Linear(128, CLASS_NUM), nn.LogSoftmax(dim=1) ) self.se_layer = SELayer() def forward(self, x): x = self.conv_3d(x) x = x.view(-1, x.shape[1] * x.shape[2], x.shape[3], x.shape[4]) x = self.conv_2d(x) x = x * self.se_layer(x) x = x.view(x.size(0), -1) x = self.linear(x) return xdef test_net(): # 随机输入,测试网络结构是否通 x = torch.randn(1, 1, 30, 25, 25) net = HybridSN() y = net(x) print(y.shape)# 对高光谱数据 x 应用 PCA 变换def apply_pca(x, num_components): new_x = np.reshape(x, (-1, x.shape[2])) pca = PCA(n_components=num_components, whiten=True) new_x = pca.fit_transform(new_x) new_x = np.reshape(new_x, (x.shape[0], x.shape[1], num_components)) return new_x# 对单个像素周围提取 patch 时,边缘像素就无法取了,因此,给这部分像素进行 padding 操作def padding_with_zeros(x, margin=2): new_x = np.zeros((x.shape[0] 2 * margin, x.shape[1] 2 * margin, x.shape[2])) x_offset = margin y_offset = margin new_x[x_offset:x.shape[0] x_offset, y_offset:x.shape[1] y_offset, :] = x return new_x# 在每个像素周围提取 patch ,然后创建成符合 keras 处理的格式def create_image_cubes(x, y, window_size=5, remove_zero_labels=True): # 给 x 做 padding margin = int((window_size - 1) / 2) zero_padded_x = padding_with_zeros(x, margin=margin) # split patches patches_data = np.zeros((x.shape[0] * x.shape[1], window_size, window_size, x.shape[2])) patches_labels = np.zeros((x.shape[0] * x.shape[1])) patch_index = 0 for r in range(margin, zero_padded_x.shape[0] - margin): for c in range(margin, zero_padded_x.shape[1] - margin): patch = zero_padded_x[r - margin:r margin 1, c - margin:c margin 1] patches_data[patch_index, :, :, :] = patch patches_labels[patch_index] = y[r-margin, c-margin] patch_index = 1 if remove_zero_labels: patches_data = patches_data[patches_labels > 0, :, :, :] patches_labels = patches_labels[patches_labels > 0] patches_labels -= 1 return patches_data, patches_labelsdef split_train_test_set(x, y, test_ratio, random_state=345): x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_ratio, random_state=random_state, stratify=y) return x_train, x_test, y_train, y_testdef create_data_loader(): x = sio.loadmat('data/Indian_pines_corrected.mat')['indian_pines_corrected'] y = sio.loadmat('data/Indian_pines_gt.mat')['indian_pines_gt'] # 用于测试样本的比例 test_ratio = 0.90 # 每个像素周围提取 patch 的尺寸 patch_size = 25 # 使用 PCA 降维,得到主成分的数量 pca_components = 30 print('Hyperspectral data shape: ', x.shape) print('Label shape: ', y.shape) print('\n... ... PCA tranformation ... ...') x_pca = apply_pca(x, num_components=pca_components) print('Data shape after PCA: ', x_pca.shape) print('\n... ... create data cubes ... ...') x_pca, y = create_image_cubes(x_pca, y, window_size=patch_size) print('Data cube X shape: ', x_pca.shape) print('Data cube y shape: ', y.shape) print('\n... ... create train & test data ... ...') x_train, x_test, y_train, y_test = split_train_test_set(x_pca, y, test_ratio) print('Xtrain shape: ', x_train.shape) print('Xtest shape: ', x_test.shape) # 改变 x_train, y_train 的形状,以符合 keras 的要求 x_train = x_train.reshape(-1, patch_size, patch_size, pca_components, 1) x_test = x_test.reshape(-1, patch_size, patch_size, pca_components, 1) print('before transpose: Xtrain shape: ', x_train.shape) print('before transpose: Xtest shape: ', x_test.shape) # 为了适应 pytorch 结构,数据要做 transpose x_train = x_train.transpose(0, 4, 3, 1, 2) x_test = x_test.transpose(0, 4, 3, 1, 2) print('after transpose: Xtrain shape: ', x_train.shape) print('after transpose: Xtest shape: ', x_test.shape) # 创建 train_loader 和 test_loader train_set = TrainDS(x_train, y_train) test_set = TestDS(x_test, y_test) train_loader = torch.utils.data.DataLoader(dataset=train_set, batch_size=128, shuffle=True, num_workers=2) test_loader = torch.utils.data.DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=2) return train_loader, test_loader, y_test""" Training dataset"""class TrainDS(torch.utils.data.Dataset): def __init__(self, x_train, y_train): self.len = x_train.shape[0] self.x_data = torch.FloatTensor(x_train) self.y_data = torch.LongTensor(y_train) def __getitem__(self, index): # 根据索引返回数据和对应的标签 return self.x_data[index], self.y_data[index] def __len__(self): # 返回文件数据的数目 return self.len""" Testing dataset"""class TestDS(torch.utils.data.Dataset): def __init__(self, x_test, y_test): self.len = x_test.shape[0] self.x_data = torch.FloatTensor(x_test) self.y_data = torch.LongTensor(y_test) def __getitem__(self, index): # 根据索引返回数据和对应的标签 return self.x_data[index], self.y_data[index] def __len__(self): # 返回文件数据的数目 return self.lendef train(train_loader): # 使用GPU训练 device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 网络放到GPU上 net = HybridSN().to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(net.parameters(), lr=0.001) # 开始训练 total_loss = 0 for epoch in range(100): loss = 0 for i, (inputs, labels) in enumerate(train_loader): inputs = inputs.to(device) labels = labels.to(device) # 优化器梯度归零 optimizer.zero_grad() # 正向传播 反向传播 优化 outputs = net(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() total_loss = loss.item() print('[Epoch: %d] [loss avg: %.4f] [current loss: %.4f]' % (epoch 1, total_loss / (epoch 1), loss.item())) print('Finished Training') return net, devicedef test(device, net, test_loader, y_test): count = 0 # 模型测试 y_pred_test = 0 for inputs, _ in test_loader: inputs = inputs.to(device) outputs = net(inputs) outputs = np.argmax(outputs.detach().cpu().numpy(), axis=1) if count == 0: y_pred_test = outputs count = 1 else: y_pred_test = np.concatenate((y_pred_test, outputs)) # 生成分类报告 classification = classification_report(y_test, y_pred_test, digits=4) print(classification)def main(): test_net() train_loader, test_loader, y_test = create_data_loader() net, device = train(train_loader) test(device, net, test_loader, y_test)if __name__ == '__main__': main()
总结
基于通道注意力机制方法,使高光谱图像的分类的准确率更高。