# -*- coding=utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import unicode_literals import os import sys import unittest try: import mock except ImportError: from unittest import mock from libcloud.utils.py3 import b from libcloud.utils.py3 import httplib from libcloud.utils.py3 import urlparse from libcloud.utils.py3 import parse_qs from libcloud.common.types import InvalidCredsError from libcloud.storage.base import Container, Object from libcloud.storage.types import ContainerDoesNotExistError from libcloud.storage.types import ContainerError from libcloud.storage.types import ContainerIsNotEmptyError from libcloud.storage.types import InvalidContainerNameError from libcloud.storage.types import ObjectDoesNotExistError from libcloud.storage.types import ObjectHashMismatchError from libcloud.storage.drivers.oss import OSSConnection from libcloud.storage.drivers.oss import OSSStorageDriver from libcloud.storage.drivers.oss import CHUNK_SIZE from libcloud.storage.drivers.dummy import DummyIterator from libcloud.test import MockHttp, generate_random_data, make_response # pylint: disable-msg=E0611 from libcloud.test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611 from libcloud.test.secrets import STORAGE_OSS_PARAMS class OSSConnectionTestCase(unittest.TestCase): def setUp(self): self.conn = OSSConnection('44CF9590006BF252F707', 'OtxrzxIsfpFjA7SwPzILwy8Bw21TLhquhboDYROV') def test_signature(self): expected = b('26NBxoKdsyly4EDv6inkoDft/yA=') headers = { 'Content-MD5': 'ODBGOERFMDMzQTczRUY3NUE3NzA5QzdFNUYzMDQxNEM=', 'Content-Type': 'text/html', 'Expires': 'Thu, 17 Nov 2005 18:49:58 GMT', 'X-OSS-Meta-Author': 'foo@bar.com', 'X-OSS-Magic': 'abracadabra', 'Host': 'oss-example.oss-cn-hangzhou.aliyuncs.com' } action = '/oss-example/nelson' actual = OSSConnection._get_auth_signature('PUT', headers, {}, headers['Expires'], self.conn.key, action, 'x-oss-') self.assertEqual(expected, actual) class ObjectTestCase(unittest.TestCase): def test_object_with_chinese_name(self): driver = OSSStorageDriver(*STORAGE_OSS_PARAMS) obj = Object(name='中文', size=0, hash=None, extra=None, meta_data=None, container=None, driver=driver) self.assertTrue(obj.__repr__() is not None) class OSSMockHttp(MockHttp, unittest.TestCase): fixtures = StorageFileFixtures('oss') base_headers = {} def _unauthorized(self, method, url, body, headers): return (httplib.UNAUTHORIZED, '', self.base_headers, httplib.responses[httplib.OK]) def _list_containers_empty(self, method, url, body, headers): body = self.fixtures.load('list_containers_empty.xml') return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _list_containers(self, method, url, body, headers): body = self.fixtures.load('list_containers.xml') return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _list_container_objects_empty(self, method, url, body, headers): body = self.fixtures.load('list_container_objects_empty.xml') return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _list_container_objects(self, method, url, body, headers): body = self.fixtures.load('list_container_objects.xml') return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _list_container_objects_chinese(self, method, url, body, headers): body = self.fixtures.load('list_container_objects_chinese.xml') return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _list_container_objects_prefix(self, method, url, body, headers): params = {'prefix': self.test.prefix} self.assertUrlContainsQueryParams(url, params) body = self.fixtures.load('list_container_objects_prefix.xml') return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _get_container(self, method, url, body, headers): return self._list_containers(method, url, body, headers) def _get_object(self, method, url, body, headers): return self._list_containers(method, url, body, headers) def _notexisted_get_object(self, method, url, body, headers): return (httplib.NOT_FOUND, body, self.base_headers, httplib.responses[httplib.NOT_FOUND]) def _test_get_object(self, method, url, body, headers): self.base_headers.update( {'accept-ranges': 'bytes', 'connection': 'keep-alive', 'content-length': '0', 'content-type': 'application/octet-stream', 'date': 'Sat, 16 Jan 2016 15:38:14 GMT', 'etag': '"D41D8CD98F00B204E9800998ECF8427E"', 'last-modified': 'Fri, 15 Jan 2016 14:43:15 GMT', 'server': 'AliyunOSS', 'x-oss-object-type': 'Normal', 'x-oss-request-id': '569A63E6257784731E3D877F', 'x-oss-meta-rabbits': 'monkeys'}) return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) def _invalid_name(self, method, url, body, headers): # test_create_container_bad_request return (httplib.BAD_REQUEST, body, headers, httplib.responses[httplib.OK]) def _already_exists(self, method, url, body, headers): # test_create_container_already_existed return (httplib.CONFLICT, body, headers, httplib.responses[httplib.OK]) def _create_container(self, method, url, body, headers): # test_create_container_success self.assertEqual('PUT', method) self.assertEqual('', body) return (httplib.OK, body, headers, httplib.responses[httplib.OK]) def _create_container_location(self, method, url, body, headers): # test_create_container_success self.assertEqual('PUT', method) location_constraint = ('' '%s' '' % self.test.ex_location) self.assertEqual(location_constraint, body) return (httplib.OK, body, headers, httplib.responses[httplib.OK]) def _delete_container_doesnt_exist(self, method, url, body, headers): # test_delete_container_doesnt_exist return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.OK]) def _delete_container_not_empty(self, method, url, body, headers): # test_delete_container_not_empty return (httplib.CONFLICT, body, headers, httplib.responses[httplib.OK]) def _delete_container(self, method, url, body, headers): return (httplib.NO_CONTENT, body, self.base_headers, httplib.responses[httplib.NO_CONTENT]) def _foo_bar_object_not_found(self, method, url, body, headers): # test_delete_object_not_found return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.OK]) def _foo_bar_object_delete(self, method, url, body, headers): # test_delete_object return (httplib.NO_CONTENT, body, headers, httplib.responses[httplib.OK]) def _list_multipart(self, method, url, body, headers): query_string = urlparse.urlsplit(url).query query = parse_qs(query_string) if 'key-marker' not in query: body = self.fixtures.load('ex_iterate_multipart_uploads_p1.xml') else: body = self.fixtures.load('ex_iterate_multipart_uploads_p2.xml') return (httplib.OK, body, headers, httplib.responses[httplib.OK]) def _foo_bar_object(self, method, url, body, headers): # test_download_object_success body = generate_random_data(1000) return (httplib.OK, body, headers, httplib.responses[httplib.OK]) def _foo_bar_object_invalid_size(self, method, url, body, headers): # test_upload_object_invalid_file_size body = '' return (httplib.OK, body, headers, httplib.responses[httplib.OK]) def _foo_test_stream_data_multipart(self, method, url, body, headers): headers = {} body = '' headers = {'etag': '"0cc175b9c0f1b6a831c399e269772661"'} return (httplib.OK, body, headers, httplib.responses[httplib.OK]) class OSSStorageDriverTestCase(unittest.TestCase): driver_type = OSSStorageDriver driver_args = STORAGE_OSS_PARAMS mock_response_klass = OSSMockHttp @classmethod def create_driver(self): return self.driver_type(*self.driver_args) def setUp(self): self.driver_type.connectionCls.conn_class = self.mock_response_klass self.mock_response_klass.type = None self.mock_response_klass.test = self self.driver = self.create_driver() def tearDown(self): self._remove_test_file() def _remove_test_file(self): file_path = os.path.abspath(__file__) + '.temp' try: os.unlink(file_path) except OSError: pass def test_invalid_credentials(self): self.mock_response_klass.type = 'unauthorized' self.assertRaises(InvalidCredsError, self.driver.list_containers) def test_list_containers_empty(self): self.mock_response_klass.type = 'list_containers_empty' containers = self.driver.list_containers() self.assertEqual(len(containers), 0) def test_list_containers_success(self): self.mock_response_klass.type = 'list_containers' containers = self.driver.list_containers() self.assertEqual(len(containers), 2) container = containers[0] self.assertEqual('xz02tphky6fjfiuc0', container.name) self.assertTrue('creation_date' in container.extra) self.assertEqual('2014-05-15T11:18:32.000Z', container.extra['creation_date']) self.assertTrue('location' in container.extra) self.assertEqual('oss-cn-hangzhou-a', container.extra['location']) self.assertEqual(self.driver, container.driver) def test_list_container_objects_empty(self): self.mock_response_klass.type = 'list_container_objects_empty' container = Container(name='test_container', extra={}, driver=self.driver) objects = self.driver.list_container_objects(container=container) self.assertEqual(len(objects), 0) def test_list_container_objects_success(self): self.mock_response_klass.type = 'list_container_objects' container = Container(name='test_container', extra={}, driver=self.driver) objects = self.driver.list_container_objects(container=container) self.assertEqual(len(objects), 2) obj = objects[0] self.assertEqual(obj.name, 'en/') self.assertEqual(obj.hash, 'D41D8CD98F00B204E9800998ECF8427E') self.assertEqual(obj.size, 0) self.assertEqual(obj.container.name, 'test_container') self.assertEqual( obj.extra['last_modified'], '2016-01-15T14:43:15.000Z') self.assertTrue('owner' in obj.meta_data) def test_list_container_objects_with_chinese(self): self.mock_response_klass.type = 'list_container_objects_chinese' container = Container(name='test_container', extra={}, driver=self.driver) objects = self.driver.list_container_objects(container=container) self.assertEqual(len(objects), 2) obj = [o for o in objects if o.name == 'WEB控制台.odp'][0] self.assertEqual(obj.hash, '281371EA1618CF0E645D6BB90A158276') self.assertEqual(obj.size, 1234567) self.assertEqual(obj.container.name, 'test_container') self.assertEqual( obj.extra['last_modified'], '2016-01-15T14:43:06.000Z') self.assertTrue('owner' in obj.meta_data) def test_list_container_objects_with_prefix(self): self.mock_response_klass.type = 'list_container_objects_prefix' container = Container(name='test_container', extra={}, driver=self.driver) self.prefix = 'test_prefix' objects = self.driver.list_container_objects(container=container, prefix=self.prefix) self.assertEqual(len(objects), 2) def test_get_container_doesnt_exist(self): self.mock_response_klass.type = 'get_container' self.assertRaises(ContainerDoesNotExistError, self.driver.get_container, container_name='not-existed') def test_get_container_success(self): self.mock_response_klass.type = 'get_container' container = self.driver.get_container( container_name='xz02tphky6fjfiuc0') self.assertTrue(container.name, 'xz02tphky6fjfiuc0') def test_get_object_container_doesnt_exist(self): self.mock_response_klass.type = 'get_object' self.assertRaises(ObjectDoesNotExistError, self.driver.get_object, container_name='xz02tphky6fjfiuc0', object_name='notexisted') def test_get_object_success(self): self.mock_response_klass.type = 'get_object' obj = self.driver.get_object(container_name='xz02tphky6fjfiuc0', object_name='test') self.assertEqual(obj.name, 'test') self.assertEqual(obj.container.name, 'xz02tphky6fjfiuc0') self.assertEqual(obj.size, 0) self.assertEqual(obj.hash, 'D41D8CD98F00B204E9800998ECF8427E') self.assertEqual(obj.extra['last_modified'], 'Fri, 15 Jan 2016 14:43:15 GMT') self.assertEqual(obj.extra['content_type'], 'application/octet-stream') self.assertEqual(obj.meta_data['rabbits'], 'monkeys') def test_create_container_bad_request(self): # invalid container name, returns a 400 bad request self.mock_response_klass.type = 'invalid_name' self.assertRaises(ContainerError, self.driver.create_container, container_name='invalid_name') def test_create_container_already_exists(self): # container with this name already exists self.mock_response_klass.type = 'already_exists' self.assertRaises(InvalidContainerNameError, self.driver.create_container, container_name='new-container') def test_create_container_success(self): # success self.mock_response_klass.type = 'create_container' name = 'new_container' container = self.driver.create_container(container_name=name) self.assertEqual(container.name, name) def test_create_container_with_ex_location(self): self.mock_response_klass.type = 'create_container_location' name = 'new_container' self.ex_location = 'oss-cn-beijing' container = self.driver.create_container(container_name=name, ex_location=self.ex_location) self.assertEqual(container.name, name) self.assertTrue(container.extra['location'], self.ex_location) def test_delete_container_doesnt_exist(self): container = Container(name='new_container', extra=None, driver=self.driver) self.mock_response_klass.type = 'delete_container_doesnt_exist' self.assertRaises(ContainerDoesNotExistError, self.driver.delete_container, container=container) def test_delete_container_not_empty(self): container = Container(name='new_container', extra=None, driver=self.driver) self.mock_response_klass.type = 'delete_container_not_empty' self.assertRaises(ContainerIsNotEmptyError, self.driver.delete_container, container=container) def test_delete_container_success(self): self.mock_response_klass.type = 'delete_container' container = Container(name='new_container', extra=None, driver=self.driver) self.assertTrue(self.driver.delete_container(container=container)) def test_download_object_success(self): container = Container(name='foo_bar_container', extra={}, driver=self.driver) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=self.driver_type) destination_path = os.path.abspath(__file__) + '.temp' result = self.driver.download_object(obj=obj, destination_path=destination_path, overwrite_existing=False, delete_on_failure=True) self.assertTrue(result) def test_download_object_invalid_file_size(self): self.mock_response_klass.type = 'invalid_size' container = Container(name='foo_bar_container', extra={}, driver=self.driver) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=self.driver_type) destination_path = os.path.abspath(__file__) + '.temp' result = self.driver.download_object(obj=obj, destination_path=destination_path, overwrite_existing=False, delete_on_failure=True) self.assertFalse(result) def test_download_object_not_found(self): self.mock_response_klass.type = 'not_found' container = Container(name='foo_bar_container', extra={}, driver=self.driver) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=self.driver_type) destination_path = os.path.abspath(__file__) + '.temp' self.assertRaises(ObjectDoesNotExistError, self.driver.download_object, obj=obj, destination_path=destination_path, overwrite_existing=False, delete_on_failure=True) def test_download_object_as_stream_success(self): container = Container(name='foo_bar_container', extra={}, driver=self.driver) obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, container=container, meta_data=None, driver=self.driver_type) stream = self.driver.download_object_as_stream(obj=obj, chunk_size=None) self.assertTrue(hasattr(stream, '__iter__')) def test_upload_object_invalid_hash1(self): def upload_file(self, object_name=None, content_type=None, request_path=None, request_method=None, headers=None, file_path=None, stream=None): return {'response': make_response(200, headers={'etag': '2345'}), 'bytes_transferred': 1000, 'data_hash': 'hash343hhash89h932439jsaa89'} self.mock_response_klass.type = 'INVALID_HASH1' old_func = self.driver_type._upload_object self.driver_type._upload_object = upload_file file_path = os.path.abspath(__file__) container = Container(name='foo_bar_container', extra={}, driver=self.driver) object_name = 'foo_test_upload' try: self.driver.upload_object(file_path=file_path, container=container, object_name=object_name, verify_hash=True) except ObjectHashMismatchError: pass else: self.fail( 'Invalid hash was returned but an exception was not thrown') finally: self.driver_type._upload_object = old_func def test_upload_object_success(self): def upload_file(self, object_name=None, content_type=None, request_path=None, request_method=None, headers=None, file_path=None, stream=None): return {'response': make_response(200, headers={'etag': '0cc175b9c0f1b6a831c399e269772661'}), 'bytes_transferred': 1000, 'data_hash': '0cc175b9c0f1b6a831c399e269772661'} self.mock_response_klass.type = None old_func = self.driver_type._upload_object self.driver_type._upload_object = upload_file file_path = os.path.abspath(__file__) container = Container(name='foo_bar_container', extra={}, driver=self.driver) object_name = 'foo_test_upload' extra = {'meta_data': {'some-value': 'foobar'}} obj = self.driver.upload_object(file_path=file_path, container=container, object_name=object_name, extra=extra, verify_hash=True) self.assertEqual(obj.name, 'foo_test_upload') self.assertEqual(obj.size, 1000) self.assertTrue('some-value' in obj.meta_data) self.driver_type._upload_object = old_func def test_upload_object_with_acl(self): def upload_file(self, object_name=None, content_type=None, request_path=None, request_method=None, headers=None, file_path=None, stream=None): return {'response': make_response(200, headers={'etag': '0cc175b9c0f1b6a831c399e269772661'}), 'bytes_transferred': 1000, 'data_hash': '0cc175b9c0f1b6a831c399e269772661'} self.mock_response_klass.type = None old_func = self.driver_type._upload_object self.driver_type._upload_object = upload_file file_path = os.path.abspath(__file__) container = Container(name='foo_bar_container', extra={}, driver=self.driver) object_name = 'foo_test_upload' extra = {'acl': 'public-read'} obj = self.driver.upload_object(file_path=file_path, container=container, object_name=object_name, extra=extra, verify_hash=True) self.assertEqual(obj.name, 'foo_test_upload') self.assertEqual(obj.size, 1000) self.assertEqual(obj.extra['acl'], 'public-read') self.driver_type._upload_object = old_func def test_upload_object_with_invalid_acl(self): file_path = os.path.abspath(__file__) container = Container(name='foo_bar_container', extra={}, driver=self.driver) object_name = 'foo_test_upload' extra = {'acl': 'invalid-acl'} self.assertRaises(AttributeError, self.driver.upload_object, file_path=file_path, container=container, object_name=object_name, extra=extra, verify_hash=True) def test_upload_empty_object_via_stream(self): if self.driver.supports_multipart_upload: self.mock_response_klass.type = 'multipart' else: self.mock_response_klass.type = None container = Container(name='foo_bar_container', extra={}, driver=self.driver) object_name = 'foo_test_stream_data' iterator = DummyIterator(data=['']) extra = {'content_type': 'text/plain'} obj = self.driver.upload_object_via_stream(container=container, object_name=object_name, iterator=iterator, extra=extra) self.assertEqual(obj.name, object_name) self.assertEqual(obj.size, 0) def test_upload_small_object_via_stream(self): if self.driver.supports_multipart_upload: self.mock_response_klass.type = 'multipart' else: self.mock_response_klass.type = None container = Container(name='foo_bar_container', extra={}, driver=self.driver) object_name = 'foo_test_stream_data' iterator = DummyIterator(data=['2', '3', '5']) extra = {'content_type': 'text/plain'} obj = self.driver.upload_object_via_stream(container=container, object_name=object_name, iterator=iterator, extra=extra) self.assertEqual(obj.name, object_name) self.assertEqual(obj.size, 3) def test_upload_big_object_via_stream(self): if self.driver.supports_multipart_upload: self.mock_response_klass.type = 'multipart' else: self.mock_response_klass.type = None container = Container(name='foo_bar_container', extra={}, driver=self.driver) object_name = 'foo_test_stream_data' iterator = DummyIterator( data=['2' * CHUNK_SIZE, '3' * CHUNK_SIZE, '5']) extra = {'content_type': 'text/plain'} obj = self.driver.upload_object_via_stream(container=container, object_name=object_name, iterator=iterator, extra=extra) self.assertEqual(obj.name, object_name) self.assertEqual(obj.size, CHUNK_SIZE * 2 + 1) def test_upload_object_via_stream_abort(self): if not self.driver.supports_multipart_upload: return self.mock_response_klass.type = 'MULTIPART' def _faulty_iterator(): for i in range(0, 5): yield str(i) raise RuntimeError('Error in fetching data') container = Container(name='foo_bar_container', extra={}, driver=self.driver) object_name = 'foo_test_stream_data' iterator = _faulty_iterator() extra = {'content_type': 'text/plain'} try: self.driver.upload_object_via_stream(container=container, object_name=object_name, iterator=iterator, extra=extra) except Exception: pass return def test_ex_iterate_multipart_uploads(self): if not self.driver.supports_multipart_upload: return self.mock_response_klass.type = 'list_multipart' container = Container(name='foo_bar_container', extra={}, driver=self.driver) for upload in self.driver.ex_iterate_multipart_uploads(container, max_uploads=2): self.assertTrue(upload.key is not None) self.assertTrue(upload.id is not None) self.assertTrue(upload.initiated is not None) def test_ex_abort_all_multipart_uploads(self): if not self.driver.supports_multipart_upload: return self.mock_response_klass.type = 'list_multipart' container = Container(name='foo_bar_container', extra={}, driver=self.driver) with mock.patch('libcloud.storage.drivers.oss.OSSStorageDriver' '._abort_multipart', autospec=True) as mock_abort: self.driver.ex_abort_all_multipart_uploads(container) self.assertEqual(3, mock_abort.call_count) def test_delete_object_not_found(self): self.mock_response_klass.type = 'not_found' container = Container(name='foo_bar_container', extra={}, driver=self.driver) obj = Object(name='foo_bar_object', size=1234, hash=None, extra=None, meta_data=None, container=container, driver=self.driver) self.assertRaises(ObjectDoesNotExistError, self.driver.delete_object, obj=obj) def test_delete_object_success(self): self.mock_response_klass.type = 'delete' container = Container(name='foo_bar_container', extra={}, driver=self.driver) obj = Object(name='foo_bar_object', size=1234, hash=None, extra=None, meta_data=None, container=container, driver=self.driver) result = self.driver.delete_object(obj=obj) self.assertTrue(result) if __name__ == '__main__': sys.exit(unittest.main())