Introduction

Problem Restatement

这次作业的任务是图像分类(classification)和图像认证(verification),值得注意的是这个分类任务是closed set problem,图像一共有7000种类别,每个图像的主体都会出现在训练集中,在测试集中该主体的图像会和测试集中的有所不同。认证的任务是open set problem,下面的图解释了这两种问题的差别。对于verification我们关注的点并不是所属哪个类别,而是这个datapoint能与其他datapoint比较的metric值。除了这点外,verificationclassification的区别还在于verification是允许一对多match的,也就是一个sample和多个identity匹配,而classification只是一对一。

关于这两种任务的差别,指导书解释了一堆,有点冗余🤔

image-20230307150049351

Solving Problems

分类问题利用传统的方法解决就行:

A face classifier that can extract feature vectors from face images. The face classifier consists of two main parts: the feature extractor and the classification layer.

Your model needs to be able to learn facial features (e.g., skin tone, hair color, nose size, etc.) from an image of a person’s face and represent them as a fixed-length feature vector called face embedding. In order to do this, you will explore architectures consisting of multiple convolutional layers. Stacking several convolutional layers allows for hierarchical decomposition of the input image. For example, if the first layer extracts low-level features such as lines, then the second layer (that acts on the output of the first layer) may extract combinations of low-level features, such as features that comprise multiple lines to express shapes.

下面详细讨论一下verification

可能最容易想到的解决方法就是multi-class classification,也就是直接用softmax得到各个类别的match程度,但是这样处理并没有考虑到verification任务本身的特点,如下图所示,利用cross-entropy只是学习分界线和判别的标准,对于instances之间的metric并没有要求,也就是像左图所示,而我们希望的效果是右图,也就是instance embedding是有结构的,即使是同一类别最远两个instance的metric(PQ)也会比两个类别最近的两个instance的metric(QR)小。

image-20230307171743874

为了解决这一问题,增强特征向量的鉴别能力,需要同时最大化其类内紧凑性和类间跨度,我们可以结合不同的损失函数设计最终的loss function。(Use Center loss, Sphere loss, Large-margin softmax loss, Large-margin Gaussian mixture loss etc)。Each of these losses is jointly used with cross-entropy loss to get high-quality feature vectors.

在实际的问题中,仅仅考虑以上的点可能还算不够的,因为测试集往往有很多unseen的class,仅仅靠训练集得到的特征向量是很局限的,毕竟只是在训练集的class内可以进行明显的区分。这时可以直接构建以feature embedding为目标的模型,而不考虑class,这样得到的效果往往会好很多。

A verification system that computes the similarity between feature vectors of two images. The face verification consists of two steps:

  1. Extracting the feature vectors from the images.

  2. Comparing the feature vectors using a similarity metric.

A vanilla verification system looks like this:

  1. image1 = feature extractor = feature vector1

  2. image2 = feature extractor = feature vector2

  3. feature vector1, feature vector2 = similarity metric = similarity score

代码上,verification是这样做的:

The verification task consists of the following generalized scenario:

  • You are given X unknown identities
  • You are given Y known identities
  • Your goal is to match X unknown identities to Y known identities.

We have given you a verification dataset, that consists of 1000 known identities, and 1000 unknown identities. The 1000 unknown identities are split into dev (200) and test (800). Your goal is to compare the unknown identities to the 1000 known identities and assign an identity to each image from the set of unknown identities.

Your will use/finetune your model trained for classification to compare images between known and unknown identities using a similarity metric and assign labels to the unknown identities.

This will judge your model’s performance in terms of the quality of embeddings/features it generates on images/faces it has never seen during training for classification.

Data Part

本次作业采用的是VGGFace2 datasetd的一部分,展示的情况如下:

image-20230307213533595

对于训练集和验证集,加载数据直接利用torchvision.datasets.ImageFolder即可,以train为例,这个方法其实就是把train目录下的所有子目录作为不同label的数据集,第一个子目录n000002对应的图像就对应label 0,以此类推。

