Pytorch实现文本分类,数据集为kears内置IMDB影评数据
1. 导入相应库、定义常量以及加载IMDB数据
- 导入库,其中imdb数据从keras中导入 - 1 
 2
 3
 4
 5
 6
 7- import torch 
 import torch.nn as nn
 import torch.optim as optim
 import torch.nn.functional as F
 from torch.utils.data import *
 from keras.datasets import imdb
 from keras.preprocessing.sequence import pad_sequences
- 需要定义的常量:词汇表大小、句子最大长度、批处理量、嵌入层层数、隐藏层层数、设备 - 1 
 2
 3
 4
 5
 6
 7- MAX_WORDS = 10000 #词汇表大小 
 MAX_LEN = 200
 BATCH_SIZE = 256
 EMB_SIZE = 128
 HID_SIZE = 128
 DROPOUT = 0.2
 DEVICE = torch.device("cuda" if torch.cuda.is_available() else 'cpu')
- 加载IMDB数据,TensorDataset->RandomSampler->DataLoader - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14- (x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=MAX_WORDS) # 加载数据 
 # 将训练集、测试集中的文本进行预处理,变成相同长度的文本,这里采用的规则是,在句子后面填充或截断
 x_train = pad_sequences(x_train, maxlen=MAX_LEN, padding='post', truncating='post')
 x_test = pad_sequences(x_test, maxlen=MAX_LEN, padding='post', truncating='post')
 print(x_train.shape, x_test.shape)
 train_data = TensorDataset(torch.LongTensor(x_train), torch.LongTensor(y_train))
 test_data = TensorDataset(torch.LongTensor(x_test), torch.LongTensor(y_test))
 train_sampler = RandomSampler(train_data) # 从一个打乱的数据集进行采样
 train_loader = DataLoader(train_data, sampler=train_sampler, batch_size=BATCH_SIZE) # 将数据打包起来(一个batch_size是一组)
 test_sampler = RandomSampler(test_data)
 test_loader = DataLoader(test_data, sampler=test_sampler, batch_size=BATCH_SIZE)
2. 定义模型
- 使用LSTM模型进行文本分类,模型类中需要初始化函数、前向函数。 
- 初始化函数负责初始化模型的参数(词汇表大小、批处理量、嵌入层层数、隐藏层层数)以及模型的架构(本实验使用的是LSTM+线性层1+线性层2)。 
- 前向函数负责输出最终分类结果(本实验通过Embedding->dropout->LSTM->dropout->fc1->relu->avg_pool2d->fc2最终得到二分类结果) - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24- class Model(nn.Module): 
 def __init__(self, max_words, emb_size, hid_size, dropout):
 super(Model, self).__init__()
 self.max_words = max_words
 self.emb_size = emb_size
 self.hid_size = hid_size
 self.dropout = dropout
 self.Embedding = nn.Embedding(self.max_words, self.emb_size)
 self.LSTM = nn.LSTM(self.emb_size, self.hid_size, num_layers=2,
 batch_first=True, bidirectional=True) # 两层双向LSTM
 self.dp = nn.Dropout(self.dropout)
 self.fc1 = nn.Linear(self.hid_size*2, self.hid_size)
 self.fc2 = nn.Linear(self.hid_size, 2)
 def forward(self, x):
 x = self.Embedding(x) # x.shape [batch_size, max_len, emb_size]
 x = self.dp(x)
 x, _ = self.LSTM(x) # [batch_size, max_len, hid_size*2]
 x = self.dp(x)
 x = F.relu(self.fc1(x)) # [batch_size, max_len, hid_size]
 x = F.avg_pool2d(x, (x.shape[1], 1)).squeeze() # [batch_size, 1, hid_size] -> [batch_size, hid_size]
 out = self.fc2(x) # [batch_size, 2]
 return out
3. 定义训练函数
- 训练函数中首先定义损失函数(本实验使用交叉熵),然后加载数据,并通过将数据输入到模型中产生分类结果(y_),将损失计算后,通过反向传播(loss.backward())更新模型参数(需要加上optimizer.step()模型参数才会被更新) - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15- def train(model, device, train_loader, optimizer, epoch): 
 model.train() # 当有`Dropout`, `BatchNorm`时,需要加上这条
 criterion = nn.CrossEntropyLoss() # 交叉熵
 for batch_idx, (x,y) in enumerate(train_loader):
 x, y = x.to(device), y.to(device) # x 是一个二维tensor,其中每一行是一个句子,一组batch_size个句子
 y_ = model(x)
 loss = criterion(y_, y)
 loss.backward()
 optimizer.step()
 if(batch_idx + 1)%10 == 0:
 print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
 epoch, batch_idx * len(x), len(train_loader.dataset),
 100. * batch_idx / len(train_loader), loss.item()
 ))
4. 定义测试函数
- 测试函数与训练函数类似,而测试函数中需要计算准确率。 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19- def test(model, device, test_loader): 
 model.eval()
 criterion = nn.CrossEntropyLoss(reduction='sum')
 test_loss = 0.0
 acc = 0
 for batch_idx, (x, y ) in enumerate(test_loader):
 x, y = x.to(device), y.to(device)
 # torch.no_grad(),对tensor的操作正常进行,但是track不被记录,无法求其梯度
 with torch.no_grad():
 y_ = model(x)
 test_loss += criterion(y_, y)
 pred = y_.max(-1, keepdim=True)[1] # .max()的输出为最大值和最大值的index, 获取index
 acc += pred.eq(y.view_as(pred)).sum().item()
 test_loss /= len(test_loader.dataset)
 print("\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)".format(
 test_loss, acc, len(test_loader.dataset),
 100. * acc/len(test_loader.dataset)
 ))
 return acc/ len(test_loader.dataset)
5. 训练+保存模型
- 初始化模型,定义优化函数(本实验使用的Adam优化器),定义模型保存路径 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15- model = Model(MAX_WORDS, EMB_SIZE, HID_SIZE, DROPOUT).to(DEVICE) 
 print(model)
 optimizer = optim.Adam(model.parameters())
 best_acc = 0.0
 PATH = './model.pth' # 模型保存路径
 # 训练、测试以及保存模型
 for epoch in range(1, 5):
 train(model,DEVICE,train_loader,optimizer,epoch)
 acc = test(model,DEVICE,test_loader)
 if best_acc < acc:
 best_acc = acc
 torch.save(model.state_dict(), PATH)
 print("acc is {:.4f}, best acc is {:.4f}\n".format(acc, best_acc))
6. 加载+测试模型
- 加载保存的模型,调用测试函数进行测试 - 1 
 2
 3- best_model = Model(MAX_WORDS, EMB_SIZE, HID_SIZE, DROPOUT) 
 best_model.load_state_dict(torch.load(PATH))
 test(best_model, DEVICE, test_loader)





