# -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import hmac import os import os.path # pylint: disable-msg=W0404 import math import sys import copy from io import BytesIO import hashlib from hashlib import sha1 import mock from mock import Mock from mock import PropertyMock import libcloud.utils.files from libcloud.utils.py3 import b from libcloud.utils.py3 import httplib from libcloud.utils.py3 import urlquote from libcloud.utils.py3 import StringIO from libcloud.utils.py3 import PY3 from libcloud.utils.files import exhaust_iterator from libcloud.common.types import MalformedResponseError from libcloud.storage.base import CHUNK_SIZE, Container, Object from libcloud.storage.types import ContainerAlreadyExistsError from libcloud.storage.types import ContainerDoesNotExistError from libcloud.storage.types import ContainerIsNotEmptyError from libcloud.storage.types import ObjectDoesNotExistError from libcloud.storage.types import ObjectHashMismatchError from libcloud.storage.types import InvalidContainerNameError from libcloud.storage.drivers.cloudfiles import CloudFilesStorageDriver from libcloud.test import MockHttp # pylint: disable-msg=E0611 from libcloud.test import unittest, generate_random_data, make_response from libcloud.test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611 from libcloud.test.storage.base import BaseRangeDownloadMockHttp class CloudFilesTests(unittest.TestCase): driver_klass = CloudFilesStorageDriver driver_args = ('dummy', 'dummy') driver_kwargs = {} region = 'ord' def setUp(self): self.driver_klass.connectionCls.conn_class = CloudFilesMockHttp CloudFilesMockHttp.type = None driver_kwargs = self.driver_kwargs.copy() driver_kwargs['region'] = self.region self.driver = self.driver_klass(*self.driver_args, **driver_kwargs) # normally authentication happens lazily, but we force it here self.driver.connection._populate_hosts_and_request_paths() self._remove_test_file() def tearDown(self): self._remove_test_file() def test_invalid_ex_force_service_region(self): driver = CloudFilesStorageDriver('driver', 'dummy', ex_force_service_region='invalid') try: driver.list_containers() except Exception as e: self.assertEqual(e.value, 'Could not find specified endpoint') else: self.fail('Exception was not thrown') def test_ex_force_service_region(self): driver = CloudFilesStorageDriver('driver', 'dummy', ex_force_service_region='ORD') driver.list_containers() def test_force_auth_token_kwargs(self): base_url = 'https://cdn2.clouddrive.com/v1/MossoCloudFS' kwargs = { 'ex_force_auth_token': 'some-auth-token', 'ex_force_base_url': base_url } driver = CloudFilesStorageDriver('driver', 'dummy', **kwargs) driver.list_containers() self.assertEqual(kwargs['ex_force_auth_token'], driver.connection.auth_token) self.assertEqual('cdn2.clouddrive.com', driver.connection.host) self.assertEqual('/v1/MossoCloudFS', driver.connection.request_path) def test_force_auth_url_kwargs(self): kwargs = { 'ex_force_auth_version': '2.0', 'ex_force_auth_url': 'https://identity.api.rackspace.com' } driver = CloudFilesStorageDriver('driver', 'dummy', **kwargs) self.assertEqual(kwargs['ex_force_auth_url'], driver.connection._ex_force_auth_url) self.assertEqual(kwargs['ex_force_auth_version'], driver.connection._auth_version) def test_invalid_json_throws_exception(self): CloudFilesMockHttp.type = 'MALFORMED_JSON' try: self.driver.list_containers() except MalformedResponseError: pass else: self.fail('Exception was not thrown') def test_service_catalog(self): url = 'https://storage4.%s1.clouddrive.com/v1/MossoCloudFS' % \ (self.region) self.assertEqual( url, self.driver.connection.get_endpoint()) self.driver.connection.cdn_request = True self.assertEqual( 'https://cdn.clouddrive.com/v1/MossoCloudFS', self.driver.connection.get_endpoint()) self.driver.connection.cdn_request = False def test_list_containers(self): CloudFilesMockHttp.type = 'EMPTY' containers = self.driver.list_containers() self.assertEqual(len(containers), 0) CloudFilesMockHttp.type = None containers = self.driver.list_containers() self.assertEqual(len(containers), 3) container = [c for c in containers if c.name == 'container2'][0] self.assertEqual(container.extra['object_count'], 120) self.assertEqual(container.extra['size'], 340084450) def test_list_container_objects(self): CloudFilesMockHttp.type = 'EMPTY' container = Container( name='test_container', extra={}, driver=self.driver) objects = self.driver.list_container_objects(container=container) self.assertEqual(len(objects), 0) CloudFilesMockHttp.type = None objects = self.driver.list_container_objects(container=container) self.assertEqual(len(objects), 4) obj = [o for o in objects if o.name == 'foo test 1'][0] self.assertEqual(obj.hash, '16265549b5bda64ecdaa5156de4c97cc') self.assertEqual(obj.size, 1160520) self.assertEqual(obj.container.name, 'test_container') def test_list_container_object_name_encoding(self): CloudFilesMockHttp.type = 'EMPTY' container = Container(name='test container 1', extra={}, driver=self.driver) objects = self.driver.list_container_objects(container=container) self.assertEqual(len(objects), 0) def test_list_container_objects_with_prefix(self): CloudFilesMockHttp.type = 'EMPTY' container = Container( name='test_container', extra={}, driver=self.driver) objects = self.driver.list_container_objects(container=container, prefix='test_prefix1') self.assertEqual(len(objects), 0) CloudFilesMockHttp.type = None objects = self.driver.list_container_objects(container=container, prefix='test_prefix2') self.assertEqual(len(objects), 4) obj = [o for o in objects if o.name == 'foo test 1'][0] self.assertEqual(obj.hash, '16265549b5bda64ecdaa5156de4c97cc') self.assertEqual(obj.size, 1160520) self.assertEqual(obj.container.name, 'test_container') def test_list_container_objects_iterator(self): CloudFilesMockHttp.type = 'ITERATOR' container = Container( name='test_container', extra={}, driver=self.driver) objects = self.driver.list_container_objects(container=container) self.assertEqual(len(objects), 5) obj = [o for o in objects if o.name == 'foo-test-1'][0] self.assertEqual(obj.hash, '16265549b5bda64ecdaa5156de4c97cc') self.assertEqual(obj.size, 1160520) self.assertEqual(obj.container.name, 'test_container') def test_get_container(self): container = self.driver.get_container(container_name='test_container') self.assertEqual(container.name, 'test_container') self.assertEqual(container.extra['object_count'], 800) self.assertEqual(container.extra['size'], 1234568) def test_get_container_not_found(self): try: self.driver.get_container(container_name='not_found') except ContainerDoesNotExistError: pass else: self.fail('Exception was not thrown') def test_get_object_success(self): obj = self.driver.get_object(container_name='test_container', object_name='test_object') self.assertEqual(obj.container.name, 'test_container') self.assertEqual(obj.size, 555) self.assertEqual(obj.hash, '6b21c4a111ac178feacf9ec9d0c71f17') self.assertEqual(obj.extra['content_type'], 'application/zip') self.assertEqual( obj.extra['last_modified'], 'Tue, 25 Jan 2011 22:01:49 GMT') self.assertEqual(obj.meta_data['foo-bar'], 'test 1') self.assertEqual(obj.meta_data['bar-foo'], 'test 2') def test_get_object_object_name_encoding(self): obj = self.driver.get_object(container_name='test_container', object_name='~/test_object/') self.assertEqual(obj.name, '~/test_object/') def test_get_object_not_found(self): try: self.driver.get_object(container_name='test_container', object_name='not_found') except ObjectDoesNotExistError: pass else: self.fail('Exception was not thrown') def test_create_container_success(self): container = self.driver.create_container( container_name='test_create_container') self.assertTrue(isinstance(container, Container)) self.assertEqual(container.name, 'test_create_container') self.assertEqual(container.extra['object_count'], 0) def test_create_container_already_exists(self): CloudFilesMockHttp.type = 'ALREADY_EXISTS' try: self.driver.create_container( container_name='test_create_container') except ContainerAlreadyExistsError: pass else: self.fail( 'Container already exists but an exception was not thrown') def test_create_container_invalid_name_too_long(self): name = ''.join(['x' for x in range(0, 257)]) try: self.driver.create_container(container_name=name) except InvalidContainerNameError: pass else: self.fail( 'Invalid name was provided (name is too long)' ', but exception was not thrown') def test_create_container_invalid_name_slashes_in_name(self): try: self.driver.create_container(container_name='test/slashes/') except InvalidContainerNameError: pass else: self.fail( 'Invalid name was provided (name contains slashes)' ', but exception was not thrown') def test_delete_container_success(self): container = Container(name='foo_bar_container', extra={}, driver=self) result = self.driver.delete_container(container=container) self.assertTrue(result) def test_delete_container_not_found(self): CloudFilesMockHttp.type = 'NOT_FOUND' container = Container(name='foo_bar_container', extra={}, driver=self) try: self.driver.delete_container(container=container) except ContainerDoesNotExistError: pass else: self.fail( 'Container does not exist but an exception was not thrown') def test_delete_container_not_empty(self): CloudFilesMockHttp.type = 'NOT_EMPTY' container = Container(name='foo_bar_container', extra={}, driver=self) try: self.driver.delete_container(container=container) except ContainerIsNotEmptyError: pass else: self.fail('Container is not empty but an exception was not thrown') def test_download_object_success(self): container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=CloudFilesStorageDriver) destination_path = os.path.abspath(__file__) + '.temp' result = self.driver.download_object(obj=obj, destination_path=destination_path, overwrite_existing=False, delete_on_failure=True) self.assertTrue(result) def test_download_object_invalid_file_size(self): CloudFilesMockHttp.type = 'INVALID_SIZE' container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=CloudFilesStorageDriver) destination_path = os.path.abspath(__file__) + '.temp' result = self.driver.download_object(obj=obj, destination_path=destination_path, overwrite_existing=False, delete_on_failure=True) self.assertFalse(result) def test_download_object_success_not_found(self): CloudFilesMockHttp.type = 'NOT_FOUND' container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=CloudFilesStorageDriver) destination_path = os.path.abspath(__file__) + '.temp' try: self.driver.download_object( obj=obj, destination_path=destination_path, overwrite_existing=False, delete_on_failure=True) except ObjectDoesNotExistError: pass else: self.fail('Object does not exist but an exception was not thrown') def test_download_object_range_success(self): container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object_range', size=10, hash=None, extra={}, container=container, meta_data=None, driver=CloudFilesStorageDriver) destination_path = os.path.abspath(__file__) + '.temp' result = self.driver.download_object_range(obj=obj, destination_path=destination_path, start_bytes=5, end_bytes=7, overwrite_existing=False, delete_on_failure=True) self.assertTrue(result) with open(destination_path, 'r') as fp: content = fp.read() self.assertEqual(content, '56') def test_download_object_as_stream(self): container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=CloudFilesStorageDriver) stream = self.driver.download_object_as_stream( obj=obj, chunk_size=None) self.assertTrue(hasattr(stream, '__iter__')) def test_download_object_as_stream_data_is_not_buffered_in_memory(self): # Test case which verifies that response.response attribute is not accessed # and as such, whole body response is not buffered into RAM # If content is consumed and response.content attribute accessed exception # will be thrown and test will fail mock_response = Mock(name='mock response') mock_response.headers = {} mock_response.status = 200 msg1 = '"response" attribute was accessed but it shouldn\'t have been' msg2 = '"content" attribute was accessed but it shouldn\'t have been' type(mock_response).response = PropertyMock(name='mock response attribute', side_effect=Exception(msg1)) type(mock_response).content = PropertyMock(name='mock content attribute', side_effect=Exception(msg2)) mock_response.iter_content.return_value = StringIO('a' * 1000) self.driver.connection.request = Mock() self.driver.connection.request.return_value = mock_response container = Container(name='foo_bar_container', extra={}, driver=self.driver) obj = Object(name='foo_bar_object_NO_BUFFER', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=self.driver) result = self.driver.download_object_as_stream(obj=obj) result = exhaust_iterator(result) if PY3: result = result.decode('utf-8') self.assertEqual(result, 'a' * 1000) def test_download_object_range_as_stream_success(self): container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object_range', size=2, hash=None, extra={}, container=container, meta_data=None, driver=CloudFilesStorageDriver) stream = self.driver.download_object_range_as_stream( start_bytes=5, end_bytes=7, obj=obj, chunk_size=None) self.assertTrue(hasattr(stream, '__iter__')) consumed_stream = ''.join(chunk.decode('utf-8') for chunk in stream) self.assertEqual(consumed_stream, '56') self.assertEqual(len(consumed_stream), obj.size) def test_upload_object_success(self): def upload_file(self, object_name=None, content_type=None, request_path=None, request_method=None, headers=None, file_path=None, stream=None): return {'response': make_response(201, headers={'etag': '0cc175b9c0f1b6a831c399e269772661'}), 'bytes_transferred': 1000, 'data_hash': '0cc175b9c0f1b6a831c399e269772661'} old_func = CloudFilesStorageDriver._upload_object CloudFilesStorageDriver._upload_object = upload_file file_path = os.path.abspath(__file__) container = Container(name='foo_bar_container', extra={}, driver=self) object_name = 'foo_test_upload' extra = {'meta_data': {'some-value': 'foobar'}} obj = self.driver.upload_object( file_path=file_path, container=container, extra=extra, object_name=object_name) self.assertEqual(obj.name, 'foo_test_upload') self.assertEqual(obj.size, 1000) self.assertTrue('some-value' in obj.meta_data) CloudFilesStorageDriver._upload_object = old_func def test_upload_object_zero_size_object(self): def upload_file(self, object_name=None, content_type=None, request_path=None, request_method=None, headers=None, file_path=None, stream=None): return {'response': make_response(201, headers={'etag': '0cc175b9c0f1b6a831c399e269772661'}), 'bytes_transferred': 0, 'data_hash': '0cc175b9c0f1b6a831c399e269772661'} old_func = CloudFilesStorageDriver._upload_object CloudFilesStorageDriver._upload_object = upload_file old_request = self.driver.connection.request file_path = os.path.join(os.path.dirname(__file__), '__init__.py') container = Container(name='foo_bar_container', extra={}, driver=self) object_name = 'empty' extra = {} def func(*args, **kwargs): self.assertEqual(kwargs['headers']['Content-Length'], 0) func.called = True return old_request(*args, **kwargs) self.driver.connection.request = func func.called = False obj = self.driver.upload_object( file_path=file_path, container=container, extra=extra, object_name=object_name) self.assertEqual(obj.name, 'empty') self.assertEqual(obj.size, 0) CloudFilesStorageDriver._upload_object = old_func self.driver.connection.request = old_request def test_upload_object_invalid_hash(self): CloudFilesMockHttp.type = 'INVALID_HASH' def upload_file(self, object_name=None, content_type=None, request_path=None, request_method=None, headers=None, file_path=None, stream=None): return {'response': make_response(201, headers={'etag': '0cc175b9c0f1b6a831c399e269772661'}), 'bytes_transferred': 1000, 'data_hash': 'blah blah'} old_func = CloudFilesStorageDriver._upload_object CloudFilesStorageDriver._upload_object = upload_file file_path = os.path.abspath(__file__) container = Container(name='foo_bar_container', extra={}, driver=self) object_name = 'foo_test_upload' try: self.driver.upload_object(file_path=file_path, container=container, object_name=object_name, verify_hash=True) except ObjectHashMismatchError: pass else: self.fail( 'Invalid hash was returned but an exception was not thrown') finally: CloudFilesStorageDriver._upload_object = old_func def test_upload_object_no_content_type(self): def no_content_type(name): return None, None old_func = libcloud.utils.files.guess_file_mime_type libcloud.utils.files.guess_file_mime_type = no_content_type file_path = os.path.abspath(__file__) container = Container(name='foo_bar_container', extra={}, driver=self) object_name = 'foo_test_upload' obj = self.driver.upload_object(file_path=file_path, verify_hash=False, container=container, object_name=object_name) self.assertEqual(obj.name, object_name) libcloud.utils.files.guess_file_mime_type = old_func def test_upload_object_inexistent_file(self): def dummy_content_type(name): return 'application/zip', None old_func = libcloud.utils.files.guess_file_mime_type libcloud.utils.files.guess_file_mime_type = dummy_content_type file_path = os.path.abspath(__file__ + '.inexistent') container = Container(name='foo_bar_container', extra={}, driver=self) object_name = 'foo_test_upload' try: self.driver.upload_object( file_path=file_path, container=container, object_name=object_name) except OSError: pass else: self.fail('Inexistent but an exception was not thrown') finally: libcloud.utils.files.guess_file_mime_type = old_func def test_upload_object_via_stream(self): def dummy_content_type(name): return 'application/zip', None old_func = libcloud.utils.files.guess_file_mime_type libcloud.utils.files.guess_file_mime_type = dummy_content_type container = Container(name='foo_bar_container', extra={}, driver=self) object_name = 'foo_test_stream_data' iterator = BytesIO(b('235')) try: self.driver.upload_object_via_stream(container=container, object_name=object_name, iterator=iterator) finally: libcloud.utils.files.guess_file_mime_type = old_func def test_upload_object_via_stream_stream_seek_at_end(self): def dummy_content_type(name): return 'application/zip', None old_func = libcloud.utils.files.guess_file_mime_type libcloud.utils.files.guess_file_mime_type = dummy_content_type container = Container(name='foo_bar_container', extra={}, driver=self) object_name = 'foo_test_stream_data_seek' iterator = BytesIO(b('123456789')) iterator.seek(10) self.assertEqual(iterator.tell(), 10) try: self.driver.upload_object_via_stream(container=container, object_name=object_name, iterator=iterator) finally: libcloud.utils.files.guess_file_mime_type = old_func def test_delete_object_success(self): container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=CloudFilesStorageDriver) status = self.driver.delete_object(obj=obj) self.assertTrue(status) def test_delete_object_not_found(self): CloudFilesMockHttp.type = 'NOT_FOUND' container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=CloudFilesStorageDriver) try: self.driver.delete_object(obj=obj) except ObjectDoesNotExistError: pass else: self.fail('Object does not exist but an exception was not thrown') def test_ex_get_meta_data(self): meta_data = self.driver.ex_get_meta_data() self.assertTrue(isinstance(meta_data, dict)) self.assertTrue('object_count' in meta_data) self.assertTrue('container_count' in meta_data) self.assertTrue('bytes_used' in meta_data) self.assertTrue('temp_url_key' in meta_data) def test_ex_purge_object_from_cdn(self): CloudFilesMockHttp.type = 'PURGE_SUCCESS' container = Container(name='foo_bar_container', extra={}, driver=self.driver) obj = Object(name='object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=self) self.assertTrue(self.driver.ex_purge_object_from_cdn(obj=obj)) def test_ex_purge_object_from_cdn_with_email(self): CloudFilesMockHttp.type = 'PURGE_SUCCESS_EMAIL' container = Container(name='foo_bar_container', extra={}, driver=self.driver) obj = Object(name='object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=self) self.assertTrue(self.driver.ex_purge_object_from_cdn(obj=obj, email='test@test.com')) @mock.patch('os.path.getsize') def test_ex_multipart_upload_object_for_small_files(self, getsize_mock): getsize_mock.return_value = 0 old_func = CloudFilesStorageDriver.upload_object mocked_upload_object = mock.Mock(return_value="test") CloudFilesStorageDriver.upload_object = mocked_upload_object file_path = os.path.abspath(__file__) container = Container(name='foo_bar_container', extra={}, driver=self) object_name = 'foo_test_upload' obj = self.driver.ex_multipart_upload_object( file_path=file_path, container=container, object_name=object_name) CloudFilesStorageDriver.upload_object = old_func self.assertTrue(mocked_upload_object.called) self.assertEqual(obj, "test") def test_ex_multipart_upload_object_success(self): _upload_object_part = CloudFilesStorageDriver._upload_object_part _upload_object_manifest = CloudFilesStorageDriver._upload_object_manifest mocked__upload_object_part = mock.Mock(return_value="test_part") mocked__upload_object_manifest = mock.Mock( return_value="test_manifest") CloudFilesStorageDriver._upload_object_part = mocked__upload_object_part CloudFilesStorageDriver._upload_object_manifest = mocked__upload_object_manifest parts = 5 file_path = os.path.abspath(__file__) chunk_size = int(math.ceil(float(os.path.getsize(file_path)) / parts)) container = Container(name='foo_bar_container', extra={}, driver=self) object_name = 'foo_test_upload' self.driver.ex_multipart_upload_object( file_path=file_path, container=container, object_name=object_name, chunk_size=chunk_size) CloudFilesStorageDriver._upload_object_part = _upload_object_part CloudFilesStorageDriver._upload_object_manifest = _upload_object_manifest self.assertEqual(mocked__upload_object_part.call_count, parts) self.assertTrue(mocked__upload_object_manifest.call_count, 1) def test__upload_object_part(self): _put_object = CloudFilesStorageDriver._put_object mocked__put_object = mock.Mock(return_value="test") CloudFilesStorageDriver._put_object = mocked__put_object part_number = 7 object_name = "test_object" expected_name = object_name + '/%08d' % part_number container = Container(name='foo_bar_container', extra={}, driver=self) self.driver._upload_object_part(container, object_name, part_number, None) CloudFilesStorageDriver._put_object = _put_object func_kwargs = tuple(mocked__put_object.call_args)[1] self.assertEqual(func_kwargs['object_name'], expected_name) self.assertEqual(func_kwargs['container'], container) def test_upload_object_via_stream_with_cors_headers(self): """ Test we can add some ``Cross-origin resource sharing`` headers to the request about to be sent. """ cors_headers = { 'Access-Control-Allow-Origin': 'http://mozilla.com', 'Origin': 'http://storage.clouddrive.com', } expected_headers = { # Automatically added headers 'Content-Type': 'application/octet-stream' } expected_headers.update(cors_headers) def intercept_request(request_path, method=None, data=None, headers=None, raw=True): # What we're actually testing self.assertDictEqual(expected_headers, headers) raise NotImplementedError('oops') self.driver.connection.request = intercept_request container = Container(name='CORS', extra={}, driver=self.driver) try: self.driver.upload_object_via_stream( iterator=iter(b'blob data like an image or video'), container=container, object_name="test_object", headers=cors_headers, ) except NotImplementedError: # Don't care about the response we'd have to mock anyway # as long as we intercepted the request and checked its headers pass else: self.fail('Expected NotImplementedError to be thrown to ' 'verify we actually checked the expected headers') def test_upload_object_via_stream_python3_bytes_error(self): container = Container(name='py3', extra={}, driver=self.driver) bytes_blob = b'blob data like an image or video' # This is mostly to check we didn't discover other errors along the way mocked_response = container.upload_object_via_stream( iterator=iter(bytes_blob), object_name="img_or_vid", ) self.assertEqual(len(bytes_blob), mocked_response.size) @unittest.skip("Skipping as chunking is disabled in 2.0rc1") def test_upload_object_via_stream_chunked_encoding(self): # Create enough bytes it should get split into two chunks bytes_blob = ''.join(['\0' for _ in range(CHUNK_SIZE + 1)]) hex_chunk_size = ('%X' % CHUNK_SIZE).encode('utf8') expected = [ # Chunk 1 hex_chunk_size + b'\r\n', bytes(bytes_blob[:CHUNK_SIZE].encode('utf8')), b'\r\n', # Chunk 2 b'1\r\n', bytes(bytes_blob[CHUNK_SIZE:].encode('utf8')), b'\r\n', # If chunked, also send a final message b'0\r\n\r\n', ] logged_data = [] class InterceptResponse(MockHttp): def __init__(self, connection, response=None): super(InterceptResponse, self).__init__(connection=connection, response=response) old_send = self.connection.connection.send def intercept_send(data): old_send(data) logged_data.append(data) self.connection.connection.send = intercept_send def _v1_MossoCloudFS_py3_img_or_vid2(self, method, url, body, headers): headers = {'etag': 'd79fb00c27b50494a463e680d459c90c'} headers.update(self.base_headers) _201 = httplib.CREATED return _201, '', headers, httplib.responses[_201] self.driver_klass.connectionCls.rawResponseCls = InterceptResponse container = Container(name='py3', extra={}, driver=self.driver) container.upload_object_via_stream( iterator=iter(bytes_blob), object_name="img_or_vid2", ) self.assertListEqual(expected, logged_data) def test__upload_object_manifest(self): hash_function = self.driver._get_hash_function() hash_function.update(b('')) data_hash = hash_function.hexdigest() fake_response = type('CloudFilesResponse', (), {'headers': {'etag': data_hash} }) _request = self.driver.connection.request mocked_request = mock.Mock(return_value=fake_response) self.driver.connection.request = mocked_request container = Container(name='foo_bar_container', extra={}, driver=self) object_name = "test_object" self.driver._upload_object_manifest(container, object_name) func_args, func_kwargs = tuple(mocked_request.call_args) self.driver.connection.request = _request self.assertEqual( func_args[0], "/" + container.name + "/" + object_name) self.assertEqual(func_kwargs["headers"]["X-Object-Manifest"], container.name + "/" + object_name + "/") self.assertEqual(func_kwargs["method"], "PUT") def test__upload_object_manifest_wrong_hash(self): fake_response = type('CloudFilesResponse', (), {'headers': {'etag': '0000000'}}) _request = self.driver.connection.request mocked_request = mock.Mock(return_value=fake_response) self.driver.connection.request = mocked_request container = Container(name='foo_bar_container', extra={}, driver=self) object_name = "test_object" try: self.driver._upload_object_manifest(container, object_name) except ObjectHashMismatchError: pass else: self.fail('Exception was not thrown') finally: self.driver.connection.request = _request def test_create_container_put_object_name_encoding(self): def upload_file(self, object_name=None, content_type=None, request_path=None, request_method=None, headers=None, file_path=None, stream=None): return {'response': make_response(201, headers={'etag': '0cc175b9c0f1b6a831c399e269772661'}), 'bytes_transferred': 1000, 'data_hash': '0cc175b9c0f1b6a831c399e269772661'} old_func = CloudFilesStorageDriver._upload_object CloudFilesStorageDriver._upload_object = upload_file container_name = 'speci@l_name' object_name = 'm@obj€ct' file_path = os.path.abspath(__file__) container = self.driver.create_container(container_name=container_name) self.assertEqual(container.name, container_name) obj = self.driver.upload_object( file_path=file_path, container=container, object_name=object_name) self.assertEqual(obj.name, object_name) CloudFilesStorageDriver._upload_object = old_func def test_ex_enable_static_website(self): container = Container(name='foo_bar_container', extra={}, driver=self) result = self.driver.ex_enable_static_website(container=container, index_file='index.html') self.assertTrue(result) def test_ex_set_error_page(self): container = Container(name='foo_bar_container', extra={}, driver=self) result = self.driver.ex_set_error_page(container=container, file_name='error.html') self.assertTrue(result) def test_ex_set_account_metadata_temp_url_key(self): result = self.driver.ex_set_account_metadata_temp_url_key("a key") self.assertTrue(result) @mock.patch("libcloud.storage.drivers.cloudfiles.time") def test_ex_get_object_temp_url(self, time): time.return_value = 0 self.driver.ex_get_meta_data = mock.Mock() self.driver.ex_get_meta_data.return_value = {'container_count': 1, 'object_count': 1, 'bytes_used': 1, 'temp_url_key': 'foo'} container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=self) hmac_body = "%s\n%s\n%s" % ('GET', 60, "/v1/MossoCloudFS/foo_bar_container/foo_bar_object") sig = hmac.new(b('foo'), b(hmac_body), sha1).hexdigest() ret = self.driver.ex_get_object_temp_url(obj, 'GET') temp_url = ('https://storage4.%s1.clouddrive.com/v1/MossoCloudFS/' 'foo_bar_container/foo_bar_object?temp_url_expires=60&temp_url_sig=%s' % (self.region, sig)) self.assertEqual(''.join(sorted(ret)), ''.join(sorted(temp_url))) def test_ex_get_object_temp_url_no_key_raises_key_error(self): self.driver.ex_get_meta_data = mock.Mock() self.driver.ex_get_meta_data.return_value = {'container_count': 1, 'object_count': 1, 'bytes_used': 1, 'temp_url_key': None} container = Container(name='foo_bar_container', extra={}, driver=self) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=self) self.assertRaises( KeyError, self.driver.ex_get_object_temp_url, obj, 'GET') def _remove_test_file(self): file_path = os.path.abspath(__file__) + '.temp' try: os.unlink(file_path) except OSError: pass class CloudFilesDeprecatedUSTests(CloudFilesTests): driver_klass = CloudFilesStorageDriver region = 'ord' class CloudFilesDeprecatedUKTests(CloudFilesTests): driver_klass = CloudFilesStorageDriver region = 'lon' class CloudFilesMockHttp(BaseRangeDownloadMockHttp, unittest.TestCase): fixtures = StorageFileFixtures('cloudfiles') base_headers = {'content-type': 'application/json; charset=UTF-8'} # fake auth token response def _v2_0_tokens(self, method, url, body, headers): headers = copy.deepcopy(self.base_headers) body = self.fixtures.load('_v2_0__auth.json') return (httplib.OK, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_MALFORMED_JSON(self, method, url, body, headers): # test_invalid_json_throws_exception body = 'broken: json /*"' return (httplib.NO_CONTENT, body, self.base_headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_EMPTY(self, method, url, body, headers): return (httplib.NO_CONTENT, body, self.base_headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS(self, method, url, body, headers): headers = copy.deepcopy(self.base_headers) if method == 'GET': # list_containers body = self.fixtures.load('list_containers.json') status_code = httplib.OK elif method == 'HEAD': # get_meta_data body = self.fixtures.load('meta_data.json') status_code = httplib.NO_CONTENT headers.update({'x-account-container-count': '10', 'x-account-object-count': '400', 'x-account-bytes-used': '1234567' }) elif method == 'POST': body = '' status_code = httplib.NO_CONTENT return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_not_found(self, method, url, body, headers): # test_get_object_not_found if method == 'HEAD': body = '' else: raise ValueError('Invalid method') return (httplib.NOT_FOUND, body, self.base_headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_test_container_EMPTY(self, method, url, body, headers): body = self.fixtures.load('list_container_objects_empty.json') return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_test_20container_201_EMPTY(self, method, url, body, headers): body = self.fixtures.load('list_container_objects_empty.json') return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_test_container(self, method, url, body, headers): headers = copy.deepcopy(self.base_headers) if method == 'GET': # list_container_objects if url.find('marker') == -1: body = self.fixtures.load('list_container_objects.json') status_code = httplib.OK else: body = '' status_code = httplib.NO_CONTENT elif method == 'HEAD': # get_container body = self.fixtures.load('list_container_objects_empty.json') status_code = httplib.NO_CONTENT headers.update({'x-container-object-count': '800', 'x-container-bytes-used': '1234568' }) return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_test_container_ITERATOR(self, method, url, body, headers): headers = copy.deepcopy(self.base_headers) # list_container_objects if url.find('foo-test-3') != -1: body = self.fixtures.load( 'list_container_objects_not_exhausted2.json') status_code = httplib.OK elif url.find('foo-test-5') != -1: body = '' status_code = httplib.NO_CONTENT else: # First request body = self.fixtures.load( 'list_container_objects_not_exhausted1.json') status_code = httplib.OK return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_test_container_not_found( self, method, url, body, headers): # test_get_container_not_found if method == 'HEAD': body = '' else: raise ValueError('Invalid method') return (httplib.NOT_FOUND, body, self.base_headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_test_container_test_object( self, method, url, body, headers): headers = copy.deepcopy(self.base_headers) if method == 'HEAD': # get_object body = self.fixtures.load('list_container_objects_empty.json') status_code = httplib.NO_CONTENT headers.update({'content-length': '555', 'last-modified': 'Tue, 25 Jan 2011 22:01:49 GMT', 'etag': '6b21c4a111ac178feacf9ec9d0c71f17', 'x-object-meta-foo-bar': 'test 1', 'x-object-meta-bar-foo': 'test 2', 'content-type': 'application/zip'}) return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_test_container__7E_test_object( self, method, url, body, headers): headers = copy.deepcopy(self.base_headers) if method == 'HEAD': # get_object_name_encoding body = self.fixtures.load('list_container_objects_empty.json') status_code = httplib.NO_CONTENT headers.update({'content-length': '555', 'last-modified': 'Tue, 25 Jan 2011 22:01:49 GMT', 'etag': '6b21c4a111ac178feacf9ec9d0c71f17', 'x-object-meta-foo-bar': 'test 1', 'x-object-meta-bar-foo': 'test 2', 'content-type': 'application/zip'}) return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_test_create_container( self, method, url, body, headers): # test_create_container_success headers = copy.deepcopy(self.base_headers) body = self.fixtures.load('list_container_objects_empty.json') headers = copy.deepcopy(self.base_headers) headers.update({'content-length': '18', 'date': 'Mon, 28 Feb 2011 07:52:57 GMT' }) status_code = httplib.CREATED return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_speci_40l_name(self, method, url, body, headers): # test_create_container_put_object_name_encoding # Verify that the name is properly url encoded container_name = 'speci@l_name' encoded_container_name = urlquote(container_name) self.assertTrue(encoded_container_name in url) headers = copy.deepcopy(self.base_headers) body = self.fixtures.load('list_container_objects_empty.json') headers = copy.deepcopy(self.base_headers) headers.update({'content-length': '18', 'date': 'Mon, 28 Feb 2011 07:52:57 GMT' }) status_code = httplib.CREATED return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_test_create_container_ALREADY_EXISTS( self, method, url, body, headers): # test_create_container_already_exists headers = copy.deepcopy(self.base_headers) body = self.fixtures.load('list_container_objects_empty.json') headers.update({'content-type': 'text/plain'}) status_code = httplib.ACCEPTED return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container(self, method, url, body, headers): if method == 'DELETE': # test_delete_container_success body = self.fixtures.load('list_container_objects_empty.json') headers = self.base_headers status_code = httplib.NO_CONTENT elif method == 'POST': # test_ex_enable_static_website body = '' headers = self.base_headers status_code = httplib.ACCEPTED return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_object_PURGE_SUCCESS( self, method, url, body, headers): if method == 'DELETE': # test_ex_purge_from_cdn headers = self.base_headers status_code = httplib.NO_CONTENT return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_object_PURGE_SUCCESS_EMAIL( self, method, url, body, headers): if method == 'DELETE': # test_ex_purge_from_cdn_with_email self.assertEqual(headers['X-Purge-Email'], 'test@test.com') headers = self.base_headers status_code = httplib.NO_CONTENT return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_NOT_FOUND( self, method, url, body, headers): if method == 'DELETE': # test_delete_container_not_found body = self.fixtures.load('list_container_objects_empty.json') headers = self.base_headers status_code = httplib.NOT_FOUND return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_NOT_EMPTY( self, method, url, body, headers): if method == 'DELETE': # test_delete_container_not_empty body = self.fixtures.load('list_container_objects_empty.json') headers = self.base_headers status_code = httplib.CONFLICT return (status_code, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_foo_bar_object( self, method, url, body, headers): if method == 'DELETE': # test_delete_object_success body = self.fixtures.load('list_container_objects_empty.json') headers = self.base_headers status_code = httplib.NO_CONTENT return (status_code, body, headers, httplib.responses[httplib.OK]) elif method == 'GET': body = generate_random_data(1000) return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_range( self, method, url, body, headers): if method == 'GET': # test_download_object_range_success body = '0123456789123456789' self.assertTrue('Range' in headers) self.assertEqual(headers['Range'], 'bytes=5-6') start_bytes, end_bytes = self._get_start_and_end_bytes_from_range_str(headers['Range'], body) return (httplib.PARTIAL_CONTENT, body[start_bytes:end_bytes + 1], self.base_headers, httplib.responses[httplib.PARTIAL_CONTENT]) def _v1_MossoCloudFS_py3_img_or_vid(self, method, url, body, headers): headers = {'etag': 'e2378cace8712661ce7beec3d9362ef6'} headers.update(self.base_headers) return httplib.CREATED, '', headers, httplib.responses[httplib.CREATED] def _v1_MossoCloudFS_foo_bar_container_foo_test_upload( self, method, url, body, headers): # test_object_upload_success body = '' headers = {} headers.update(self.base_headers) headers['etag'] = 'hash343hhash89h932439jsaa89' return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_speci_40l_name_m_40obj_E2_82_ACct(self, method, url, body, headers): # test_create_container_put_object_name_encoding # Verify that the name is properly url encoded object_name = 'm@obj€ct' urlquote(object_name) headers = copy.deepcopy(self.base_headers) body = '' headers['etag'] = 'hash343hhash89h932439jsaa89' return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_empty(self, method, url, body, headers): # test_upload_object_zero_size_object body = '' headers = {} headers.update(self.base_headers) headers['etag'] = 'hash343hhash89h932439jsaa89' return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_foo_test_upload_INVALID_HASH( self, method, url, body, headers): # test_object_upload_invalid_hash body = '' headers = {} headers.update(self.base_headers) headers['etag'] = 'foobar' return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_INVALID_SIZE( self, method, url, body, headers): # test_download_object_invalid_file_size body = generate_random_data(100) return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_NOT_FOUND( self, method, url, body, headers): body = '' return (httplib.NOT_FOUND, body, self.base_headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_foo_test_stream_data( self, method, url, body, headers): # test_upload_object_via_stream_success hasher = hashlib.md5() hasher.update(b'235') hash_value = hasher.hexdigest() headers = {} headers.update(self.base_headers) headers['etag'] = hash_value body = 'test' return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_foo_test_stream_data_seek( self, method, url, body, headers): # test_upload_object_via_stream_stream_seek_at_end hasher = hashlib.md5() hasher.update(b'123456789') hash_value = hasher.hexdigest() headers = {} headers.update(self.base_headers) headers['etag'] = hash_value body = 'test' return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_NO_BUFFER( self, method, url, body, headers): # test_download_object_data_is_not_buffered_in_memory headers = {} headers.update(self.base_headers) headers['etag'] = '577ef1154f3240ad5b9b413aa7346a1e' body = generate_random_data(1000) return (httplib.OK, body, headers, httplib.responses[httplib.OK]) if __name__ == '__main__': sys.exit(unittest.main())