# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import with_statement import os import sys import unittest import datetime import mock import pytest from libcloud.utils.iso8601 import UTC try: import simplejson as json except ImportError: import json from mock import Mock, patch import requests_mock from libcloud.utils.py3 import httplib from libcloud.utils.py3 import method_type from libcloud.utils.py3 import u from libcloud.common.base import LibcloudConnection from libcloud.common.types import InvalidCredsError, MalformedResponseError, \ LibcloudError from libcloud.compute.types import Provider, KeyPairDoesNotExistError, StorageVolumeState, \ VolumeSnapshotState, NodeImageMemberState from libcloud.compute.providers import get_driver from libcloud.compute.drivers.openstack import ( OpenStack_1_0_NodeDriver, OpenStack_1_1_NodeDriver, OpenStackSecurityGroup, OpenStackSecurityGroupRule, OpenStack_1_1_FloatingIpPool, OpenStack_1_1_FloatingIpAddress, OpenStackKeyPair, OpenStack_1_0_Connection, OpenStack_2_FloatingIpPool, OpenStackNodeDriver, OpenStack_2_NodeDriver, OpenStack_2_PortInterfaceState, OpenStackNetwork, OpenStackException) from libcloud.compute.base import Node, NodeImage, NodeSize from libcloud.pricing import set_pricing, clear_pricing_data from libcloud.test import MockHttp, XML_HEADERS from libcloud.test.file_fixtures import ComputeFileFixtures, OpenStackFixtures from libcloud.test.compute import TestCaseMixin from libcloud.test.secrets import OPENSTACK_PARAMS BASE_DIR = os.path.abspath(os.path.split(__file__)[0]) def test_driver_instantiation_invalid_auth(): with pytest.raises(LibcloudError): d = OpenStackNodeDriver( 'user', 'correct_password', ex_force_auth_version='5.0', ex_force_auth_url='http://x.y.z.y:5000', ex_tenant_name='admin') d.list_nodes() class OpenStackAuthTests(unittest.TestCase): def setUp(self): OpenStack_1_0_NodeDriver.connectionCls = OpenStack_1_0_Connection OpenStack_1_0_NodeDriver.connectionCls.conn_class = LibcloudConnection def test_auth_host_passed(self): forced_auth = 'http://x.y.z.y:5000' d = OpenStack_1_0_NodeDriver( 'user', 'correct_password', ex_force_auth_version='2.0_password', ex_force_auth_url='http://x.y.z.y:5000', ex_tenant_name='admin') self.assertEqual(d._ex_force_auth_url, forced_auth) with requests_mock.Mocker() as mock: body2 = ComputeFileFixtures('openstack').load('_v2_0__auth.json') mock.register_uri('POST', 'http://x.y.z.y:5000/v2.0/tokens', text=body2, headers={'content-type': 'application/json; charset=UTF-8'}) d.connection._populate_hosts_and_request_paths() self.assertEqual(d.connection.host, 'test_endpoint.com') class OpenStack_1_0_Tests(TestCaseMixin, unittest.TestCase): should_list_locations = False should_list_volumes = False driver_klass = OpenStack_1_0_NodeDriver driver_args = OPENSTACK_PARAMS driver_kwargs = {} # driver_kwargs = {'ex_force_auth_version': '1.0'} @classmethod def create_driver(self): if self is not OpenStack_1_0_FactoryMethodTests: self.driver_type = self.driver_klass return self.driver_type(*self.driver_args, **self.driver_kwargs) def setUp(self): # monkeypatch get_endpoint because the base openstack driver doesn't actually # work with old devstack but this class/tests are still used by the rackspace # driver def get_endpoint(*args, **kwargs): return "https://servers.api.rackspacecloud.com/v1.0/slug" self.driver_klass.connectionCls.get_endpoint = get_endpoint self.driver_klass.connectionCls.conn_class = OpenStackMockHttp self.driver_klass.connectionCls.auth_url = "https://auth.api.example.com" OpenStackMockHttp.type = None self.driver = self.create_driver() # normally authentication happens lazily, but we force it here self.driver.connection._populate_hosts_and_request_paths() clear_pricing_data() @patch('libcloud.common.openstack.OpenStackServiceCatalog') def test_populate_hosts_and_requests_path(self, _): tomorrow = datetime.datetime.today() + datetime.timedelta(1) cls = self.driver_klass.connectionCls count = 5 # Test authentication and token re-use con = cls('username', 'key') osa = con.get_auth_class() mocked_auth_method = Mock() osa.authenticate = mocked_auth_method # Valid token returned on first call, should be reused. for i in range(0, count): con._populate_hosts_and_request_paths() if i == 0: osa.auth_token = '1234' osa.auth_token_expires = tomorrow self.assertEqual(mocked_auth_method.call_count, 1) osa.auth_token = None osa.auth_token_expires = None # ex_force_auth_token provided, authenticate should never be called con = cls('username', 'key', ex_force_base_url='http://ponies', ex_force_auth_token='1234') osa = con.get_auth_class() mocked_auth_method = Mock() osa.authenticate = mocked_auth_method for i in range(0, count): con._populate_hosts_and_request_paths() self.assertEqual(mocked_auth_method.call_count, 0) def test_auth_token_is_set(self): self.driver.connection._populate_hosts_and_request_paths() self.assertEqual( self.driver.connection.auth_token, "aaaaaaaaaaaa-bbb-cccccccccccccc") def test_auth_token_expires_is_set(self): self.driver.connection._populate_hosts_and_request_paths() expires = self.driver.connection.auth_token_expires self.assertEqual(expires.isoformat(), "2999-11-23T21:00:14-06:00") def test_auth(self): if self.driver.connection._auth_version == '2.0': return OpenStackMockHttp.type = 'UNAUTHORIZED' try: self.driver = self.create_driver() self.driver.list_nodes() except InvalidCredsError as e: self.assertEqual(True, isinstance(e, InvalidCredsError)) else: self.fail('test should have thrown') def test_auth_missing_key(self): if self.driver.connection._auth_version == '2.0': return OpenStackMockHttp.type = 'UNAUTHORIZED_MISSING_KEY' try: self.driver = self.create_driver() self.driver.list_nodes() except MalformedResponseError as e: self.assertEqual(True, isinstance(e, MalformedResponseError)) else: self.fail('test should have thrown') def test_auth_server_error(self): if self.driver.connection._auth_version == '2.0': return OpenStackMockHttp.type = 'INTERNAL_SERVER_ERROR' try: self.driver = self.create_driver() self.driver.list_nodes() except MalformedResponseError as e: self.assertEqual(True, isinstance(e, MalformedResponseError)) else: self.fail('test should have thrown') def test_error_parsing_when_body_is_missing_message(self): OpenStackMockHttp.type = 'NO_MESSAGE_IN_ERROR_BODY' try: self.driver.list_images() except Exception as e: self.assertEqual(True, isinstance(e, Exception)) else: self.fail('test should have thrown') def test_list_locations(self): locations = self.driver.list_locations() self.assertEqual(len(locations), 1) def test_list_nodes(self): OpenStackMockHttp.type = 'EMPTY' ret = self.driver.list_nodes() self.assertEqual(len(ret), 0) OpenStackMockHttp.type = None ret = self.driver.list_nodes() self.assertEqual(len(ret), 1) node = ret[0] self.assertEqual('67.23.21.33', node.public_ips[0]) self.assertTrue('10.176.168.218' in node.private_ips) self.assertEqual(node.extra.get('flavorId'), '1') self.assertEqual(node.extra.get('imageId'), '11') self.assertEqual(type(node.extra.get('metadata')), type(dict())) OpenStackMockHttp.type = 'METADATA' ret = self.driver.list_nodes() self.assertEqual(len(ret), 1) node = ret[0] self.assertEqual(type(node.extra.get('metadata')), type(dict())) self.assertEqual(node.extra.get('metadata').get('somekey'), 'somevalue') OpenStackMockHttp.type = None def test_list_images(self): ret = self.driver.list_images() expected = {10: {'serverId': None, 'status': 'ACTIVE', 'created': '2009-07-20T09:14:37-05:00', 'updated': '2009-07-20T09:14:37-05:00', 'progress': None, 'minDisk': None, 'minRam': None}, 11: {'serverId': '91221', 'status': 'ACTIVE', 'created': '2009-11-29T20:22:09-06:00', 'updated': '2009-11-29T20:24:08-06:00', 'progress': '100', 'minDisk': '5', 'minRam': '256'}} for ret_idx, extra in list(expected.items()): for key, value in list(extra.items()): self.assertEqual(ret[ret_idx].extra[key], value) def test_create_node(self): image = NodeImage(id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize(1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size) self.assertEqual(node.name, 'racktest') self.assertEqual(node.extra.get('password'), 'racktestvJq7d3') def test_create_node_without_adminPass(self): OpenStackMockHttp.type = 'NO_ADMIN_PASS' image = NodeImage(id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize(1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size) self.assertEqual(node.name, 'racktest') self.assertIsNone(node.extra.get('password')) def test_create_node_ex_shared_ip_group(self): OpenStackMockHttp.type = 'EX_SHARED_IP_GROUP' image = NodeImage(id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize(1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size, ex_shared_ip_group_id='12345') self.assertEqual(node.name, 'racktest') self.assertEqual(node.extra.get('password'), 'racktestvJq7d3') def test_create_node_with_metadata(self): OpenStackMockHttp.type = 'METADATA' image = NodeImage(id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize(1, '256 slice', None, None, None, None, driver=self.driver) metadata = {'a': 'b', 'c': 'd'} files = {'/file1': 'content1', '/file2': 'content2'} node = self.driver.create_node(name='racktest', image=image, size=size, ex_metadata=metadata, ex_files=files) self.assertEqual(node.name, 'racktest') self.assertEqual(node.extra.get('password'), 'racktestvJq7d3') self.assertEqual(node.extra.get('metadata'), metadata) def test_reboot_node(self): node = Node(id=72258, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) ret = node.reboot() self.assertTrue(ret is True) def test_destroy_node(self): node = Node(id=72258, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) ret = node.destroy() self.assertTrue(ret is True) def test_ex_limits(self): limits = self.driver.ex_limits() self.assertTrue("rate" in limits) self.assertTrue("absolute" in limits) def test_create_image(self): node = Node(id=444222, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) image = self.driver.create_image(node, "imgtest") self.assertEqual(image.name, "imgtest") self.assertEqual(image.id, "12345") def test_delete_image(self): image = NodeImage(id=333111, name='Ubuntu 8.10 (intrepid)', driver=self.driver) ret = self.driver.delete_image(image) self.assertTrue(ret) def test_ex_list_ip_addresses(self): ret = self.driver.ex_list_ip_addresses(node_id=72258) self.assertEqual(2, len(ret.public_addresses)) self.assertTrue('67.23.10.131' in ret.public_addresses) self.assertTrue('67.23.10.132' in ret.public_addresses) self.assertEqual(1, len(ret.private_addresses)) self.assertTrue('10.176.42.16' in ret.private_addresses) def test_ex_list_ip_groups(self): ret = self.driver.ex_list_ip_groups() self.assertEqual(2, len(ret)) self.assertEqual('1234', ret[0].id) self.assertEqual('Shared IP Group 1', ret[0].name) self.assertEqual('5678', ret[1].id) self.assertEqual('Shared IP Group 2', ret[1].name) self.assertTrue(ret[0].servers is None) def test_ex_list_ip_groups_detail(self): ret = self.driver.ex_list_ip_groups(details=True) self.assertEqual(2, len(ret)) self.assertEqual('1234', ret[0].id) self.assertEqual('Shared IP Group 1', ret[0].name) self.assertEqual(2, len(ret[0].servers)) self.assertEqual('422', ret[0].servers[0]) self.assertEqual('3445', ret[0].servers[1]) self.assertEqual('5678', ret[1].id) self.assertEqual('Shared IP Group 2', ret[1].name) self.assertEqual(3, len(ret[1].servers)) self.assertEqual('23203', ret[1].servers[0]) self.assertEqual('2456', ret[1].servers[1]) self.assertEqual('9891', ret[1].servers[2]) def test_ex_create_ip_group(self): ret = self.driver.ex_create_ip_group('Shared IP Group 1', '5467') self.assertEqual('1234', ret.id) self.assertEqual('Shared IP Group 1', ret.name) self.assertEqual(1, len(ret.servers)) self.assertEqual('422', ret.servers[0]) def test_ex_delete_ip_group(self): ret = self.driver.ex_delete_ip_group('5467') self.assertEqual(True, ret) def test_ex_share_ip(self): ret = self.driver.ex_share_ip('1234', '3445', '67.23.21.133') self.assertEqual(True, ret) def test_ex_unshare_ip(self): ret = self.driver.ex_unshare_ip('3445', '67.23.21.133') self.assertEqual(True, ret) def test_ex_resize(self): node = Node(id=444222, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) size = NodeSize(1, '256 slice', None, None, None, None, driver=self.driver) self.assertTrue(self.driver.ex_resize(node=node, size=size)) def test_ex_confirm_resize(self): node = Node(id=444222, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) self.assertTrue(self.driver.ex_confirm_resize(node=node)) def test_ex_revert_resize(self): node = Node(id=444222, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) self.assertTrue(self.driver.ex_revert_resize(node=node)) def test_list_sizes(self): sizes = self.driver.list_sizes() self.assertEqual(len(sizes), 7, 'Wrong sizes count') for size in sizes: self.assertTrue(isinstance(size.price, float), 'Wrong size price type') if self.driver.api_name == 'openstack': self.assertEqual(size.price, 0, 'Size price should be zero by default') def test_list_sizes_with_specified_pricing(self): if self.driver.api_name != 'openstack': return pricing = dict((str(i), i) for i in range(1, 8)) set_pricing(driver_type='compute', driver_name='openstack', pricing=pricing) sizes = self.driver.list_sizes() self.assertEqual(len(sizes), 7, 'Wrong sizes count') for size in sizes: self.assertTrue(isinstance(size.price, float), 'Wrong size price type') self.assertEqual(float(size.price), float(pricing[size.id])) class OpenStack_1_0_FactoryMethodTests(OpenStack_1_0_Tests): should_list_locations = False should_list_volumes = False driver_klass = OpenStack_1_0_NodeDriver driver_type = get_driver(Provider.OPENSTACK) driver_args = OPENSTACK_PARAMS + ('1.0',) def test_factory_method_invalid_version(self): try: self.driver_type(*(OPENSTACK_PARAMS + ('15.5',))) except NotImplementedError: pass else: self.fail('Exception was not thrown') class OpenStackMockHttp(MockHttp, unittest.TestCase): fixtures = ComputeFileFixtures('openstack') auth_fixtures = OpenStackFixtures() json_content_headers = {'content-type': 'application/json; charset=UTF-8'} # fake auth token response def _v1_0(self, method, url, body, headers): headers = { 'x-server-management-url': 'https://servers.api.rackspacecloud.com/v1.0/slug', 'x-auth-token': 'FE011C19-CF86-4F87-BE5D-9229145D7A06', 'x-cdn-management-url': 'https://cdn.clouddrive.com/v1/MossoCloudFS_FE011C19-CF86-4F87-BE5D-9229145D7A06', 'x-storage-token': 'FE011C19-CF86-4F87-BE5D-9229145D7A06', 'x-storage-url': 'https://storage4.clouddrive.com/v1/MossoCloudFS_FE011C19-CF86-4F87-BE5D-9229145D7A06'} return (httplib.NO_CONTENT, "", headers, httplib.responses[httplib.NO_CONTENT]) def _v1_0_UNAUTHORIZED(self, method, url, body, headers): return (httplib.UNAUTHORIZED, "", {}, httplib.responses[httplib.UNAUTHORIZED]) def _v1_0_INTERNAL_SERVER_ERROR(self, method, url, body, headers): return (httplib.INTERNAL_SERVER_ERROR, "

