#:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::# # # # Part of solution to exercise set for week 13 # # INF5860 - Machine Learning for Image analysis # # University of Oslo # # # # # # Ole-Johan Skrede olejohas at ifi dot uio dot no # # 2018.04.12 # # # #:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::# """Implements three variants of an autoencoder""" import sys import os import shutil import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data import matplotlib.pyplot as plt import numpy as np def config(): """Return a dict of configuration settings used in the program""" conf = {} # Whether you are training or testing conf['mode'] = "train" # {"train, "test"} # Select autoencoder variant in {'compression', 'denoising', 'sparse'} conf['variant'] = 'denoising' # Whether you are restoring variables from checkpoint, or initializing them from "scratch" # Restoring variables can be useful in testing a trained model, or if you want to continue # training from a checkpoint conf['restore_from_checkpoint'] = False if "test" in conf['mode']: conf['restore_from_checkpoint'] = True conf['job_dir'] = "/tmp/{}_autoencoder/".format(conf['variant']) # Relevant dataset will be put in this location after download conf['data_dir'] = "/tmp/mnist_data" # Location to place checkpoints conf['checkpoint_dir'] = os.path.join(conf['job_dir'], "train/checkpoints") # Location of the summary events to be used by tensorboard conf['summary_dir'] = os.path.join(conf['job_dir'], "{}/events".format(conf['mode'])) # Location to place output conf['output_dir'] = os.path.join(conf['job_dir'], "{}/output".format(conf['mode'])) # Path of checkpoint you want to restore variables from (notice that the postfix is ommitted) conf['restore_path'] = os.path.join(conf['job_dir'], "train/checkpoints/model.ckpt-10000") # Create directories if not os.path.exists(conf['checkpoint_dir']): os.makedirs(conf['checkpoint_dir']) if not os.path.exists(conf['output_dir']): os.makedirs(conf['output_dir']) if not os.path.exists(conf['summary_dir']): os.makedirs(conf['summary_dir']) else: # Remove content of old run before creating a new directory shutil.rmtree(conf['summary_dir']) os.makedirs(conf['summary_dir']) # Number of layers and nodes in the autoencoder. # Number of nodes in the input (the dimensions of an input example) conf['height'] = 28 conf['width'] = 28 conf['channels'] = 1 # This specifies only the hidden layers in the encoder, as the decoder will mirror this setup. conf['hidden_dimensions'] = [128, 32] # Implemented: {'cross_entropy', 'mean_squared_error'} conf['cost_function'] = 'mean_squared_error' # Implemented: {'RMSProp', 'Adam', 'GradientDescent} conf['optimization_function'] = 'Adam' # Implemented: {'relu', 'sigmoid', 'tanh'} conf['activation_function'] = 'sigmoid' # The number of steps to run before termination of training. One step is one forward->backward # pass of a mini-batch conf['max_steps'] = 10000 # The batch size used in training. conf['batch_size'] = 256 # The step size used by the optimization routine. conf['learning_rate'] = 1.0e-2 # Variant-specific hyperparameters conf['gauss_std'] = 0.2 # For the denoising autoencoder conf['sparsity'] = 0.01 # For sparse autoencoder. \rho in the lecture slides conf['beta'] = 1.0 # Regularization strength. For sparse autoencoder # How often (in steps) to log the training progress (to stdout) conf['monitor_progress'] = 1000 # How often (in steps) to save checkpoints conf['periodic_checkpoint'] = 5000 # How many test results to show in a plotted mosaic at the end of training. Preferably a square # number conf['num_display'] = 4**2 return conf def gaussian_noise(in_array, mean, std, min_val, max_val): """Add gaussian noise to in_array and clip the result in value range [min_val, max_val]""" return np.clip(in_array + np.random.normal(mean, std, in_array.shape), min_val, max_val) def kl_divergence(rho, rho_hat): """ Computes the KL divergence between two Bernoulli distributions p and q D_KL(p||q) = sum_x p(x) ( log p(x) / q(x) ) where p ~ Bernoulli(rho) q ~ Bernoulli(rho_hat) """ return rho * tf.log(rho) - rho * tf.log(rho_hat) + \ (1.0 - rho) * tf.log(1.0 - rho) - (1.0 - rho) * tf.log(1.0 - rho_hat) def activation_function(name): """Return an activation function according to the input name""" if name == 'relu': return tf.nn.relu elif name == 'sigmoid': return tf.sigmoid elif name == 'tanh': return tf.tanh else: print("Please specify a valid activation function") sys.exit(1) def reconstruction_cost(name, references, predictions): """Return a cost function value given the input""" if name == 'mean_squared_error': cost = tf.reduce_mean(tf.pow(predictions - references, 2)) elif name == 'cross_entropy': # Binary cross entropy is used to compare the predictions and the reference epsilon = 1e-10 # To avoid log(0) loss = -tf.reduce_sum(references * tf.log(epsilon + predictions) + \ (1.0 - references) * tf.log(epsilon + 1.0 - predictions), axis=1) cost = tf.reduce_mean(loss) # Average over examples else: print("Please specify an implemented cost function") sys.exit(1) tf.summary.scalar("Reconstruction cost", cost) return cost def sparsity_cost(latent_vector, rho): """Return cost function on the sparsity in the latent layer""" rho_hat = tf.reduce_mean(latent_vector, axis=0) # Average over all examples cost = tf.reduce_sum(kl_divergence(rho, rho_hat)) tf.summary.scalar("Sparsity cost", cost) return cost def optimization_function(name, learning_rate, cost): """Return an optimization function given the input""" if name == 'RMSProp': return tf.train.RMSPropOptimizer(learning_rate).minimize(cost) elif name == 'Adam': return tf.train.AdamOptimizer(learning_rate).minimize(cost) elif name == 'sgd': return tf.train.GradientDescentOptimizer(learning_rate).minimize(cost) else: print("Please specify an implemented optimizer") sys.exit(1) def dense_layer(name, input_activations, num_nodes, activation): """Creates a dense layer with num_nodes""" initializer = tf.random_normal_initializer(mean=0.0, stddev=1.0) num_nodes_prev = input_activations.get_shape().as_list()[-1] print("Layer {0}: {1:>4} -> {2:>4}".format(name, num_nodes_prev, num_nodes)) weights = tf.get_variable(name="W_" + name, shape=[num_nodes_prev, num_nodes], dtype=tf.float32, initializer=initializer) biases = tf.get_variable(name="b_" + name, shape=[num_nodes], dtype=tf.float32, initializer=initializer) linear = tf.add(tf.matmul(input_activations, weights), biases) if activation is None: return linear activation_fn = activation_function(activation) return activation_fn(linear) def encoder(input_batch, conf): """Builds the encoder. This assumes that the input image has been flattened""" x = input_batch for layer_ind, num_nodes in enumerate(conf['hidden_dimensions'], start=1): x = dense_layer("encoding_{}".format(layer_ind), x, num_nodes, conf['activation_function']) return x def decoder(latent_batch, conf): """Builds the decoder and returns both the predictions and logits""" x = latent_batch for layer_ind, num_nodes in enumerate(reversed(conf['hidden_dimensions'][:-1]), start=1): x = dense_layer("decoding_{}".format(layer_ind), x, num_nodes, conf['activation_function']) num_nodes = conf['height'] * conf['width'] * conf['channels'] layer_ind = len(conf['hidden_dimensions']) logits = dense_layer("decoding_{}".format(layer_ind), x, num_nodes, None) predictions = tf.sigmoid(logits) return predictions def autoencoder(input_batch, conf): """Define the autoencoder model""" latent_batch = encoder(input_batch, conf) predictions = decoder(latent_batch, conf) return predictions, latent_batch def train(conf, data): """Train the model""" learning_rate = conf['learning_rate'] input_ph = tf.placeholder(name="input_batch", shape=[None, conf['height'] * conf['width'] * conf['channels']], dtype=tf.float32) predictions, latent_batch = autoencoder(input_ph, conf) references = input_ph cost = reconstruction_cost(conf['cost_function'], references, predictions) if conf['variant'] == 'sparse': cost += conf['beta'] * sparsity_cost(latent_batch, conf['sparsity']) tf.summary.scalar("Total cost", cost) train_op = optimization_function(conf['optimization_function'], learning_rate, cost) summaries = tf.get_collection(tf.GraphKeys.SUMMARIES) summary_op = tf.summary.merge(summaries) saver = tf.train.Saver(tf.global_variables(), max_to_keep=20) if conf['restore_from_checkpoint']: restorer = tf.train.Saver() else: init = tf.global_variables_initializer() with tf.Session() as sess: if conf['restore_from_checkpoint']: print('Restoring variables from {}'.format(conf['restore_path'])) restorer.restore(sess, conf['restore_path']) else: sess.run(init) summary_writer = tf.summary.FileWriter(conf['summary_dir'], sess.graph) # Training loop step = 0 while step <= conf['max_steps']: # Get data, do not need labels input_batch, _ = data.train.next_batch(conf['batch_size']) if conf['variant'] == 'denoising': input_batch = gaussian_noise(input_batch, 0.0, conf['gauss_std'], 0.0, 1.0) # Run optimization and backpropagation _, cost_val, summary_str = sess.run([train_op, cost, summary_op], feed_dict={input_ph: input_batch}) summary_writer.add_summary(summary_str, step) step += 1 last_step = True if step == conf['max_steps'] else False if (step % conf['monitor_progress'] == 0) or last_step: print('Step {0:>6}: Cost: {1:>7,.4e}'.format(step, cost_val)) if (step % conf['periodic_checkpoint'] == 0) or last_step: print('Writing checkpoint') checkpoint_path = os.path.join(conf['checkpoint_dir'], 'model.ckpt') saver.save(sess, checkpoint_path, global_step=step) print("Training finished") def test(conf, data): """Test a trained model""" input_ph = tf.placeholder(name="input_batch", shape=[None, conf['height'] * conf['width'] * conf['channels']], dtype=tf.float32) predictions, _ = autoencoder(input_ph, conf) restorer = tf.train.Saver() with tf.Session() as sess: print('Restoring variables from {}'.format(conf['restore_path'])) restorer.restore(sess, conf['restore_path']) # Testing originals = [] reconstructed = [] for _ in range(conf['num_display']): input_batch, _ = data.test.next_batch(1) if conf['variant'] == 'denoising': input_batch = gaussian_noise(input_batch, 0.0, conf['gauss_std'], 0.0, 1.0) preds = sess.run(predictions, feed_dict={input_ph: input_batch}) originals.append(input_batch[0].reshape([conf['height'], conf['width']])) reconstructed.append(preds[0].reshape([conf['height'], conf['width']])) return originals, reconstructed def main(): """Main""" print("Start program") conf = config() mnist = input_data.read_data_sets(conf['data_dir'], one_hot=True) if conf['mode'] == 'train': train(conf, mnist) elif conf['mode'] == 'test': originals, reconstructed = test(conf, mnist) mosaic_size = int(np.ceil(np.sqrt(conf['num_display']))) plt.figure(0, figsize=(mosaic_size, mosaic_size)) for ind, im in enumerate(originals): plt.subplot(mosaic_size, mosaic_size, ind+1) plt.imshow(im, origin="upper", cmap="gray", clim=(0.0, 1.0)) plt.axis('off') plt.savefig(os.path.join(conf['output_dir'], 'mosaic_original.png')) plt.figure(1, figsize=(mosaic_size, mosaic_size)) for ind, im in enumerate(reconstructed): plt.subplot(mosaic_size, mosaic_size, ind+1) plt.imshow(im, origin="upper", cmap="gray", clim=(0.0, 1.0)) plt.axis('off') plt.savefig(os.path.join(conf['output_dir'], 'mosaic_reconstructed.png')) plt.show() if __name__ == "__main__": main()