ToTensor()shape(H, W, C)nump.ndarrayimg转为shape(C, H, W)tensor,其将每一个数值归一化到[0,1],其归一化方法比较简单,直接除以255即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
DATA_DIR = '/content/11-785-f22-hw2p2-classification'# TODO: Path where you have downloaded the data
TRAIN_DIR = os.path.join(DATA_DIR, "classification/train")
VAL_DIR = os.path.join(DATA_DIR, "classification/dev")
TEST_DIR = os.path.join(DATA_DIR, "classification/test")

# Transforms using torchvision - Refer https://pytorch.org/vision/stable/transforms.html

train_transforms = torchvision.transforms.Compose([
# Implementing the right transforms/augmentation methods is key to improving performance.
torchvision.transforms.ToTensor(),
])
# Most torchvision transforms are done on PIL images. So you convert it into a tensor at the end with ToTensor()
# But there are some transforms which are performed after ToTensor() : e.g - Normalization
# Normalization Tip - Do not blindly use normalization that is not suitable for this dataset

val_transforms = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])


train_dataset = torchvision.datasets.ImageFolder(TRAIN_DIR, transform = train_transforms)
val_dataset = torchvision.datasets.ImageFolder(VAL_DIR, transform = val_transforms)
# You should NOT have data augmentation on the validation set. Why?


# Create data loaders
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = config['batch_size'],
shuffle = True,num_workers = 4, pin_memory = True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size = config['batch_size'],
shuffle = False, num_workers = 2)

对于测试集,因为没有标签这个信息,所以test目录下只有对应数量的图像,没有子目录。因此加载测试集就要用自己设计的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# You can do this with ImageFolder as well, but it requires some tweaking
class ClassificationTestDataset(torch.utils.data.Dataset):

def __init__(self, data_dir, transforms):
self.data_dir = data_dir
self.transforms = transforms

# This one-liner basically generates a sorted list of full paths to each image in the test directory
self.img_paths = list(map(lambda fname: os.path.join(self.data_dir, fname), sorted(os.listdir(self.data_dir))))

def __len__(self):
return len(self.img_paths)

def __getitem__(self, idx):
return self.transforms(Image.open(self.img_paths[idx]))

test_dataset = ClassificationTestDataset(TEST_DIR, transforms = val_transforms)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size = config['batch_size'], shuffle = False,
drop_last = False, num_workers = 2)

Model Part

Now let’s consider how do we take convolutions and assemble them into a strong architecture, considering layers, channel size, stride, kernel size etc. In this part, i’ll first cover 3 aritectures:

  • MobileNetV2: A fast, parameter-efficient model
  • ResNet: The “go-to” for CNNs
  • ConvNeXt: The state of the art model

CNN architectures are divided into stages, which are divided into blocks.

  • Each “stage” consists of (almost) equivalent “blocks”
  • Each “block” consists of a few CNN layers, BN, and ReLUs

To understand an architecture, we mostly need to understand its blocks. All that changes for blocks in different stages is the base num of channels. We do need to piece these blocks together into a final model. The general flow is like this:

  • Stem (some linear layers like project 3 channels to 64 channels and downsample)
  • Stage 1
  • Stage n
  • Classification layer

The stem usually downsamples the input by 4x. Some stages do downsample. If they do, generally, the first convolution in the stage downsamples by 2x. When you downsample by 2x, you usually increase channel dimension by 2x. So, later stages have smaller spatial resolution, higher num of channels.

SimpleNet

这个就是先交个能跑通的版本,根据提示完成4-layer的CNN,主要就是channel的设计以及outputsize的设计,channel就是按照3->128->256->512很简单,outputsize则是按照一共downsample 32x这个原则,按照纯卷积也就是stride为1进行padding补齐后,再把stride设置为想要downsample的倍数,因为我们有四层,所以依次stride为4,2,2,2。

最后将得到的输出通过nn.AdaptiveAvgPool2d((1, 1)),这个层的作用就是指定输出的size,然后自适应调整kernel和stride的大小。

结果大概是10个epoch 25%的准确率,比较低,但是我看训练集上准确率还挺高的,可能要处理一下拟合效果才好。

MobileNetV2

reference: link

