Spaces:
Runtime error
Runtime error
| # Copyright (c) OpenMMLab. All rights reserved. | |
| import os | |
| import os.path as osp | |
| import sys | |
| import tempfile | |
| from contextlib import contextmanager | |
| from copy import deepcopy | |
| from pathlib import Path | |
| from unittest.mock import MagicMock, patch | |
| import pytest | |
| import mmcv | |
| from mmcv import BaseStorageBackend, FileClient | |
| from mmcv.utils import has_method | |
| sys.modules['ceph'] = MagicMock() | |
| sys.modules['petrel_client'] = MagicMock() | |
| sys.modules['petrel_client.client'] = MagicMock() | |
| sys.modules['mc'] = MagicMock() | |
| def build_temporary_directory(): | |
| """Build a temporary directory containing many files to test | |
| ``FileClient.list_dir_or_file``. | |
| . \n | |
| | -- dir1 \n | |
| | -- | -- text3.txt \n | |
| | -- dir2 \n | |
| | -- | -- dir3 \n | |
| | -- | -- | -- text4.txt \n | |
| | -- | -- img.jpg \n | |
| | -- text1.txt \n | |
| | -- text2.txt \n | |
| """ | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| text1 = Path(tmp_dir) / 'text1.txt' | |
| text1.open('w').write('text1') | |
| text2 = Path(tmp_dir) / 'text2.txt' | |
| text2.open('w').write('text2') | |
| dir1 = Path(tmp_dir) / 'dir1' | |
| dir1.mkdir() | |
| text3 = dir1 / 'text3.txt' | |
| text3.open('w').write('text3') | |
| dir2 = Path(tmp_dir) / 'dir2' | |
| dir2.mkdir() | |
| jpg1 = dir2 / 'img.jpg' | |
| jpg1.open('wb').write(b'img') | |
| dir3 = dir2 / 'dir3' | |
| dir3.mkdir() | |
| text4 = dir3 / 'text4.txt' | |
| text4.open('w').write('text4') | |
| yield tmp_dir | |
| def delete_and_reset_method(obj, method): | |
| method_obj = deepcopy(getattr(type(obj), method)) | |
| try: | |
| delattr(type(obj), method) | |
| yield | |
| finally: | |
| setattr(type(obj), method, method_obj) | |
| class MockS3Client: | |
| def __init__(self, enable_mc=True): | |
| self.enable_mc = enable_mc | |
| def Get(self, filepath): | |
| with open(filepath, 'rb') as f: | |
| content = f.read() | |
| return content | |
| class MockPetrelClient: | |
| def __init__(self, enable_mc=True, enable_multi_cluster=False): | |
| self.enable_mc = enable_mc | |
| self.enable_multi_cluster = enable_multi_cluster | |
| def Get(self, filepath): | |
| with open(filepath, 'rb') as f: | |
| content = f.read() | |
| return content | |
| def put(self): | |
| pass | |
| def delete(self): | |
| pass | |
| def contains(self): | |
| pass | |
| def isdir(self): | |
| pass | |
| def list(self, dir_path): | |
| for entry in os.scandir(dir_path): | |
| if not entry.name.startswith('.') and entry.is_file(): | |
| yield entry.name | |
| elif osp.isdir(entry.path): | |
| yield entry.name + '/' | |
| class MockMemcachedClient: | |
| def __init__(self, server_list_cfg, client_cfg): | |
| pass | |
| def Get(self, filepath, buffer): | |
| with open(filepath, 'rb') as f: | |
| buffer.content = f.read() | |
| class TestFileClient: | |
| def setup_class(cls): | |
| cls.test_data_dir = Path(__file__).parent / 'data' | |
| cls.img_path = cls.test_data_dir / 'color.jpg' | |
| cls.img_shape = (300, 400, 3) | |
| cls.text_path = cls.test_data_dir / 'filelist.txt' | |
| def test_error(self): | |
| with pytest.raises(ValueError): | |
| FileClient('hadoop') | |
| def test_disk_backend(self): | |
| disk_backend = FileClient('disk') | |
| # test `name` attribute | |
| assert disk_backend.name == 'HardDiskBackend' | |
| # test `allow_symlink` attribute | |
| assert disk_backend.allow_symlink | |
| # test `get` | |
| # input path is Path object | |
| img_bytes = disk_backend.get(self.img_path) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert self.img_path.open('rb').read() == img_bytes | |
| assert img.shape == self.img_shape | |
| # input path is str | |
| img_bytes = disk_backend.get(str(self.img_path)) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert self.img_path.open('rb').read() == img_bytes | |
| assert img.shape == self.img_shape | |
| # test `get_text` | |
| # input path is Path object | |
| value_buf = disk_backend.get_text(self.text_path) | |
| assert self.text_path.open('r').read() == value_buf | |
| # input path is str | |
| value_buf = disk_backend.get_text(str(self.text_path)) | |
| assert self.text_path.open('r').read() == value_buf | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| # test `put` | |
| filepath1 = Path(tmp_dir) / 'test.jpg' | |
| disk_backend.put(b'disk', filepath1) | |
| assert filepath1.open('rb').read() == b'disk' | |
| # test the `mkdir_or_exist` behavior in `put` | |
| _filepath1 = Path(tmp_dir) / 'not_existed_dir1' / 'test.jpg' | |
| disk_backend.put(b'disk', _filepath1) | |
| assert _filepath1.open('rb').read() == b'disk' | |
| # test `put_text` | |
| filepath2 = Path(tmp_dir) / 'test.txt' | |
| disk_backend.put_text('disk', filepath2) | |
| assert filepath2.open('r').read() == 'disk' | |
| # test the `mkdir_or_exist` behavior in `put_text` | |
| _filepath2 = Path(tmp_dir) / 'not_existed_dir2' / 'test.txt' | |
| disk_backend.put_text('disk', _filepath2) | |
| assert _filepath2.open('r').read() == 'disk' | |
| # test `isfile` | |
| assert disk_backend.isfile(filepath2) | |
| assert not disk_backend.isfile(Path(tmp_dir) / 'not/existed/path') | |
| # test `remove` | |
| disk_backend.remove(filepath2) | |
| # test `exists` | |
| assert not disk_backend.exists(filepath2) | |
| # test `get_local_path` | |
| # if the backend is disk, `get_local_path` just return the input | |
| with disk_backend.get_local_path(filepath1) as path: | |
| assert str(filepath1) == path | |
| assert osp.isfile(filepath1) | |
| # test `join_path` | |
| disk_dir = '/path/of/your/directory' | |
| assert disk_backend.join_path(disk_dir, 'file') == \ | |
| osp.join(disk_dir, 'file') | |
| assert disk_backend.join_path(disk_dir, 'dir', 'file') == \ | |
| osp.join(disk_dir, 'dir', 'file') | |
| # test `list_dir_or_file` | |
| with build_temporary_directory() as tmp_dir: | |
| # 1. list directories and files | |
| assert set(disk_backend.list_dir_or_file(tmp_dir)) == { | |
| 'dir1', 'dir2', 'text1.txt', 'text2.txt' | |
| } | |
| # 2. list directories and files recursively | |
| assert set(disk_backend.list_dir_or_file( | |
| tmp_dir, recursive=True)) == { | |
| 'dir1', | |
| osp.join('dir1', 'text3.txt'), 'dir2', | |
| osp.join('dir2', 'dir3'), | |
| osp.join('dir2', 'dir3', 'text4.txt'), | |
| osp.join('dir2', 'img.jpg'), 'text1.txt', 'text2.txt' | |
| } | |
| # 3. only list directories | |
| assert set( | |
| disk_backend.list_dir_or_file( | |
| tmp_dir, list_file=False)) == {'dir1', 'dir2'} | |
| with pytest.raises( | |
| TypeError, | |
| match='`suffix` should be None when `list_dir` is True'): | |
| # Exception is raised among the `list_dir_or_file` of client, | |
| # so we need to invode the client to trigger the exception | |
| disk_backend.client.list_dir_or_file( | |
| tmp_dir, list_file=False, suffix='.txt') | |
| # 4. only list directories recursively | |
| assert set( | |
| disk_backend.list_dir_or_file( | |
| tmp_dir, list_file=False, recursive=True)) == { | |
| 'dir1', 'dir2', | |
| osp.join('dir2', 'dir3') | |
| } | |
| # 5. only list files | |
| assert set(disk_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False)) == {'text1.txt', 'text2.txt'} | |
| # 6. only list files recursively | |
| assert set( | |
| disk_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False, recursive=True)) == { | |
| osp.join('dir1', 'text3.txt'), | |
| osp.join('dir2', 'dir3', 'text4.txt'), | |
| osp.join('dir2', 'img.jpg'), 'text1.txt', 'text2.txt' | |
| } | |
| # 7. only list files ending with suffix | |
| assert set( | |
| disk_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False, | |
| suffix='.txt')) == {'text1.txt', 'text2.txt'} | |
| assert set( | |
| disk_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False, | |
| suffix=('.txt', '.jpg'))) == {'text1.txt', 'text2.txt'} | |
| with pytest.raises( | |
| TypeError, | |
| match='`suffix` must be a string or tuple of strings'): | |
| disk_backend.client.list_dir_or_file( | |
| tmp_dir, list_dir=False, suffix=['.txt', '.jpg']) | |
| # 8. only list files ending with suffix recursively | |
| assert set( | |
| disk_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False, suffix='.txt', | |
| recursive=True)) == { | |
| osp.join('dir1', 'text3.txt'), | |
| osp.join('dir2', 'dir3', 'text4.txt'), 'text1.txt', | |
| 'text2.txt' | |
| } | |
| # 7. only list files ending with suffix | |
| assert set( | |
| disk_backend.list_dir_or_file( | |
| tmp_dir, | |
| list_dir=False, | |
| suffix=('.txt', '.jpg'), | |
| recursive=True)) == { | |
| osp.join('dir1', 'text3.txt'), | |
| osp.join('dir2', 'dir3', 'text4.txt'), | |
| osp.join('dir2', 'img.jpg'), 'text1.txt', 'text2.txt' | |
| } | |
| def test_ceph_backend(self): | |
| ceph_backend = FileClient('ceph') | |
| # test `allow_symlink` attribute | |
| assert not ceph_backend.allow_symlink | |
| # input path is Path object | |
| with pytest.raises(NotImplementedError): | |
| ceph_backend.get_text(self.text_path) | |
| # input path is str | |
| with pytest.raises(NotImplementedError): | |
| ceph_backend.get_text(str(self.text_path)) | |
| # input path is Path object | |
| img_bytes = ceph_backend.get(self.img_path) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == self.img_shape | |
| # input path is str | |
| img_bytes = ceph_backend.get(str(self.img_path)) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == self.img_shape | |
| # `path_mapping` is either None or dict | |
| with pytest.raises(AssertionError): | |
| FileClient('ceph', path_mapping=1) | |
| # test `path_mapping` | |
| ceph_path = 's3://user/data' | |
| ceph_backend = FileClient( | |
| 'ceph', path_mapping={str(self.test_data_dir): ceph_path}) | |
| ceph_backend.client._client.Get = MagicMock( | |
| return_value=ceph_backend.client._client.Get(self.img_path)) | |
| img_bytes = ceph_backend.get(self.img_path) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == self.img_shape | |
| ceph_backend.client._client.Get.assert_called_with( | |
| str(self.img_path).replace(str(self.test_data_dir), ceph_path)) | |
| def test_petrel_backend(self, backend, prefix): | |
| petrel_backend = FileClient(backend=backend, prefix=prefix) | |
| # test `allow_symlink` attribute | |
| assert not petrel_backend.allow_symlink | |
| # input path is Path object | |
| img_bytes = petrel_backend.get(self.img_path) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == self.img_shape | |
| # input path is str | |
| img_bytes = petrel_backend.get(str(self.img_path)) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == self.img_shape | |
| # `path_mapping` is either None or dict | |
| with pytest.raises(AssertionError): | |
| FileClient('petrel', path_mapping=1) | |
| # test `_map_path` | |
| petrel_dir = 's3://user/data' | |
| petrel_backend = FileClient( | |
| 'petrel', path_mapping={str(self.test_data_dir): petrel_dir}) | |
| assert petrel_backend.client._map_path(str(self.img_path)) == \ | |
| str(self.img_path).replace(str(self.test_data_dir), petrel_dir) | |
| petrel_path = f'{petrel_dir}/test.jpg' | |
| petrel_backend = FileClient('petrel') | |
| # test `_format_path` | |
| assert petrel_backend.client._format_path('s3://user\\data\\test.jpg')\ | |
| == petrel_path | |
| # test `get` | |
| with patch.object( | |
| petrel_backend.client._client, 'Get', | |
| return_value=b'petrel') as mock_get: | |
| assert petrel_backend.get(petrel_path) == b'petrel' | |
| mock_get.assert_called_once_with(petrel_path) | |
| # test `get_text` | |
| with patch.object( | |
| petrel_backend.client._client, 'Get', | |
| return_value=b'petrel') as mock_get: | |
| assert petrel_backend.get_text(petrel_path) == 'petrel' | |
| mock_get.assert_called_once_with(petrel_path) | |
| # test `put` | |
| with patch.object(petrel_backend.client._client, 'put') as mock_put: | |
| petrel_backend.put(b'petrel', petrel_path) | |
| mock_put.assert_called_once_with(petrel_path, b'petrel') | |
| # test `put_text` | |
| with patch.object(petrel_backend.client._client, 'put') as mock_put: | |
| petrel_backend.put_text('petrel', petrel_path) | |
| mock_put.assert_called_once_with(petrel_path, b'petrel') | |
| # test `remove` | |
| assert has_method(petrel_backend.client._client, 'delete') | |
| # raise Exception if `delete` is not implemented | |
| with delete_and_reset_method(petrel_backend.client._client, 'delete'): | |
| assert not has_method(petrel_backend.client._client, 'delete') | |
| with pytest.raises(NotImplementedError): | |
| petrel_backend.remove(petrel_path) | |
| with patch.object(petrel_backend.client._client, | |
| 'delete') as mock_delete: | |
| petrel_backend.remove(petrel_path) | |
| mock_delete.assert_called_once_with(petrel_path) | |
| # test `exists` | |
| assert has_method(petrel_backend.client._client, 'contains') | |
| assert has_method(petrel_backend.client._client, 'isdir') | |
| # raise Exception if `delete` is not implemented | |
| with delete_and_reset_method(petrel_backend.client._client, | |
| 'contains'), delete_and_reset_method( | |
| petrel_backend.client._client, | |
| 'isdir'): | |
| assert not has_method(petrel_backend.client._client, 'contains') | |
| assert not has_method(petrel_backend.client._client, 'isdir') | |
| with pytest.raises(NotImplementedError): | |
| petrel_backend.exists(petrel_path) | |
| with patch.object( | |
| petrel_backend.client._client, 'contains', | |
| return_value=True) as mock_contains: | |
| assert petrel_backend.exists(petrel_path) | |
| mock_contains.assert_called_once_with(petrel_path) | |
| # test `isdir` | |
| assert has_method(petrel_backend.client._client, 'isdir') | |
| with delete_and_reset_method(petrel_backend.client._client, 'isdir'): | |
| assert not has_method(petrel_backend.client._client, 'isdir') | |
| with pytest.raises(NotImplementedError): | |
| petrel_backend.isdir(petrel_path) | |
| with patch.object( | |
| petrel_backend.client._client, 'isdir', | |
| return_value=True) as mock_isdir: | |
| assert petrel_backend.isdir(petrel_dir) | |
| mock_isdir.assert_called_once_with(petrel_dir) | |
| # test `isfile` | |
| assert has_method(petrel_backend.client._client, 'contains') | |
| with delete_and_reset_method(petrel_backend.client._client, | |
| 'contains'): | |
| assert not has_method(petrel_backend.client._client, 'contains') | |
| with pytest.raises(NotImplementedError): | |
| petrel_backend.isfile(petrel_path) | |
| with patch.object( | |
| petrel_backend.client._client, 'contains', | |
| return_value=True) as mock_contains: | |
| assert petrel_backend.isfile(petrel_path) | |
| mock_contains.assert_called_once_with(petrel_path) | |
| # test `join_path` | |
| assert petrel_backend.join_path(petrel_dir, 'file') == \ | |
| f'{petrel_dir}/file' | |
| assert petrel_backend.join_path(f'{petrel_dir}/', 'file') == \ | |
| f'{petrel_dir}/file' | |
| assert petrel_backend.join_path(petrel_dir, 'dir', 'file') == \ | |
| f'{petrel_dir}/dir/file' | |
| # test `get_local_path` | |
| with patch.object(petrel_backend.client._client, 'Get', | |
| return_value=b'petrel') as mock_get, \ | |
| patch.object(petrel_backend.client._client, 'contains', | |
| return_value=True) as mock_contains: | |
| with petrel_backend.get_local_path(petrel_path) as path: | |
| assert Path(path).open('rb').read() == b'petrel' | |
| # exist the with block and path will be released | |
| assert not osp.isfile(path) | |
| mock_get.assert_called_once_with(petrel_path) | |
| mock_contains.assert_called_once_with(petrel_path) | |
| # test `list_dir_or_file` | |
| assert has_method(petrel_backend.client._client, 'list') | |
| with delete_and_reset_method(petrel_backend.client._client, 'list'): | |
| assert not has_method(petrel_backend.client._client, 'list') | |
| with pytest.raises(NotImplementedError): | |
| list(petrel_backend.list_dir_or_file(petrel_dir)) | |
| with build_temporary_directory() as tmp_dir: | |
| # 1. list directories and files | |
| assert set(petrel_backend.list_dir_or_file(tmp_dir)) == { | |
| 'dir1', 'dir2', 'text1.txt', 'text2.txt' | |
| } | |
| # 2. list directories and files recursively | |
| assert set( | |
| petrel_backend.list_dir_or_file(tmp_dir, recursive=True)) == { | |
| 'dir1', '/'.join(('dir1', 'text3.txt')), 'dir2', '/'.join( | |
| ('dir2', 'dir3')), '/'.join( | |
| ('dir2', 'dir3', 'text4.txt')), '/'.join( | |
| ('dir2', 'img.jpg')), 'text1.txt', 'text2.txt' | |
| } | |
| # 3. only list directories | |
| assert set( | |
| petrel_backend.list_dir_or_file( | |
| tmp_dir, list_file=False)) == {'dir1', 'dir2'} | |
| with pytest.raises( | |
| TypeError, | |
| match=('`list_dir` should be False when `suffix` is not ' | |
| 'None')): | |
| # Exception is raised among the `list_dir_or_file` of client, | |
| # so we need to invode the client to trigger the exception | |
| petrel_backend.client.list_dir_or_file( | |
| tmp_dir, list_file=False, suffix='.txt') | |
| # 4. only list directories recursively | |
| assert set( | |
| petrel_backend.list_dir_or_file( | |
| tmp_dir, list_file=False, recursive=True)) == { | |
| 'dir1', 'dir2', '/'.join(('dir2', 'dir3')) | |
| } | |
| # 5. only list files | |
| assert set( | |
| petrel_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False)) == {'text1.txt', 'text2.txt'} | |
| # 6. only list files recursively | |
| assert set( | |
| petrel_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False, recursive=True)) == { | |
| '/'.join(('dir1', 'text3.txt')), '/'.join( | |
| ('dir2', 'dir3', 'text4.txt')), '/'.join( | |
| ('dir2', 'img.jpg')), 'text1.txt', 'text2.txt' | |
| } | |
| # 7. only list files ending with suffix | |
| assert set( | |
| petrel_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False, | |
| suffix='.txt')) == {'text1.txt', 'text2.txt'} | |
| assert set( | |
| petrel_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False, | |
| suffix=('.txt', '.jpg'))) == {'text1.txt', 'text2.txt'} | |
| with pytest.raises( | |
| TypeError, | |
| match='`suffix` must be a string or tuple of strings'): | |
| petrel_backend.client.list_dir_or_file( | |
| tmp_dir, list_dir=False, suffix=['.txt', '.jpg']) | |
| # 8. only list files ending with suffix recursively | |
| assert set( | |
| petrel_backend.list_dir_or_file( | |
| tmp_dir, list_dir=False, suffix='.txt', | |
| recursive=True)) == { | |
| '/'.join(('dir1', 'text3.txt')), '/'.join( | |
| ('dir2', 'dir3', 'text4.txt')), 'text1.txt', | |
| 'text2.txt' | |
| } | |
| # 7. only list files ending with suffix | |
| assert set( | |
| petrel_backend.list_dir_or_file( | |
| tmp_dir, | |
| list_dir=False, | |
| suffix=('.txt', '.jpg'), | |
| recursive=True)) == { | |
| '/'.join(('dir1', 'text3.txt')), '/'.join( | |
| ('dir2', 'dir3', 'text4.txt')), '/'.join( | |
| ('dir2', 'img.jpg')), 'text1.txt', 'text2.txt' | |
| } | |
| def test_memcached_backend(self): | |
| mc_cfg = dict(server_list_cfg='', client_cfg='', sys_path=None) | |
| mc_backend = FileClient('memcached', **mc_cfg) | |
| # test `allow_symlink` attribute | |
| assert not mc_backend.allow_symlink | |
| # input path is Path object | |
| with pytest.raises(NotImplementedError): | |
| mc_backend.get_text(self.text_path) | |
| # input path is str | |
| with pytest.raises(NotImplementedError): | |
| mc_backend.get_text(str(self.text_path)) | |
| # input path is Path object | |
| img_bytes = mc_backend.get(self.img_path) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == self.img_shape | |
| # input path is str | |
| img_bytes = mc_backend.get(str(self.img_path)) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == self.img_shape | |
| def test_lmdb_backend(self): | |
| lmdb_path = self.test_data_dir / 'demo.lmdb' | |
| # db_path is Path object | |
| lmdb_backend = FileClient('lmdb', db_path=lmdb_path) | |
| # test `allow_symlink` attribute | |
| assert not lmdb_backend.allow_symlink | |
| with pytest.raises(NotImplementedError): | |
| lmdb_backend.get_text(self.text_path) | |
| img_bytes = lmdb_backend.get('baboon') | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == (120, 125, 3) | |
| # db_path is str | |
| lmdb_backend = FileClient('lmdb', db_path=str(lmdb_path)) | |
| with pytest.raises(NotImplementedError): | |
| lmdb_backend.get_text(str(self.text_path)) | |
| img_bytes = lmdb_backend.get('baboon') | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == (120, 125, 3) | |
| def test_http_backend(self, backend, prefix): | |
| http_backend = FileClient(backend=backend, prefix=prefix) | |
| img_url = 'https://raw.githubusercontent.com/open-mmlab/mmcv/' \ | |
| 'master/tests/data/color.jpg' | |
| text_url = 'https://raw.githubusercontent.com/open-mmlab/mmcv/' \ | |
| 'master/tests/data/filelist.txt' | |
| # test `allow_symlink` attribute | |
| assert not http_backend.allow_symlink | |
| # input is path or Path object | |
| with pytest.raises(Exception): | |
| http_backend.get(self.img_path) | |
| with pytest.raises(Exception): | |
| http_backend.get(str(self.img_path)) | |
| with pytest.raises(Exception): | |
| http_backend.get_text(self.text_path) | |
| with pytest.raises(Exception): | |
| http_backend.get_text(str(self.text_path)) | |
| # input url is http image | |
| img_bytes = http_backend.get(img_url) | |
| img = mmcv.imfrombytes(img_bytes) | |
| assert img.shape == self.img_shape | |
| # input url is http text | |
| value_buf = http_backend.get_text(text_url) | |
| assert self.text_path.open('r').read() == value_buf | |
| # test `_get_local_path` | |
| # exist the with block and path will be released | |
| with http_backend.get_local_path(img_url) as path: | |
| assert mmcv.imread(path).shape == self.img_shape | |
| assert not osp.isfile(path) | |
| def test_new_magic_method(self): | |
| class DummyBackend1(BaseStorageBackend): | |
| def get(self, filepath): | |
| return filepath | |
| def get_text(self, filepath, encoding='utf-8'): | |
| return filepath | |
| FileClient.register_backend('dummy_backend', DummyBackend1) | |
| client1 = FileClient(backend='dummy_backend') | |
| client2 = FileClient(backend='dummy_backend') | |
| assert client1 is client2 | |
| # if a backend is overwrote, it will disable the singleton pattern for | |
| # the backend | |
| class DummyBackend2(BaseStorageBackend): | |
| def get(self, filepath): | |
| pass | |
| def get_text(self, filepath): | |
| pass | |
| FileClient.register_backend('dummy_backend', DummyBackend2, force=True) | |
| client3 = FileClient(backend='dummy_backend') | |
| client4 = FileClient(backend='dummy_backend') | |
| assert client2 is not client3 | |
| assert client3 is client4 | |
| def test_parse_uri_prefix(self): | |
| # input path is None | |
| with pytest.raises(AssertionError): | |
| FileClient.parse_uri_prefix(None) | |
| # input path is list | |
| with pytest.raises(AssertionError): | |
| FileClient.parse_uri_prefix([]) | |
| # input path is Path object | |
| assert FileClient.parse_uri_prefix(self.img_path) is None | |
| # input path is str | |
| assert FileClient.parse_uri_prefix(str(self.img_path)) is None | |
| # input path starts with https | |
| img_url = 'https://raw.githubusercontent.com/open-mmlab/mmcv/' \ | |
| 'master/tests/data/color.jpg' | |
| assert FileClient.parse_uri_prefix(img_url) == 'https' | |
| # input path starts with s3 | |
| img_url = 's3://your_bucket/img.png' | |
| assert FileClient.parse_uri_prefix(img_url) == 's3' | |
| # input path starts with clusterName:s3 | |
| img_url = 'clusterName:s3://your_bucket/img.png' | |
| assert FileClient.parse_uri_prefix(img_url) == 's3' | |
| def test_infer_client(self): | |
| # HardDiskBackend | |
| file_client_args = {'backend': 'disk'} | |
| client = FileClient.infer_client(file_client_args) | |
| assert client.name == 'HardDiskBackend' | |
| client = FileClient.infer_client(uri=self.img_path) | |
| assert client.name == 'HardDiskBackend' | |
| # PetrelBackend | |
| file_client_args = {'backend': 'petrel'} | |
| client = FileClient.infer_client(file_client_args) | |
| assert client.name == 'PetrelBackend' | |
| uri = 's3://user_data' | |
| client = FileClient.infer_client(uri=uri) | |
| assert client.name == 'PetrelBackend' | |
| def test_register_backend(self): | |
| # name must be a string | |
| with pytest.raises(TypeError): | |
| class TestClass1: | |
| pass | |
| FileClient.register_backend(1, TestClass1) | |
| # module must be a class | |
| with pytest.raises(TypeError): | |
| FileClient.register_backend('int', 0) | |
| # module must be a subclass of BaseStorageBackend | |
| with pytest.raises(TypeError): | |
| class TestClass1: | |
| pass | |
| FileClient.register_backend('TestClass1', TestClass1) | |
| class ExampleBackend(BaseStorageBackend): | |
| def get(self, filepath): | |
| return filepath | |
| def get_text(self, filepath, encoding='utf-8'): | |
| return filepath | |
| FileClient.register_backend('example', ExampleBackend) | |
| example_backend = FileClient('example') | |
| assert example_backend.get(self.img_path) == self.img_path | |
| assert example_backend.get_text(self.text_path) == self.text_path | |
| assert 'example' in FileClient._backends | |
| class Example2Backend(BaseStorageBackend): | |
| def get(self, filepath): | |
| return b'bytes2' | |
| def get_text(self, filepath, encoding='utf-8'): | |
| return 'text2' | |
| # force=False | |
| with pytest.raises(KeyError): | |
| FileClient.register_backend('example', Example2Backend) | |
| FileClient.register_backend('example', Example2Backend, force=True) | |
| example_backend = FileClient('example') | |
| assert example_backend.get(self.img_path) == b'bytes2' | |
| assert example_backend.get_text(self.text_path) == 'text2' | |
| class Example3Backend(BaseStorageBackend): | |
| def get(self, filepath): | |
| return b'bytes3' | |
| def get_text(self, filepath, encoding='utf-8'): | |
| return 'text3' | |
| example_backend = FileClient('example3') | |
| assert example_backend.get(self.img_path) == b'bytes3' | |
| assert example_backend.get_text(self.text_path) == 'text3' | |
| assert 'example3' in FileClient._backends | |
| # force=False | |
| with pytest.raises(KeyError): | |
| class Example4Backend(BaseStorageBackend): | |
| def get(self, filepath): | |
| return b'bytes4' | |
| def get_text(self, filepath, encoding='utf-8'): | |
| return 'text4' | |
| class Example5Backend(BaseStorageBackend): | |
| def get(self, filepath): | |
| return b'bytes5' | |
| def get_text(self, filepath, encoding='utf-8'): | |
| return 'text5' | |
| example_backend = FileClient('example3') | |
| assert example_backend.get(self.img_path) == b'bytes5' | |
| assert example_backend.get_text(self.text_path) == 'text5' | |
| # prefixes is a str | |
| class Example6Backend(BaseStorageBackend): | |
| def get(self, filepath): | |
| return b'bytes6' | |
| def get_text(self, filepath, encoding='utf-8'): | |
| return 'text6' | |
| FileClient.register_backend( | |
| 'example4', | |
| Example6Backend, | |
| force=True, | |
| prefixes='example4_prefix') | |
| example_backend = FileClient('example4') | |
| assert example_backend.get(self.img_path) == b'bytes6' | |
| assert example_backend.get_text(self.text_path) == 'text6' | |
| example_backend = FileClient(prefix='example4_prefix') | |
| assert example_backend.get(self.img_path) == b'bytes6' | |
| assert example_backend.get_text(self.text_path) == 'text6' | |
| example_backend = FileClient('example4', prefix='example4_prefix') | |
| assert example_backend.get(self.img_path) == b'bytes6' | |
| assert example_backend.get_text(self.text_path) == 'text6' | |
| # prefixes is a list of str | |
| class Example7Backend(BaseStorageBackend): | |
| def get(self, filepath): | |
| return b'bytes7' | |
| def get_text(self, filepath, encoding='utf-8'): | |
| return 'text7' | |
| FileClient.register_backend( | |
| 'example5', | |
| Example7Backend, | |
| force=True, | |
| prefixes=['example5_prefix1', 'example5_prefix2']) | |
| example_backend = FileClient('example5') | |
| assert example_backend.get(self.img_path) == b'bytes7' | |
| assert example_backend.get_text(self.text_path) == 'text7' | |
| example_backend = FileClient(prefix='example5_prefix1') | |
| assert example_backend.get(self.img_path) == b'bytes7' | |
| assert example_backend.get_text(self.text_path) == 'text7' | |
| example_backend = FileClient(prefix='example5_prefix2') | |
| assert example_backend.get(self.img_path) == b'bytes7' | |
| assert example_backend.get_text(self.text_path) == 'text7' | |
| # backend has a higher priority than prefixes | |
| class Example8Backend(BaseStorageBackend): | |
| def get(self, filepath): | |
| return b'bytes8' | |
| def get_text(self, filepath, encoding='utf-8'): | |
| return 'text8' | |
| FileClient.register_backend( | |
| 'example6', | |
| Example8Backend, | |
| force=True, | |
| prefixes='example6_prefix') | |
| example_backend = FileClient('example6') | |
| assert example_backend.get(self.img_path) == b'bytes8' | |
| assert example_backend.get_text(self.text_path) == 'text8' | |
| example_backend = FileClient('example6', prefix='example4_prefix') | |
| assert example_backend.get(self.img_path) == b'bytes8' | |
| assert example_backend.get_text(self.text_path) == 'text8' | |