# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import with_statement import os import sys import platform import shutil import unittest import tempfile import mock from libcloud.common.types import LibcloudError from libcloud.storage.base import Container from libcloud.storage.base import Object from libcloud.storage.types import ContainerDoesNotExistError from libcloud.storage.types import ContainerAlreadyExistsError from libcloud.storage.types import ContainerIsNotEmptyError from libcloud.storage.types import InvalidContainerNameError from libcloud.utils.files import exhaust_iterator try: from libcloud.storage.drivers.local import LocalStorageDriver from libcloud.storage.drivers.local import LockLocalStorage from lockfile import LockTimeout except ImportError: print('lockfile library is not available, skipping local_storage tests...') LocalStorageDriver = None LockTimeout = None class LocalTests(unittest.TestCase): driver_type = LocalStorageDriver @classmethod def create_driver(self): self.key = tempfile.mkdtemp() return self.driver_type(self.key, None) def setUp(self): self.driver = self.create_driver() def tearDown(self): shutil.rmtree(self.key) self.key = None def make_tmp_file(self, content=None): if not content: content = b'blah' * 1024 _, tmppath = tempfile.mkstemp() with open(tmppath, 'wb') as fp: fp.write(content) return tmppath def remove_tmp_file(self, tmppath): try: os.unlink(tmppath) except Exception as e: msg = str(e) if 'being used by another process' in msg and platform.system().lower() == 'windows': return raise e def test_list_containers_empty(self): containers = self.driver.list_containers() self.assertEqual(len(containers), 0) def test_containers_success(self): self.driver.create_container('test1') self.driver.create_container('test2') containers = self.driver.list_containers() self.assertEqual(len(containers), 2) container = containers[1] self.assertTrue('creation_time' in container.extra) self.assertTrue('modify_time' in container.extra) self.assertTrue('access_time' in container.extra) objects = self.driver.list_container_objects(container=container) self.assertEqual(len(objects), 0) objects = container.list_objects() self.assertEqual(len(objects), 0) for container in containers: self.driver.delete_container(container) def test_objects_success(self): tmppath = self.make_tmp_file() container = self.driver.create_container('test3') obj1 = container.upload_object(tmppath, 'object1') obj2 = container.upload_object(tmppath, 'path/object2') obj3 = container.upload_object(tmppath, 'path/to/object3') obj4 = container.upload_object(tmppath, 'path/to/object4.ext') with open(tmppath, 'rb') as tmpfile: obj5 = container.upload_object_via_stream(tmpfile, 'object5') objects = self.driver.list_container_objects(container=container) self.assertEqual(len(objects), 5) prefix = os.path.join('path', 'to') objects = self.driver.list_container_objects(container=container, prefix=prefix) self.assertEqual(len(objects), 2) for obj in objects: self.assertNotEqual(obj.hash, None) self.assertEqual(obj.size, 4096) self.assertEqual(obj.container.name, 'test3') self.assertTrue('creation_time' in obj.extra) self.assertTrue('modify_time' in obj.extra) self.assertTrue('access_time' in obj.extra) obj1.delete() obj2.delete() objects = container.list_objects() self.assertEqual(len(objects), 3) container.delete_object(obj3) container.delete_object(obj4) container.delete_object(obj5) objects = container.list_objects() self.assertEqual(len(objects), 0) container.delete() self.remove_tmp_file(tmppath) def test_get_container_doesnt_exist(self): try: self.driver.get_container(container_name='container1') except ContainerDoesNotExistError: pass else: self.fail('Exception was not thrown') def test_get_container_success(self): self.driver.create_container('test4') container = self.driver.get_container(container_name='test4') self.assertTrue(container.name, 'test4') container.delete() def test_get_object_container_doesnt_exist(self): try: self.driver.get_object(container_name='test-inexistent', object_name='test') except ContainerDoesNotExistError: pass else: self.fail('Exception was not thrown') def test_get_object_success(self): tmppath = self.make_tmp_file() container = self.driver.create_container('test5') container.upload_object(tmppath, 'test') obj = self.driver.get_object(container_name='test5', object_name='test') self.assertEqual(obj.name, 'test') self.assertEqual(obj.container.name, 'test5') self.assertEqual(obj.size, 4096) self.assertNotEqual(obj.hash, None) self.assertTrue('creation_time' in obj.extra) self.assertTrue('modify_time' in obj.extra) self.assertTrue('access_time' in obj.extra) obj.delete() container.delete() self.remove_tmp_file(tmppath) def test_create_container_invalid_name(self): try: self.driver.create_container(container_name='new/container') except InvalidContainerNameError: pass else: self.fail('Exception was not thrown') def test_create_container_already_exists(self): container = self.driver.create_container( container_name='new-container') try: self.driver.create_container(container_name='new-container') except ContainerAlreadyExistsError: pass else: self.fail('Exception was not thrown') # success self.driver.delete_container(container) def test_create_container_success(self): name = 'new_container' container = self.driver.create_container(container_name=name) self.assertEqual(container.name, name) self.driver.delete_container(container) def test_delete_container_doesnt_exist(self): container = Container(name='new_container', extra=None, driver=self.driver) try: self.driver.delete_container(container=container) except ContainerDoesNotExistError: pass else: self.fail('Exception was not thrown') def test_delete_container_not_empty(self): tmppath = self.make_tmp_file() container = self.driver.create_container('test6') obj = container.upload_object(tmppath, 'test') try: self.driver.delete_container(container=container) except ContainerIsNotEmptyError: pass else: self.fail('Exception was not thrown') # success obj.delete() self.remove_tmp_file(tmppath) self.assertTrue(self.driver.delete_container(container=container)) def test_delete_container_not_found(self): container = Container(name='foo_bar_container', extra={}, driver=self.driver) try: self.driver.delete_container(container=container) except ContainerDoesNotExistError: pass else: self.fail('Container does not exist but an exception was not' + 'thrown') def test_delete_container_success(self): container = self.driver.create_container('test7') self.assertTrue(self.driver.delete_container(container=container)) def test_download_object_success(self): tmppath = self.make_tmp_file() container = self.driver.create_container('test6') obj = container.upload_object(tmppath, 'test') destination_path = tmppath + '.temp' result = self.driver.download_object(obj=obj, destination_path=destination_path, overwrite_existing=False, delete_on_failure=True) self.assertTrue(result) obj.delete() container.delete() self.remove_tmp_file(tmppath) os.unlink(destination_path) def test_download_object_and_overwrite(self): tmppath = self.make_tmp_file() container = self.driver.create_container('test6') obj = container.upload_object(tmppath, 'test') destination_path = tmppath + '.temp' result = self.driver.download_object(obj=obj, destination_path=destination_path, overwrite_existing=False, delete_on_failure=True) self.assertTrue(result) try: self.driver.download_object(obj=obj, destination_path=destination_path, overwrite_existing=False, delete_on_failure=True) except LibcloudError: pass else: self.fail('Exception was not thrown') result = self.driver.download_object(obj=obj, destination_path=destination_path, overwrite_existing=True, delete_on_failure=True) self.assertTrue(result) # success obj.delete() container.delete() self.remove_tmp_file(tmppath) os.unlink(destination_path) def test_download_object_as_stream_success(self): tmppath = self.make_tmp_file() container = self.driver.create_container('test6') obj = container.upload_object(tmppath, 'test') stream = self.driver.download_object_as_stream(obj=obj, chunk_size=1024) self.assertTrue(hasattr(stream, '__iter__')) data = b''.join(stream) self.assertTrue(len(data), 4096) obj.delete() container.delete() self.remove_tmp_file(tmppath) def test_download_object_range_success(self): content = b'0123456789123456789' tmppath = self.make_tmp_file(content=content) container = self.driver.create_container('test6') obj = container.upload_object(tmppath, 'test') destination_path = tmppath + '.temp' # 1. Only start_bytes provided result = self.driver.download_object_range(obj=obj, destination_path=destination_path, start_bytes=4, overwrite_existing=True, delete_on_failure=True) self.assertTrue(result) with open(destination_path, 'rb') as fp: written_content = fp.read() self.assertEqual(written_content, b'456789123456789') self.assertEqual(written_content, content[4:]) # 2. start_bytes and end_bytes is provided result = self.driver.download_object_range(obj=obj, destination_path=destination_path, start_bytes=4, end_bytes=6, overwrite_existing=True, delete_on_failure=True) self.assertTrue(result) with open(destination_path, 'rb') as fp: written_content = fp.read() self.assertEqual(written_content, b'45') self.assertEqual(written_content, content[4:6]) result = self.driver.download_object_range(obj=obj, destination_path=destination_path, start_bytes=0, end_bytes=1, overwrite_existing=True, delete_on_failure=True) self.assertTrue(result) with open(destination_path, 'rb') as fp: written_content = fp.read() self.assertEqual(written_content, b'0') self.assertEqual(written_content, content[0:1]) result = self.driver.download_object_range(obj=obj, destination_path=destination_path, start_bytes=0, end_bytes=2, overwrite_existing=True, delete_on_failure=True) self.assertTrue(result) with open(destination_path, 'rb') as fp: written_content = fp.read() self.assertEqual(written_content, b'01') self.assertEqual(written_content, content[0:2]) result = self.driver.download_object_range(obj=obj, destination_path=destination_path, start_bytes=0, end_bytes=len(content), overwrite_existing=True, delete_on_failure=True) self.assertTrue(result) with open(destination_path, 'rb') as fp: written_content = fp.read() self.assertEqual(written_content, b'0123456789123456789') self.assertEqual(written_content, content[0:len(content)]) obj.delete() container.delete() self.remove_tmp_file(tmppath) os.unlink(destination_path) def test_download_object_range_as_stream_success(self): content = b'0123456789123456789' tmppath = self.make_tmp_file(content=content) container = self.driver.create_container('test6') obj = container.upload_object(tmppath, 'test') # 1. Only start_bytes provided stream = self.driver.download_object_range_as_stream(obj=obj, start_bytes=4, chunk_size=1024) written_content = b''.join(stream) self.assertEqual(written_content, b'456789123456789') self.assertEqual(written_content, content[4:]) # 2. start_bytes and end_bytes is provided stream = self.driver.download_object_range_as_stream(obj=obj, start_bytes=4, end_bytes=7, chunk_size=1024) written_content = b''.join(stream) self.assertEqual(written_content, b'456') self.assertEqual(written_content, content[4:7]) stream = self.driver.download_object_range_as_stream(obj=obj, start_bytes=0, end_bytes=1, chunk_size=1024) written_content = b''.join(stream) self.assertEqual(written_content, b'0') self.assertEqual(written_content, content[0:1]) stream = self.driver.download_object_range_as_stream(obj=obj, start_bytes=1, end_bytes=3, chunk_size=1024) written_content = b''.join(stream) self.assertEqual(written_content, b'12') self.assertEqual(written_content, content[1:3]) stream = self.driver.download_object_range_as_stream(obj=obj, start_bytes=0, end_bytes=len(content), chunk_size=1024) written_content = b''.join(stream) self.assertEqual(written_content, b'0123456789123456789') self.assertEqual(written_content, content[0:len(content)]) obj.delete() container.delete() self.remove_tmp_file(tmppath) def test_download_object_range_invalid_values(self): obj = Object('a', 500, '', {}, {}, None, None) tmppath = self.make_tmp_file(content='') expected_msg = 'start_bytes must be greater than 0' self.assertRaisesRegex(ValueError, expected_msg, self.driver.download_object_range, obj=obj, destination_path=tmppath, start_bytes=-1) expected_msg = 'start_bytes must be smaller than end_bytes' self.assertRaisesRegex(ValueError, expected_msg, self.driver.download_object_range, obj=obj, destination_path=tmppath, start_bytes=5, end_bytes=4) expected_msg = 'start_bytes and end_bytes can\'t be the same' self.assertRaisesRegex(ValueError, expected_msg, self.driver.download_object_range, obj=obj, destination_path=tmppath, start_bytes=5, end_bytes=5) def test_download_object_range_as_stream_invalid_values(self): content = b'0123456789123456789' tmppath = self.make_tmp_file(content=content) container = self.driver.create_container('test6') obj = container.upload_object(tmppath, 'test') expected_msg = 'start_bytes must be greater than 0' stream = self.driver.download_object_range_as_stream( obj=obj, start_bytes=-1, end_bytes=None, chunk_size=1024) self.assertRaisesRegex(ValueError, expected_msg, exhaust_iterator, stream) expected_msg = 'start_bytes must be smaller than end_bytes' stream = self.driver.download_object_range_as_stream( obj=obj, start_bytes=5, end_bytes=4, chunk_size=1024) self.assertRaisesRegex(ValueError, expected_msg, exhaust_iterator, stream) expected_msg = 'end_bytes is larger than file size' stream = self.driver.download_object_range_as_stream( obj=obj, start_bytes=5, end_bytes=len(content) + 1, chunk_size=1024) expected_msg = 'start_bytes and end_bytes can\'t be the same' stream = self.driver.download_object_range_as_stream( obj=obj, start_bytes=5, end_bytes=5, chunk_size=1024) obj.delete() container.delete() self.remove_tmp_file(tmppath) @mock.patch("lockfile.mkdirlockfile.MkdirLockFile.acquire", mock.MagicMock(side_effect=LockTimeout)) def test_proper_lockfile_imports(self): # LockLocalStorage was previously using an un-imported exception # in its __enter__ method, so the following would raise a NameError. lls = LockLocalStorage("blah") self.assertRaises(LibcloudError, lls.__enter__) if not LocalStorageDriver: class LocalTests(unittest.TestCase): # NOQA pass if __name__ == '__main__': sys.exit(unittest.main())