summaryrefslogtreecommitdiff
path: root/utils/triplet_loss.py
blob: 77c723436b5d656b6f349045dc5d2d2b891bd59c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from typing import Optional, Tuple

import torch
import torch.nn as nn
import torch.nn.functional as F


class BatchTripletLoss(nn.Module):
    def __init__(
            self,
            is_hard: bool = True,
            is_mean: bool = True,
            margin: Optional[float] = 0.2,
    ):
        super().__init__()
        self.is_hard = is_hard
        self.is_mean = is_mean
        self.margin = margin

    def forward(self, x, y):
        p, n, c = x.size()
        dist = self._batch_distance(x)
        flat_dist = dist.tril(-1)
        flat_dist = flat_dist[flat_dist != 0].view(p, -1)

        if self.is_hard:
            positive_negative_dist = self._hard_distance(dist, y, p, n)
        else:  # is_all
            positive_negative_dist = self._all_distance(dist, y, p, n)

        if self.margin:
            losses = F.relu(self.margin + positive_negative_dist).view(p, -1)
            non_zero_counts = (losses != 0).sum(1).float()
            if self.is_mean:
                loss_metric = self._none_zero_mean(losses, non_zero_counts)
            else:  # is_sum
                loss_metric = losses.sum(1)
            return loss_metric, flat_dist, non_zero_counts
        else:  # Soft margin
            losses = F.softplus(positive_negative_dist).view(p, -1)
            if self.is_mean:
                loss_metric = losses.mean(1)
            else:  # is_sum
                loss_metric = losses.sum(1)
            return loss_metric, flat_dist, None

    @staticmethod
    def _batch_distance(x):
        # Euclidean distance p x n x n
        x_squared_sum = torch.sum(x ** 2, dim=2)
        x1_squared_sum = x_squared_sum.unsqueeze(2)
        x2_squared_sum = x_squared_sum.unsqueeze(1)
        x1_times_x2_sum = x @ x.transpose(1, 2)
        dist = torch.sqrt(
            F.relu(x1_squared_sum - 2 * x1_times_x2_sum + x2_squared_sum)
        )
        return dist

    @staticmethod
    def _hard_distance(dist, y, p, n):
        positive_mask = y.unsqueeze(1) == y.unsqueeze(2)
        negative_mask = y.unsqueeze(1) != y.unsqueeze(2)
        hard_positive = dist[positive_mask].view(p, n, -1).max(-1).values
        hard_negative = dist[negative_mask].view(p, n, -1).min(-1).values
        positive_negative_dist = hard_positive - hard_negative

        return positive_negative_dist

    @staticmethod
    def _all_distance(dist, y, p, n):
        positive_mask = y.unsqueeze(1) == y.unsqueeze(2)
        negative_mask = y.unsqueeze(1) != y.unsqueeze(2)
        all_positive = dist[positive_mask].view(p, n, -1, 1)
        all_negative = dist[negative_mask].view(p, n, 1, -1)
        positive_negative_dist = all_positive - all_negative

        return positive_negative_dist

    @staticmethod
    def _none_zero_mean(losses, non_zero_counts):
        # Non-zero parted mean
        non_zero_mean = losses.sum(1) / non_zero_counts
        non_zero_mean[non_zero_counts == 0] = 0
        return non_zero_mean


class JointBatchTripletLoss(BatchTripletLoss):
    def __init__(
            self,
            hpm_num_parts: int,
            is_hard: bool = True,
            is_mean: bool = True,
            margins: Tuple[float, float] = (0.2, 0.2)
    ):
        super().__init__(is_hard, is_mean)
        self.hpm_num_parts = hpm_num_parts
        self.margin_hpm, self.margin_pn = margins

    def forward(self, x, y):
        p, n, c = x.size()
        dist = self._batch_distance(x)

        if self.is_hard:
            positive_negative_dist = self._hard_distance(dist, y, p, n)
        else:  # is_all
            positive_negative_dist = self._all_distance(dist, y, p, n)

        hpm_part_loss = F.relu(
            self.margin_hpm + positive_negative_dist[:self.hpm_num_parts]
        )
        pn_part_loss = F.relu(
            self.margin_pn + positive_negative_dist[self.hpm_num_parts:]
        )
        losses = torch.cat((hpm_part_loss, pn_part_loss)).view(p, -1)

        non_zero_counts = (losses != 0).sum(1).float()
        if self.is_mean:
            loss_metric = self._none_zero_mean(losses, non_zero_counts)
        else:  # is_sum
            loss_metric = losses.sum(1)

        return loss_metric, dist, non_zero_counts