The goal of MobileNetV2 is to be parameter efficient. They do so by making extensive use of depth-wise convolutions and point-wise convolutions, which is the intuition that a normal convolution can be divided into these 2 parts but with cheaper params.

  • A normal convolution mixes information from both different channels and different spatial locations(pixels).img

  • A depth-wise convolution only mixes information over spatial locations: different channels don’t interact. “Depth” means each channel.

img
  • A point-wise convolution only mixes information over different channels: different spatial locations don’t interact. “point” means pixel.
img

MobileNetV2 Block design

First we apply a point-wise convolution to get more channels by an expansion ratio, then apply a depth-wise convolution that communicates information over different spatial locations. At last, we do a point-wise convolution but to reduce channels, which bottlenecks channels. The intuition is to distill a sparse space to a more condensed rich feature dimension.

image-20230411143953025
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class InvertedResidualBlock(nn.Module):
"""
Intuitively, layers in MobileNet can be split into "feature mixing"
and "spatial mixing" layers. You can think of feature mixing as each pixel
"thinking on its own" about its own featuers, and you can think of spatial
mixing as pixels "talking with each other". Alternating these two builds
up a CNN.
In a bit more detail:
- The purpose of the "feature mixing" layers is what you've already seen in
hw1p2. Remember, in hw1p2, we went from some low-level audio input to
semantically rich representations of phonemes. Featuring mixing is simply a
linear layer (a weight matrix) that transforms simpler features into
something more advanced.
- The purpose of the "spatial mixing" layers is to mix features from different
spatial locations. You can't figure out a face by looking at each pixel on
its own, right? So we need 3x3 convolutions to mix features from neighboring
pixels to build up spatially larger features.
"""
def __init__(self,
in_channels,
out_channels,
stride,
expand_ratio):
super().__init__() # Just have to do this for all nn.Module classes

# Can only do identity residual connection if input & output are the
# same channel & spatial shape.
if stride == 1 and in_channels == out_channels:
self.do_identity = True
else:
self.do_identity = False

# Expand Ratio is like 6, so hidden_dim >> in_channels
hidden_dim = in_channels * expand_ratio

"""
What is this doing? It's a 1x1 convolutional layer that drastically
increases the # of channels (feature dimension). 1x1 means each pixel
is thinking on its own, and increasing # of channels means the network
is seeing if it can "see" more clearly in a higher dimensional space.
Some patterns are just more obvious/separable in higher dimensions.
Also, note that bias = False since BatchNorm2d has a bias term built-in.
As you go, note the relationship between kernel_size and padding. As you
covered in class, padding = kernel_size // 2 (kernel_size being odd) to
make sure input & output spatial resolution is the same.
"""
self.feature_mixing = nn.Sequential(
# TODO: Fill this in!
nn.Conv2d(in_channels=in_channels,
out_channels=hidden_dim,
kernel_size=1,
stride=1,
padding=0,
bias=False),
nn.BatchNorm2d(hidden_dim),
nn.ReLU6(inplace=True)
)

"""
What is this doing? Let's break it down.
- kernel_size = 3 means neighboring pixels are talking with each other.
This is different from feature mixing, where kernel_size = 1.
- stride. Remember that we sometimes want to downsample spatially.
Downsampling is done to reduce # of pixels (less computation to do),
and also to increase receptive field (if a face was 32x32, and now
it's 16x16, a 3x3 convolution covers more of the face, right?). It
makes sense to put the downsampling in the spatial mixing portion
since this layer is "in charge" of messing around spatially anyway.
Note that most of the time, stride is 1. It's just the first block of
every "stage" (layer \subsetof block \subsetof stage) that we have
stride = 2.
- groups = hidden_dim. Remember depthwise separable convolutions in
class? If not, it's fine. Usually, when we go from hidden_dim channels
to hidden_dim channels, they're densely connected (like a linear
layer). So you can think of every pixel/grid in an input
3 x 3 x hidden_dim block being connected to every single pixel/grid
in the output 3 x 3 x hidden_dim block.
What groups = hidden_dim does is remove a lot of these connections.
Now, each input 3 x 3 block/region is densely connected to the
corresponding output 3 x 3 block/region. This happens for each of the
hidden_dim input/output channel pairs independently.
So we're not even mixing different channels together - we're only
mixing spatial neighborhoods.

Try to draw this out, or come to my (Jinhyung Park)'s OH if you want
a more in-depth explanation.
https://towardsdatascience.com/a-basic-introduction-to-separable-convolutions-b99ec3102728
"""
self.spatial_mixing = nn.Sequential(
# TODO: Fill this in!
nn.Conv2d(in_channels=hidden_dim,
out_channels=hidden_dim,
kernel_size=3,
stride=stride,
padding=1,
groups=hidden_dim,
bias=False),
nn.BatchNorm2d(hidden_dim),
nn.ReLU6(inplace=True)
)