500: Internal Server Error

", {}, httplib.responses[httplib.INTERNAL_SERVER_ERROR]) def _v1_0_slug_images_detail_NO_MESSAGE_IN_ERROR_BODY(self, method, url, body, headers): body = self.fixtures.load('300_multiple_choices.json') return (httplib.MULTIPLE_CHOICES, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_0_UNAUTHORIZED_MISSING_KEY(self, method, url, body, headers): headers = { 'x-server-management-url': 'https://servers.api.rackspacecloud.com/v1.0/slug', 'x-auth-tokenx': 'FE011C19-CF86-4F87-BE5D-9229145D7A06', 'x-cdn-management-url': 'https://cdn.clouddrive.com/v1/MossoCloudFS_FE011C19-CF86-4F87-BE5D-9229145D7A06'} return (httplib.NO_CONTENT, "", headers, httplib.responses[httplib.NO_CONTENT]) def _v2_0_tokens(self, method, url, body, headers): body = self.auth_fixtures.load('_v2_0__auth.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_0_slug_servers_detail_EMPTY(self, method, url, body, headers): body = self.fixtures.load('v1_slug_servers_detail_empty.xml') return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK]) def _v1_0_slug_servers_detail(self, method, url, body, headers): body = self.fixtures.load('v1_slug_servers_detail.xml') return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK]) def _v1_0_slug_servers_detail_METADATA(self, method, url, body, headers): body = self.fixtures.load('v1_slug_servers_detail_metadata.xml') return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK]) def _v1_0_slug_images_333111(self, method, url, body, headers): if method != "DELETE": raise NotImplementedError() # this is currently used for deletion of an image # as such it should not accept GET/POST return(httplib.NO_CONTENT, "", {}, httplib.responses[httplib.NO_CONTENT]) def _v1_0_slug_images(self, method, url, body, headers): if method != "POST": raise NotImplementedError() # this is currently used for creation of new image with # POST request, don't handle GET to avoid possible confusion body = self.fixtures.load('v1_slug_images_post.xml') return (httplib.ACCEPTED, body, XML_HEADERS, httplib.responses[httplib.ACCEPTED]) def _v1_0_slug_images_detail(self, method, url, body, headers): body = self.fixtures.load('v1_slug_images_detail.xml') return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK]) def _v1_0_slug_servers(self, method, url, body, headers): body = self.fixtures.load('v1_slug_servers.xml') return (httplib.ACCEPTED, body, XML_HEADERS, httplib.responses[httplib.ACCEPTED]) def _v1_0_slug_servers_NO_ADMIN_PASS(self, method, url, body, headers): body = self.fixtures.load('v1_slug_servers_no_admin_pass.xml') return (httplib.ACCEPTED, body, XML_HEADERS, httplib.responses[httplib.ACCEPTED]) def _v1_0_slug_servers_EX_SHARED_IP_GROUP(self, method, url, body, headers): # test_create_node_ex_shared_ip_group # Verify that the body contains sharedIpGroupId XML element body = u(body) self.assertTrue(body.find('sharedIpGroupId="12345"') != -1) body = self.fixtures.load('v1_slug_servers.xml') return (httplib.ACCEPTED, body, XML_HEADERS, httplib.responses[httplib.ACCEPTED]) def _v1_0_slug_servers_METADATA(self, method, url, body, headers): body = self.fixtures.load('v1_slug_servers_metadata.xml') return (httplib.ACCEPTED, body, XML_HEADERS, httplib.responses[httplib.ACCEPTED]) def _v1_0_slug_servers_72258_action(self, method, url, body, headers): if method != "POST" or body[:8] != "500: Internal Server Error", {'content-type': 'text/html'}, httplib.responses[httplib.INTERNAL_SERVER_ERROR]) class OpenStack_1_1_Tests(unittest.TestCase, TestCaseMixin): should_list_locations = False should_list_volumes = True driver_klass = OpenStack_1_1_NodeDriver driver_type = OpenStack_1_1_NodeDriver driver_args = OPENSTACK_PARAMS driver_kwargs = {'ex_force_auth_version': '2.0'} @classmethod def create_driver(self): if self is not OpenStack_1_1_FactoryMethodTests: self.driver_type = self.driver_klass return self.driver_type(*self.driver_args, **self.driver_kwargs) def setUp(self): self.driver_klass.connectionCls.conn_class = OpenStack_2_0_MockHttp self.driver_klass.connectionCls.auth_url = "https://auth.api.example.com" OpenStackMockHttp.type = None OpenStack_1_1_MockHttp.type = None OpenStack_2_0_MockHttp.type = None self.driver = self.create_driver() # normally authentication happens lazily, but we force it here self.driver.connection._populate_hosts_and_request_paths() clear_pricing_data() self.node = self.driver.list_nodes()[1] def _force_reauthentication(self): """ Trash current auth token so driver will be forced to re-authentication on next request. """ self.driver.connection._ex_force_base_url = 'http://ex_force_base_url.com:666/forced_url' self.driver.connection.auth_token = None self.driver.connection.auth_token_expires = None self.driver.connection._osa.auth_token = None self.driver.connection._osa.auth_token_expires = None def test_auth_token_is_set(self): self._force_reauthentication() self.driver.connection._populate_hosts_and_request_paths() self.assertEqual( self.driver.connection.auth_token, "aaaaaaaaaaaa-bbb-cccccccccccccc") def test_auth_token_expires_is_set(self): self._force_reauthentication() self.driver.connection._populate_hosts_and_request_paths() expires = self.driver.connection.auth_token_expires self.assertEqual(expires.isoformat(), "2999-11-23T21:00:14-06:00") def test_ex_force_base_url(self): # change base url and trash the current auth token so we can # re-authenticate self.driver.connection._ex_force_base_url = 'http://ex_force_base_url.com:666/forced_url' self.driver.connection.auth_token = None self.driver.connection._populate_hosts_and_request_paths() # assert that we use the base url and not the auth url self.assertEqual(self.driver.connection.host, 'ex_force_base_url.com') self.assertEqual(self.driver.connection.port, 666) self.assertEqual(self.driver.connection.request_path, '/forced_url') def test_get_endpoint_populates_host_port_and_request_path(self): # simulate a subclass overriding this method self.driver.connection.get_endpoint = lambda: 'http://endpoint_auth_url.com:1555/service_url' self.driver.connection.auth_token = None self.driver.connection._ex_force_base_url = None self.driver.connection._populate_hosts_and_request_paths() # assert that we use the result of get endpoint self.assertEqual(self.driver.connection.host, 'endpoint_auth_url.com') self.assertEqual(self.driver.connection.port, 1555) self.assertEqual(self.driver.connection.request_path, '/service_url') def test_set_auth_token_populates_host_port_and_request_path(self): # change base url and trash the current auth token so we can # re-authenticate self.driver.connection._ex_force_base_url = 'http://some_other_ex_force_base_url.com:1222/some-service' self.driver.connection.auth_token = "preset-auth-token" self.driver.connection._populate_hosts_and_request_paths() # assert that we use the base url and not the auth url self.assertEqual( self.driver.connection.host, 'some_other_ex_force_base_url.com') self.assertEqual(self.driver.connection.port, 1222) self.assertEqual(self.driver.connection.request_path, '/some-service') def test_auth_token_without_base_url_raises_exception(self): kwargs = { 'ex_force_auth_version': '2.0', 'ex_force_auth_token': 'preset-auth-token' } try: self.driver_type(*self.driver_args, **kwargs) self.fail('Expected failure setting auth token without base url') except LibcloudError: pass else: self.fail('Expected failure setting auth token without base url') def test_ex_force_auth_token_passed_to_connection(self): base_url = 'https://servers.api.rackspacecloud.com/v1.1/slug' kwargs = { 'ex_force_auth_version': '2.0', 'ex_force_auth_token': 'preset-auth-token', 'ex_force_base_url': base_url } driver = self.driver_type(*self.driver_args, **kwargs) driver.list_nodes() self.assertEqual(kwargs['ex_force_auth_token'], driver.connection.auth_token) self.assertEqual('servers.api.rackspacecloud.com', driver.connection.host) self.assertEqual('/v1.1/slug', driver.connection.request_path) self.assertEqual(443, driver.connection.port) def test_list_nodes(self): nodes = self.driver.list_nodes() self.assertEqual(len(nodes), 2) node = nodes[0] self.assertEqual('12065', node.id) # test public IPv4 self.assertTrue('12.16.18.28' in node.public_ips) self.assertTrue('50.57.94.35' in node.public_ips) # fixed public ip self.assertTrue('1.1.1.1' in node.public_ips) # floating public ip self.assertTrue('2.2.2.2' in node.public_ips) # test public IPv6 self.assertTrue( '2001:4801:7808:52:16:3eff:fe47:788a' in node.public_ips) # test private IPv4 self.assertTrue('10.182.64.34' in node.private_ips) # fixed private ip self.assertTrue('10.3.3.3' in node.private_ips) # floating private ip self.assertTrue('192.168.3.3' in node.private_ips) self.assertTrue('172.16.1.1' in node.private_ips) # test private IPv6 self.assertTrue( 'fec0:4801:7808:52:16:3eff:fe60:187d' in node.private_ips) # test creation date self.assertEqual(node.created_at, datetime.datetime(2011, 10, 11, 0, 51, 39, tzinfo=UTC)) self.assertEqual(node.extra.get('flavorId'), '2') self.assertEqual(node.extra.get('imageId'), '7') self.assertEqual(node.extra.get('metadata'), {}) self.assertEqual(node.extra['updated'], '2011-10-11T00:50:04Z') self.assertEqual(node.extra['created'], '2011-10-11T00:51:39Z') self.assertEqual(node.extra.get('userId'), 'rs-reach') self.assertEqual(node.extra.get('hostId'), '912566d83a13fbb357ea' '3f13c629363d9f7e1ba3f' '925b49f3d2ab725') self.assertEqual(node.extra.get('disk_config'), 'AUTO') self.assertEqual(node.extra.get('task_state'), 'spawning') self.assertEqual(node.extra.get('vm_state'), 'active') self.assertEqual(node.extra.get('power_state'), 1) self.assertEqual(node.extra.get('progress'), 25) self.assertEqual(node.extra.get('fault')['id'], 1234) self.assertTrue(node.extra.get('service_name') is not None) self.assertTrue(node.extra.get('uri') is not None) def test_list_nodes_no_image_id_attribute(self): # Regression test for LIBCLOD-455 self.driver_klass.connectionCls.conn_class.type = 'ERROR_STATE_NO_IMAGE_ID' nodes = self.driver.list_nodes() self.assertIsNone(nodes[0].extra['imageId']) def test_list_volumes(self): volumes = self.driver.list_volumes() self.assertEqual(len(volumes), 2) volume = volumes[0] self.assertEqual('cd76a3a1-c4ce-40f6-9b9f-07a61508938d', volume.id) self.assertEqual('test_volume_2', volume.name) self.assertEqual(StorageVolumeState.AVAILABLE, volume.state) self.assertEqual(2, volume.size) self.assertEqual(volume.extra, { 'description': '', 'attachments': [{ 'id': 'cd76a3a1-c4ce-40f6-9b9f-07a61508938d', "device": "/dev/vdb", "serverId": "12065", "volumeId": "cd76a3a1-c4ce-40f6-9b9f-07a61508938d", }], 'snapshot_id': None, 'state': 'available', 'location': 'nova', 'volume_type': 'None', 'metadata': {}, 'created_at': '2013-06-24T11:20:13.000000', }) # also test that unknown state resolves to StorageVolumeState.UNKNOWN volume = volumes[1] self.assertEqual('cfcec3bc-b736-4db5-9535-4c24112691b5', volume.id) self.assertEqual('test_volume', volume.name) self.assertEqual(50, volume.size) self.assertEqual(StorageVolumeState.UNKNOWN, volume.state) self.assertEqual(volume.extra, { 'description': 'some description', 'attachments': [], 'snapshot_id': '01f48111-7866-4cd2-986a-e92683c4a363', 'state': 'some-unknown-state', 'location': 'nova', 'volume_type': 'None', 'metadata': {}, 'created_at': '2013-06-21T12:39:02.000000', }) def test_list_sizes(self): sizes = self.driver.list_sizes() self.assertEqual(len(sizes), 8, 'Wrong sizes count') for size in sizes: self.assertTrue(size.price is None or isinstance(size.price, float), 'Wrong size price type') self.assertTrue(isinstance(size.ram, int)) self.assertTrue(isinstance(size.vcpus, int)) self.assertTrue(isinstance(size.disk, int)) self.assertTrue(isinstance(size.swap, int)) self.assertTrue(isinstance(size.ephemeral_disk, int) or size.ephemeral_disk is None) self.assertTrue(isinstance(size.extra, dict)) if size.id == '1': self.assertEqual(size.ephemeral_disk, 40) self.assertEqual(size.extra, { "policy_class": "standard_flavor", "class": "standard1", "disk_io_index": "2", "number_of_data_disks": "0" }) self.assertEqual(sizes[0].vcpus, 8) def test_list_sizes_with_specified_pricing(self): pricing = dict((str(i), i * 5.0) for i in range(1, 9)) set_pricing(driver_type='compute', driver_name=self.driver.api_name, pricing=pricing) sizes = self.driver.list_sizes() self.assertEqual(len(sizes), 8, 'Wrong sizes count') for size in sizes: self.assertTrue(isinstance(size.price, float), 'Wrong size price type') self.assertEqual(size.price, pricing[size.id], 'Size price should match') def test_list_images(self): images = self.driver.list_images() self.assertEqual(len(images), 13, 'Wrong images count') image = images[0] self.assertEqual(image.id, '13') self.assertEqual(image.name, 'Windows 2008 SP2 x86 (B24)') self.assertEqual(image.extra['updated'], '2011-08-06T18:14:02Z') self.assertEqual(image.extra['created'], '2011-08-06T18:13:11Z') self.assertEqual(image.extra['status'], 'ACTIVE') self.assertEqual(image.extra['metadata']['os_type'], 'windows') self.assertEqual( image.extra['serverId'], '52415800-8b69-11e0-9b19-734f335aa7b3') self.assertEqual(image.extra['minDisk'], 0) self.assertEqual(image.extra['minRam'], 0) def test_create_node(self): image = NodeImage( id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize( 1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size) self.assertEqual(node.id, '26f7fbee-8ce1-4c28-887a-bfe8e4bb10fe') self.assertEqual(node.name, 'racktest') self.assertEqual(node.extra['password'], 'racktestvJq7d3') self.assertEqual(node.extra['metadata']['My Server Name'], 'Apache1') def test_create_node_with_ex_keyname_and_ex_userdata(self): image = NodeImage( id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize( 1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size, ex_keyname='devstack', ex_userdata='sample data') self.assertEqual(node.id, '26f7fbee-8ce1-4c28-887a-bfe8e4bb10fe') self.assertEqual(node.name, 'racktest') self.assertEqual(node.extra['password'], 'racktestvJq7d3') self.assertEqual(node.extra['metadata']['My Server Name'], 'Apache1') self.assertEqual(node.extra['key_name'], 'devstack') def test_create_node_with_availability_zone(self): image = NodeImage( id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize( 1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size, ex_availability_zone='testaz') self.assertEqual(node.id, '26f7fbee-8ce1-4c28-887a-bfe8e4bb10fe') self.assertEqual(node.name, 'racktest') self.assertEqual(node.extra['password'], 'racktestvJq7d3') self.assertEqual(node.extra['metadata']['My Server Name'], 'Apache1') self.assertEqual(node.extra['availability_zone'], 'testaz') def test_create_node_with_ex_disk_config(self): OpenStack_1_1_MockHttp.type = 'EX_DISK_CONFIG' image = NodeImage( id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize( 1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size, ex_disk_config='AUTO') self.assertEqual(node.id, '26f7fbee-8ce1-4c28-887a-bfe8e4bb10fe') self.assertEqual(node.name, 'racktest') self.assertEqual(node.extra['disk_config'], 'AUTO') def test_create_node_with_ex_config_drive(self): OpenStack_1_1_MockHttp.type = 'EX_CONFIG_DRIVE' image = NodeImage( id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize( 1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size, ex_config_drive=True) self.assertEqual(node.id, '26f7fbee-8ce1-4c28-887a-bfe8e4bb10fe') self.assertEqual(node.name, 'racktest') self.assertTrue(node.extra['config_drive']) def test_create_node_from_bootable_volume(self): size = NodeSize( 1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node( name='racktest', size=size, ex_blockdevicemappings=[ { 'boot_index': 0, 'uuid': 'ee7ee330-b454-4414-8e9f-c70c558dd3af', 'source_type': 'volume', 'destination_type': 'volume', 'delete_on_termination': False }]) self.assertEqual(node.id, '26f7fbee-8ce1-4c28-887a-bfe8e4bb10fe') self.assertEqual(node.name, 'racktest') self.assertEqual(node.extra['password'], 'racktestvJq7d3') self.assertEqual(node.extra['metadata']['My Server Name'], 'Apache1') def test_destroy_node(self): self.assertTrue(self.node.destroy()) def test_reboot_node(self): self.assertTrue(self.node.reboot()) def test_create_volume(self): volume = self.driver.create_volume(1, 'test') self.assertEqual(volume.name, 'test') self.assertEqual(volume.size, 1) def test_create_volume_passes_location_to_request_only_if_not_none(self): with patch.object(self.driver.connection, 'request') as mock_request: self.driver.create_volume(1, 'test', location='mylocation') name, args, kwargs = mock_request.mock_calls[0] self.assertEqual(kwargs["data"]["volume"]["availability_zone"], "mylocation") def test_create_volume_does_not_pass_location_to_request_if_none(self): with patch.object(self.driver.connection, 'request') as mock_request: self.driver.create_volume(1, 'test') name, args, kwargs = mock_request.mock_calls[0] self.assertFalse("availability_zone" in kwargs["data"]["volume"]) def test_create_volume_passes_volume_type_to_request_only_if_not_none(self): with patch.object(self.driver.connection, 'request') as mock_request: self.driver.create_volume(1, 'test', ex_volume_type='myvolumetype') name, args, kwargs = mock_request.mock_calls[0] self.assertEqual(kwargs["data"]["volume"]["volume_type"], "myvolumetype") def test_create_volume_does_not_pass_volume_type_to_request_if_none(self): with patch.object(self.driver.connection, 'request') as mock_request: self.driver.create_volume(1, 'test') name, args, kwargs = mock_request.mock_calls[0] self.assertFalse("volume_type" in kwargs["data"]["volume"]) def test_destroy_volume(self): volume = self.driver.ex_get_volume( 'cd76a3a1-c4ce-40f6-9b9f-07a61508938d') self.assertEqual(self.driver.destroy_volume(volume), True) def test_attach_volume(self): node = self.driver.list_nodes()[0] volume = self.driver.ex_get_volume( 'cd76a3a1-c4ce-40f6-9b9f-07a61508938d') self.assertEqual( self.driver.attach_volume(node, volume, '/dev/sdb'), True) def test_attach_volume_device_auto(self): node = self.driver.list_nodes()[0] volume = self.driver.ex_get_volume( 'cd76a3a1-c4ce-40f6-9b9f-07a61508938d') OpenStack_2_0_MockHttp.type = 'DEVICE_AUTO' self.assertEqual( self.driver.attach_volume(node, volume, 'auto'), True) def test_detach_volume(self): node = self.driver.list_nodes()[0] volume = self.driver.ex_get_volume( 'cd76a3a1-c4ce-40f6-9b9f-07a61508938d') self.assertEqual( self.driver.attach_volume(node, volume, '/dev/sdb'), True) self.assertEqual(self.driver.detach_volume(volume), True) def test_ex_set_password(self): self.assertTrue(self.driver.ex_set_password(self.node, 'New1&53jPass')) def test_ex_rebuild(self): image = NodeImage(id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) success = self.driver.ex_rebuild(self.node, image=image) self.assertTrue(success) def test_ex_rebuild_with_ex_disk_config(self): image = NodeImage(id=58, name='Ubuntu 10.10 (intrepid)', driver=self.driver) node = Node(id=12066, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) success = self.driver.ex_rebuild(node, image=image, ex_disk_config='MANUAL') self.assertTrue(success) def test_ex_rebuild_with_ex_config_drive(self): image = NodeImage(id=58, name='Ubuntu 10.10 (intrepid)', driver=self.driver) node = Node(id=12066, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) success = self.driver.ex_rebuild(node, image=image, ex_disk_config='MANUAL', ex_config_drive=True) self.assertTrue(success) def test_ex_resize(self): size = NodeSize(1, '256 slice', None, None, None, None, driver=self.driver) try: self.driver.ex_resize(self.node, size) except Exception as e: self.fail('An error was raised: ' + repr(e)) def test_ex_confirm_resize(self): try: self.driver.ex_confirm_resize(self.node) except Exception as e: self.fail('An error was raised: ' + repr(e)) def test_ex_revert_resize(self): try: self.driver.ex_revert_resize(self.node) except Exception as e: self.fail('An error was raised: ' + repr(e)) def test_create_image(self): image = self.driver.create_image(self.node, 'new_image') self.assertEqual(image.name, 'new_image') self.assertEqual(image.id, '4949f9ee-2421-4c81-8b49-13119446008b') def test_ex_set_server_name(self): old_node = Node( id='12064', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) new_node = self.driver.ex_set_server_name(old_node, 'Bob') self.assertEqual('Bob', new_node.name) def test_ex_set_metadata(self): old_node = Node( id='12063', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) metadata = {'Image Version': '2.1', 'Server Label': 'Web Head 1'} returned_metadata = self.driver.ex_set_metadata(old_node, metadata) self.assertEqual(metadata, returned_metadata) def test_ex_get_metadata(self): node = Node( id='12063', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) metadata = {'Image Version': '2.1', 'Server Label': 'Web Head 1'} returned_metadata = self.driver.ex_get_metadata(node) self.assertEqual(metadata, returned_metadata) def test_ex_update_node(self): old_node = Node( id='12064', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) new_node = self.driver.ex_update_node(old_node, name='Bob') self.assertTrue(new_node) self.assertEqual('Bob', new_node.name) self.assertEqual('50.57.94.30', new_node.public_ips[0]) def test_ex_get_node_details(self): node_id = '12064' node = self.driver.ex_get_node_details(node_id) self.assertEqual(node.id, '12064') self.assertEqual(node.name, 'lc-test') def test_ex_get_node_details_returns_none_if_node_does_not_exist(self): node = self.driver.ex_get_node_details('does-not-exist') self.assertTrue(node is None) def test_ex_get_size(self): size_id = '7' size = self.driver.ex_get_size(size_id) self.assertEqual(size.id, size_id) self.assertEqual(size.name, '15.5GB slice') def test_get_image(self): image_id = '13' image = self.driver.get_image(image_id) self.assertEqual(image.id, image_id) self.assertEqual(image.name, 'Windows 2008 SP2 x86 (B24)') self.assertIsNone(image.extra['serverId']) self.assertEqual(image.extra['minDisk'], "5") self.assertEqual(image.extra['minRam'], "256") self.assertIsNone(image.extra['visibility']) def test_delete_image(self): image = NodeImage( id='26365521-8c62-11f9-2c33-283d153ecc3a', name='My Backup', driver=self.driver) result = self.driver.delete_image(image) self.assertTrue(result) def test_extract_image_id_from_url(self): url = 'http://127.0.0.1/v1.1/68/images/1d4a8ea9-aae7-4242-a42d-5ff4702f2f14' url_two = 'http://127.0.0.1/v1.1/68/images/13' image_id = self.driver._extract_image_id_from_url(url) image_id_two = self.driver._extract_image_id_from_url(url_two) self.assertEqual(image_id, '1d4a8ea9-aae7-4242-a42d-5ff4702f2f14') self.assertEqual(image_id_two, '13') def test_ex_rescue_with_password(self): node = Node(id=12064, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) n = self.driver.ex_rescue(node, 'foo') self.assertEqual(n.extra['password'], 'foo') def test_ex_rescue_no_password(self): node = Node(id=12064, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) n = self.driver.ex_rescue(node) self.assertEqual(n.extra['password'], 'foo') def test_ex_unrescue(self): node = Node(id=12064, name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) result = self.driver.ex_unrescue(node) self.assertTrue(result) def test_ex_get_node_security_groups(self): node = Node(id='1c01300f-ef97-4937-8f03-ac676d6234be', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) security_groups = self.driver.ex_get_node_security_groups(node) self.assertEqual( len(security_groups), 2, 'Wrong security groups count') security_group = security_groups[1] self.assertEqual(security_group.id, 4) self.assertEqual(security_group.tenant_id, '68') self.assertEqual(security_group.name, 'ftp') self.assertEqual( security_group.description, 'FTP Client-Server - Open 20-21 ports') self.assertEqual(security_group.rules[0].id, 1) self.assertEqual(security_group.rules[0].parent_group_id, 4) self.assertEqual(security_group.rules[0].ip_protocol, "tcp") self.assertEqual(security_group.rules[0].from_port, 20) self.assertEqual(security_group.rules[0].to_port, 21) self.assertEqual(security_group.rules[0].ip_range, '0.0.0.0/0') def test_ex_list_security_groups(self): security_groups = self.driver.ex_list_security_groups() self.assertEqual( len(security_groups), 2, 'Wrong security groups count') security_group = security_groups[1] self.assertEqual(security_group.id, 4) self.assertEqual(security_group.tenant_id, '68') self.assertEqual(security_group.name, 'ftp') self.assertEqual( security_group.description, 'FTP Client-Server - Open 20-21 ports') self.assertEqual(security_group.rules[0].id, 1) self.assertEqual(security_group.rules[0].parent_group_id, 4) self.assertEqual(security_group.rules[0].ip_protocol, "tcp") self.assertEqual(security_group.rules[0].from_port, 20) self.assertEqual(security_group.rules[0].to_port, 21) self.assertEqual(security_group.rules[0].ip_range, '0.0.0.0/0') def test_ex_create_security_group(self): name = 'test' description = 'Test Security Group' security_group = self.driver.ex_create_security_group( name, description) self.assertEqual(security_group.id, 6) self.assertEqual(security_group.tenant_id, '68') self.assertEqual(security_group.name, name) self.assertEqual(security_group.description, description) self.assertEqual(len(security_group.rules), 0) def test_ex_delete_security_group(self): security_group = OpenStackSecurityGroup( id=6, tenant_id=None, name=None, description=None, driver=self.driver) result = self.driver.ex_delete_security_group(security_group) self.assertTrue(result) def test_ex_create_security_group_rule(self): security_group = OpenStackSecurityGroup( id=6, tenant_id=None, name=None, description=None, driver=self.driver) security_group_rule = self.driver.ex_create_security_group_rule( security_group, 'tcp', 14, 16, '0.0.0.0/0') self.assertEqual(security_group_rule.id, 2) self.assertEqual(security_group_rule.parent_group_id, 6) self.assertEqual(security_group_rule.ip_protocol, 'tcp') self.assertEqual(security_group_rule.from_port, 14) self.assertEqual(security_group_rule.to_port, 16) self.assertEqual(security_group_rule.ip_range, '0.0.0.0/0') self.assertIsNone(security_group_rule.tenant_id) def test_ex_delete_security_group_rule(self): security_group_rule = OpenStackSecurityGroupRule( id=2, parent_group_id=None, ip_protocol=None, from_port=None, to_port=None, driver=self.driver) result = self.driver.ex_delete_security_group_rule(security_group_rule) self.assertTrue(result) def test_list_key_pairs(self): keypairs = self.driver.list_key_pairs() self.assertEqual(len(keypairs), 2, 'Wrong keypairs count') keypair = keypairs[1] self.assertEqual(keypair.name, 'key2') self.assertEqual( keypair.fingerprint, '5d:66:33:ae:99:0f:fb:cb:86:f2:bc:ae:53:99:b6:ed') self.assertTrue(len(keypair.public_key) > 10) self.assertIsNone(keypair.private_key) def test_get_key_pair(self): key_pair = self.driver.get_key_pair(name='test-key-pair') self.assertEqual(key_pair.name, 'test-key-pair') def test_get_key_pair_doesnt_exist(self): self.assertRaises(KeyPairDoesNotExistError, self.driver.get_key_pair, name='doesnt-exist') def test_create_key_pair(self): name = 'key0' keypair = self.driver.create_key_pair(name=name) self.assertEqual(keypair.name, name) self.assertEqual(keypair.fingerprint, '80:f8:03:a7:8e:c1:c3:b1:7e:c5:8c:50:04:5e:1c:5b') self.assertTrue(len(keypair.public_key) > 10) self.assertTrue(len(keypair.private_key) > 10) def test_import_key_pair_from_file(self): name = 'key3' path = os.path.join( os.path.dirname(__file__), 'fixtures', 'misc', 'test_rsa.pub') with open(path, 'r') as fp: pub_key = fp.read() keypair = self.driver.import_key_pair_from_file(name=name, key_file_path=path) self.assertEqual(keypair.name, name) self.assertEqual( keypair.fingerprint, '97:10:a6:e7:92:65:7e:69:fe:e6:81:8f:39:3c:8f:5a') self.assertEqual(keypair.public_key, pub_key) self.assertIsNone(keypair.private_key) def test_import_key_pair_from_string(self): name = 'key3' path = os.path.join( os.path.dirname(__file__), 'fixtures', 'misc', 'test_rsa.pub') with open(path, 'r') as fp: pub_key = fp.read() keypair = self.driver.import_key_pair_from_string(name=name, key_material=pub_key) self.assertEqual(keypair.name, name) self.assertEqual( keypair.fingerprint, '97:10:a6:e7:92:65:7e:69:fe:e6:81:8f:39:3c:8f:5a') self.assertEqual(keypair.public_key, pub_key) self.assertIsNone(keypair.private_key) def test_delete_key_pair(self): keypair = OpenStackKeyPair( name='key1', fingerprint=None, public_key=None, driver=self.driver) result = self.driver.delete_key_pair(key_pair=keypair) self.assertTrue(result) def test_ex_list_floating_ip_pools(self): ret = self.driver.ex_list_floating_ip_pools() self.assertEqual(ret[0].name, 'public') self.assertEqual(ret[1].name, 'foobar') def test_ex_attach_floating_ip_to_node(self): image = NodeImage( id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize( 1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size) node.id = 4242 ip = '42.42.42.42' self.assertTrue(self.driver.ex_attach_floating_ip_to_node(node, ip)) def test_detach_floating_ip_from_node(self): image = NodeImage( id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize( 1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='racktest', image=image, size=size) node.id = 4242 ip = '42.42.42.42' self.assertTrue(self.driver.ex_detach_floating_ip_from_node(node, ip)) def test_OpenStack_1_1_FloatingIpPool_list_floating_ips(self): pool = OpenStack_1_1_FloatingIpPool('foo', self.driver.connection) ret = pool.list_floating_ips() self.assertEqual(ret[0].id, '09ea1784-2f81-46dc-8c91-244b4df75bde') self.assertEqual(ret[0].pool, pool) self.assertEqual(ret[0].ip_address, '10.3.1.42') self.assertIsNone(ret[0].node_id) self.assertEqual(ret[1].id, '04c5336a-0629-4694-ba30-04b0bdfa88a4') self.assertEqual(ret[1].pool, pool) self.assertEqual(ret[1].ip_address, '10.3.1.1') self.assertEqual( ret[1].node_id, 'fcfc96da-19e2-40fd-8497-f29da1b21143') def test_OpenStack_1_1_FloatingIpPool_get_floating_ip(self): pool = OpenStack_1_1_FloatingIpPool('foo', self.driver.connection) ret = pool.get_floating_ip('10.3.1.42') self.assertEqual(ret.id, '09ea1784-2f81-46dc-8c91-244b4df75bde') self.assertEqual(ret.pool, pool) self.assertEqual(ret.ip_address, '10.3.1.42') self.assertIsNone(ret.node_id) def test_OpenStack_1_1_FloatingIpPool_create_floating_ip(self): pool = OpenStack_1_1_FloatingIpPool('foo', self.driver.connection) ret = pool.create_floating_ip() self.assertEqual(ret.id, '09ea1784-2f81-46dc-8c91-244b4df75bde') self.assertEqual(ret.pool, pool) self.assertEqual(ret.ip_address, '10.3.1.42') self.assertIsNone(ret.node_id) def test_OpenStack_1_1_FloatingIpPool_delete_floating_ip(self): pool = OpenStack_1_1_FloatingIpPool('foo', self.driver.connection) ip = OpenStack_1_1_FloatingIpAddress('foo-bar-id', '42.42.42.42', pool) self.assertTrue(pool.delete_floating_ip(ip)) def test_OpenStack_1_1_FloatingIpAddress_delete(self): pool = OpenStack_1_1_FloatingIpPool('foo', self.driver.connection) pool.delete_floating_ip = Mock() ip = OpenStack_1_1_FloatingIpAddress('foo-bar-id', '42.42.42.42', pool) ip.pool.delete_floating_ip() self.assertEqual(pool.delete_floating_ip.call_count, 1) def test_OpenStack_2_FloatingIpPool_list_floating_ips(self): pool = OpenStack_2_FloatingIpPool(1, 'foo', self.driver.connection) ret = pool.list_floating_ips() self.assertEqual(ret[0].id, '09ea1784-2f81-46dc-8c91-244b4df75bde') self.assertEqual(ret[0].pool, pool) self.assertEqual(ret[0].ip_address, '10.3.1.42') self.assertEqual(ret[0].node_id, None) self.assertEqual(ret[1].id, '04c5336a-0629-4694-ba30-04b0bdfa88a4') self.assertEqual(ret[1].pool, pool) self.assertEqual(ret[1].ip_address, '10.3.1.1') self.assertEqual( ret[1].node_id, 'fcfc96da-19e2-40fd-8497-f29da1b21143') self.assertEqual(ret[2].id, '123c5336a-0629-4694-ba30-04b0bdfa88a4') self.assertEqual(ret[2].pool, pool) self.assertEqual(ret[2].ip_address, '10.3.1.2') self.assertEqual( ret[2].node_id, 'cb4fba64-19e2-40fd-8497-f29da1b21143') self.assertEqual(ret[3].id, '123c5336a-0629-4694-ba30-04b0bdfa88a4') self.assertEqual(ret[3].pool, pool) self.assertEqual(ret[3].ip_address, '10.3.1.3') self.assertEqual( ret[3].node_id, 'cb4fba64-19e2-40fd-8497-f29da1b21143') def test_OpenStack_2_FloatingIpPool_get_floating_ip(self): pool = OpenStack_2_FloatingIpPool(1, 'foo', self.driver.connection) ret = pool.get_floating_ip('10.3.1.42') self.assertEqual(ret.id, '09ea1784-2f81-46dc-8c91-244b4df75bde') self.assertEqual(ret.pool, pool) self.assertEqual(ret.ip_address, '10.3.1.42') self.assertEqual(ret.node_id, None) def test_OpenStack_2_FloatingIpPool_create_floating_ip(self): pool = OpenStack_2_FloatingIpPool(1, 'foo', self.driver.connection) ret = pool.create_floating_ip() self.assertEqual(ret.id, '09ea1784-2f81-46dc-8c91-244b4df75bde') self.assertEqual(ret.pool, pool) self.assertEqual(ret.ip_address, '10.3.1.42') self.assertEqual(ret.node_id, None) def test_OpenStack_2_FloatingIpPool_delete_floating_ip(self): pool = OpenStack_2_FloatingIpPool(1, 'foo', self.driver.connection) ip = OpenStack_1_1_FloatingIpAddress('foo-bar-id', '42.42.42.42', pool) self.assertTrue(pool.delete_floating_ip(ip)) def test_OpenStack_2_FloatingIpAddress_delete(self): pool = OpenStack_2_FloatingIpPool(1, 'foo', self.driver.connection) pool.delete_floating_ip = Mock() ip = OpenStack_1_1_FloatingIpAddress('foo-bar-id', '42.42.42.42', pool) ip.pool.delete_floating_ip() self.assertEqual(pool.delete_floating_ip.call_count, 1) def test_ex_get_metadata_for_node(self): image = NodeImage(id=11, name='Ubuntu 8.10 (intrepid)', driver=self.driver) size = NodeSize(1, '256 slice', None, None, None, None, driver=self.driver) node = self.driver.create_node(name='foo', image=image, size=size) metadata = self.driver.ex_get_metadata_for_node(node) self.assertEqual(metadata['My Server Name'], 'Apache1') self.assertEqual(len(metadata), 1) def test_ex_pause_node(self): node = Node( id='12063', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) ret = self.driver.ex_pause_node(node) self.assertTrue(ret is True) def test_ex_unpause_node(self): node = Node( id='12063', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) ret = self.driver.ex_unpause_node(node) self.assertTrue(ret is True) def test_ex_stop_node(self): node = Node( id='12063', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) ret = self.driver.ex_stop_node(node) self.assertTrue(ret is True) def test_ex_start_node(self): node = Node( id='12063', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) ret = self.driver.ex_start_node(node) self.assertTrue(ret is True) def test_ex_suspend_node(self): node = Node( id='12063', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) ret = self.driver.ex_suspend_node(node) self.assertTrue(ret is True) def test_ex_resume_node(self): node = Node( id='12063', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) ret = self.driver.ex_resume_node(node) self.assertTrue(ret is True) def test_ex_get_console_output(self): node = Node( id='12086', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver, ) resp = self.driver.ex_get_console_output(node) expected_output = 'FAKE CONSOLE OUTPUT\nANOTHER\nLAST LINE' self.assertEqual(resp['output'], expected_output) def test_ex_list_snapshots(self): if self.driver_type.type == 'rackspace': self.conn_class.type = 'RACKSPACE' snapshots = self.driver.ex_list_snapshots() self.assertEqual(len(snapshots), 3) self.assertEqual(snapshots[0].created, datetime.datetime(2012, 2, 29, 3, 50, 7, tzinfo=UTC)) self.assertEqual(snapshots[0].extra['created'], "2012-02-29T03:50:07Z") self.assertEqual(snapshots[0].extra['name'], 'snap-001') self.assertEqual(snapshots[0].name, 'snap-001') self.assertEqual(snapshots[0].state, VolumeSnapshotState.AVAILABLE) # invalid date is parsed as None assert snapshots[2].created is None def test_ex_get_snapshot(self): if self.driver_type.type == 'rackspace': self.conn_class.type = 'RACKSPACE' snapshot = self.driver.ex_get_snapshot('3fbbcccf-d058-4502-8844-6feeffdf4cb5') self.assertEqual(snapshot.created, datetime.datetime(2012, 2, 29, 3, 50, 7, tzinfo=UTC)) self.assertEqual(snapshot.extra['created'], "2012-02-29T03:50:07Z") self.assertEqual(snapshot.extra['name'], 'snap-001') self.assertEqual(snapshot.name, 'snap-001') self.assertEqual(snapshot.state, VolumeSnapshotState.AVAILABLE) def test_list_volume_snapshots(self): volume = self.driver.list_volumes()[0] # rackspace needs a different mocked response for snapshots, but not for volumes if self.driver_type.type == 'rackspace': self.conn_class.type = 'RACKSPACE' snapshots = self.driver.list_volume_snapshots(volume) self.assertEqual(len(snapshots), 1) self.assertEqual(snapshots[0].id, '4fbbdccf-e058-6502-8844-6feeffdf4cb5') def test_create_volume_snapshot(self): volume = self.driver.list_volumes()[0] if self.driver_type.type == 'rackspace': self.conn_class.type = 'RACKSPACE' ret = self.driver.create_volume_snapshot(volume, 'Test Volume', ex_description='This is a test', ex_force=True) self.assertEqual(ret.id, '3fbbcccf-d058-4502-8844-6feeffdf4cb5') def test_ex_create_snapshot(self): volume = self.driver.list_volumes()[0] if self.driver_type.type == 'rackspace': self.conn_class.type = 'RACKSPACE' ret = self.driver.ex_create_snapshot(volume, 'Test Volume', description='This is a test', force=True) self.assertEqual(ret.id, '3fbbcccf-d058-4502-8844-6feeffdf4cb5') def test_ex_create_snapshot_does_not_post_optional_parameters_if_none(self): volume = self.driver.list_volumes()[0] with patch.object(self.driver, '_to_snapshot'): with patch.object(self.driver.connection, 'request') as mock_request: self.driver.create_volume_snapshot(volume, name=None, ex_description=None, ex_force=True) name, args, kwargs = mock_request.mock_calls[0] self.assertFalse("display_name" in kwargs["data"]["snapshot"]) self.assertFalse("display_description" in kwargs["data"]["snapshot"]) def test_destroy_volume_snapshot(self): if self.driver_type.type == 'rackspace': self.conn_class.type = 'RACKSPACE' snapshot = self.driver.ex_list_snapshots()[0] ret = self.driver.destroy_volume_snapshot(snapshot) self.assertTrue(ret) def test_ex_delete_snapshot(self): if self.driver_type.type == 'rackspace': self.conn_class.type = 'RACKSPACE' snapshot = self.driver.ex_list_snapshots()[0] ret = self.driver.ex_delete_snapshot(snapshot) self.assertTrue(ret) class OpenStack_2_Tests(OpenStack_1_1_Tests): driver_klass = OpenStack_2_NodeDriver driver_type = OpenStack_2_NodeDriver driver_kwargs = { 'ex_force_auth_version': '2.0', 'ex_force_auth_url': 'https://auth.api.example.com' } def setUp(self): super(OpenStack_2_Tests, self).setUp() self.driver_klass.image_connectionCls.conn_class = OpenStack_2_0_MockHttp self.driver_klass.image_connectionCls.auth_url = "https://auth.api.example.com" # normally authentication happens lazily, but we force it here self.driver.image_connection._populate_hosts_and_request_paths() self.driver_klass.network_connectionCls.conn_class = OpenStack_2_0_MockHttp self.driver_klass.network_connectionCls.auth_url = "https://auth.api.example.com" # normally authentication happens lazily, but we force it here self.driver.network_connection._populate_hosts_and_request_paths() self.driver_klass.volumev2_connectionCls.conn_class = OpenStack_2_0_MockHttp self.driver_klass.volumev2_connectionCls.auth_url = "https://auth.api.example.com" # normally authentication happens lazily, but we force it here self.driver.volumev2_connection._populate_hosts_and_request_paths() def test__paginated_request_single_page(self): snapshots = self.driver._paginated_request( '/snapshots/detail', 'snapshots', self.driver.volumev2_connection )['snapshots'] self.assertEqual(len(snapshots), 3) self.assertEqual(snapshots[0]['name'], 'snap-001') def test__paginated_request_two_pages(self): snapshots = self.driver._paginated_request( '/snapshots/detail?unit_test=paginate', 'snapshots', self.driver.volumev2_connection )['snapshots'] self.assertEqual(len(snapshots), 6) self.assertEqual(snapshots[0]['name'], 'snap-101') self.assertEqual(snapshots[3]['name'], 'snap-001') # NOTE: We use a smaller limit to speed tests up. @mock.patch('libcloud.compute.drivers.openstack.PAGINATION_LIMIT', 10) def test__paginated_request_raises_if_stuck_in_a_loop(self): with pytest.raises(OpenStackException): self.driver._paginated_request( '/snapshots/detail?unit_test=pagination_loop', 'snapshots', self.driver.volumev2_connection ) def test_ex_force_auth_token_passed_to_connection(self): base_url = 'https://servers.api.rackspacecloud.com/v1.1/slug' kwargs = { 'ex_force_auth_version': '2.0', 'ex_force_auth_token': 'preset-auth-token', 'ex_force_auth_url': 'https://auth.api.example.com', 'ex_force_base_url': base_url } driver = self.driver_type(*self.driver_args, **kwargs) driver.list_nodes() self.assertEqual(kwargs['ex_force_auth_token'], driver.connection.auth_token) self.assertEqual('servers.api.rackspacecloud.com', driver.connection.host) self.assertEqual('/v1.1/slug', driver.connection.request_path) self.assertEqual(443, driver.connection.port) def test_get_image(self): image_id = 'f24a3c1b-d52a-4116-91da-25b3eee8f55e' image = self.driver.get_image(image_id) self.assertEqual(image.id, image_id) self.assertEqual(image.name, 'hypernode') self.assertIsNone(image.extra['serverId']) self.assertEqual(image.extra['minDisk'], 40) self.assertEqual(image.extra['minRam'], 0) self.assertEqual(image.extra['visibility'], "shared") def test_list_images(self): images = self.driver.list_images() self.assertEqual(len(images), 2, 'Wrong images count') image = images[0] self.assertEqual(image.id, 'f24a3c1b-d52a-4116-91da-25b3eee8f55e') self.assertEqual(image.name, 'hypernode') self.assertEqual(image.extra['updated'], '2017-11-28T10:19:49Z') self.assertEqual(image.extra['created'], '2017-09-11T13:00:05Z') self.assertEqual(image.extra['status'], 'active') self.assertEqual(image.extra['os_type'], 'linux') self.assertIsNone(image.extra['serverId']) self.assertEqual(image.extra['minDisk'], 40) self.assertEqual(image.extra['minRam'], 0) def test_ex_update_image(self): image_id = 'f24a3c1b-d52a-4116-91da-25b3eee8f55e' data = { 'op': 'replace', 'path': '/visibility', 'value': 'shared' } image = self.driver.ex_update_image(image_id, data) self.assertEqual(image.name, 'hypernode') self.assertIsNone(image.extra['serverId']) self.assertEqual(image.extra['minDisk'], 40) self.assertEqual(image.extra['minRam'], 0) self.assertEqual(image.extra['visibility'], "shared") def test_ex_list_image_members(self): image_id = 'd9a9cd9a-278a-444c-90a6-d24b8c688a63' image_member_id = '016926dff12345e8b10329f24c99745b' image_members = self.driver.ex_list_image_members(image_id) self.assertEqual(len(image_members), 30, 'Wrong image member count') image_member = image_members[0] self.assertEqual(image_member.id, image_member_id) self.assertEqual(image_member.image_id, image_id) self.assertEqual(image_member.state, NodeImageMemberState.ACCEPTED) self.assertEqual(image_member.created, '2017-01-12T12:31:50Z') self.assertEqual(image_member.extra['updated'], '2017-01-12T12:31:54Z') self.assertEqual(image_member.extra['schema'], '/v2/schemas/member') def test_ex_create_image_member(self): image_id = '9af1a54e-a1b2-4df8-b747-4bec97abc799' image_member_id = 'e2151b1fe02d4a8a2d1f5fc331522c0a' image_member = self.driver.ex_create_image_member( image_id, image_member_id ) self.assertEqual(image_member.id, image_member_id) self.assertEqual(image_member.image_id, image_id) self.assertEqual(image_member.state, NodeImageMemberState.PENDING) self.assertEqual(image_member.created, '2018-03-02T14:19:38Z') self.assertEqual(image_member.extra['updated'], '2018-03-02T14:19:38Z') self.assertEqual(image_member.extra['schema'], '/v2/schemas/member') def test_ex_get_image_member(self): image_id = 'd9a9cd9a-278a-444c-90a6-d24b8c688a63' image_member_id = '016926dff12345e8b10329f24c99745b' image_member = self.driver.ex_get_image_member( image_id, image_member_id ) self.assertEqual(image_member.id, image_member_id) self.assertEqual(image_member.image_id, image_id) self.assertEqual(image_member.state, NodeImageMemberState.ACCEPTED) self.assertEqual(image_member.created, '2017-01-12T12:31:50Z') self.assertEqual(image_member.extra['updated'], '2017-01-12T12:31:54Z') self.assertEqual(image_member.extra['schema'], '/v2/schemas/member') def test_ex_accept_image_member(self): image_id = '8af1a54e-a1b2-4df8-b747-4bec97abc799' image_member_id = 'e2151b1fe02d4a8a2d1f5fc331522c0a' image_member = self.driver.ex_accept_image_member( image_id, image_member_id ) self.assertEqual(image_member.id, image_member_id) self.assertEqual(image_member.image_id, image_id) self.assertEqual(image_member.state, NodeImageMemberState.ACCEPTED) self.assertEqual(image_member.created, '2018-03-02T14:19:38Z') self.assertEqual(image_member.extra['updated'], '2018-03-02T14:20:37Z') self.assertEqual(image_member.extra['schema'], '/v2/schemas/member') def test_ex_list_networks(self): networks = self.driver.ex_list_networks() network = networks[0] self.assertEqual(len(networks), 2) self.assertEqual(network.name, 'net1') self.assertEqual(network.extra['subnets'], ['54d6f61d-db07-451c-9ab3-b9609b6b6f0b']) def test_ex_list_subnets(self): subnets = self.driver.ex_list_subnets() subnet = subnets[0] self.assertEqual(len(subnets), 2) self.assertEqual(subnet.name, 'private-subnet') self.assertEqual(subnet.cidr, '10.0.0.0/24') def test_ex_create_subnet(self): network = self.driver.ex_list_networks()[0] subnet = self.driver.ex_create_subnet('name', network, '10.0.0.0/24', ip_version=4, dns_nameservers=["10.0.0.01"]) self.assertEqual(subnet.name, 'name') self.assertEqual(subnet.cidr, '10.0.0.0/24') def test_ex_delete_subnet(self): subnet = self.driver.ex_list_subnets()[0] self.assertTrue(self.driver.ex_delete_subnet(subnet=subnet)) def test_ex_update_subnet(self): subnet = self.driver.ex_list_subnets()[0] subnet = self.driver.ex_update_subnet(subnet, name='net2') self.assertEqual(subnet.name, 'name') def test_ex_list_network(self): networks = self.driver.ex_list_networks() network = networks[0] self.assertEqual(len(networks), 2) self.assertEqual(network.name, 'net1') def test_ex_create_network(self): network = self.driver.ex_create_network(name='net1', cidr='127.0.0.0/24') self.assertEqual(network.name, 'net1') def test_ex_delete_network(self): network = self.driver.ex_list_networks()[0] self.assertTrue(self.driver.ex_delete_network(network=network)) def test_ex_list_ports(self): ports = self.driver.ex_list_ports() port = ports[0] self.assertEqual(port.id, '126da55e-cfcb-41c8-ae39-a26cb8a7e723') self.assertEqual(port.state, OpenStack_2_PortInterfaceState.BUILD) self.assertEqual(port.created, '2018-07-04T14:38:18Z') self.assertEqual( port.extra['network_id'], '123c8a8c-6427-4e8f-a805-2035365f4d43' ) self.assertEqual( port.extra['project_id'], 'abcdec85bee34bb0a44ab8255eb36abc' ) self.assertEqual( port.extra['tenant_id'], 'abcdec85bee34bb0a44ab8255eb36abc' ) self.assertEqual(port.extra['name'], '') def test_ex_create_port(self): network = OpenStackNetwork(id='123c8a8c-6427-4e8f-a805-2035365f4d43', name='test-network', cidr='1.2.3.4', driver=self.driver) port = self.driver.ex_create_port(network=network, description='Some port description', name='Some port name', admin_state_up=True) self.assertEqual(port.id, '126da55e-cfcb-41c8-ae39-a26cb8a7e723') self.assertEqual(port.state, OpenStack_2_PortInterfaceState.BUILD) self.assertEqual(port.created, '2018-07-04T14:38:18Z') self.assertEqual( port.extra['network_id'], '123c8a8c-6427-4e8f-a805-2035365f4d43' ) self.assertEqual( port.extra['project_id'], 'abcdec85bee34bb0a44ab8255eb36abc' ) self.assertEqual( port.extra['tenant_id'], 'abcdec85bee34bb0a44ab8255eb36abc' ) self.assertEqual(port.extra['admin_state_up'], True) self.assertEqual(port.extra['name'], 'Some port name') self.assertEqual(port.extra['description'], 'Some port description') def test_ex_get_port(self): port = self.driver.ex_get_port('126da55e-cfcb-41c8-ae39-a26cb8a7e723') self.assertEqual(port.id, '126da55e-cfcb-41c8-ae39-a26cb8a7e723') self.assertEqual(port.state, OpenStack_2_PortInterfaceState.BUILD) self.assertEqual(port.created, '2018-07-04T14:38:18Z') self.assertEqual( port.extra['network_id'], '123c8a8c-6427-4e8f-a805-2035365f4d43' ) self.assertEqual( port.extra['project_id'], 'abcdec85bee34bb0a44ab8255eb36abc' ) self.assertEqual( port.extra['tenant_id'], 'abcdec85bee34bb0a44ab8255eb36abc' ) self.assertEqual(port.extra['name'], 'Some port name') def test_ex_delete_port(self): ports = self.driver.ex_list_ports() port = ports[0] ret = self.driver.ex_delete_port(port) self.assertTrue(ret) def test_ex_update_port(self): port = self.driver.ex_get_port('126da55e-cfcb-41c8-ae39-a26cb8a7e723') ret = self.driver.ex_update_port(port, port_security_enabled=False) self.assertEqual(ret.extra['name'], 'Some port name') def test_detach_port_interface(self): node = Node(id='1c01300f-ef97-4937-8f03-ac676d6234be', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) ports = self.driver.ex_list_ports() port = ports[0] ret = self.driver.ex_detach_port_interface(node, port) self.assertTrue(ret) def test_attach_port_interface(self): node = Node(id='1c01300f-ef97-4937-8f03-ac676d6234be', name=None, state=None, public_ips=None, private_ips=None, driver=self.driver) ports = self.driver.ex_list_ports() port = ports[0] ret = self.driver.ex_attach_port_interface(node, port) self.assertTrue(ret) def test_list_volumes(self): volumes = self.driver.list_volumes() self.assertEqual(len(volumes), 2) volume = volumes[0] self.assertEqual('6edbc2f4-1507-44f8-ac0d-eed1d2608d38', volume.id) self.assertEqual('test-volume-attachments', volume.name) self.assertEqual(StorageVolumeState.INUSE, volume.state) self.assertEqual(2, volume.size) self.assertEqual(volume.extra, { 'description': '', 'attachments': [{ "attachment_id": "3b4db356-253d-4fab-bfa0-e3626c0b8405", "id": '6edbc2f4-1507-44f8-ac0d-eed1d2608d38', "device": "/dev/vdb", "server_id": "f4fda93b-06e0-4743-8117-bc8bcecd651b", "volume_id": "6edbc2f4-1507-44f8-ac0d-eed1d2608d38", }], 'snapshot_id': None, 'state': 'in-use', 'location': 'nova', 'volume_type': 'lvmdriver-1', 'metadata': {}, 'created_at': '2013-06-24T11:20:13.000000' }) # also test that unknown state resolves to StorageVolumeState.UNKNOWN volume = volumes[1] self.assertEqual('cfcec3bc-b736-4db5-9535-4c24112691b5', volume.id) self.assertEqual('test_volume', volume.name) self.assertEqual(50, volume.size) self.assertEqual(StorageVolumeState.UNKNOWN, volume.state) self.assertEqual(volume.extra, { 'description': 'some description', 'attachments': [], 'snapshot_id': '01f48111-7866-4cd2-986a-e92683c4a363', 'state': 'some-unknown-state', 'location': 'nova', 'volume_type': None, 'metadata': {}, 'created_at': '2013-06-21T12:39:02.000000', }) def test_create_volume_passes_location_to_request_only_if_not_none(self): with patch.object(self.driver.volumev2_connection, 'request') as mock_request: self.driver.create_volume(1, 'test', location='mylocation') name, args, kwargs = mock_request.mock_calls[0] self.assertEqual(kwargs["data"]["volume"]["availability_zone"], "mylocation") def test_create_volume_does_not_pass_location_to_request_if_none(self): with patch.object(self.driver.volumev2_connection, 'request') as mock_request: self.driver.create_volume(1, 'test') name, args, kwargs = mock_request.mock_calls[0] self.assertFalse("availability_zone" in kwargs["data"]["volume"]) def test_create_volume_passes_volume_type_to_request_only_if_not_none(self): with patch.object(self.driver.volumev2_connection, 'request') as mock_request: self.driver.create_volume(1, 'test', ex_volume_type='myvolumetype') name, args, kwargs = mock_request.mock_calls[0] self.assertEqual(kwargs["data"]["volume"]["volume_type"], "myvolumetype") def test_create_volume_does_not_pass_volume_type_to_request_if_none(self): with patch.object(self.driver.volumev2_connection, 'request') as mock_request: self.driver.create_volume(1, 'test') name, args, kwargs = mock_request.mock_calls[0] self.assertFalse("volume_type" in kwargs["data"]["volume"]) def test_create_volume_passes_image_ref_to_request_only_if_not_none(self): with patch.object(self.driver.volumev2_connection, 'request') as mock_request: self.driver.create_volume( 1, 'test', ex_image_ref='353c4bd2-b28f-4857-9b7b-808db4397d03') name, args, kwargs = mock_request.mock_calls[0] self.assertEqual( kwargs["data"]["volume"]["imageRef"], "353c4bd2-b28f-4857-9b7b-808db4397d03") def test_create_volume_does_not_pass_image_ref_to_request_if_none(self): with patch.object(self.driver.volumev2_connection, 'request') as mock_request: self.driver.create_volume(1, 'test') name, args, kwargs = mock_request.mock_calls[0] self.assertFalse("imageRef" in kwargs["data"]["volume"]) def test_ex_create_snapshot_does_not_post_optional_parameters_if_none(self): volume = self.driver.list_volumes()[0] with patch.object(self.driver, '_to_snapshot'): with patch.object(self.driver.volumev2_connection, 'request') as mock_request: self.driver.create_volume_snapshot(volume, name=None, ex_description=None, ex_force=True) name, args, kwargs = mock_request.mock_calls[0] self.assertFalse("display_name" in kwargs["data"]["snapshot"]) self.assertFalse("display_description" in kwargs["data"]["snapshot"]) def test_ex_list_routers(self): routers = self.driver.ex_list_routers() router = routers[0] self.assertEqual(len(routers), 2) self.assertEqual(router.name, 'router2') self.assertEqual(router.status, 'ACTIVE') self.assertEqual(router.extra['routes'], [{'destination': '179.24.1.0/24', 'nexthop': '172.24.3.99'}]) def test_ex_create_router(self): router = self.driver.ex_create_router('router1', admin_state_up = True) self.assertEqual(router.name, 'router1') def test_ex_delete_router(self): router = self.driver.ex_list_routers()[1] self.assertTrue(self.driver.ex_delete_router(router=router)) def test_manage_router_interfaces(self): router = self.driver.ex_list_routers()[1] port = self.driver.ex_list_ports()[0] subnet = self.driver.ex_list_subnets()[0] self.assertTrue(self.driver.ex_add_router_port(router, port)) self.assertTrue(self.driver.ex_del_router_port(router, port)) self.assertTrue(self.driver.ex_add_router_subnet(router, subnet)) self.assertTrue(self.driver.ex_del_router_subnet(router, subnet)) def test_detach_volume(self): node = self.driver.list_nodes()[0] volume = self.driver.ex_get_volume( 'abc6a3a1-c4ce-40f6-9b9f-07a61508938d') self.assertEqual( self.driver.attach_volume(node, volume, '/dev/sdb'), True) self.assertEqual(self.driver.detach_volume(volume), True) def test_ex_remove_security_group_from_node(self): security_group = OpenStackSecurityGroup("sgid", None, "sgname", "", self.driver) node = Node("1000", "node", None, [], [], self.driver) ret = self.driver.ex_remove_security_group_from_node(security_group, node) self.assertTrue(ret) class OpenStack_1_1_FactoryMethodTests(OpenStack_1_1_Tests): should_list_locations = False should_list_volumes = True driver_klass = OpenStack_1_1_NodeDriver driver_type = get_driver(Provider.OPENSTACK) driver_args = OPENSTACK_PARAMS + ('1.1',) driver_kwargs = {'ex_force_auth_version': '2.0'} class OpenStack_1_1_MockHttp(MockHttp, unittest.TestCase): fixtures = ComputeFileFixtures('openstack_v1.1') auth_fixtures = OpenStackFixtures() json_content_headers = {'content-type': 'application/json; charset=UTF-8'} def _v2_0_tokens(self, method, url, body, headers): body = self.auth_fixtures.load('_v2_0__auth.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_0(self, method, url, body, headers): headers = { 'x-auth-token': 'FE011C19-CF86-4F87-BE5D-9229145D7A06', 'x-server-management-url': 'https://api.example.com/v1.1/slug', } return (httplib.NO_CONTENT, "", headers, httplib.responses[httplib.NO_CONTENT]) def _v1_1_slug_servers_detail(self, method, url, body, headers): body = self.fixtures.load('_servers_detail.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_servers_detail_ERROR_STATE_NO_IMAGE_ID(self, method, url, body, headers): body = self.fixtures.load('_servers_detail_ERROR_STATE.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_servers_does_not_exist(self, *args, **kwargs): return httplib.NOT_FOUND, None, {}, httplib.responses[httplib.NOT_FOUND] def _v1_1_slug_flavors_detail(self, method, url, body, headers): body = self.fixtures.load('_flavors_detail.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_images_detail(self, method, url, body, headers): body = self.fixtures.load('_images_detail.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_servers(self, method, url, body, headers): if method == "POST": body = self.fixtures.load('_servers_create.json') elif method == "GET": body = self.fixtures.load('_servers.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_servers_26f7fbee_8ce1_4c28_887a_bfe8e4bb10fe(self, method, url, body, headers): if method == "GET": body = self.fixtures.load( '_servers_26f7fbee_8ce1_4c28_887a_bfe8e4bb10fe.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_servers_12065_action(self, method, url, body, headers): if method != "POST": self.fail('HTTP method other than POST to action URL') return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) def _v1_1_slug_servers_12064_action(self, method, url, body, headers): if method != "POST": self.fail('HTTP method other than POST to action URL') if "createImage" in json.loads(body): return (httplib.ACCEPTED, "", {"location": "http://127.0.0.1/v1.1/68/images/4949f9ee-2421-4c81-8b49-13119446008b"}, httplib.responses[httplib.ACCEPTED]) elif "rescue" in json.loads(body): return (httplib.OK, '{"adminPass": "foo"}', {}, httplib.responses[httplib.OK]) return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) def _v1_1_slug_servers_12066_action(self, method, url, body, headers): if method != "POST": self.fail('HTTP method other than POST to action URL') if "rebuild" not in json.loads(body): self.fail("Did not get expected action (rebuild) in action URL") self.assertTrue('\"OS-DCF:diskConfig\": \"MANUAL\"' in body, msg="Manual disk configuration option was not specified in rebuild body: " + body) return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) def _v1_1_slug_servers_12065(self, method, url, body, headers): if method == "DELETE": return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) else: raise NotImplementedError() def _v1_1_slug_servers_12064(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_servers_12064.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == "PUT": body = self.fixtures.load('_servers_12064_updated_name_bob.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == "DELETE": return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) else: raise NotImplementedError() def _v1_1_slug_servers_12062(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_servers_12064.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_servers_12063_metadata(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_servers_12063_metadata_two_keys.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == "PUT": body = self.fixtures.load('_servers_12063_metadata_two_keys.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_servers_EX_DISK_CONFIG(self, method, url, body, headers): if method == "POST": body = u(body) self.assertTrue(body.find('\"OS-DCF:diskConfig\": \"AUTO\"')) body = self.fixtures.load('_servers_create_disk_config.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_flavors_7(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_flavors_7.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v1_1_slug_images_13(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_images_13.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v2_1337_v2_images_f24a3c1b_d52a_4116_91da_25b3eee8f55e(self, method, url, body, headers): if method == "GET" or method == "PATCH": body = self.fixtures.load('_images_f24a3c1b-d52a-4116-91da-25b3eee8f55e.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v2_1337_v2_images_d9a9cd9a_278a_444c_90a6_d24b8c688a63_members(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_images_d9a9cd9a_278a_444c_90a6_d24b8c688a63_members.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v2_1337_v2_images_9af1a54e_a1b2_4df8_b747_4bec97abc799_members(self, method, url, body, headers): if method == "POST": body = self.fixtures.load('_images_9af1a54e_a1b2_4df8_b747_4bec97abc799_members.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v2_1337_v2_images_8af1a54e_a1b2_4df8_b747_4bec97abc799_members_e2151b1fe02d4a8a2d1f5fc331522c0a(self, method, url, body, headers): if method == "PUT": body = self.fixtures.load('_images_8af1a54e_a1b2_4df8_b747_4bec97abc799_members.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v2_1337_v2_images_d9a9cd9a_278a_444c_90a6_d24b8c688a63_members_016926dff12345e8b10329f24c99745b(self, method, url, body, headers): if method == "GET": body = self.fixtures.load( '_images_d9a9cd9a_278a_444c_90a6_d24b8c688a63_members_016926dff12345e8b10329f24c99745b.json' ) return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v2_1337_v2_images(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_images_v2.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v1_1_slug_images_26365521_8c62_11f9_2c33_283d153ecc3a(self, method, url, body, headers): if method == "DELETE": return (httplib.NO_CONTENT, "", {}, httplib.responses[httplib.NO_CONTENT]) else: raise NotImplementedError() def _v1_1_slug_images_4949f9ee_2421_4c81_8b49_13119446008b(self, method, url, body, headers): if method == "GET": body = self.fixtures.load( '_images_4949f9ee_2421_4c81_8b49_13119446008b.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v2_1337_v2_images_4949f9ee_2421_4c81_8b49_13119446008b(self, method, url, body, headers): if method == "GET": body = self.fixtures.load( '_images_f24a3c1b-d52a-4116-91da-25b3eee8f55d.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v2_1337_v2_0_ports(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_ports_v2.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == "POST": body = self.fixtures.load('_port_v2.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v2_1337_v2_0_ports_126da55e_cfcb_41c8_ae39_a26cb8a7e723(self, method, url, body, headers): if method == "DELETE": return (httplib.NO_CONTENT, "", {}, httplib.responses[httplib.NO_CONTENT]) elif method == "GET": body = self.fixtures.load('_port_v2.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == "PUT": if body: body = self.fixtures.load('_port_v2.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: return (httplib.INTERNAL_SERVER_ERROR, "", {}, httplib.responses[httplib.INTERNAL_SERVER_ERROR]) else: raise NotImplementedError() def _v2_1337_servers_12065_os_volume_attachments_DEVICE_AUTO(self, method, url, body, headers): # test_attach_volume_device_auto if method == "POST": if 'rackspace' not in self.__class__.__name__.lower(): body = json.loads(body) self.assertEqual(body['volumeAttachment']['device'], None) return (httplib.NO_CONTENT, "", {}, httplib.responses[httplib.NO_CONTENT]) else: raise NotImplementedError() def _v2_1337_servers_1c01300f_ef97_4937_8f03_ac676d6234be_os_interface_126da55e_cfcb_41c8_ae39_a26cb8a7e723(self, method, url, body, headers): if method == "DELETE": return (httplib.NO_CONTENT, "", {}, httplib.responses[httplib.NO_CONTENT]) else: raise NotImplementedError() def _v2_1337_servers_1c01300f_ef97_4937_8f03_ac676d6234be_os_interface(self, method, url, body, headers): if method == "POST": return (httplib.NO_CONTENT, "", {}, httplib.responses[httplib.NO_CONTENT]) else: raise NotImplementedError() def _v1_1_slug_servers_1c01300f_ef97_4937_8f03_ac676d6234be_os_security_groups(self, method, url, body, headers): if method == "GET": body = self.fixtures.load( '_servers_1c01300f-ef97-4937-8f03-ac676d6234be_os-security-groups.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_security_groups(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_os_security_groups.json') elif method == "POST": body = self.fixtures.load('_os_security_groups_create.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_security_groups_6(self, method, url, body, headers): if method == "DELETE": return (httplib.NO_CONTENT, "", {}, httplib.responses[httplib.NO_CONTENT]) else: raise NotImplementedError() def _v1_1_slug_os_security_group_rules(self, method, url, body, headers): if method == "POST": body = self.fixtures.load('_os_security_group_rules_create.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_security_group_rules_2(self, method, url, body, headers): if method == "DELETE": return (httplib.NO_CONTENT, "", {}, httplib.responses[httplib.NO_CONTENT]) else: raise NotImplementedError() def _v1_1_slug_os_keypairs(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_os_keypairs.json') elif method == "POST": if 'public_key' in body: body = self.fixtures.load('_os_keypairs_create_import.json') else: body = self.fixtures.load('_os_keypairs_create.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_keypairs_test_key_pair(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_os_keypairs_get_one.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_keypairs_doesnt_exist(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_os_keypairs_not_found.json') else: raise NotImplementedError() return (httplib.NOT_FOUND, body, self.json_content_headers, httplib.responses[httplib.NOT_FOUND]) def _v1_1_slug_os_keypairs_key1(self, method, url, body, headers): if method == "DELETE": return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) else: raise NotImplementedError() def _v1_1_slug_os_volumes(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_os_volumes.json') elif method == "POST": body = self.fixtures.load('_os_volumes_create.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_volumes_cd76a3a1_c4ce_40f6_9b9f_07a61508938d(self, method, url, body, headers): if method == "GET": body = self.fixtures.load( '_os_volumes_cd76a3a1_c4ce_40f6_9b9f_07a61508938d.json') elif method == "DELETE": body = '' else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_servers_12065_os_volume_attachments(self, method, url, body, headers): if method == "POST": if 'rackspace' not in self.__class__.__name__.lower(): body = json.loads(body) self.assertEqual(body['volumeAttachment']['device'], '/dev/sdb') body = self.fixtures.load( '_servers_12065_os_volume_attachments.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_servers_12065_os_volume_attachments_cd76a3a1_c4ce_40f6_9b9f_07a61508938d(self, method, url, body, headers): if method == "DELETE": body = '' else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_floating_ip_pools(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_floating_ip_pools.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v1_1_slug_os_floating_ips_foo_bar_id(self, method, url, body, headers): if method == "DELETE": body = '' return (httplib.ACCEPTED, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v1_1_slug_os_floating_ips(self, method, url, body, headers): if method == "GET": body = self.fixtures.load('_floating_ips.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == "POST": body = self.fixtures.load('_floating_ip.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v1_1_slug_servers_4242_action(self, method, url, body, headers): if method == "POST": body = '' return (httplib.ACCEPTED, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v1_1_slug_os_networks(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_os_networks.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == 'POST': body = self.fixtures.load('_os_networks_POST.json') return (httplib.ACCEPTED, body, self.json_content_headers, httplib.responses[httplib.OK]) raise NotImplementedError() def _v1_1_slug_os_networks_f13e5051_feea_416b_827a_1a0acc2dad14(self, method, url, body, headers): if method == 'DELETE': body = '' return (httplib.ACCEPTED, body, self.json_content_headers, httplib.responses[httplib.OK]) raise NotImplementedError() def _v1_1_slug_servers_72258_action(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_servers_suspend.json') return (httplib.ACCEPTED, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v1_1_slug_servers_12063_action(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_servers_unpause.json') return (httplib.ACCEPTED, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v1_1_slug_servers_12086_action(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_servers_12086_console_output.json') return (httplib.ACCEPTED, body, self.json_content_headers, httplib.responses[httplib.OK]) else: raise NotImplementedError() def _v1_1_slug_os_snapshots(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_os_snapshots.json') elif method == 'POST': body = self.fixtures.load('_os_snapshots_create.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_snapshots_3fbbcccf_d058_4502_8844_6feeffdf4cb5(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_os_snapshot.json') status_code = httplib.OK elif method == 'DELETE': body = '' status_code = httplib.NO_CONTENT else: raise NotImplementedError() return (status_code, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_snapshots_3fbbcccf_d058_4502_8844_6feeffdf4cb5_RACKSPACE(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_os_snapshot_rackspace.json') status_code = httplib.OK elif method == 'DELETE': body = '' status_code = httplib.NO_CONTENT else: raise NotImplementedError() return (status_code, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v1_1_slug_os_snapshots_RACKSPACE(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_os_snapshots_rackspace.json') elif method == 'POST': body = self.fixtures.load('_os_snapshots_create_rackspace.json') else: raise NotImplementedError() return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_networks(self, method, url, body, headers): if method == 'GET': if "router:external=True" in url: body = self.fixtures.load('_v2_0__networks_public.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) else: body = self.fixtures.load('_v2_0__networks.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == 'POST': body = self.fixtures.load('_v2_0__networks_POST.json') return (httplib.ACCEPTED, body, self.json_content_headers, httplib.responses[httplib.OK]) raise NotImplementedError() def _v2_1337_v2_0_networks_d32019d3_bc6e_4319_9c1d_6722fc136a22(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_v2_0__networks_POST.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == 'DELETE': body = '' return (httplib.NO_CONTENT, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_subnets_08eae331_0402_425a_923c_34f7cfe39c1b(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_v2_0__subnet.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) if method == 'DELETE': body = '' return (httplib.NO_CONTENT, body, self.json_content_headers, httplib.responses[httplib.OK]) elif method == 'PUT': body = self.fixtures.load('_v2_0__subnet.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_subnets(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_v2_0__subnet.json') return (httplib.CREATED, body, self.json_content_headers, httplib.responses[httplib.OK]) else: body = self.fixtures.load('_v2_0__subnets.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_volumes_detail(self, method, url, body, headers): body = self.fixtures.load('_v2_0__volumes.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_volumes(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_v2_0__volume.json') return (httplib.CREATED, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_volumes_cd76a3a1_c4ce_40f6_9b9f_07a61508938d(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_v2_0__volume.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) if method == 'DELETE': body = '' return (httplib.NO_CONTENT, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_volumes_abc6a3a1_c4ce_40f6_9b9f_07a61508938d(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_v2_0__volume_abc6a3a1_c4ce_40f6_9b9f_07a61508938d.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) if method == 'DELETE': body = '' return (httplib.NO_CONTENT, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_snapshots_detail(self, method, url, body, headers): if ('unit_test=paginate' in url and 'marker' not in url) or \ 'unit_test=pagination_loop' in url: body = self.fixtures.load('_v2_0__snapshots_paginate_start.json') else: body = self.fixtures.load('_v2_0__snapshots.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_snapshots(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_v2_0__snapshot.json') return (httplib.CREATED, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_snapshots_3fbbcccf_d058_4502_8844_6feeffdf4cb5(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_v2_0__snapshot.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) if method == 'DELETE': body = '' return (httplib.NO_CONTENT, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_security_groups(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_v2_0__security_group.json') return (httplib.CREATED, body, self.json_content_headers, httplib.responses[httplib.OK]) if method == 'GET': body = self.fixtures.load('_v2_0__security_groups.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_security_groups_6(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_v2_0__security_group.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) if method == 'DELETE': body = '' return (httplib.NO_CONTENT, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_security_group_rules(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_v2_0__security_group_rule.json') return (httplib.CREATED, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_security_group_rules_2(self, method, url, body, headers): if method == 'DELETE': body = '' return (httplib.NO_CONTENT, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_floatingips(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_v2_0__floatingip.json') return (httplib.CREATED, body, self.json_content_headers, httplib.responses[httplib.OK]) if method == 'GET': body = self.fixtures.load('_v2_0__floatingips.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_floatingips_foo_bar_id(self, method, url, body, headers): if method == 'DELETE': body = '' return (httplib.NO_CONTENT, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_routers_f8a44de0_fc8e_45df_93c7_f79bf3b01c95(self, method, url, body, headers): if method == 'GET': body = self.fixtures.load('_v2_0__router.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) if method == 'DELETE': body = '' return (httplib.NO_CONTENT, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_routers(self, method, url, body, headers): if method == 'POST': body = self.fixtures.load('_v2_0__router.json') return (httplib.CREATED, body, self.json_content_headers, httplib.responses[httplib.OK]) else: body = self.fixtures.load('_v2_0__routers.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_routers_f8a44de0_fc8e_45df_93c7_f79bf3b01c95_add_router_interface(self, method, url, body, headers): if method == 'PUT': body = self.fixtures.load('_v2_0__router_interface.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_v2_0_routers_f8a44de0_fc8e_45df_93c7_f79bf3b01c95_remove_router_interface(self, method, url, body, headers): if method == 'PUT': body = self.fixtures.load('_v2_0__router_interface.json') return (httplib.OK, body, self.json_content_headers, httplib.responses[httplib.OK]) def _v2_1337_servers_1000_action(self, method, url, body, headers): if method != 'POST' or body != '{"removeSecurityGroup": {"name": "sgname"}}': raise NotImplementedError(body) return httplib.ACCEPTED, None, {}, httplib.responses[httplib.ACCEPTED] # This exists because the nova compute url in devstack has v2 in there but the v1.1 fixtures # work fine. class OpenStack_2_0_MockHttp(OpenStack_1_1_MockHttp): def __init__(self, *args, **kwargs): super(OpenStack_2_0_MockHttp, self).__init__(*args, **kwargs) methods1 = OpenStack_1_1_MockHttp.__dict__ names1 = [m for m in methods1 if m.find('_v1_1') == 0] for name in names1: method = methods1[name] new_name = name.replace('_v1_1_slug_', '_v2_1337_') setattr(self, new_name, method_type(method, self, OpenStack_2_0_MockHttp)) class OpenStack_1_1_Auth_2_0_Tests(OpenStack_1_1_Tests): driver_args = OPENSTACK_PARAMS + ('1.1',) driver_kwargs = {'ex_force_auth_version': '2.0'} def setUp(self): self.driver_klass.connectionCls.conn_class = OpenStack_2_0_MockHttp self.driver_klass.connectionCls.auth_url = "https://auth.api.example.com" OpenStackMockHttp.type = None OpenStack_1_1_MockHttp.type = None OpenStack_2_0_MockHttp.type = None self.driver = self.create_driver() # normally authentication happens lazily, but we force it here self.driver.connection._populate_hosts_and_request_paths() clear_pricing_data() self.node = self.driver.list_nodes()[1] def test_auth_user_info_is_set(self): self.driver.connection._populate_hosts_and_request_paths() self.assertEqual(self.driver.connection.auth_user_info, { 'id': '7', 'name': 'testuser', 'roles': [{'description': 'Default Role.', 'id': 'identity:default', 'name': 'identity:default'}]}) if __name__ == '__main__': sys.exit(unittest.main())