import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
import torchvision.utils as vutils
from statistics import mean
# normalize the Training set with data augmentation
transform_train = transforms.Compose(
[transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
# load the Training dataset
trainset = torchvision.datasets.CIFAR10(root='./data',download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)
classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
displayloader = torch.utils.data.DataLoader(trainset, batch_size=50000, shuffle=True, num_workers=2)
# get some random training images
dataiter = iter(displayloader)
images, labels = dataiter.next()
num_classes = len(classes)
samples_per_class = 7
for y, cls in enumerate(classes):
idxs = np.flatnonzero(labels == y)
# select random 7 images from each class
idxs = np.random.choice(idxs, samples_per_class, replace=False)
for i, idx in enumerate(idxs):
plt_idx = i * num_classes + y + 1
plt.subplot(samples_per_class, num_classes, plt_idx)
img = images[idx]/2 + 0.5 # un-normalize the images for display
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.axis('off')
if i == 0:
plt.title(cls)
plt.show()
# Generator is CNN with 4 convolution layers and 1 fully connected layer.
class Generator(nn.Module):
def __init__(self):
super(Generator, self).__init__()
self.model = nn.Sequential(
# input z = noise, going into the FC network 100-->4096
nn.Linear(100, 4*4*256),
nn.LeakyReLU()
)
self.cnn = nn.Sequential(
# DConv1 [state size = (4 x 4 x 256)]
nn.ConvTranspose2d(in_channels = 256, out_channels = 128, kernel_size = 3, stride=2, padding=0, output_padding=0),
nn.LeakyReLU(),
# DConv2 [state size = (9 x 9 x 128)]
nn.ConvTranspose2d(in_channels = 128, out_channels = 64, kernel_size = 3, stride=2, padding=1, output_padding=0),
nn.LeakyReLU(),
# DConv 3 [state size = (17 x 17 x 64)]
nn.ConvTranspose2d(in_channels = 64, out_channels = 64, kernel_size = 3, stride=2, padding=2, output_padding=1),
nn.LeakyReLU(),
# DConv 4 [state size = (32 x 32 x 64)]
nn.Conv2d(in_channels = 64, out_channels = 3, kernel_size = 3, stride=1, padding=1),
nn.Tanh()
# [output size = (32 x 32 x 3)]
)
def forward(self, z):
x = self.model(z)
x = x.view(-1, 256, 4, 4) # Deflatten
x = self.cnn(x)
return x
class Discriminator(nn.Module):
def __init__(self):
super(Discriminator, self).__init__()
self.cnn = nn.Sequential(
# Conv 1 [input size = (32 x 32 x 3)]
nn.Conv2d(in_channels = 3, out_channels = 64, kernel_size = 3, stride=1, padding=1),
nn.LeakyReLU(),
# MAX-POOL [state size = (32 x 32 x 64)]
nn.MaxPool2d(kernel_size = 2, stride = 2),
nn.Dropout(0.4),
# Conv 2 [state size = (16 x 16 x 64)]
nn.Conv2d(in_channels = 64, out_channels = 128, kernel_size = 3, stride=1, padding=1),
nn.LeakyReLU(),
# MAX-POOL [state size = (16 x 16 x 128)]
nn.MaxPool2d(kernel_size = 2, stride = 2),
nn.Dropout(0.4),
# Conv 3 [state size = (8 x 8 x 128)]
nn.Conv2d(in_channels = 128, out_channels = 256, kernel_size = 3, stride=1, padding=1),
nn.LeakyReLU(),
# MAX-POOL [state size = (4 x 4 x 256)]
nn.MaxPool2d(kernel_size = 2, stride = 2),
nn.Dropout(0.4),
)
self.fc = nn.Sequential(
# FC network 4096-->128
nn.Linear(4*4*256, 128),
nn.LeakyReLU(),
nn.Dropout(0.5),
# FC network 128-->1
nn.Linear(128, 1),
nn.Sigmoid()
)
def forward(self, x):
x = self.cnn(x)
x = x.view(-1, 4*4*256) # Flatten
x = self.fc(x)
return x
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
netD = Discriminator().to(device)
netG = Generator().to(device)
# Assuming that we are on a CUDA machine, this should print a CUDA device:
print(device)
criterion = nn.BCELoss() # Binary cross-entropy loss
# Define Optimizers
lr = 0.0002
optimizerD = optim.Adam(netD.parameters(), lr=lr,betas=(0.5, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=lr,betas=(0.5, 0.999))
num_epochs = 120
D_loss = []
G_loss = []
img_list = []
# Use a Fixed noise to follow how the generator output evolves
# Draw 64 samples from the distribution
fixed_noise = Variable(torch.randn(64,100,device = device))
for epoch in range(num_epochs): # loop over the dataset multiple times
D_loss_it = []
G_loss_it = []
for i, data in enumerate(trainloader, 0):
#####################################
# (1) TRAIN DISCRIMINATOR
#####################################
# zero the parameter gradients
netD.zero_grad()
##### (a) Train with real images #####
# get the inputs; data is a list of [inputs, labels]
real_images = data[0].to(device)
real_images = Variable(real_images)
# Batch size in each iteration
batch_size = real_images.size(0)
real_labels = Variable(torch.ones(batch_size,1)).to(device) # labels for real images All ones
# forward + loss
outputs = netD(real_images)
real_loss = criterion(outputs, real_labels)
##### (b) Train with fake images #####
noise = Variable(torch.randn(batch_size,100,device = device))
fake_images = netG(noise)
fake_labels = Variable(torch.zeros(batch_size,1,device=device)) # labels for fake images All zeroes
# forward + loss
outputs = netD(fake_images.detach())
fake_loss = criterion(outputs, fake_labels)
# Total loss + backward + optimize
lossD = real_loss + fake_loss
lossD.backward()
optimizerD.step()
#####################################
# (2) TRAIN GENERATOR
#####################################
# zero the parameter gradients
netG.zero_grad()
# pick m noise samples from the distribution
noise = Variable(torch.randn(batch_size,100,device = device))
fake_images = netG(noise)
outputs = netD(fake_images)
# loss + backward + optimize
lossG = criterion(outputs,real_labels)
lossG.backward()
optimizerG.step()
# record loss
D_loss_it.append(lossD.item())
G_loss_it.append(lossG.item())
# report status
print('Iteration: %2d | Discriminator loss: %.3f | Generator loss: %.3f '
%(epoch + 1, mean(D_loss_it), mean(G_loss_it)))
# average loss over all batches in one epoch
D_loss.append(mean(D_loss_it))
G_loss.append(mean(G_loss_it))
# save images to follow generator output over fixed noise
if epoch%5 ==4:
fake = netG(fixed_noise).detach()
vutils.save_image(fake,'V1/my_samples_epoch_%03d.png' % (epoch),normalize=True)
img_list.append(vutils.make_grid(fake.cpu(),padding=2,normalize=True))
print('Finished Training')
import matplotlib.animation as animation
from IPython.display import HTML
#%%capture
fig = plt.figure(figsize=(8,8))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)
HTML(ani.to_jshtml())
plt.plot(G_loss)
plt.plot(D_loss)
plt.ylabel('cost')
plt.xlabel('epochs')
plt.title('Loss vs Epoch (CIFAR-10)')
plt.grid()
plt.legend(['Generator Loss','Discriminator Loss'], loc='upper right')
plt.show()
# Grab a batch of real images from the dataloader
real_batch = next(iter(trainloader))
# Plot the real images
plt.figure(figsize=(15,15))
plt.subplot(1,2,1)
plt.axis("off")
plt.title("Real Images")
plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(device)[:64], padding=5, normalize=True).cpu(),(1,2,0)))
# Plot the fake images from the last epoch
plt.subplot(1,2,2)
plt.axis("off")
plt.title("Fake Images")
plt.imshow(np.transpose(img_list[-1],(1,2,0)))
plt.show()