"""
What's this? Remember that hidden_dim is quite large - six times the
in_channels. So it was nice to do the above operations in this high-dim
space, where some patterns might be more clear. But we still want to
bring it back down-to-earth.
Intuitively, you can takeaway two reasons for doing this:
- Reduces computational cost by a lot. 6x in & out channels means 36x
larger weights, which is crazy. We're okay with just one of input or
output of a convolutional layer being large when mixing channels, but
not both.

- We also want a residual connection from the input to the output. To
do that without introducing another convolutional layer, we want to
condense the # of channels back to be the same as the in_channels.
(out_channels and in_channels are usually the same).
"""
self.bottleneck_channels = nn.Sequential(
# TODO: Fill this in!
nn.Conv2d(in_channels=hidden_dim,
out_channels=out_channels,
kernel_size=1,
stride=1,
padding=0,
bias=False),
nn.BatchNorm2d(out_channels)
)

def forward(self, x):
out = self.feature_mixing(x)
out = self.spatial_mixing(out)
out = self.bottleneck_channels(out)

if self.do_identity:
return x + out
else:
return out

And the complete MobileNetV2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class MobileNetV2(nn.Module):
"""
The heavy lifting is already done in InvertedBottleneck.
Why MobileNetV2 and not V3? V2 is the foundation for V3, which uses "neural
architecture search" to find better configurations of V2. If you understand
V2 well, you can totally implement V3!
"""
def __init__(self, num_classes= 7000):
super().__init__()

self.num_classes = num_classes

"""
First couple of layers are special, just do them here.
This is called the "stem". Usually, methods use it to downsample or twice.
"""
self.stem = nn.Sequential(
# TODO: Fill this in!
nn.Conv2d(3, 32, 3, 2, 1, bias=False),
nn.BatchNorm2d(32),
nn.ReLU6(inplace=True),
nn.Conv2d(32, 32, 3, 1, 1, groups=32, bias=False),
nn.BatchNorm2d(32),
nn.ReLU6(inplace=True),
nn.Conv2d(32, 16, 1, 1, 0, bias=False),
nn.BatchNorm2d(16),
)

"""
Since we're just repeating InvertedResidualBlocks again and again, we
want to specify their parameters like this.
The four numbers in each row (a stage) are shown below.
- Expand ratio: We talked about this in InvertedResidualBlock
- Channels: This specifies the channel size before expansion
- # blocks: Each stage has many blocks, how many?
- Stride of first block: For some stages, we want to downsample. In a
downsampling stage, we set the first block in that stage to have
stride = 2, and the rest just have stride = 1.
Again, note that almost every stage here is downsampling! By the time
we get to the last stage, what is the image resolution? Can it still
be called an image for our dataset? Think about this, and make changes
as you want.
"""
self.stage_cfgs = [
# expand_ratio, channels, # blocks, stride of first block
[6, 24, 2, 2],
[6, 32, 3, 2],
[6, 64, 4, 2],
[6, 96, 3, 1],
[6, 160, 3, 2],
[6, 320, 1, 1],
]

# Remember that our stem left us off at 16 channels. We're going to
# keep updating this in_channels variable as we go
in_channels = 16

# Let's make the layers
layers = []
for curr_stage in self.stage_cfgs:
expand_ratio, num_channels, num_blocks, stride = curr_stage

