.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "gallery_2d/cifar_small_sample.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_gallery_2d_cifar_small_sample.py: Classification on CIFAR10 (ResNet) ================================== Based on pytorch example for CIFAR10 .. GENERATED FROM PYTHON SOURCE LINES 7-244 .. code-block:: default import torch.optim from torchvision import datasets, transforms import torch.nn.functional as F from kymatio import Scattering2D import torch import argparse import kymatio.datasets as scattering_datasets import torch.nn as nn from numpy.random import RandomState import numpy as np class Identity(nn.Module): def __init__(self, *args, **kwargs): super().__init__() def forward(self, x): return x def conv3x3(in_planes, out_planes, stride=1): "3x3 convolution with padding" return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False) class BasicBlock(nn.Module): def __init__(self, inplanes, planes, stride=1, downsample=None): super(BasicBlock, self).__init__() self.conv1 = conv3x3(inplanes, planes, stride) self.bn1 = nn.BatchNorm2d(planes) self.relu = nn.ReLU(inplace=True) self.conv2 = conv3x3(planes, planes) self.bn2 = nn.BatchNorm2d(planes) self.downsample = downsample self.stride = stride def forward(self, x): residual = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) if self.downsample is not None: residual = self.downsample(x) out += residual out = self.relu(out) return out class Scattering2dResNet(nn.Module): def __init__(self, in_channels, k=2, n=4, num_classes=10,standard=False): super(Scattering2dResNet, self).__init__() self.inplanes = 16 * k self.ichannels = 16 * k if standard: self.init_conv = nn.Sequential( nn.Conv2d(3, self.ichannels, kernel_size=3, stride=1, padding=1, bias=False), nn.BatchNorm2d(self.ichannels), nn.ReLU(True) ) self.layer1 = self._make_layer(BasicBlock, 16 * k, n) self.standard = True else: self.K = in_channels self.init_conv = nn.Sequential( nn.BatchNorm2d(in_channels, eps=1e-5, affine=False), nn.Conv2d(in_channels, self.ichannels, kernel_size=3, stride=1, padding=1, bias=False), nn.BatchNorm2d(self.ichannels), nn.ReLU(True) ) self.standard = False self.layer2 = self._make_layer(BasicBlock, 32 * k, n) self.layer3 = self._make_layer(BasicBlock, 64 * k, n) self.avgpool = nn.AdaptiveAvgPool2d(2) self.fc = nn.Linear(64 * k * 4, num_classes) def _make_layer(self, block, planes, blocks, stride=1): downsample = None if stride != 1 or self.inplanes != planes: downsample = nn.Sequential( nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(planes), ) layers = [] layers.append(block(self.inplanes, planes, stride, downsample)) self.inplanes = planes for i in range(1, blocks): layers.append(block(self.inplanes, planes)) return nn.Sequential(*layers) def forward(self, x): if not self.standard: x = x.view(x.size(0), self.K, 8, 8) x = self.init_conv(x) if self.standard: x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.avgpool(x) x = x.view(x.size(0), -1) x = self.fc(x) return x def train(model, device, train_loader, optimizer, epoch, scattering): model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(scattering(data)) loss = F.cross_entropy(output, target) loss.backward() optimizer.step() if batch_idx % 50 == 0: print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) def test(model, device, test_loader, scattering): model.eval() test_loss = 0 correct = 0 with torch.no_grad(): for data, target in test_loader: data, target = data.to(device), target.to(device) output = model(scattering(data)) test_loss += F.cross_entropy(output, target, size_average=False).item() # sum up batch loss pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() test_loss /= len(test_loader.dataset) print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset))) def main(): """Train a simple Hybrid Resnet Scattering + CNN model on CIFAR. """ parser = argparse.ArgumentParser(description='CIFAR scattering + hybrid examples') parser.add_argument('--mode', type=str, default='scattering',choices=['scattering', 'standard'], help='network_type') parser.add_argument('--num_samples', type=int, default=50, help='samples per class') parser.add_argument('--learning_schedule_multi', type=int, default=10, help='samples per class') parser.add_argument('--seed', type=int, default=0, help='seed for dataset subselection') parser.add_argument('--width', type=int, default=2,help='width factor for resnet') args = parser.parse_args() use_cuda = torch.cuda.is_available() device = torch.device("cuda" if use_cuda else "cpu") if args.mode == 'scattering': scattering = Scattering2D(J=2, shape=(32, 32)) K = 81*3 model = Scattering2dResNet(K, args.width).to(device) scattering = scattering.to(device) else: model = Scattering2dResNet(8, args.width,standard=True).to(device) scattering = Identity() # DataLoaders num_workers = 4 if use_cuda: pin_memory = True else: pin_memory = False normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) #####cifar data cifar_data = datasets.CIFAR10(root=scattering_datasets.get_dataset_dir('CIFAR'), train=True, transform=transforms.Compose([ transforms.RandomHorizontalFlip(), transforms.RandomCrop(32, 4), transforms.ToTensor(), normalize, ]), download=True) # Extract a subset of X samples per class prng = RandomState(args.seed) random_permute = prng.permutation(np.arange(0, 5000))[0:args.num_samples] indx = np.concatenate([np.where(np.array(cifar_data.targets) == classe)[0][random_permute] for classe in range(0, 10)]) cifar_data.data, cifar_data.targets = cifar_data.data[indx], list(np.array(cifar_data.targets)[indx]) train_loader = torch.utils.data.DataLoader(cifar_data, batch_size=32, shuffle=True, num_workers=num_workers, pin_memory=pin_memory) test_loader = torch.utils.data.DataLoader( datasets.CIFAR10(root=scattering_datasets.get_dataset_dir('CIFAR'), train=False, transform=transforms.Compose([ transforms.ToTensor(), normalize, ])), batch_size=128, shuffle=False, num_workers=num_workers, pin_memory=pin_memory) # Optimizer lr = 0.1 M = args.learning_schedule_multi drops = [60*M,120*M,160*M] for epoch in range(0, 200*M): if epoch in drops or epoch==0: optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=0.0005) lr*=0.2 train(model, device, train_loader, optimizer, epoch+1, scattering) if epoch%10==0: test(model, device, test_loader, scattering) if __name__ == '__main__': main() .. rst-class:: sphx-glr-timing **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_gallery_2d_cifar_small_sample.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: cifar_small_sample.py ` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: cifar_small_sample.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_