import tensorflow as tf import numpy as np import os

读取图像和标签

def get_files(file_dir): images = [] labels = [] for root, sub_folder, files in os.walk(file_dir): for file in files: images.append(os.path.join(root, file)) labels.append(int(root.split('/')[-1])) # 将标签转换为one-hot编码 labels = np.array([np.eye(5)[i] for i in labels]) # 将图像和标签打乱顺序 shuffle_index = np.random.permutation(len(images)) images = np.array(images)[shuffle_index] labels = labels[shuffle_index] return images, labels

生成批次数据

def get_batch(image, label, image_W, image_H, batch_size, capacity): # 重新设定图像的大小 image = tf.cast(image, tf.string) label = tf.cast(label, tf.float32) input_queue = tf.train.slice_input_producer([image, label]) label = input_queue[1] image_contents = tf.read_file(input_queue[0]) image = tf.image.decode_jpeg(image_contents, channels=3) image = tf.image.resize_image_with_crop_or_pad(image, image_W, image_H) image = tf.image.per_image_standardization(image) image_batch, label_batch = tf.train.batch([image, label], batch_size=batch_size, num_threads=64, capacity=capacity) label_batch = tf.reshape(label_batch, [batch_size, 5]) return image_batch, label_batch

定义网络结构

def cnn_model(input_images): # 第一层卷积层 with tf.variable_scope('conv1') as scope: conv1 = tf.layers.conv2d(input_images, filters=32, kernel_size=[3, 3], padding='SAME', activation=tf.nn.relu, name='conv1') pool1 = tf.layers.max_pooling2d(conv1, pool_size=[2, 2], strides=2, name='pool1') # 第二层卷积层 with tf.variable_scope('conv2') as scope: conv2 = tf.layers.conv2d(pool1, filters=64, kernel_size=[3, 3], padding='SAME', activation=tf.nn.relu, name='conv2') pool2 = tf.layers.max_pooling2d(conv2, pool_size=[2, 2], strides=2, name='pool2') # 第三层卷积层 with tf.variable_scope('conv3') as scope: conv3 = tf.layers.conv2d(pool2, filters=128, kernel_size=[3, 3], padding='SAME', activation=tf.nn.relu, name='conv3') pool3 = tf.layers.max_pooling2d(conv3, pool_size=[2, 2], strides=2, name='pool3') # 将卷积层输出扁平化 with tf.variable_scope('flatten') as scope: flatten = tf.layers.flatten(pool3, name='flatten') # 第一层全连接层 with tf.variable_scope('fc1') as scope: fc1 = tf.layers.dense(flatten, units=256, activation=tf.nn.relu, name='fc1') # 第二层全连接层 with tf.variable_scope('fc2') as scope: fc2 = tf.layers.dense(fc1, units=128, activation=tf.nn.relu, name='fc2') # 输出层 with tf.variable_scope('output') as scope: output = tf.layers.dense(fc2, units=5, name='output') return output

训练模型

def train(): # 参数设置 learning_rate = 0.01 batch_size = 32 capacity = 1000 max_step = 1000 image_W = 128 image_H = 128 # 读取数据 train_dir = 'train_data' logs_dir = 'logs' train_images, train_labels = get_files(train_dir) train_images_batch, train_labels_batch = get_batch(train_images, train_labels, image_W, image_H, batch_size, capacity) # 定义网络结构 input_images = tf.placeholder(tf.float32, [None, image_W, image_H, 3]) input_labels = tf.placeholder(tf.float32, [None, 5]) output = cnn_model(input_images) # 定义损失函数 loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=output, labels=input_labels)) # 定义正则化 regularizer = tf.contrib.layers.l2_regularizer(scale=0.1) reg_loss = tf.contrib.layers.apply_regularization(regularizer, tf.trainable_variables()) total_loss = loss + reg_loss # 定义优化器 global_step = tf.Variable(0, trainable=False) learning_rate_decay = tf.train.exponential_decay(learning_rate, global_step, 1000, 0.96, staircase=True) optimizer = tf.train.AdamOptimizer(learning_rate_decay).minimize(total_loss, global_step=global_step) # 定义准确率 correct_pred = tf.equal(tf.argmax(output, 1), tf.argmax(input_labels, 1)) accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32)) # 定义TensorBoard tf.summary.scalar('loss', loss) tf.summary.scalar('accuracy', accuracy) merged_summary = tf.summary.merge_all() writer = tf.summary.FileWriter(logs_dir) saver = tf.train.Saver() with tf.Session() as sess: sess.run(tf.global_variables_initializer()) coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) for step in range(max_step): train_images, train_labels = sess.run([train_images_batch, train_labels_batch]) _, loss_value, summary = sess.run([optimizer, total_loss, merged_summary], feed_dict={input_images: train_images, input_labels: train_labels}) writer.add_summary(summary, step) if step % 100 == 0: print('Step %d, loss=%.4f' % (step, loss_value)) if step % 500 == 0: saver.save(sess, os.path.join(logs_dir, 'model.ckpt'), global_step=global_step) coord.request_stop() coord.join(threads)

测试模型

def test(): # 参数设置 batch_size = 32 capacity = 1000 image_W = 128 image_H = 128 # 读取数据 test_dir = 'test_data' test_images, test_labels = get_files(test_dir) test_images_batch, test_labels_batch = get_batch(test_images, test_labels, image_W, image_H, batch_size, capacity) # 定义网络结构 input_images = tf.placeholder(tf.float32, [None, image_W, image_H, 3]) input_labels = tf.placeholder(tf.float32, [None, 5]) output = cnn_model(input_images) # 定义准确率 correct_pred = tf.equal(tf.argmax(output, 1), tf.argmax(input_labels, 1)) accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32)) saver = tf.train.Saver() with tf.Session() as sess: ckpt = tf.train.get_checkpoint_state('logs') if ckpt and ckpt.model_checkpoint_path: saver.restore(sess, ckpt.model_checkpoint_path) else: print('No checkpoint file found') return coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) acc_value = 0.0 test_num = 0 try: while not coord.should_stop(): test_images, test_labels = sess.run([test_images_batch, test_labels_batch]) acc_value += sess.run(accuracy, feed_dict={input_images: test_images, input_labels: test_labels}) test_num += 1 except tf.errors.OutOfRangeError: print('Test Done.') finally: coord.request_stop() acc_value /= test_num print('Test Accuracy: %.4f' % acc_value) coord.join(threads)

if name == 'main': train() test(


原文地址: https://www.cveoy.top/t/topic/eCPw 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录