for block_idx in range(num_blocks):
out_channels = num_channels
layers.append(InvertedResidualBlock(
in_channels=in_channels,
out_channels=out_channels,
# only have non-trivial stride if first block
stride=stride if block_idx == 0 else 1,
expand_ratio=expand_ratio
))
# In channels of the next block is the out_channels of the current one
in_channels = out_channels

self.layers = nn.Sequential(*layers) # Done, save them to the class

# Some final feature mixing
self.final_block = nn.Sequential(
nn.Conv2d(in_channels, 1280, kernel_size=1, padding=0, stride=1, bias=False),
nn.BatchNorm2d(1280),
nn.ReLU6()
)

# Now, we need to build the final classification layer.
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.classifier = nn.Linear(1280, self.num_classes)

self._initialize_weights()

def _initialize_weights(self):
"""
Usually, I like to use default pytorch initialization for stuff, but
MobileNetV2 made a point of putting in some custom ones, so let's just
use them.
"""
for m in self.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
if m.bias is not None:
m.bias.data.zero_()
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
elif isinstance(m, nn.Linear):
m.weight.data.normal_(0, 0.01)
m.bias.data.zero_()

def forward(self, x, return_feats=False):
out = self.stem(x)
out = self.layers(out)
out = self.final_block(out)

avg_out = self.avgpool(out)
feats = avg_out.reshape(avg_out.size(0), -1)
classifier_out = self.classifier(feats)

if return_feats:
feats = nn.functional.normalize(feats, p=2.0, dim=1)
return feats
else:
return classifier_out

ResNet

简要总结一下:对于stem和stage的数据流动

  • 输入的图片为
  • 经过stem,输出的图片为
  • 经过max pool层,输出的图片为

至此相当于downsample了4倍。

对于Resnet 18Resnet 34也就是使用BasicBlock

  • 图片经过stage1,输出图片的大小为 (也就是不变)
  • 图片经过stage2,输出图片的大小为
  • 图片经过stage3,输出图片的大小为
  • 图片经过stage4,输出图片的大小为
  • 图片经过avg pool,输出图片的大小为

对于Resnet 50Resnet 101也就是使用Bottleneck

  • 图片经过stage1,输出图片的大小为 (也就是不变)
  • 图片经过stage2,输出图片的大小为
  • 图片经过stage3,输出图片的大小为
  • 图片经过stage4,输出图片的大小为
  • 图片经过avg pool输出图片的大小为

下面分析stage中的结构,以resnet 18resnet 50为例。

对于resnet 18,stage里面包含两个basic block,每个block由两个卷积层组成,对应数据的流动:

  • 在stage1中,两个卷积层的input channel和output channel都是64

  • 后续的stage中,因为要下采样,所以在第一个卷积层中output channel都是input channel的两倍,而且残差连接也要对应把输入的channel变成output channel的形式

对于resnet 50,不同的点在于每个block里面有三个卷积层,对应的数据流动其实是经过了一个bottleneck的过程,具体可以看下面的图。

在代码上的实现,也就是_make_stage中,对应要把第二块block的input调整成第一块block中间bottleneck output倍率后的输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import torch
import torch.nn as nn

class BasicBlock(nn.Module):
"""Basic Block for resnet 18 and resnet 34
"""

#BasicBlock and BottleNeck block
#have different output size
#we use class attribute expansion
#to distinct
expansion = 1

def __init__(self, in_channels, out_channels, stride=1):
super().__init__()

#residual function
self.residual_function = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.GELU(),
nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(out_channels * BasicBlock.expansion)
)

#shortcut
self.shortcut = nn.Sequential()

#the shortcut output dimension is not the same with residual function
#use 1*1 convolution to match the dimension
if stride != 1 or in_channels != BasicBlock.expansion * out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels * BasicBlock.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels * BasicBlock.expansion)
)

def forward(self, x):
return nn.GELU()(self.residual_function(x) + self.shortcut(x))

