diff --git a/playwright/_impl/_assertions.py b/playwright/_impl/_assertions.py index aea37d35c..d616d397d 100644 --- a/playwright/_impl/_assertions.py +++ b/playwright/_impl/_assertions.py @@ -12,8 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import collections.abc -from typing import Any, List, Optional, Pattern, Sequence, Union +import inspect +import math +import time +from typing import Any, Callable, List, Optional, Pattern, Sequence, Union from urllib.parse import urljoin from playwright._impl._api_structures import ( @@ -983,6 +987,482 @@ async def not_to_be_ok(self) -> None: await self._not.to_be_ok() +def _normalize_poll_intervals(intervals: Optional[Sequence[float]]) -> List[float]: + resolved = ( + [float(interval) for interval in intervals] + if intervals is not None + else [100.0, 250.0, 500.0, 1000.0] + ) + if not resolved: + raise Error('"intervals" must be a non-empty sequence of positive numbers') + if any(interval <= 0 for interval in resolved): + raise Error('"intervals" must be a sequence of positive numbers') + return resolved + + +class _PollAssertionsBase: + def __init__( + self, + callback: Callable[[], Any], + timeout: float = None, + intervals: Optional[Sequence[float]] = None, + is_not: bool = False, + message: Optional[str] = None, + ) -> None: + if not callable(callback): + raise Error('"callback" must be callable') + self._callback = callback + self._timeout = timeout + self._intervals = _normalize_poll_intervals(intervals) + self._is_not = is_not + self._custom_message = message + + def _effective_timeout(self, timeout: float = None) -> float: + if timeout is not None: + return timeout + if self._timeout is not None: + return self._timeout + return 5_000 + + def _format_error( + self, + matcher: str, + expected: Any, + last_value: Any, + last_error: Optional[Exception], + timeout_ms: float, + ) -> AssertionError: + if self._custom_message: + out_message = self._custom_message + else: + expectation = f"not {matcher}" if self._is_not else matcher + out_message = f'Expect poll {expectation} "{expected}"' + out_message += f"\nTimeout: {timeout_ms}ms" + if last_error is not None: + out_message += f"\nLast error: {last_error}" + else: + out_message += f"\nLast value: {repr(last_value)}" + return AssertionError(out_message) + + +class SyncPollAssertions(_PollAssertionsBase): + @property + def _not(self) -> "SyncPollAssertions": + return SyncPollAssertions( + self._callback, + timeout=self._timeout, + intervals=self._intervals, + is_not=not self._is_not, + message=self._custom_message, + ) + + def _expect( + self, + matcher: str, + expected: Any, + predicate: Callable[[Any], bool], + timeout: float = None, + ) -> None: + timeout_ms = self._effective_timeout(timeout) + deadline = None if timeout_ms == 0 else time.monotonic() + timeout_ms / 1000 + interval_index = 0 + last_value = None + last_error = None + while True: + evaluation_succeeded = False + try: + last_value = self._callback() + if inspect.isawaitable(last_value): + raise Error( + "Synchronous expect.poll() callback must not return awaitable" + ) + matched = predicate(last_value) + evaluation_succeeded = True + last_error = None + except Exception as exc: + matched = False + last_error = exc + if evaluation_succeeded and matched != self._is_not: + return + if deadline is not None and time.monotonic() >= deadline: + raise self._format_error( + matcher, expected, last_value, last_error, timeout_ms + ) + wait_ms = self._intervals[min(interval_index, len(self._intervals) - 1)] + interval_index += 1 + time.sleep(wait_ms / 1000) + + def to_be(self, expected: Any, timeout: float = None) -> None: + self._expect("to_be", expected, lambda actual: actual == expected, timeout) + + def not_to_be(self, expected: Any, timeout: float = None) -> None: + self._not.to_be(expected, timeout) + + def to_equal(self, expected: Any, timeout: float = None) -> None: + self.to_be(expected, timeout) + + def not_to_equal(self, expected: Any, timeout: float = None) -> None: + self._not.to_equal(expected, timeout) + + def to_contain(self, expected: Any, timeout: float = None) -> None: + def contains(actual: Any) -> bool: + return expected in actual + + self._expect("to_contain", expected, contains, timeout) + + def not_to_contain(self, expected: Any, timeout: float = None) -> None: + self._not.to_contain(expected, timeout) + + def to_be_truthy(self, timeout: float = None) -> None: + self._expect("to_be_truthy", True, bool, timeout) + + def not_to_be_truthy(self, timeout: float = None) -> None: + self._not.to_be_truthy(timeout) + + def to_be_falsy(self, timeout: float = None) -> None: + self._expect("to_be_falsy", False, lambda actual: not actual, timeout) + + def not_to_be_falsy(self, timeout: float = None) -> None: + self._not.to_be_falsy(timeout) + + def to_be_greater_than(self, expected: Any, timeout: float = None) -> None: + self._expect( + "to_be_greater_than", expected, lambda actual: actual > expected, timeout + ) + + def to_be_greater_than_or_equal(self, expected: Any, timeout: float = None) -> None: + self._expect( + "to_be_greater_than_or_equal", + expected, + lambda actual: actual >= expected, + timeout, + ) + + def to_be_less_than(self, expected: Any, timeout: float = None) -> None: + self._expect( + "to_be_less_than", expected, lambda actual: actual < expected, timeout + ) + + def to_be_less_than_or_equal(self, expected: Any, timeout: float = None) -> None: + self._expect( + "to_be_less_than_or_equal", + expected, + lambda actual: actual <= expected, + timeout, + ) + + def to_have_length(self, expected: int, timeout: float = None) -> None: + self._expect( + "to_have_length", expected, lambda actual: len(actual) == expected, timeout + ) + + def to_be_none(self, timeout: float = None) -> None: + self._expect("to_be_none", None, lambda actual: actual is None, timeout) + + def not_to_be_none(self, timeout: float = None) -> None: + self._not.to_be_none(timeout) + + def to_be_null(self, timeout: float = None) -> None: + self.to_be_none(timeout) + + def not_to_be_null(self, timeout: float = None) -> None: + self.not_to_be_none(timeout) + + def to_be_defined(self, timeout: float = None) -> None: + self._expect("to_be_defined", None, lambda actual: actual is not None, timeout) + + def not_to_be_defined(self, timeout: float = None) -> None: + self._not.to_be_defined(timeout) + + def to_be_instance_of(self, cls: type, timeout: float = None) -> None: + self._expect( + "to_be_instance_of", + cls, + lambda actual: isinstance(actual, cls), + timeout, + ) + + def not_to_be_instance_of(self, cls: type, timeout: float = None) -> None: + self._not.to_be_instance_of(cls, timeout) + + def to_match( + self, expected: Union[str, Pattern[str]], timeout: float = None + ) -> None: + def matches(actual: Any) -> bool: + actual_str = str(actual) + if isinstance(expected, Pattern): + return expected.search(actual_str) is not None + return expected in actual_str + + self._expect("to_match", expected, matches, timeout) + + def not_to_match( + self, expected: Union[str, Pattern[str]], timeout: float = None + ) -> None: + self._not.to_match(expected, timeout) + + def to_start_with(self, expected: str, timeout: float = None) -> None: + self._expect( + "to_start_with", + expected, + lambda actual: str(actual).startswith(expected), + timeout, + ) + + def not_to_start_with(self, expected: str, timeout: float = None) -> None: + self._not.to_start_with(expected, timeout) + + def to_end_with(self, expected: str, timeout: float = None) -> None: + self._expect( + "to_end_with", + expected, + lambda actual: str(actual).endswith(expected), + timeout, + ) + + def not_to_end_with(self, expected: str, timeout: float = None) -> None: + self._not.to_end_with(expected, timeout) + + def to_be_close_to( + self, expected: float, num_digits: int = 2, timeout: float = None + ) -> None: + threshold = 0.5 * (10 ** (-num_digits)) + self._expect( + "to_be_close_to", + expected, + lambda actual: abs(actual - expected) < threshold, + timeout, + ) + + def not_to_be_close_to( + self, expected: float, num_digits: int = 2, timeout: float = None + ) -> None: + self._not.to_be_close_to(expected, num_digits, timeout) + + def to_be_nan(self, timeout: float = None) -> None: + self._expect( + "to_be_nan", + "nan", + lambda actual: isinstance(actual, float) and math.isnan(actual), + timeout, + ) + + def not_to_be_nan(self, timeout: float = None) -> None: + self._not.to_be_nan(timeout) + + +class AsyncPollAssertions(_PollAssertionsBase): + @property + def _not(self) -> "AsyncPollAssertions": + return AsyncPollAssertions( + self._callback, + timeout=self._timeout, + intervals=self._intervals, + is_not=not self._is_not, + message=self._custom_message, + ) + + async def _expect( + self, + matcher: str, + expected: Any, + predicate: Callable[[Any], bool], + timeout: float = None, + ) -> None: + timeout_ms = self._effective_timeout(timeout) + deadline = None if timeout_ms == 0 else time.monotonic() + timeout_ms / 1000 + interval_index = 0 + last_value = None + last_error = None + while True: + evaluation_succeeded = False + try: + last_value = self._callback() + if inspect.isawaitable(last_value): + last_value = await last_value + matched = predicate(last_value) + evaluation_succeeded = True + last_error = None + except Exception as exc: + matched = False + last_error = exc + if evaluation_succeeded and matched != self._is_not: + return + if deadline is not None and time.monotonic() >= deadline: + raise self._format_error( + matcher, expected, last_value, last_error, timeout_ms + ) + wait_ms = self._intervals[min(interval_index, len(self._intervals) - 1)] + interval_index += 1 + await asyncio.sleep(wait_ms / 1000) + + async def to_be(self, expected: Any, timeout: float = None) -> None: + await self._expect( + "to_be", expected, lambda actual: actual == expected, timeout + ) + + async def not_to_be(self, expected: Any, timeout: float = None) -> None: + await self._not.to_be(expected, timeout) + + async def to_equal(self, expected: Any, timeout: float = None) -> None: + await self.to_be(expected, timeout) + + async def not_to_equal(self, expected: Any, timeout: float = None) -> None: + await self._not.to_equal(expected, timeout) + + async def to_contain(self, expected: Any, timeout: float = None) -> None: + def contains(actual: Any) -> bool: + return expected in actual + + await self._expect("to_contain", expected, contains, timeout) + + async def not_to_contain(self, expected: Any, timeout: float = None) -> None: + await self._not.to_contain(expected, timeout) + + async def to_be_truthy(self, timeout: float = None) -> None: + await self._expect("to_be_truthy", True, bool, timeout) + + async def not_to_be_truthy(self, timeout: float = None) -> None: + await self._not.to_be_truthy(timeout) + + async def to_be_falsy(self, timeout: float = None) -> None: + await self._expect("to_be_falsy", False, lambda actual: not actual, timeout) + + async def not_to_be_falsy(self, timeout: float = None) -> None: + await self._not.to_be_falsy(timeout) + + async def to_be_greater_than(self, expected: Any, timeout: float = None) -> None: + await self._expect( + "to_be_greater_than", expected, lambda actual: actual > expected, timeout + ) + + async def to_be_greater_than_or_equal( + self, expected: Any, timeout: float = None + ) -> None: + await self._expect( + "to_be_greater_than_or_equal", + expected, + lambda actual: actual >= expected, + timeout, + ) + + async def to_be_less_than(self, expected: Any, timeout: float = None) -> None: + await self._expect( + "to_be_less_than", expected, lambda actual: actual < expected, timeout + ) + + async def to_be_less_than_or_equal( + self, expected: Any, timeout: float = None + ) -> None: + await self._expect( + "to_be_less_than_or_equal", + expected, + lambda actual: actual <= expected, + timeout, + ) + + async def to_have_length(self, expected: int, timeout: float = None) -> None: + await self._expect( + "to_have_length", expected, lambda actual: len(actual) == expected, timeout + ) + + async def to_be_none(self, timeout: float = None) -> None: + await self._expect("to_be_none", None, lambda actual: actual is None, timeout) + + async def not_to_be_none(self, timeout: float = None) -> None: + await self._not.to_be_none(timeout) + + async def to_be_null(self, timeout: float = None) -> None: + await self.to_be_none(timeout) + + async def not_to_be_null(self, timeout: float = None) -> None: + await self.not_to_be_none(timeout) + + async def to_be_defined(self, timeout: float = None) -> None: + await self._expect( + "to_be_defined", None, lambda actual: actual is not None, timeout + ) + + async def not_to_be_defined(self, timeout: float = None) -> None: + await self._not.to_be_defined(timeout) + + async def to_be_instance_of(self, cls: type, timeout: float = None) -> None: + await self._expect( + "to_be_instance_of", + cls, + lambda actual: isinstance(actual, cls), + timeout, + ) + + async def not_to_be_instance_of(self, cls: type, timeout: float = None) -> None: + await self._not.to_be_instance_of(cls, timeout) + + async def to_match( + self, expected: Union[str, Pattern[str]], timeout: float = None + ) -> None: + def matches(actual: Any) -> bool: + actual_str = str(actual) + if isinstance(expected, Pattern): + return expected.search(actual_str) is not None + return expected in actual_str + + await self._expect("to_match", expected, matches, timeout) + + async def not_to_match( + self, expected: Union[str, Pattern[str]], timeout: float = None + ) -> None: + await self._not.to_match(expected, timeout) + + async def to_start_with(self, expected: str, timeout: float = None) -> None: + await self._expect( + "to_start_with", + expected, + lambda actual: str(actual).startswith(expected), + timeout, + ) + + async def not_to_start_with(self, expected: str, timeout: float = None) -> None: + await self._not.to_start_with(expected, timeout) + + async def to_end_with(self, expected: str, timeout: float = None) -> None: + await self._expect( + "to_end_with", + expected, + lambda actual: str(actual).endswith(expected), + timeout, + ) + + async def not_to_end_with(self, expected: str, timeout: float = None) -> None: + await self._not.to_end_with(expected, timeout) + + async def to_be_close_to( + self, expected: float, num_digits: int = 2, timeout: float = None + ) -> None: + threshold = 0.5 * (10 ** (-num_digits)) + await self._expect( + "to_be_close_to", + expected, + lambda actual: abs(actual - expected) < threshold, + timeout, + ) + + async def not_to_be_close_to( + self, expected: float, num_digits: int = 2, timeout: float = None + ) -> None: + await self._not.to_be_close_to(expected, num_digits, timeout) + + async def to_be_nan(self, timeout: float = None) -> None: + await self._expect( + "to_be_nan", + "nan", + lambda actual: isinstance(actual, float) and math.isnan(actual), + timeout, + ) + + async def not_to_be_nan(self, timeout: float = None) -> None: + await self._not.to_be_nan(timeout) + + def expected_regex( pattern: Pattern[str], match_substring: bool, diff --git a/playwright/async_api/__init__.py b/playwright/async_api/__init__.py index c05735fcd..429c1cebf 100644 --- a/playwright/async_api/__init__.py +++ b/playwright/async_api/__init__.py @@ -18,7 +18,7 @@ web automation that is ever-green, capable, reliable and fast. """ -from typing import Any, Optional, Union, overload +from typing import Any, Awaitable, Callable, Optional, Sequence, Union, overload import playwright._impl._api_structures import playwright._impl._errors @@ -26,6 +26,7 @@ from playwright._impl._assertions import ( APIResponseAssertions as APIResponseAssertionsImpl, ) +from playwright._impl._assertions import AsyncPollAssertions as PollAssertions from playwright._impl._assertions import LocatorAssertions as LocatorAssertionsImpl from playwright._impl._assertions import PageAssertions as PageAssertionsImpl from playwright.async_api._context_manager import PlaywrightContextManager @@ -142,6 +143,45 @@ def __call__( ) raise ValueError(f"Unsupported type: {type(actual)}") + def poll( + self, + callback: Callable[[], Union[Any, Awaitable[Any]]], + *, + timeout: Optional[float] = None, + intervals: Optional[Sequence[float]] = None, + message: Optional[str] = None, + ) -> PollAssertions: + """ + Poll the `callback` result until a matcher passes. + + This mirrors Playwright Test's `expect.poll(...)` behavior from JavaScript: + repeatedly evaluate `callback` and retry assertions on the returned value + until success or timeout. + + Args: + callback: Function returning the value to assert. Can be async. + timeout: Timeout in milliseconds. Defaults to 5000. Pass `0` to disable timeout. + intervals: Polling intervals in milliseconds. Defaults to `[100, 250, 500, 1000]`. + message: Custom assertion message shown on failure. + + Example: + ```py + counter = {"value": 0} + async def get_value() -> int: + counter["value"] += 1 + return counter["value"] + + await expect.poll(get_value, timeout=3000).to_be_greater_than_or_equal(3) + await expect.poll(lambda: page.title()).to_match("Example") + ``` + """ + return PollAssertions( + callback=callback, + timeout=timeout if timeout is not None else self._timeout, + intervals=intervals, + message=message, + ) + expect = Expect() diff --git a/playwright/sync_api/__init__.py b/playwright/sync_api/__init__.py index 53dee2cad..b1b3cd759 100644 --- a/playwright/sync_api/__init__.py +++ b/playwright/sync_api/__init__.py @@ -18,7 +18,7 @@ web automation that is ever-green, capable, reliable and fast. """ -from typing import Any, Optional, Union, overload +from typing import Any, Callable, Optional, Sequence, Union, overload import playwright._impl._api_structures import playwright._impl._errors @@ -28,6 +28,7 @@ ) from playwright._impl._assertions import LocatorAssertions as LocatorAssertionsImpl from playwright._impl._assertions import PageAssertions as PageAssertionsImpl +from playwright._impl._assertions import SyncPollAssertions as PollAssertions from playwright.sync_api._context_manager import PlaywrightContextManager from playwright.sync_api._generated import ( APIRequest, @@ -142,6 +143,45 @@ def __call__( ) raise ValueError(f"Unsupported type: {type(actual)}") + def poll( + self, + callback: Callable[[], Any], + *, + timeout: Optional[float] = None, + intervals: Optional[Sequence[float]] = None, + message: Optional[str] = None, + ) -> PollAssertions: + """ + Poll the `callback` result until a matcher passes. + + This mirrors Playwright Test's `expect.poll(...)` behavior from JavaScript: + repeatedly evaluate `callback` and retry assertions on the returned value + until success or timeout. + + Args: + callback: Function returning the value to assert. + timeout: Timeout in milliseconds. Defaults to 5000. Pass `0` to disable timeout. + intervals: Polling intervals in milliseconds. Defaults to `[100, 250, 500, 1000]`. + message: Custom assertion message shown on failure. + + Example: + ```py + counter = {"value": 0} + def get_value() -> int: + counter["value"] += 1 + return counter["value"] + + expect.poll(get_value, timeout=3000).to_be_greater_than_or_equal(3) + expect.poll(lambda: page.title()).to_match("Example") + ``` + """ + return PollAssertions( + callback=callback, + timeout=timeout if timeout is not None else self._timeout, + intervals=intervals, + message=message, + ) + expect = Expect() diff --git a/tests/async/test_expect_misc.py b/tests/async/test_expect_misc.py index 2148b0d9e..8592082b4 100644 --- a/tests/async/test_expect_misc.py +++ b/tests/async/test_expect_misc.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math +import re + import pytest from playwright.async_api import Page, TimeoutError, expect @@ -78,3 +81,57 @@ async def test_should_have_timeout_error_name(page: Page) -> None: with pytest.raises(TimeoutError) as exc_info: await page.wait_for_selector("#not-found", timeout=1) assert exc_info.value.name == "TimeoutError" + + +async def test_expect_poll_should_eventually_pass() -> None: + value = 0 + + async def callback() -> int: + nonlocal value + value += 1 + return value + + await expect.poll( + callback, timeout=1_000, intervals=[10] + ).to_be_greater_than_or_equal(3) + + +async def test_expect_poll_should_support_not() -> None: + await expect.poll(lambda: "done").not_to_be("pending") + + +async def test_expect_poll_should_use_custom_message_on_failure() -> None: + with pytest.raises(AssertionError) as exc_info: + await expect.poll( + lambda: 1, + timeout=50, + intervals=[10], + message="custom-message", + ).to_be(2) + assert "custom-message" in str(exc_info.value) + assert "Last value: 1" in str(exc_info.value) + + +async def test_expect_poll_should_report_callback_error() -> None: + async def callback() -> None: + raise ValueError("boom") + + with pytest.raises(AssertionError) as exc_info: + await expect.poll(callback, timeout=50, intervals=[10]).to_be(1) + assert "Last error: boom" in str(exc_info.value) + + +async def test_expect_poll_should_support_additional_matchers() -> None: + await expect.poll(lambda: "alpha-beta").to_match(re.compile(r"alpha")) + await expect.poll(lambda: "alpha-beta").to_match("beta") + await expect.poll(lambda: "alpha-beta").to_start_with("alpha") + await expect.poll(lambda: "alpha-beta").to_end_with("beta") + await expect.poll(lambda: [1, 2, 3]).to_have_length(3) + await expect.poll(lambda: None).to_be_none() + await expect.poll(lambda: None).to_be_null() + await expect.poll(lambda: 1).to_be_defined() + await expect.poll(lambda: 1.234).to_be_close_to(1.23, 2) + await expect.poll(lambda: math.nan).to_be_nan() + await expect.poll(lambda: "hello").to_be_instance_of(str) + await expect.poll(lambda: "").not_to_be_truthy() + await expect.poll(lambda: "x").not_to_be_falsy() diff --git a/tests/sync/test_expect_misc.py b/tests/sync/test_expect_misc.py index 4d8b68cc0..314ba0f9f 100644 --- a/tests/sync/test_expect_misc.py +++ b/tests/sync/test_expect_misc.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math +import re + import pytest from playwright.sync_api import Page, expect @@ -70,3 +73,55 @@ def test_to_be_in_viewport_should_report_intersection_even_if_fully_covered_by_o """ ) expect(page.locator("h1")).to_be_in_viewport() + + +def test_expect_poll_should_eventually_pass() -> None: + value = 0 + + def callback() -> int: + nonlocal value + value += 1 + return value + + expect.poll(callback, timeout=1_000, intervals=[10]).to_be_greater_than_or_equal(3) + + +def test_expect_poll_should_support_not() -> None: + expect.poll(lambda: "done").not_to_be("pending") + + +def test_expect_poll_should_use_custom_message_on_failure() -> None: + with pytest.raises(AssertionError) as exc_info: + expect.poll( + lambda: 1, + timeout=50, + intervals=[10], + message="custom-message", + ).to_be(2) + assert "custom-message" in str(exc_info.value) + assert "Last value: 1" in str(exc_info.value) + + +def test_expect_poll_should_report_callback_error() -> None: + def callback() -> None: + raise ValueError("boom") + + with pytest.raises(AssertionError) as exc_info: + expect.poll(callback, timeout=50, intervals=[10]).to_be(1) + assert "Last error: boom" in str(exc_info.value) + + +def test_expect_poll_should_support_additional_matchers() -> None: + expect.poll(lambda: "alpha-beta").to_match(re.compile(r"alpha")) + expect.poll(lambda: "alpha-beta").to_match("beta") + expect.poll(lambda: "alpha-beta").to_start_with("alpha") + expect.poll(lambda: "alpha-beta").to_end_with("beta") + expect.poll(lambda: [1, 2, 3]).to_have_length(3) + expect.poll(lambda: None).to_be_none() + expect.poll(lambda: None).to_be_null() + expect.poll(lambda: 1).to_be_defined() + expect.poll(lambda: 1.234).to_be_close_to(1.23, 2) + expect.poll(lambda: math.nan).to_be_nan() + expect.poll(lambda: "hello").to_be_instance_of(str) + expect.poll(lambda: "").not_to_be_truthy() + expect.poll(lambda: "x").not_to_be_falsy()