summaryrefslogtreecommitdiff
path: root/utils/triplet_loss.py
diff options
context:
space:
mode:
authorJordan Gong <jordan.gong@protonmail.com>2021-03-01 18:23:33 +0800
committerJordan Gong <jordan.gong@protonmail.com>2021-03-01 18:26:47 +0800
commitd88e40217f56d96e568335ccee1f14ff3ea5a696 (patch)
treef24b204794fbb8fc501d2124ae67a73faf82db1f /utils/triplet_loss.py
parent5f75d7ef65f6dcd0e72df320c58b6bd141937b5f (diff)
parent6002b2d2017912f90e8917e6e8b71b78ce58e7c2 (diff)
Merge branch 'master' into data_parallel
# Conflicts: # models/model.py
Diffstat (limited to 'utils/triplet_loss.py')
-rw-r--r--utils/triplet_loss.py117
1 files changed, 83 insertions, 34 deletions
diff --git a/utils/triplet_loss.py b/utils/triplet_loss.py
index 0df2188..e05b69d 100644
--- a/utils/triplet_loss.py
+++ b/utils/triplet_loss.py
@@ -1,32 +1,48 @@
+from typing import Optional
+
import torch
import torch.nn as nn
import torch.nn.functional as F
-class BatchAllTripletLoss(nn.Module):
- def __init__(self, margin: float = 0.2):
+class BatchTripletLoss(nn.Module):
+ def __init__(
+ self,
+ is_hard: bool = True,
+ is_mean: bool = True,
+ margin: Optional[float] = 0.2,
+ ):
super().__init__()
+ self.is_hard = is_hard
+ self.is_mean = is_mean
self.margin = margin
def forward(self, x, y):
p, n, c = x.size()
-
dist = self._batch_distance(x)
- positive_negative_dist = self._hard_distance(dist, y, p, n)
- all_loss = F.relu(self.margin + positive_negative_dist).view(p, -1)
- parted_loss_mean = self._none_zero_parted_mean(all_loss)
-
- return parted_loss_mean
-
- @staticmethod
- def _hard_distance(dist, y, p, n):
- hard_positive_mask = y.unsqueeze(1) == y.unsqueeze(2)
- hard_negative_mask = y.unsqueeze(1) != y.unsqueeze(2)
- all_hard_positive = dist[hard_positive_mask].view(p, n, -1, 1)
- all_hard_negative = dist[hard_negative_mask].view(p, n, 1, -1)
- positive_negative_dist = all_hard_positive - all_hard_negative
-
- return positive_negative_dist
+ flat_dist_mask = torch.tril_indices(n, n, offset=-1, device=dist.device)
+ flat_dist = dist[:, flat_dist_mask[0], flat_dist_mask[1]]
+
+ if self.is_hard:
+ positive_negative_dist = self._hard_distance(dist, y, p, n)
+ else: # is_all
+ positive_negative_dist = self._all_distance(dist, y, p, n)
+
+ if self.margin:
+ losses = F.relu(self.margin + positive_negative_dist).view(p, -1)
+ non_zero_counts = (losses != 0).sum(1).float()
+ if self.is_mean:
+ loss_metric = self._none_zero_mean(losses, non_zero_counts)
+ else: # is_sum
+ loss_metric = losses.sum(1)
+ return loss_metric, flat_dist, non_zero_counts
+ else: # Soft margin
+ losses = F.softplus(positive_negative_dist).view(p, -1)
+ if self.is_mean:
+ loss_metric = losses.mean(1)
+ else: # is_sum
+ loss_metric = losses.sum(1)
+ return loss_metric, flat_dist, None
@staticmethod
def _batch_distance(x):
@@ -38,41 +54,74 @@ class BatchAllTripletLoss(nn.Module):
dist = torch.sqrt(
F.relu(x1_squared_sum - 2 * x1_times_x2_sum + x2_squared_sum)
)
-
return dist
@staticmethod
- def _none_zero_parted_mean(all_loss):
- # Non-zero parted mean
- non_zero_counts = (all_loss != 0).sum(1)
- parted_loss_mean = all_loss.sum(1) / non_zero_counts
- parted_loss_mean[non_zero_counts == 0] = 0
+ def _hard_distance(dist, y, p, n):
+ positive_mask = y.unsqueeze(1) == y.unsqueeze(2)
+ negative_mask = y.unsqueeze(1) != y.unsqueeze(2)
+ hard_positive = dist[positive_mask].view(p, n, -1).max(-1).values
+ hard_negative = dist[negative_mask].view(p, n, -1).min(-1).values
+ positive_negative_dist = hard_positive - hard_negative
- return parted_loss_mean
+ return positive_negative_dist
+
+ @staticmethod
+ def _all_distance(dist, y, p, n):
+ # Unmask identical samples
+ positive_mask = torch.eye(
+ n, dtype=torch.bool, device=y.device
+ ) ^ (y.unsqueeze(1) == y.unsqueeze(2))
+ negative_mask = y.unsqueeze(1) != y.unsqueeze(2)
+ all_positive = dist[positive_mask].view(p, n, -1, 1)
+ all_negative = dist[negative_mask].view(p, n, 1, -1)
+ positive_negative_dist = all_positive - all_negative
+
+ return positive_negative_dist
+
+ @staticmethod
+ def _none_zero_mean(losses, non_zero_counts):
+ # Non-zero parted mean
+ non_zero_mean = losses.sum(1) / non_zero_counts
+ non_zero_mean[non_zero_counts == 0] = 0
+ return non_zero_mean
-class JointBatchAllTripletLoss(BatchAllTripletLoss):
+class JointBatchTripletLoss(BatchTripletLoss):
def __init__(
self,
hpm_num_parts: int,
+ is_hard: bool = True,
+ is_mean: bool = True,
margins: tuple[float, float] = (0.2, 0.2)
):
- super().__init__()
+ super().__init__(is_hard, is_mean)
self.hpm_num_parts = hpm_num_parts
self.margin_hpm, self.margin_pn = margins
def forward(self, x, y):
p, n, c = x.size()
-
dist = self._batch_distance(x)
- positive_negative_dist = self._hard_distance(dist, y, p, n)
+ flat_dist_mask = torch.tril_indices(n, n, offset=-1, device=dist.device)
+ flat_dist = dist[:, flat_dist_mask[0], flat_dist_mask[1]]
+
+ if self.is_hard:
+ positive_negative_dist = self._hard_distance(dist, y, p, n)
+ else: # is_all
+ positive_negative_dist = self._all_distance(dist, y, p, n)
+
hpm_part_loss = F.relu(
self.margin_hpm + positive_negative_dist[:self.hpm_num_parts]
- ).view(self.hpm_num_parts, -1)
+ )
pn_part_loss = F.relu(
self.margin_pn + positive_negative_dist[self.hpm_num_parts:]
- ).view(p - self.hpm_num_parts, -1)
- all_loss = torch.cat((hpm_part_loss, pn_part_loss)).view(p, -1)
- parted_loss_mean = self._none_zero_parted_mean(all_loss)
+ )
+ losses = torch.cat((hpm_part_loss, pn_part_loss)).view(p, -1)
+
+ non_zero_counts = (losses != 0).sum(1).float()
+ if self.is_mean:
+ loss_metric = self._none_zero_mean(losses, non_zero_counts)
+ else: # is_sum
+ loss_metric = losses.sum(1)
- return parted_loss_mean
+ return loss_metric, flat_dist, non_zero_counts