1.加载数据

import os import numpy as np

加载拼音数据,文件名为'pinyin.txt'

with open('pinyin.txt', 'r', encoding='utf-8') as f: data = f.read()

去除换行符

data = data.replace('\n', '')

去重并排序

chars = sorted(list(set(data)))

字符到索引的映射

char_to_idx = {char: idx for idx, char in enumerate(chars)}

索引到字符的映射

idx_to_char = {idx: char for idx, char in enumerate(chars)}

将数据转化为索引序列

data_idx = [char_to_idx[char] for char in data]

定义每个样本的长度,这里设定为50

seq_length = 50

定义每个batch的大小

batch_size = 32

计算序列的数量

num_seqs = len(data_idx) // seq_length

每个batch的数量

num_batches = num_seqs // batch_size

数据集

data_x = np.zeros((num_batches, batch_size, seq_length), dtype=np.int32) data_y = np.zeros((num_batches, batch_size, seq_length), dtype=np.int32)

填充数据集

for i in range(num_batches * batch_size): batch_idx = i // seq_length seq_idx = i % seq_length char_idx = data_idx[batch_idx * seq_length + seq_idx] if i < num_batches * batch_size - 1: data_x[batch_idx, i % batch_size, seq_idx] = char_idx data_y[batch_idx, i % batch_size, seq_idx] = data_idx[batch_idx * seq_length + seq_idx + 1]

将数据集划分为训练集和测试集

train_idx = int(num_batches * 0.8) train_x, test_x = data_x[:train_idx], data_x[train_idx:] train_y, test_y = data_y[:train_idx], data_y[train_idx:]

打印数据集信息

print('数据集大小:', len(data_idx)) print('字符数:', len(chars)) print('每个样本长度:', seq_length) print('每个batch大小:', batch_size) print('序列数量:', num_seqs) print('batch数量:', num_batches) print('训练集大小:', len(train_x)) print('测试集大小:', len(test_x))

2.定义模型

import torch import torch.nn as nn

class RNN(nn.Module): def init(self, input_size, hidden_size, output_size, num_layers=1): super(RNN, self).init() self.hidden_size = hidden_size self.num_layers = num_layers self.embedding = nn.Embedding(input_size, hidden_size) self.rnn = nn.RNN(hidden_size, hidden_size, num_layers) self.fc = nn.Linear(hidden_size, output_size)

def forward(self, x, h0):
    # x: (seq_length, batch_size)
    x = self.embedding(x)
    # x: (seq_length, batch_size, hidden_size)
    out, hn = self.rnn(x, h0)
    # out: (seq_length, batch_size, hidden_size)
    # hn: (num_layers, batch_size, hidden_size)
    out = out.view(-1, self.hidden_size)
    # out: (seq_length * batch_size, hidden_size)
    out = self.fc(out)
    # out: (seq_length * batch_size, output_size)
    return out, hn

def init_hidden(self, batch_size):
    return torch.zeros(self.num_layers, batch_size, self.hidden_size)

3.训练模型

import time import math

定义超参数

input_size = len(chars) hidden_size = 128 output_size = len(chars) num_layers = 2 lr = 0.001 epochs = 50

创建模型

model = RNN(input_size, hidden_size, output_size, num_layers)

定义损失函数和优化器

criterion = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=lr)

训练循环

for epoch in range(epochs): start_time = time.time() # 初始化隐状态 h0 = model.init_hidden(batch_size) # 初始化损失 loss = 0.0 # 训练循环 for i in range(num_batches): # 获取当前batch的数据 x = train_x[i] y = train_y[i] # 转化为tensor x = torch.from_numpy(x).long().transpose(0, 1) y = torch.from_numpy(y).long().transpose(0, 1) # 初始化梯度 optimizer.zero_grad() # 前向传播 outputs, hn = model(x, h0) # 计算损失 loss = criterion(outputs, y.reshape(-1)) # 反向传播 loss.backward() # 梯度截断 nn.utils.clip_grad_norm_(model.parameters(), 5) # 更新参数 optimizer.step() # 更新隐状态 h0 = hn.detach()

end_time = time.time()
# 计算训练集准确率
train_acc = 0.0
for i in range(num_batches):
    x = train_x[i]
    y = train_y[i]
    x = torch.from_numpy(x).long().transpose(0, 1)
    y = torch.from_numpy(y).long().transpose(0, 1)
    h0 = model.init_hidden(batch_size)
    outputs, _ = model(x, h0)
    preds = torch.argmax(outputs, dim=1).reshape(batch_size, seq_length)
    train_acc += torch.sum(preds == y).item() / (batch_size * seq_length)
train_acc /= num_batches
# 计算测试集准确率
test_acc = 0.0
for i in range(len(test_x)):
    x = test_x[i]
    y = test_y[i]
    x = torch.from_numpy(x).long().transpose(0, 1)
    y = torch.from_numpy(y).long().transpose(0, 1)
    h0 = model.init_hidden(batch_size)
    outputs, _ = model(x, h0)
    preds = torch.argmax(outputs, dim=1).reshape(batch_size, seq_length)
    test_acc += torch.sum(preds == y).item() / (batch_size * seq_length)
test_acc /= len(test_x)
# 打印训练信息
print('Epoch [{}/{}], Loss: {:.4f}, Train Acc: {:.4f}, Test Acc: {:.4f}, Time: {:.2f}s'.format(
    epoch + 1, epochs, loss.item(), train_acc, test_acc, end_time - start_time))

4.预测模型

定义前缀

prefix = 'ni hao'

单步预测

h0 = model.init_hidden(1) x = torch.from_numpy(np.array([char_to_idx[char] for char in prefix])).long().unsqueeze(1) for i in range(len(prefix)): output, h0 = model(x[i].unsqueeze(0), h0) pred = torch.argmax(output, dim=1).item() print(idx_to_char[pred], end='')

K步预测

h0 = model.init_hidden(1) x = torch.from_numpy(np.array([char_to_idx[char] for char in prefix])).long().unsqueeze(1) for i in range(len(prefix)): output, h0 = model(x[i].unsqueeze(0), h0) pred = torch.argmax(output, dim=1).item() x = torch.cat((x, torch.tensor([[pred]])), dim=0) for i in range(50): output, h0 = model(x[-1].unsqueeze(0), h0) pred = torch.argmax(output, dim=1).item() x = torch.cat((x, torch.tensor([[pred]])), dim=0) print(''.join([idx_to_char[idx] for idx in x.squeeze()])


原文地址: https://www.cveoy.top/t/topic/fDZF 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录