-
-
Notifications
You must be signed in to change notification settings - Fork 237
[feature] New ws endpoint for all all Location broadcast #1161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
dee077
wants to merge
6
commits into
gsoc25-map
Choose a base branch
from
feature/1157-new-ws-endpoint-for-all-location
base: gsoc25-map
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+148
−59
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
0121463
[feature] Add websocket endpoint for bulk location updates #1157
dee077 faff6f4
[fix] Deps
dee077 fb09331
[fix] Typo
dee077 75476f6
[tests] Add test for common ws endpoint
dee077 9fcee51
[ci] Force reinstall django-loci
dee077 9697b8b
[fix] Add org specific group and add tests
dee077 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,18 +1,24 @@ | ||
| import importlib | ||
| import asyncio | ||
| import os | ||
| from contextlib import suppress | ||
| from unittest import skipIf | ||
|
|
||
| import pytest | ||
| from channels.db import database_sync_to_async | ||
| from channels.layers import get_channel_layer | ||
| from channels.routing import ProtocolTypeRouter | ||
| from channels.testing import WebsocketCommunicator | ||
| from django.conf import settings | ||
| from django.contrib.auth import get_user_model, login | ||
| from django.contrib.auth import get_user_model | ||
| from django.contrib.auth.models import Permission | ||
| from django.http.request import HttpRequest | ||
| from django.utils.module_loading import import_string | ||
| from django_loci.tests import TestChannelsMixin | ||
| from swapper import load_model | ||
|
|
||
| from openwisp_controller.geo.channels.consumers import ( | ||
| CommonLocationBroadcast, | ||
| LocationBroadcast, | ||
| ) | ||
|
|
||
| from .utils import TestGeoMixin | ||
|
|
||
| Device = load_model("config", "Device") | ||
|
|
@@ -23,48 +29,15 @@ | |
|
|
||
|
|
||
| @skipIf(os.environ.get("SAMPLE_APP", False), "Running tests on SAMPLE_APP") | ||
| class TestChannels(TestGeoMixin): | ||
| class TestChannels(TestGeoMixin, TestChannelsMixin): | ||
| location_consumer = LocationBroadcast | ||
| common_location_consumer = CommonLocationBroadcast | ||
| application = import_string(getattr(settings, "ASGI_APPLICATION")) | ||
| object_model = Device | ||
| location_model = Location | ||
| object_location_model = DeviceLocation | ||
| user_model = get_user_model() | ||
|
|
||
| def _force_login(self, user, backend=None): | ||
| engine = importlib.import_module(settings.SESSION_ENGINE) | ||
| request = HttpRequest() | ||
| request.session = engine.SessionStore() | ||
| login(request, user, backend) | ||
| request.session.save() | ||
| return request.session | ||
|
|
||
| async def _get_request_dict(self, pk=None, user=None): | ||
| if not pk: | ||
| location = await database_sync_to_async(self._create_location)( | ||
| is_mobile=True | ||
| ) | ||
| await database_sync_to_async(self._create_object_location)( | ||
| location=location | ||
| ) | ||
| pk = location.pk | ||
| path = "/ws/loci/location/{0}/".format(pk) | ||
| session = None | ||
| if user: | ||
| session = await database_sync_to_async(self._force_login)(user) | ||
| return {"pk": pk, "path": path, "session": session} | ||
|
|
||
| def _get_communicator(self, request_vars, user=None): | ||
| communicator = WebsocketCommunicator(self.application, request_vars["path"]) | ||
| if user: | ||
| communicator.scope.update( | ||
| { | ||
| "user": user, | ||
| "session": request_vars["session"], | ||
| "url_route": {"kwargs": {"pk": request_vars["pk"]}}, | ||
| } | ||
| ) | ||
| return communicator | ||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.django_db(transaction=True) | ||
| async def test_consumer_staff_but_no_change_permission(self): | ||
|
|
@@ -74,37 +47,122 @@ async def test_consumer_staff_but_no_change_permission(self): | |
| location = await database_sync_to_async(self._create_location)(is_mobile=True) | ||
| await database_sync_to_async(self._create_object_location)(location=location) | ||
| pk = location.pk | ||
| request_vars = await self._get_request_dict(user=user, pk=pk) | ||
| communicator = self._get_communicator(request_vars, user) | ||
| request_vars = await self._get_specific_location_request_dict(pk=pk, user=user) | ||
| communicator = self._get_specific_location_communicator(request_vars, user) | ||
| connected, _ = await communicator.connect() | ||
| assert not connected | ||
| await communicator.disconnect() | ||
| # add permission to change location and repeat | ||
| perm = await database_sync_to_async( | ||
| ( | ||
| await database_sync_to_async(Permission.objects.filter)( | ||
| name="Can change location" | ||
| ) | ||
| ).first | ||
| )() | ||
| perm = await Permission.objects.filter( | ||
| codename=f"change_{self.location_model._meta.model_name}", | ||
| content_type__app_label=self.location_model._meta.app_label, | ||
| ).afirst() | ||
| await database_sync_to_async(user.user_permissions.add)(perm) | ||
| user = await database_sync_to_async(User.objects.get)(pk=user.pk) | ||
| request_vars = await self._get_request_dict(user=user, pk=pk) | ||
| communicator = self._get_communicator(request_vars, user) | ||
| request_vars = await self._get_specific_location_request_dict(pk=pk, user=user) | ||
| communicator = self._get_specific_location_communicator(request_vars, user) | ||
| connected, _ = await communicator.connect() | ||
| assert not connected | ||
| await communicator.disconnect() | ||
| # add user to organization | ||
| await database_sync_to_async(OrganizationUser.objects.create)( | ||
| organization=location.organization, user=user, is_admin=True | ||
| organization=location.organization, | ||
| user=user, | ||
| is_admin=True, | ||
| ) | ||
| await database_sync_to_async(location.organization.save)() | ||
| user = await database_sync_to_async(User.objects.get)(pk=user.pk) | ||
| request_vars = await self._get_request_dict(user=user, pk=pk) | ||
| communicator = self._get_communicator(request_vars, user) | ||
| request_vars = await self._get_specific_location_request_dict(pk=pk, user=user) | ||
| communicator = self._get_specific_location_communicator(request_vars, user) | ||
| connected, _ = await communicator.connect() | ||
| assert connected | ||
| await communicator.disconnect() | ||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.django_db(transaction=True) | ||
| async def test_common_location_consumer_staff_but_no_change_permission(self): | ||
| user = await database_sync_to_async(User.objects.create_user)( | ||
| username="user", password="password", email="[email protected]", is_staff=True | ||
| ) | ||
| location = await database_sync_to_async(self._create_location)(is_mobile=True) | ||
| await database_sync_to_async(self._create_object_location)(location=location) | ||
| pk = location.pk | ||
| request_vars = await self._get_common_location_request_dict(pk=pk, user=user) | ||
| communicator = self._get_common_location_communicator(request_vars, user) | ||
| connected, _ = await communicator.connect() | ||
| assert not connected | ||
| await communicator.disconnect() | ||
| # After granting change permission, the user can connect to the common | ||
| # location endpoint, but must receive updates only for locations | ||
| # belonging to their organization. | ||
| perm = await Permission.objects.filter( | ||
| codename=f"change_{self.location_model._meta.model_name}", | ||
| content_type__app_label=self.location_model._meta.app_label, | ||
| ).afirst() | ||
| await database_sync_to_async(user.user_permissions.add)(perm) | ||
| user = await database_sync_to_async(User.objects.get)(pk=user.pk) | ||
| request_vars = await self._get_common_location_request_dict(pk=pk, user=user) | ||
| communicator = self._get_common_location_communicator(request_vars, user) | ||
| connected, _ = await communicator.connect() | ||
| assert connected | ||
| await communicator.disconnect() | ||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.django_db(transaction=True) | ||
| async def test_common_location_org_isolation(self): | ||
| org1 = await database_sync_to_async(self._create_organization)(name="test1") | ||
| org2 = await database_sync_to_async(self._create_organization)(name="test2") | ||
| location1 = await database_sync_to_async(self._create_location)( | ||
| is_mobile=True, organization=org1 | ||
| ) | ||
| location2 = await database_sync_to_async(self._create_location)( | ||
| is_mobile=True, organization=org2 | ||
| ) | ||
| user1 = await database_sync_to_async(User.objects.create_user)( | ||
| username="user1", password="password", email="[email protected]", is_staff=True | ||
| ) | ||
| user2 = await database_sync_to_async(User.objects.create_user)( | ||
| username="user2", password="password", email="[email protected]", is_staff=True | ||
| ) | ||
| perm = await Permission.objects.filter( | ||
| codename=f"change_{self.location_model._meta.model_name}", | ||
| content_type__app_label=self.location_model._meta.app_label, | ||
| ).afirst() | ||
| await database_sync_to_async(user1.user_permissions.add)(perm) | ||
| await database_sync_to_async(user2.user_permissions.add)(perm) | ||
| await database_sync_to_async(OrganizationUser.objects.create)( | ||
| organization=org1, user=user1, is_admin=True | ||
| ) | ||
| await database_sync_to_async(OrganizationUser.objects.create)( | ||
| organization=org2, user=user2, is_admin=True | ||
| ) | ||
| user1 = await database_sync_to_async(User.objects.get)(pk=user1.pk) | ||
| user2 = await database_sync_to_async(User.objects.get)(pk=user2.pk) | ||
| channel_layer = get_channel_layer() | ||
| communicator1 = self._get_common_location_communicator( | ||
| await self._get_common_location_request_dict(pk=location1.pk, user=user1), | ||
| user1, | ||
| ) | ||
| communicator2 = self._get_common_location_communicator( | ||
| await self._get_common_location_request_dict(pk=location2.pk, user=user2), | ||
| user2, | ||
| ) | ||
| connected, _ = await communicator1.connect() | ||
| assert connected | ||
| connected, _ = await communicator2.connect() | ||
| assert connected | ||
| await channel_layer.group_send( | ||
| f"loci.mobile-location.organization.{org1.pk}", | ||
| {"type": "send.message", "message": {"id": str(location1.pk)}}, | ||
| ) | ||
| response = await communicator1.receive_json_from(timeout=1) | ||
| assert response["id"] == str(location1.pk) | ||
| with pytest.raises(asyncio.TimeoutError): | ||
| await communicator2.receive_json_from(timeout=1) | ||
| # The task is been cancelled if not completed in the given timeout | ||
| await communicator1.disconnect() | ||
| with suppress(asyncio.CancelledError): | ||
| await communicator2.disconnect() | ||
|
|
||
| def test_asgi_application_router(self): | ||
| assert isinstance(self.application, ProtocolTypeRouter) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.