class BottleNeck(nn.Module):
"""Residual block for resnet over 50 layers
"""
expansion = 4
def __init__(self, in_channels, out_channels, stride=1):
super().__init__()
self.residual_function = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.GELU(),
nn.Conv2d(out_channels, out_channels, stride=stride, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.GELU(),
nn.Conv2d(out_channels, out_channels * BottleNeck.expansion, kernel_size=1, bias=False),
nn.BatchNorm2d(out_channels * BottleNeck.expansion),
)

self.shortcut = nn.Sequential()

if stride != 1 or in_channels != out_channels * BottleNeck.expansion:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels * BottleNeck.expansion, stride=stride, kernel_size=1, bias=False),
nn.BatchNorm2d(out_channels * BottleNeck.expansion)
)

def forward(self, x):
return nn.GELU()(self.residual_function(x) + self.shortcut(x))

class ResNet(nn.Module):

def __init__(self, block, num_block, num_classes=7000):
super().__init__()

self.in_channels = 64 # stage1 input channels

self.stem = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(64),
nn.GELU()
)

self.maxpool = nn.MaxPool2d(
kernel_size=3,
stride=2,
padding=1
)

self.stage1 = self._make_stage(block, 64, num_block[0], 1)
self.stage2 = self._make_stage(block, 128, num_block[1], 2)
self.stage3 = self._make_stage(block, 256, num_block[2], 2)
self.stage4 = self._make_stage(block, 512, num_block[3], 2)

self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512 * block.expansion, num_classes)

def _make_stage(self, block, out_channels, num_blocks, stride):
"""make resnet layers(by layer i didnt mean this 'layer' was the
same as a neuron netowork layer, ex. conv layer), one layer may
contain more than one residual block
Args:
block: block type, basic block or bottle neck block
out_channels: output depth channel number of this layer
num_blocks: how many blocks per layer
stride: the stride of the first block of this layer
Return:
return a resnet layer
"""

# we have num_block blocks per layer, the first block
# could be 1 or 2, other blocks would always be 1
strides = [stride] + [1] * (num_blocks - 1)
layers = []
for stride in strides:
layers.append(block(self.in_channels, out_channels, stride))
self.in_channels = out_channels * block.expansion

return nn.Sequential(*layers)

def forward(self, x, return_feats = False):
output = self.stem(x)
output = self.maxpool(output)
output = self.stage1(output)
output = self.stage2(output)
output = self.stage3(output)
output = self.stage4(output)
avg_out = self.avg_pool(output)

feats = avg_out.reshape(avg_out.size(0), -1)
classifier_out = self.fc(feats)

if return_feats:
feats = nn.functional.normalize(feats, p=2.0, dim=1)
return feats
else:
return classifier_out


def resnet18():
""" return a ResNet 18 object
"""
return ResNet(BasicBlock, [2, 2, 2, 2])

def resnet34():
""" return a ResNet 34 object
"""
return ResNet(BasicBlock, [3, 4, 6, 3])

def resnet50():
""" return a ResNet 50 object
"""
return ResNet(BottleNeck, [3, 4, 6, 3])

def resnet101():
""" return a ResNet 101 object
"""
return ResNet(BottleNeck, [3, 4, 23, 3])

def resnet152():
""" return a ResNet 152 object
"""
return ResNet(BottleNeck, [3, 8, 36, 3])

ConvNeXt

SOTA architecture, its intuitions are very similar to MobileNetV2.

image-20230411154140241

The differences:

  1. The depth-wise convolution in ConvNeXt is larger kernel size(7x7).
  2. The order of spatial mixing & feature mixing are flipped. In ConvNeXt, depth-wise convolution operates on lower num of channels. In MobileNetV2, operates on higher num of channels.
  3. Channel Expansion Ratio in ConvNeXt is 4, MobileNetV2 is 6
  4. ConvNeXt uses layer Norm, MobileNetV2 uses batch Norm
  5. ConvNeXt recommends training via AdamW, MobileNetV2 recommends SGD

本次实验用的是batchnorm

image-20230413223920781 image-20230413223950024 image-20230413224024931
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import math
import torch
import torch.nn as nn
from timm.models.layers import trunc_normal_

class BottleNeck(nn.Module):

