1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
import torch
import torch.nn as nn
from torchvision.models import resnet50
from models.layers import HorizontalPyramidPooling
class HorizontalPyramidMatching(nn.Module):
def __init__(
self,
scales: tuple[int, ...] = (1, 2, 4, 8),
out_channels: int = 256,
use_avg_pool: bool = True,
use_max_pool: bool = True,
**kwargs
):
super().__init__()
self.scales = scales
self.out_channels = out_channels
self.use_avg_pool = use_avg_pool
self.use_max_pool = use_max_pool
self.backbone = resnet50(pretrained=True)
self.in_channels = self.backbone.layer4[-1].conv1.in_channels
self.pyramids = nn.ModuleList([
self._make_pyramid(scale, **kwargs) for scale in self.scales
])
def _make_pyramid(self, scale: int, **kwargs):
pyramid = nn.ModuleList([
HorizontalPyramidPooling(self.in_channels,
self.out_channels,
use_avg_pool=self.use_avg_pool,
use_max_pool=self.use_max_pool,
**kwargs)
for _ in range(scale)
])
return pyramid
def forward(self, x):
# Flatten frames in all batches
n, t, c, h, w = x.size()
x = x.view(-1, c, h, w)
x = self.backbone(x)
n, c, h, w = x.size()
feature = []
for pyramid_index, pyramid in enumerate(self.pyramids):
h_per_hpp = h // self.scales[pyramid_index]
for hpp_index, hpp in enumerate(pyramid):
h_filter = torch.arange(hpp_index * h_per_hpp,
(hpp_index + 1) * h_per_hpp)
x_slice = x[:, :, h_filter, :]
x_slice = hpp(x_slice)
x_slice = x_slice.view(n, -1)
feature.append(x_slice)
x = torch.cat(feature, dim=1)
# Unfold frames to original batch
_, d = x.size()
x = x.view(n, t, d)
return x
|