Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ jobs:
pip install -U -r requirements-test.txt
pip install -U -e .
pip install ${{ matrix.django-version }}
# Todo: Remove before merge
pip install --no-cache-dir --force-reinstall \
"django-loci @ https://github.com/openwisp/django-loci/tarball/feature/191-new-ws-endpoint-for-all-location"

- name: Start redis
if: ${{ !cancelled() && steps.deps.conclusion == 'success' }}
Expand Down
21 changes: 20 additions & 1 deletion openwisp_controller/geo/channels/consumers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import swapper
from django_loci.channels.base import BaseLocationBroadcast
from asgiref.sync import async_to_sync
from django_loci.channels.base import BaseCommonLocationBroadcast, BaseLocationBroadcast

Location = swapper.load_model("geo", "Location")

Expand All @@ -17,3 +18,21 @@ def is_authorized(self, user, location):
):
return False
return result


class CommonLocationBroadcast(BaseCommonLocationBroadcast):
model = Location

def join_groups(self, user):
"""
Subscribe user to all organizations they manage or bypass if superuser.
"""
if user.is_superuser:
super().join_groups(user)
return

self.group_names = []
for org in user.organizations_managed:
group = f"loci.mobile-location.organization.{org}"
self.group_names.append(group)
async_to_sync(self.channel_layer.group_add)(group, self.channel_name)
14 changes: 11 additions & 3 deletions openwisp_controller/geo/channels/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.urls import path
from django_loci.channels.base import location_broadcast_path
from django_loci.channels.base import (
common_location_broadcast_path,
location_broadcast_path,
)
from openwisp_notifications.websockets.routing import (
get_routes as get_notification_routes,
)

from .consumers import LocationBroadcast
from .consumers import CommonLocationBroadcast, LocationBroadcast


def get_routes():
return [
path(
location_broadcast_path, LocationBroadcast.as_asgi(), name="LocationChannel"
)
),
path(
common_location_broadcast_path,
CommonLocationBroadcast.as_asgi(),
name="AllLocationChannel",
),
]


Expand Down
166 changes: 112 additions & 54 deletions openwisp_controller/geo/tests/pytest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import importlib
import asyncio
import os
from contextlib import suppress
from unittest import skipIf

import pytest
from channels.db import database_sync_to_async
from channels.layers import get_channel_layer
from channels.routing import ProtocolTypeRouter
from channels.testing import WebsocketCommunicator
from django.conf import settings
from django.contrib.auth import get_user_model, login
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from django.http.request import HttpRequest
from django.utils.module_loading import import_string
from django_loci.tests import TestChannelsMixin
from swapper import load_model

from openwisp_controller.geo.channels.consumers import (
CommonLocationBroadcast,
LocationBroadcast,
)

from .utils import TestGeoMixin

Device = load_model("config", "Device")
Expand All @@ -23,48 +29,15 @@


@skipIf(os.environ.get("SAMPLE_APP", False), "Running tests on SAMPLE_APP")
class TestChannels(TestGeoMixin):
class TestChannels(TestGeoMixin, TestChannelsMixin):
location_consumer = LocationBroadcast
common_location_consumer = CommonLocationBroadcast
application = import_string(getattr(settings, "ASGI_APPLICATION"))
object_model = Device
location_model = Location
object_location_model = DeviceLocation
user_model = get_user_model()

def _force_login(self, user, backend=None):
engine = importlib.import_module(settings.SESSION_ENGINE)
request = HttpRequest()
request.session = engine.SessionStore()
login(request, user, backend)
request.session.save()
return request.session

async def _get_request_dict(self, pk=None, user=None):
if not pk:
location = await database_sync_to_async(self._create_location)(
is_mobile=True
)
await database_sync_to_async(self._create_object_location)(
location=location
)
pk = location.pk
path = "/ws/loci/location/{0}/".format(pk)
session = None
if user:
session = await database_sync_to_async(self._force_login)(user)
return {"pk": pk, "path": path, "session": session}

def _get_communicator(self, request_vars, user=None):
communicator = WebsocketCommunicator(self.application, request_vars["path"])
if user:
communicator.scope.update(
{
"user": user,
"session": request_vars["session"],
"url_route": {"kwargs": {"pk": request_vars["pk"]}},
}
)
return communicator

@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
async def test_consumer_staff_but_no_change_permission(self):
Expand All @@ -74,37 +47,122 @@ async def test_consumer_staff_but_no_change_permission(self):
location = await database_sync_to_async(self._create_location)(is_mobile=True)
await database_sync_to_async(self._create_object_location)(location=location)
pk = location.pk
request_vars = await self._get_request_dict(user=user, pk=pk)
communicator = self._get_communicator(request_vars, user)
request_vars = await self._get_specific_location_request_dict(pk=pk, user=user)
communicator = self._get_specific_location_communicator(request_vars, user)
connected, _ = await communicator.connect()
assert not connected
await communicator.disconnect()
# add permission to change location and repeat
perm = await database_sync_to_async(
(
await database_sync_to_async(Permission.objects.filter)(
name="Can change location"
)
).first
)()
perm = await Permission.objects.filter(
codename=f"change_{self.location_model._meta.model_name}",
content_type__app_label=self.location_model._meta.app_label,
).afirst()
await database_sync_to_async(user.user_permissions.add)(perm)
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
request_vars = await self._get_request_dict(user=user, pk=pk)
communicator = self._get_communicator(request_vars, user)
request_vars = await self._get_specific_location_request_dict(pk=pk, user=user)
communicator = self._get_specific_location_communicator(request_vars, user)
connected, _ = await communicator.connect()
assert not connected
await communicator.disconnect()
# add user to organization
await database_sync_to_async(OrganizationUser.objects.create)(
organization=location.organization, user=user, is_admin=True
organization=location.organization,
user=user,
is_admin=True,
)
await database_sync_to_async(location.organization.save)()
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
request_vars = await self._get_request_dict(user=user, pk=pk)
communicator = self._get_communicator(request_vars, user)
request_vars = await self._get_specific_location_request_dict(pk=pk, user=user)
communicator = self._get_specific_location_communicator(request_vars, user)
connected, _ = await communicator.connect()
assert connected
await communicator.disconnect()

