mirror of
				https://github.com/peeringdb/peeringdb.git
				synced 2024-05-11 05:55:09 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			307 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			307 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import pytest
 | 
						|
 | 
						|
from django.test import Client, TestCase, RequestFactory
 | 
						|
 | 
						|
import peeringdb_server.models as models
 | 
						|
import peeringdb_server.admin as admin
 | 
						|
 | 
						|
 | 
						|
class AdminTests(TestCase):
 | 
						|
    """
 | 
						|
    Test peeringdb django admin functionality
 | 
						|
    """
 | 
						|
 | 
						|
    asn_count = 0
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def entity_data(cls, org, tag):
 | 
						|
        kwargs = {
 | 
						|
            "name": "%s %s" % (org.name, tag),
 | 
						|
            "status": "ok",
 | 
						|
            "org": org
 | 
						|
        }
 | 
						|
        if tag == "net":
 | 
						|
            cls.asn_count += 1
 | 
						|
            kwargs.update(asn=cls.asn_count)
 | 
						|
        return kwargs
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def setUpTestData(cls):
 | 
						|
 | 
						|
        cls.entities = {}
 | 
						|
 | 
						|
        # set up organizations
 | 
						|
        cls.entities["org"] = [
 | 
						|
            org
 | 
						|
            for org in [
 | 
						|
                models.Organization.objects.create(
 | 
						|
                    name="Org %d" % i, status="ok") for i in range(0, 9)
 | 
						|
            ]
 | 
						|
        ]
 | 
						|
 | 
						|
        # set up a network,facility and ix under each org
 | 
						|
        for tag in ["ix", "net", "fac"]:
 | 
						|
            cls.entities[tag] = [
 | 
						|
                models.REFTAG_MAP[tag].objects.create(**cls.entity_data(
 | 
						|
                    org, tag)) for org in cls.entities["org"]
 | 
						|
            ]
 | 
						|
 | 
						|
        # create a user under each org
 | 
						|
        cls.entities["user"] = [
 | 
						|
            models.User.objects.create_user(
 | 
						|
                "user " + org.name, "%s@localhost" % org.name,
 | 
						|
                first_name="First", last_name="Last")
 | 
						|
            for org in cls.entities["org"]
 | 
						|
        ]
 | 
						|
        i = 0
 | 
						|
        for user in cls.entities["user"]:
 | 
						|
            cls.entities["org"][i].usergroup.user_set.add(user)
 | 
						|
            i += 1
 | 
						|
 | 
						|
        cls.admin_user = models.User.objects.create_user(
 | 
						|
            "admin", "admin@localhost", first_name="admin", last_name="admin")
 | 
						|
        cls.admin_user.is_superuser = True
 | 
						|
        cls.admin_user.is_staff = True
 | 
						|
        cls.admin_user.save()
 | 
						|
        cls.admin_user.set_password("admin")
 | 
						|
        cls.admin_user.save()
 | 
						|
 | 
						|
        #set up some ixlans
 | 
						|
        cls.entities["ixlan"] = [
 | 
						|
            models.IXLan.objects.create(ix=ix, status="ok")
 | 
						|
            for ix in cls.entities["ix"]
 | 
						|
        ]
 | 
						|
 | 
						|
        #set up some netixlans
 | 
						|
        cls.entities["netixlan"] = [
 | 
						|
            models.NetworkIXLan.objects.create(
 | 
						|
                network=cls.entities["net"][0], ixlan=cls.entities["ixlan"][0],
 | 
						|
                ipaddr4=addr, status="ok", asn=cls.entities["net"][0].asn,
 | 
						|
                speed=1000)
 | 
						|
            for addr in ["207.41.110.37", "207.41.110.38", "207.41.110.39"]
 | 
						|
        ]
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.factory = RequestFactory()
 | 
						|
 | 
						|
    def test_views(self):
 | 
						|
        """
 | 
						|
        Test that all views are still functional
 | 
						|
 | 
						|
        Note: this only tests for HTTP status and is a quick and dirty
 | 
						|
        way that none of the views got broken for GET requests. This will
 | 
						|
        need to be replaced by something more extensive
 | 
						|
        """
 | 
						|
 | 
						|
        m = [
 | 
						|
            models.Facility, models.InternetExchange, models.Network,
 | 
						|
            models.Organization, models.User
 | 
						|
        ]
 | 
						|
 | 
						|
        c = Client()
 | 
						|
        c.login(username="admin", password="admin")
 | 
						|
        for model in m:
 | 
						|
            url = "/cp/%s/%s/" % (model._meta.app_label,
 | 
						|
                                  model._meta.model_name)
 | 
						|
            response = c.get(url, follow=True)
 | 
						|
            self.assertEqual(response.status_code, 200)
 | 
						|
 | 
						|
            url_add = "%sadd" % url
 | 
						|
            response = c.get(url_add, follow=True)
 | 
						|
            self.assertEqual(response.status_code, 200)
 | 
						|
 | 
						|
            url_id = "%s%s" % (url, model.objects.first().id)
 | 
						|
            response = c.get(url_id, follow=True)
 | 
						|
            self.assertEqual(response.status_code, 200)
 | 
						|
 | 
						|
    def test_org_merge(self):
 | 
						|
        """
 | 
						|
        Test the org merge functionality, which should merge 1 or more
 | 
						|
        organizations into a target organization, moving all entities
 | 
						|
        to the target organization
 | 
						|
        """
 | 
						|
        request = self.factory.post("/cp")
 | 
						|
        request.user = None
 | 
						|
        # TEST 1
 | 
						|
 | 
						|
        # merge orgs 1 and 2 into org 0
 | 
						|
        t_org = self.entities["org"][0]
 | 
						|
        admin.merge_organizations(self.entities["org"][1:3], t_org, request)
 | 
						|
 | 
						|
        # check that all entities moved
 | 
						|
        for tag in ["ix", "net", "fac"]:
 | 
						|
            for ent in self.entities[tag][0:3]:
 | 
						|
                ent.refresh_from_db()
 | 
						|
                self.assertEqual(ent.org, t_org)
 | 
						|
 | 
						|
        # check that all users moved
 | 
						|
        i = 1
 | 
						|
        for user in self.entities["user"][1:3]:
 | 
						|
            org = self.entities["org"][i]
 | 
						|
            self.assertEqual(user.is_org_member(t_org), True)
 | 
						|
            self.assertEqual(user.is_org_admin(t_org), False)
 | 
						|
            self.assertEqual(user.is_org_member(org), False)
 | 
						|
            self.assertEqual(user.is_org_admin(org), False)
 | 
						|
            i += 1
 | 
						|
 | 
						|
        # check that all merged orgs are deleted
 | 
						|
        for org in self.entities["org"][1:3]:
 | 
						|
            org.refresh_from_db()
 | 
						|
            self.assertEqual(org.status, "deleted")
 | 
						|
 | 
						|
        # check that target org is still in tact
 | 
						|
        t_org.refresh_from_db()
 | 
						|
        self.assertEqual(t_org.status, "ok")
 | 
						|
 | 
						|
        # TEST 2 - Dont allow merging of target org into target org
 | 
						|
        with self.assertRaises(ValueError) as inst:
 | 
						|
            admin.merge_organizations([t_org], t_org, request)
 | 
						|
 | 
						|
    def test_org_unmerge(self):
 | 
						|
        """
 | 
						|
        Test undoing an organization merge
 | 
						|
        """
 | 
						|
 | 
						|
        request = self.factory.post("/cp")
 | 
						|
        request.user = None
 | 
						|
 | 
						|
        # merge orgs 4 and 5 into org 3
 | 
						|
        t_org = self.entities["org"][3]
 | 
						|
        admin.merge_organizations(self.entities["org"][4:6], t_org, request)
 | 
						|
 | 
						|
        print t_org
 | 
						|
 | 
						|
        # check that merge log exists
 | 
						|
        merges = models.OrganizationMerge.objects.filter(to_org=t_org)
 | 
						|
        self.assertEqual(merges.count(), 2)
 | 
						|
 | 
						|
        # undo merges
 | 
						|
        i = 4
 | 
						|
        for merge in [m for m in merges]:
 | 
						|
            self.assertEqual(merge.from_org, self.entities["org"][i])
 | 
						|
            merge.undo()
 | 
						|
            i += 1
 | 
						|
 | 
						|
        # check that all entities moved back
 | 
						|
        for tag in ["ix", "net", "fac"]:
 | 
						|
            i = 4
 | 
						|
            for ent in self.entities[tag][4:6]:
 | 
						|
                ent.refresh_from_db()
 | 
						|
                self.assertEqual(ent.org, self.entities["org"][i])
 | 
						|
                i += 1
 | 
						|
 | 
						|
        # check that all users moved back
 | 
						|
        i = 4
 | 
						|
        for user in self.entities["user"][4:6]:
 | 
						|
            org = self.entities["org"][i]
 | 
						|
            self.assertEqual(user.is_org_member(t_org), False)
 | 
						|
            self.assertEqual(user.is_org_admin(t_org), False)
 | 
						|
            self.assertEqual(user.is_org_member(org), True)
 | 
						|
            self.assertEqual(user.is_org_admin(org), False)
 | 
						|
            i += 1
 | 
						|
 | 
						|
        # check that all merged orgs are deleted
 | 
						|
        for org in self.entities["org"][4:6]:
 | 
						|
            org.refresh_from_db()
 | 
						|
            self.assertEqual(org.status, "ok")
 | 
						|
 | 
						|
        # check that target org is still in tact
 | 
						|
        t_org.refresh_from_db()
 | 
						|
        self.assertEqual(t_org.status, "ok")
 | 
						|
 | 
						|
    def test_commandline_tool(self):
 | 
						|
        c = Client()
 | 
						|
        c.login(username="admin", password="admin")
 | 
						|
 | 
						|
        # test form that lets user select which command run
 | 
						|
        url = "/cp/peeringdb_server/commandlinetool/prepare"
 | 
						|
        response = c.get(url, follow=True)
 | 
						|
        self.assertEqual(response.status_code, 200)
 | 
						|
        for i, n in models.COMMANDLINE_TOOLS:
 | 
						|
            self.assertGreater(
 | 
						|
                response.content.find('<option value="{}">{}</option>'.format(
 | 
						|
                    i, n)), -1)
 | 
						|
 | 
						|
    def test_commandline_tool_renumber_lans(self):
 | 
						|
        # test the form that runs the renumer ip space tool
 | 
						|
        c = Client()
 | 
						|
        c.login(username="admin", password="admin")
 | 
						|
 | 
						|
        # test renumber lans command form
 | 
						|
        data = {"tool": "pdb_renumber_lans"}
 | 
						|
        url = "/cp/peeringdb_server/commandlinetool/prepare/"
 | 
						|
        response = c.post(url, data, follow=True)
 | 
						|
        cont = response.content
 | 
						|
        self.assertEqual(response.status_code, 200)
 | 
						|
        self.assertGreater(
 | 
						|
            cont.find(
 | 
						|
                '<label class="required" for="id_old_prefix">Old prefix:</label>'
 | 
						|
            ), -1)
 | 
						|
        self.assertGreater(
 | 
						|
            cont.find(
 | 
						|
                '<label class="required" for="id_new_prefix">New prefix:</label>'
 | 
						|
            ), -1)
 | 
						|
        self.assertGreater(
 | 
						|
            cont.find(
 | 
						|
                '<label class="required" for="id_exchange">Exchange:</label>'),
 | 
						|
            -1)
 | 
						|
 | 
						|
        # test post to renumber lans command form (preview)
 | 
						|
        data = {
 | 
						|
            "tool": "pdb_renumber_lans",
 | 
						|
            "exchange": self.entities["ix"][0].id,
 | 
						|
            "old_prefix": "207.41.110",
 | 
						|
            "new_prefix": "207.41.111"
 | 
						|
        }
 | 
						|
        url = "/cp/peeringdb_server/commandlinetool/preview/"
 | 
						|
        response = c.post(url, data, follow=True)
 | 
						|
        cont = response.content
 | 
						|
        self.assertEqual(response.status_code, 200)
 | 
						|
        self.assertGreater(
 | 
						|
            cont.find(
 | 
						|
                '[pretend] netixlan: Org 0 ix (IXLAN:1) 207.41.110.37 -> 207.41.111.37'
 | 
						|
            ), -1)
 | 
						|
        self.assertGreater(
 | 
						|
            cont.find(
 | 
						|
                '[pretend] netixlan: Org 0 ix (IXLAN:1) 207.41.110.38 -> 207.41.111.38'
 | 
						|
            ), -1)
 | 
						|
        self.assertGreater(
 | 
						|
            cont.find(
 | 
						|
                '[pretend] netixlan: Org 0 ix (IXLAN:1) 207.41.110.39 -> 207.41.111.39'
 | 
						|
            ), -1)
 | 
						|
 | 
						|
        # test post to renumber lans command form
 | 
						|
        data = {
 | 
						|
            "tool": "pdb_renumber_lans",
 | 
						|
            "exchange": self.entities["ix"][0].id,
 | 
						|
            "old_prefix": "207.41.110",
 | 
						|
            "new_prefix": "207.41.111"
 | 
						|
        }
 | 
						|
        url = "/cp/peeringdb_server/commandlinetool/run/"
 | 
						|
        response = c.post(url, data, follow=True)
 | 
						|
        cont = response.content
 | 
						|
        self.assertEqual(response.status_code, 200)
 | 
						|
        self.assertGreater(
 | 
						|
            cont.find(
 | 
						|
                '>netixlan: Org 0 ix (IXLAN:1) 207.41.110.37 -> 207.41.111.37'
 | 
						|
            ), -1)
 | 
						|
        self.assertGreater(
 | 
						|
            cont.find(
 | 
						|
                '>netixlan: Org 0 ix (IXLAN:1) 207.41.110.38 -> 207.41.111.38'
 | 
						|
            ), -1)
 | 
						|
        self.assertGreater(
 | 
						|
            cont.find(
 | 
						|
                '>netixlan: Org 0 ix (IXLAN:1) 207.41.110.39 -> 207.41.111.39'
 | 
						|
            ), -1)
 | 
						|
 | 
						|
        for netixlan in self.entities["netixlan"]:
 | 
						|
            netixlan.refresh_from_db()
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
            str(self.entities["netixlan"][0].ipaddr4), "207.41.111.37")
 | 
						|
        self.assertEqual(
 | 
						|
            str(self.entities["netixlan"][1].ipaddr4), "207.41.111.38")
 | 
						|
        self.assertEqual(
 | 
						|
            str(self.entities["netixlan"][2].ipaddr4), "207.41.111.39")
 |