Spaces:
Runtime error
Runtime error
| import random | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import torch | |
| import torchvision.transforms as TF | |
| import dataloaders.image_transforms as IT | |
| cv2.setNumThreads(0) | |
| class Resize(object): | |
| """Rescale the image in a sample to a given size. | |
| Args: | |
| output_size (tuple or int): Desired output size. If tuple, output is | |
| matched to output_size. If int, smaller of image edges is matched | |
| to output_size keeping aspect ratio the same. | |
| """ | |
| def __init__(self, output_size, use_padding=False): | |
| assert isinstance(output_size, (int, tuple)) | |
| if isinstance(output_size, int): | |
| self.output_size = (output_size, output_size) | |
| else: | |
| self.output_size = output_size | |
| self.use_padding = use_padding | |
| def __call__(self, sample): | |
| return self.padding(sample) if self.use_padding else self.rescale( | |
| sample) | |
| def rescale(self, sample): | |
| prev_img = sample['prev_img'] | |
| h, w = prev_img.shape[:2] | |
| if self.output_size == (h, w): | |
| return sample | |
| else: | |
| new_h, new_w = self.output_size | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| continue | |
| tmp = sample[elem] | |
| if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': | |
| flagval = cv2.INTER_CUBIC | |
| else: | |
| flagval = cv2.INTER_NEAREST | |
| if elem == 'curr_img' or elem == 'curr_label': | |
| new_tmp = [] | |
| all_tmp = tmp | |
| for tmp in all_tmp: | |
| tmp = cv2.resize(tmp, | |
| dsize=(new_w, new_h), | |
| interpolation=flagval) | |
| new_tmp.append(tmp) | |
| tmp = new_tmp | |
| else: | |
| tmp = cv2.resize(tmp, | |
| dsize=(new_w, new_h), | |
| interpolation=flagval) | |
| sample[elem] = tmp | |
| return sample | |
| def padding(self, sample): | |
| prev_img = sample['prev_img'] | |
| h, w = prev_img.shape[:2] | |
| if self.output_size == (h, w): | |
| return sample | |
| else: | |
| new_h, new_w = self.output_size | |
| def sep_pad(x): | |
| x0 = np.random.randint(0, x + 1) | |
| x1 = x - x0 | |
| return x0, x1 | |
| top_pad, bottom_pad = sep_pad(new_h - h) | |
| left_pad, right_pad = sep_pad(new_w - w) | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| continue | |
| tmp = sample[elem] | |
| if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': | |
| pad_value = (124, 116, 104) | |
| else: | |
| pad_value = (0) | |
| if elem == 'curr_img' or elem == 'curr_label': | |
| new_tmp = [] | |
| all_tmp = tmp | |
| for tmp in all_tmp: | |
| tmp = cv2.copyMakeBorder(tmp, | |
| top_pad, | |
| bottom_pad, | |
| left_pad, | |
| right_pad, | |
| cv2.BORDER_CONSTANT, | |
| value=pad_value) | |
| new_tmp.append(tmp) | |
| tmp = new_tmp | |
| else: | |
| tmp = cv2.copyMakeBorder(tmp, | |
| top_pad, | |
| bottom_pad, | |
| left_pad, | |
| right_pad, | |
| cv2.BORDER_CONSTANT, | |
| value=pad_value) | |
| sample[elem] = tmp | |
| return sample | |
| class BalancedRandomCrop(object): | |
| """Crop randomly the image in a sample. | |
| Args: | |
| output_size (tuple or int): Desired output size. If int, square crop | |
| is made. | |
| """ | |
| def __init__(self, | |
| output_size, | |
| max_step=5, | |
| max_obj_num=5, | |
| min_obj_pixel_num=100): | |
| assert isinstance(output_size, (int, tuple)) | |
| if isinstance(output_size, int): | |
| self.output_size = (output_size, output_size) | |
| else: | |
| assert len(output_size) == 2 | |
| self.output_size = output_size | |
| self.max_step = max_step | |
| self.max_obj_num = max_obj_num | |
| self.min_obj_pixel_num = min_obj_pixel_num | |
| def __call__(self, sample): | |
| image = sample['prev_img'] | |
| h, w = image.shape[:2] | |
| new_h, new_w = self.output_size | |
| new_h = h if new_h >= h else new_h | |
| new_w = w if new_w >= w else new_w | |
| ref_label = sample["ref_label"] | |
| prev_label = sample["prev_label"] | |
| curr_label = sample["curr_label"] | |
| is_contain_obj = False | |
| step = 0 | |
| while (not is_contain_obj) and (step < self.max_step): | |
| step += 1 | |
| top = np.random.randint(0, h - new_h + 1) | |
| left = np.random.randint(0, w - new_w + 1) | |
| after_crop = [] | |
| contains = [] | |
| for elem in ([ref_label, prev_label] + curr_label): | |
| tmp = elem[top:top + new_h, left:left + new_w] | |
| contains.append(np.unique(tmp)) | |
| after_crop.append(tmp) | |
| all_obj = list(np.sort(contains[0])) | |
| if all_obj[-1] == 0: | |
| continue | |
| # remove background | |
| if all_obj[0] == 0: | |
| all_obj = all_obj[1:] | |
| # remove small obj | |
| new_all_obj = [] | |
| for obj_id in all_obj: | |
| after_crop_pixels = np.sum(after_crop[0] == obj_id) | |
| if after_crop_pixels > self.min_obj_pixel_num: | |
| new_all_obj.append(obj_id) | |
| if len(new_all_obj) == 0: | |
| is_contain_obj = False | |
| else: | |
| is_contain_obj = True | |
| if len(new_all_obj) > self.max_obj_num: | |
| random.shuffle(new_all_obj) | |
| new_all_obj = new_all_obj[:self.max_obj_num] | |
| all_obj = [0] + new_all_obj | |
| post_process = [] | |
| for elem in after_crop: | |
| new_elem = elem * 0 | |
| for idx in range(len(all_obj)): | |
| obj_id = all_obj[idx] | |
| if obj_id == 0: | |
| continue | |
| mask = elem == obj_id | |
| new_elem += (mask * idx).astype(np.uint8) | |
| post_process.append(new_elem.astype(np.uint8)) | |
| sample["ref_label"] = post_process[0] | |
| sample["prev_label"] = post_process[1] | |
| curr_len = len(sample["curr_img"]) | |
| sample["curr_label"] = [] | |
| for idx in range(curr_len): | |
| sample["curr_label"].append(post_process[idx + 2]) | |
| for elem in sample.keys(): | |
| if 'meta' in elem or 'label' in elem: | |
| continue | |
| if elem == 'curr_img': | |
| new_tmp = [] | |
| for tmp_ in sample[elem]: | |
| tmp_ = tmp_[top:top + new_h, left:left + new_w] | |
| new_tmp.append(tmp_) | |
| sample[elem] = new_tmp | |
| else: | |
| tmp = sample[elem] | |
| tmp = tmp[top:top + new_h, left:left + new_w] | |
| sample[elem] = tmp | |
| obj_num = len(all_obj) - 1 | |
| sample['meta']['obj_num'] = obj_num | |
| return sample | |
| class RandomScale(object): | |
| """Randomly resize the image and the ground truth to specified scales. | |
| Args: | |
| scales (list): the list of scales | |
| """ | |
| def __init__(self, min_scale=1., max_scale=1.3, short_edge=None): | |
| self.min_scale = min_scale | |
| self.max_scale = max_scale | |
| self.short_edge = short_edge | |
| def __call__(self, sample): | |
| # Fixed range of scales | |
| sc = np.random.uniform(self.min_scale, self.max_scale) | |
| # Align short edge | |
| if self.short_edge is not None: | |
| image = sample['prev_img'] | |
| h, w = image.shape[:2] | |
| if h > w: | |
| sc *= float(self.short_edge) / w | |
| else: | |
| sc *= float(self.short_edge) / h | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| continue | |
| tmp = sample[elem] | |
| if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': | |
| flagval = cv2.INTER_CUBIC | |
| else: | |
| flagval = cv2.INTER_NEAREST | |
| if elem == 'curr_img' or elem == 'curr_label': | |
| new_tmp = [] | |
| for tmp_ in tmp: | |
| tmp_ = cv2.resize(tmp_, | |
| None, | |
| fx=sc, | |
| fy=sc, | |
| interpolation=flagval) | |
| new_tmp.append(tmp_) | |
| tmp = new_tmp | |
| else: | |
| tmp = cv2.resize(tmp, | |
| None, | |
| fx=sc, | |
| fy=sc, | |
| interpolation=flagval) | |
| sample[elem] = tmp | |
| return sample | |
| class RandomScaleV2(object): | |
| """Randomly resize the image and the ground truth to specified scales. | |
| Args: | |
| scales (list): the list of scales | |
| """ | |
| def __init__(self, | |
| min_scale=0.36, | |
| max_scale=1.0, | |
| short_edge=None, | |
| ratio=[3. / 4., 4. / 3.]): | |
| self.min_scale = min_scale | |
| self.max_scale = max_scale | |
| self.short_edge = short_edge | |
| self.ratio = ratio | |
| def __call__(self, sample): | |
| image = sample['prev_img'] | |
| h, w = image.shape[:2] | |
| new_h, new_w = self.get_params(h, w) | |
| sc_x = float(new_w) / w | |
| sc_y = float(new_h) / h | |
| # Align short edge | |
| if not (self.short_edge is None): | |
| if h > w: | |
| sc_x *= float(self.short_edge) / w | |
| sc_y *= float(self.short_edge) / w | |
| else: | |
| sc_x *= float(self.short_edge) / h | |
| sc_y *= float(self.short_edge) / h | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| continue | |
| tmp = sample[elem] | |
| if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': | |
| flagval = cv2.INTER_CUBIC | |
| else: | |
| flagval = cv2.INTER_NEAREST | |
| if elem == 'curr_img' or elem == 'curr_label': | |
| new_tmp = [] | |
| for tmp_ in tmp: | |
| tmp_ = cv2.resize(tmp_, | |
| None, | |
| fx=sc_x, | |
| fy=sc_y, | |
| interpolation=flagval) | |
| new_tmp.append(tmp_) | |
| tmp = new_tmp | |
| else: | |
| tmp = cv2.resize(tmp, | |
| None, | |
| fx=sc_x, | |
| fy=sc_y, | |
| interpolation=flagval) | |
| sample[elem] = tmp | |
| return sample | |
| def get_params(self, height, width): | |
| area = height * width | |
| log_ratio = [np.log(item) for item in self.ratio] | |
| for _ in range(10): | |
| target_area = area * np.random.uniform(self.min_scale**2, | |
| self.max_scale**2) | |
| aspect_ratio = np.exp(np.random.uniform(log_ratio[0], | |
| log_ratio[1])) | |
| w = int(round(np.sqrt(target_area * aspect_ratio))) | |
| h = int(round(np.sqrt(target_area / aspect_ratio))) | |
| if 0 < w <= width and 0 < h <= height: | |
| return h, w | |
| # Fallback to central crop | |
| in_ratio = float(width) / float(height) | |
| if in_ratio < min(self.ratio): | |
| w = width | |
| h = int(round(w / min(self.ratio))) | |
| elif in_ratio > max(self.ratio): | |
| h = height | |
| w = int(round(h * max(self.ratio))) | |
| else: # whole image | |
| w = width | |
| h = height | |
| return h, w | |
| class RestrictSize(object): | |
| """Randomly resize the image and the ground truth to specified scales. | |
| Args: | |
| scales (list): the list of scales | |
| """ | |
| def __init__(self, max_short_edge=None, max_long_edge=800 * 1.3): | |
| self.max_short_edge = max_short_edge | |
| self.max_long_edge = max_long_edge | |
| assert ((max_short_edge is None)) or ((max_long_edge is None)) | |
| def __call__(self, sample): | |
| # Fixed range of scales | |
| sc = None | |
| image = sample['ref_img'] | |
| h, w = image.shape[:2] | |
| # Align short edge | |
| if not (self.max_short_edge is None): | |
| if h > w: | |
| short_edge = w | |
| else: | |
| short_edge = h | |
| if short_edge < self.max_short_edge: | |
| sc = float(self.max_short_edge) / short_edge | |
| else: | |
| if h > w: | |
| long_edge = h | |
| else: | |
| long_edge = w | |
| if long_edge > self.max_long_edge: | |
| sc = float(self.max_long_edge) / long_edge | |
| if sc is None: | |
| new_h = h | |
| new_w = w | |
| else: | |
| new_h = int(sc * h) | |
| new_w = int(sc * w) | |
| new_h = new_h - (new_h - 1) % 4 | |
| new_w = new_w - (new_w - 1) % 4 | |
| if new_h == h and new_w == w: | |
| return sample | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| continue | |
| tmp = sample[elem] | |
| if 'label' in elem: | |
| flagval = cv2.INTER_NEAREST | |
| else: | |
| flagval = cv2.INTER_CUBIC | |
| tmp = cv2.resize(tmp, dsize=(new_w, new_h), interpolation=flagval) | |
| sample[elem] = tmp | |
| return sample | |
| class RandomHorizontalFlip(object): | |
| """Horizontally flip the given image and ground truth randomly with a probability of 0.5.""" | |
| def __init__(self, prob): | |
| self.p = prob | |
| def __call__(self, sample): | |
| if random.random() < self.p: | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| continue | |
| if elem == 'curr_img' or elem == 'curr_label': | |
| new_tmp = [] | |
| for tmp_ in sample[elem]: | |
| tmp_ = cv2.flip(tmp_, flipCode=1) | |
| new_tmp.append(tmp_) | |
| sample[elem] = new_tmp | |
| else: | |
| tmp = sample[elem] | |
| tmp = cv2.flip(tmp, flipCode=1) | |
| sample[elem] = tmp | |
| return sample | |
| class RandomVerticalFlip(object): | |
| """Vertically flip the given image and ground truth randomly with a probability of 0.5.""" | |
| def __init__(self, prob=0.3): | |
| self.p = prob | |
| def __call__(self, sample): | |
| if random.random() < self.p: | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| continue | |
| if elem == 'curr_img' or elem == 'curr_label': | |
| new_tmp = [] | |
| for tmp_ in sample[elem]: | |
| tmp_ = cv2.flip(tmp_, flipCode=0) | |
| new_tmp.append(tmp_) | |
| sample[elem] = new_tmp | |
| else: | |
| tmp = sample[elem] | |
| tmp = cv2.flip(tmp, flipCode=0) | |
| sample[elem] = tmp | |
| return sample | |
| class RandomGaussianBlur(object): | |
| def __init__(self, prob=0.3, sigma=[0.1, 2.]): | |
| self.aug = TF.RandomApply([IT.GaussianBlur(sigma)], p=prob) | |
| def __call__(self, sample): | |
| for elem in sample.keys(): | |
| if 'meta' in elem or 'label' in elem: | |
| continue | |
| if elem == 'curr_img': | |
| new_tmp = [] | |
| for tmp_ in sample[elem]: | |
| tmp_ = self.apply_augmentation(tmp_) | |
| new_tmp.append(tmp_) | |
| sample[elem] = new_tmp | |
| else: | |
| tmp = sample[elem] | |
| tmp = self.apply_augmentation(tmp) | |
| sample[elem] = tmp | |
| return sample | |
| def apply_augmentation(self, x): | |
| x = Image.fromarray(np.uint8(x)) | |
| x = self.aug(x) | |
| x = np.array(x, dtype=np.float32) | |
| return x | |
| class RandomGrayScale(RandomGaussianBlur): | |
| def __init__(self, prob=0.2): | |
| self.aug = TF.RandomGrayscale(p=prob) | |
| class RandomColorJitter(RandomGaussianBlur): | |
| def __init__(self, | |
| prob=0.8, | |
| brightness=0.4, | |
| contrast=0.4, | |
| saturation=0.2, | |
| hue=0.1): | |
| self.aug = TF.RandomApply( | |
| [TF.ColorJitter(brightness, contrast, saturation, hue)], p=prob) | |
| class SubtractMeanImage(object): | |
| def __init__(self, mean, change_channels=False): | |
| self.mean = mean | |
| self.change_channels = change_channels | |
| def __call__(self, sample): | |
| for elem in sample.keys(): | |
| if 'image' in elem: | |
| if self.change_channels: | |
| sample[elem] = sample[elem][:, :, [2, 1, 0]] | |
| sample[elem] = np.subtract( | |
| sample[elem], np.array(self.mean, dtype=np.float32)) | |
| return sample | |
| def __str__(self): | |
| return 'SubtractMeanImage' + str(self.mean) | |
| class ToTensor(object): | |
| """Convert ndarrays in sample to Tensors.""" | |
| def __call__(self, sample): | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| continue | |
| tmp = sample[elem] | |
| if elem == 'curr_img' or elem == 'curr_label': | |
| new_tmp = [] | |
| for tmp_ in tmp: | |
| if tmp_.ndim == 2: | |
| tmp_ = tmp_[:, :, np.newaxis] | |
| tmp_ = tmp_.transpose((2, 0, 1)) | |
| new_tmp.append(torch.from_numpy(tmp_).int()) | |
| else: | |
| tmp_ = tmp_ / 255. | |
| tmp_ -= (0.485, 0.456, 0.406) | |
| tmp_ /= (0.229, 0.224, 0.225) | |
| tmp_ = tmp_.transpose((2, 0, 1)) | |
| new_tmp.append(torch.from_numpy(tmp_)) | |
| tmp = new_tmp | |
| else: | |
| if tmp.ndim == 2: | |
| tmp = tmp[:, :, np.newaxis] | |
| tmp = tmp.transpose((2, 0, 1)) | |
| tmp = torch.from_numpy(tmp).int() | |
| else: | |
| tmp = tmp / 255. | |
| tmp -= (0.485, 0.456, 0.406) | |
| tmp /= (0.229, 0.224, 0.225) | |
| tmp = tmp.transpose((2, 0, 1)) | |
| tmp = torch.from_numpy(tmp) | |
| sample[elem] = tmp | |
| return sample | |
| class MultiRestrictSize(object): | |
| def __init__(self, | |
| max_short_edge=None, | |
| max_long_edge=800, | |
| flip=False, | |
| multi_scale=[1.3], | |
| align_corners=True, | |
| max_stride=16): | |
| self.max_short_edge = max_short_edge | |
| self.max_long_edge = max_long_edge | |
| self.multi_scale = multi_scale | |
| self.flip = flip | |
| self.align_corners = align_corners | |
| self.max_stride = max_stride | |
| def __call__(self, sample): | |
| samples = [] | |
| image = sample['current_img'] | |
| h, w = image.shape[:2] | |
| for scale in self.multi_scale: | |
| # restrict short edge | |
| sc = 1. | |
| if self.max_short_edge is not None: | |
| if h > w: | |
| short_edge = w | |
| else: | |
| short_edge = h | |
| if short_edge > self.max_short_edge: | |
| sc *= float(self.max_short_edge) / short_edge | |
| new_h, new_w = sc * h, sc * w | |
| # restrict long edge | |
| sc = 1. | |
| if self.max_long_edge is not None: | |
| if new_h > new_w: | |
| long_edge = new_h | |
| else: | |
| long_edge = new_w | |
| if long_edge > self.max_long_edge: | |
| sc *= float(self.max_long_edge) / long_edge | |
| new_h, new_w = sc * new_h, sc * new_w | |
| new_h = int(new_h * scale) | |
| new_w = int(new_w * scale) | |
| if self.align_corners: | |
| if (new_h - 1) % self.max_stride != 0: | |
| new_h = int( | |
| np.around((new_h - 1) / self.max_stride) * | |
| self.max_stride + 1) | |
| if (new_w - 1) % self.max_stride != 0: | |
| new_w = int( | |
| np.around((new_w - 1) / self.max_stride) * | |
| self.max_stride + 1) | |
| else: | |
| if new_h % self.max_stride != 0: | |
| new_h = int( | |
| np.around(new_h / self.max_stride) * self.max_stride) | |
| if new_w % self.max_stride != 0: | |
| new_w = int( | |
| np.around(new_w / self.max_stride) * self.max_stride) | |
| if new_h == h and new_w == w: | |
| samples.append(sample) | |
| else: | |
| new_sample = {} | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| new_sample[elem] = sample[elem] | |
| continue | |
| tmp = sample[elem] | |
| if 'label' in elem: | |
| new_sample[elem] = sample[elem] | |
| continue | |
| else: | |
| flagval = cv2.INTER_CUBIC | |
| tmp = cv2.resize(tmp, | |
| dsize=(new_w, new_h), | |
| interpolation=flagval) | |
| new_sample[elem] = tmp | |
| samples.append(new_sample) | |
| if self.flip: | |
| now_sample = samples[-1] | |
| new_sample = {} | |
| for elem in now_sample.keys(): | |
| if 'meta' in elem: | |
| new_sample[elem] = now_sample[elem].copy() | |
| new_sample[elem]['flip'] = True | |
| continue | |
| tmp = now_sample[elem] | |
| tmp = tmp[:, ::-1].copy() | |
| new_sample[elem] = tmp | |
| samples.append(new_sample) | |
| return samples | |
| class MultiToTensor(object): | |
| def __call__(self, samples): | |
| for idx in range(len(samples)): | |
| sample = samples[idx] | |
| for elem in sample.keys(): | |
| if 'meta' in elem: | |
| continue | |
| tmp = sample[elem] | |
| if tmp is None: | |
| continue | |
| if tmp.ndim == 2: | |
| tmp = tmp[:, :, np.newaxis] | |
| tmp = tmp.transpose((2, 0, 1)) | |
| samples[idx][elem] = torch.from_numpy(tmp).int() | |
| else: | |
| tmp = tmp / 255. | |
| tmp -= (0.485, 0.456, 0.406) | |
| tmp /= (0.229, 0.224, 0.225) | |
| tmp = tmp.transpose((2, 0, 1)) | |
| samples[idx][elem] = torch.from_numpy(tmp) | |
| return samples | |