@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
async def test_common_location_consumer_staff_but_no_change_permission(self):
user = await database_sync_to_async(User.objects.create_user)(
username="user", password="password", email="[email protected]", is_staff=True
)
location = await database_sync_to_async(self._create_location)(is_mobile=True)
await database_sync_to_async(self._create_object_location)(location=location)
pk = location.pk
request_vars = await self._get_common_location_request_dict(pk=pk, user=user)
communicator = self._get_common_location_communicator(request_vars, user)
connected, _ = await communicator.connect()
assert not connected
await communicator.disconnect()
# After granting change permission, the user can connect to the common
# location endpoint, but must receive updates only for locations
# belonging to their organization.
perm = await Permission.objects.filter(
codename=f"change_{self.location_model._meta.model_name}",
content_type__app_label=self.location_model._meta.app_label,
).afirst()
await database_sync_to_async(user.user_permissions.add)(perm)
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
request_vars = await self._get_common_location_request_dict(pk=pk, user=user)
communicator = self._get_common_location_communicator(request_vars, user)
connected, _ = await communicator.connect()
assert connected
await communicator.disconnect()

@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
async def test_common_location_org_isolation(self):
org1 = await database_sync_to_async(self._create_organization)(name="test1")
org2 = await database_sync_to_async(self._create_organization)(name="test2")
location1 = await database_sync_to_async(self._create_location)(
is_mobile=True, organization=org1
)
location2 = await database_sync_to_async(self._create_location)(
is_mobile=True, organization=org2
)
user1 = await database_sync_to_async(User.objects.create_user)(
username="user1", password="password", email="[email protected]", is_staff=True
)
user2 = await database_sync_to_async(User.objects.create_user)(
username="user2", password="password", email="[email protected]", is_staff=True
)
perm = await Permission.objects.filter(
codename=f"change_{self.location_model._meta.model_name}",
content_type__app_label=self.location_model._meta.app_label,
).afirst()
await database_sync_to_async(user1.user_permissions.add)(perm)
await database_sync_to_async(user2.user_permissions.add)(perm)
await database_sync_to_async(OrganizationUser.objects.create)(
organization=org1, user=user1, is_admin=True
)
await database_sync_to_async(OrganizationUser.objects.create)(
organization=org2, user=user2, is_admin=True
)
user1 = await database_sync_to_async(User.objects.get)(pk=user1.pk)
user2 = await database_sync_to_async(User.objects.get)(pk=user2.pk)
channel_layer = get_channel_layer()
communicator1 = self._get_common_location_communicator(
await self._get_common_location_request_dict(pk=location1.pk, user=user1),
user1,
)
communicator2 = self._get_common_location_communicator(
await self._get_common_location_request_dict(pk=location2.pk, user=user2),
user2,
)
connected, _ = await communicator1.connect()
assert connected
connected, _ = await communicator2.connect()
assert connected
await channel_layer.group_send(
f"loci.mobile-location.organization.{org1.pk}",
{"type": "send.message", "message": {"id": str(location1.pk)}},
)
response = await communicator1.receive_json_from(timeout=1)
assert response["id"] == str(location1.pk)
with pytest.raises(asyncio.TimeoutError):
await communicator2.receive_json_from(timeout=1)
# The task is been cancelled if not completed in the given timeout
await communicator1.disconnect()
with suppress(asyncio.CancelledError):
await communicator2.disconnect()

def test_asgi_application_router(self):
assert isinstance(self.application, ProtocolTypeRouter)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ django-reversion~=6.0.0
django-taggit~=6.1.0
netjsonconfig @ https://github.com/openwisp/netjsonconfig/archive/refs/heads/1.3.tar.gz
django-x509 @ https://github.com/openwisp/django-x509/archive/refs/heads/1.4.tar.gz
django-loci @ https://github.com/openwisp/django-loci/archive/refs/heads/1.3.tar.gz
# Todo: Make path https://github.com/openwisp/django-loci/archive/refs/heads/1.3.tar.gz before merge
django-loci @ https://github.com/openwisp/django-loci/tarball/feature/191-new-ws-endpoint-for-all-location
django-flat-json-widget @ https://github.com/openwisp/django-flat-json-widget/archive/refs/heads/0.5.tar.gz
openwisp-users @ https://github.com/openwisp/openwisp-users/archive/refs/heads/1.3.tar.gz
openwisp-utils[celery,channels] @ https://github.com/openwisp/openwisp-utils/archive/refs/heads/1.3.tar.gz
Expand Down
Loading