expansion = 4

def __init__(self, dim):
super().__init__()

expanded_channels = dim * self.expansion

self.conv = nn.Sequential(
nn.Conv2d(in_channels=dim,
out_channels=dim,
kernel_size=3,
stride=1,
padding=1,
groups=dim,
bias=False),
nn.BatchNorm2d(dim),
nn.Conv2d(in_channels=dim,
out_channels=expanded_channels,
kernel_size=1,
stride=1,
bias=False),
nn.GELU(),
nn.Conv2d(in_channels=expanded_channels,
out_channels=dim,
kernel_size=1,
stride=1,
bias=False),
)

self.shortcut = nn.Sequential()

def forward(self, x):
# if self.conv(x).shape[1:] != self.shortcut(x).shape[1:]:
# import pdb; pdb.set_trace()
out = self.conv(x) + self.shortcut(x)
return out


class ConvNext(nn.Module):
def __init__(self, block, block_nums, num_classes=7000, dropout=0) -> None:
super().__init__()

self.num_classes = num_classes

dims=[96, 192, 384, 768]

self.downsampling = nn.ModuleList()
stem = nn.Sequential(
nn.Conv2d(in_channels=3,
out_channels=dims[0],
kernel_size=4,
stride=4,
bias=False),
nn.BatchNorm2d(dims[0]),
)
self.downsampling.append(stem)
for i in range(3):
downsample_layer = nn.Sequential(
nn.BatchNorm2d(dims[i]),
nn.Conv2d(dims[i], dims[i+1], kernel_size=2, stride=2),
)
self.downsampling.append(downsample_layer)

stage1 = self._make_stage(dims[0], block, block_nums[0])
stage2 = self._make_stage(dims[1], block, block_nums[1])
stage3 = self._make_stage(dims[2], block, block_nums[2])
stage4 = self._make_stage(dims[3], block, block_nums[3])
self.stage = nn.ModuleList([stage1, stage2, stage3, stage4])

self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
# self.embedding = nn.Linear(dims[3], dims[3], bias=False)
# if dropout == 0:
# self.classifier = nn.Sequential(
# nn.BatchNorm1d(dims[3]),
# nn.GELU(),
# nn.Linear(dims[3], self.num_classes)
# )
# else:
# self.classifier = nn.Sequential(
# nn.BatchNorm1d(dims[3]),
# nn.GELU(),
# nn.Dropout(dropout),
# nn.Linear(dims[3], self.num_classes)
# )
if dropout == 0:
self.classifier = nn.Linear(dims[3], 7000)
else:
self.classifier = nn.Sequential(
nn.Linear(dims[3], dims[3], bias=False),
nn.BatchNorm1d(dims[3]),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(dims[3], self.num_classes)
)


self.apply(self._initialize_weights)

def _make_stage(self, dim, block, block_num):
stage = []
for i in range(block_num):
stage.append(block(dim))
return nn.Sequential(*stage)

def _initialize_weights(self, m):
if isinstance(m, (nn.Conv2d, nn.Linear)):
# trunc_normal_: https://juejin.cn/post/7129817668350050335
trunc_normal_(m.weight, std=.02)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, (nn.BatchNorm2d, nn.BatchNorm1d)):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)

def forward(self, x, return_feats=False):
for i in range(4):
x = self.downsampling[i](x)
x = self.stage[i](x)

avg_out = self.avgpool(x)
feats = avg_out.reshape(avg_out.size(0), -1)
# feats = self.embedding(feats)
classifier_out = self.classifier(feats)
if return_feats:
feats = nn.functional.normalize(feats, p=2.0, dim=1)
return classifier_out, feats
else:
return classifier_out


def convnext_t(dropout=0):
return ConvNext(BottleNeck, [3,3,9,3], dropout=dropout)

def my_convnext(dropout=0, block_nums=[5,9,9,2]):
return ConvNext(BottleNeck, block_nums, dropout=dropout)

Tuning Part

Data Augmentation

You will find that even when using a larger/more advanced model, that modal might have same/worse performance. That’s because the larger model is severely overfitting.

Label Smoothing