Source code for selenium_driverless.types.context

# Licensed to the Software Freedom Conservancy (SFC) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The SFC licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
# modified by kaliiiiiiiiii | Aurin Aegerter
# all modifications are licensed under the license provided at LICENSE.md

"""The WebDriver implementation."""
import inspect
import time
import os
import typing
import warnings
import pathlib

from typing import List

# io
import asyncio
import websockets
from cdp_socket.exceptions import CDPError

# selenium
# SwitchTo
from selenium_driverless.scripts.switch_to import SwitchTo
from selenium_driverless.sync.switch_to import SwitchTo as SyncSwitchTo

# targets
from selenium_driverless.types.base_target import BaseTarget
from selenium_driverless.types.target import Target, TargetInfo
from selenium_driverless.scripts.driver_utils import get_targets, get_target

# other
from selenium_driverless.input.pointer import Pointer
from selenium_driverless.types.webelement import WebElement
from selenium_driverless.utils.utils import check_timeout


[docs]class Context: """Allows you to drive the browser without chromedriver.""" _is_incognito: bool # noinspection PyProtectedMember def __init__(self, base_target: Target, driver, context_id: str = None, loop: asyncio.AbstractEventLoop = None, is_incognito: bool = False, max_ws_size: int = 2 ** 20) -> None: self._loop: asyncio.AbstractEventLoop or None = None self.browser_pid: int or None = None self._targets: typing.Dict[str, Target] = {} self._switch_to = None self._started = False self._loop = loop self._current_target = base_target self._host = base_target._host self._is_remote = base_target._is_remote self._max_ws_size: int = max_ws_size self._context_id = context_id self._closed_callbacks: typing.List[callable] = [] self._driver = driver self._is_incognito = is_incognito def __repr__(self): return f'<{type(self).__module__}.{type(self).__name__} (session="{self.current_window_handle}")>' async def __aenter__(self): await self.start_session() return self def __enter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.quit() def __exit__(self, exc_type, exc_val, exc_tb): pass def __await__(self): return self.start_session().__await__() async def start_session(self): from selenium_driverless.webdriver import Chrome self._driver: Chrome if not self._started: if not self.context_id: self._context_id = await self._current_target.browser_context_id _type = await self.current_target.type targets = None if not _type == "page:": targets = await self.targets for _id, info in list(targets.items()): if info.type == "page" and not info.url.startswith("chrome-extension://"): self._current_target = info.Target break else: del targets[_id] if self._loop: self._switch_to = await SyncSwitchTo(context=self, loop=self._loop, context_id=self._context_id) else: self._switch_to = await SwitchTo(context=self, loop=self._loop, context_id=self._context_id) if targets: await self.current_target.focus(activate=False) self._started = True return self @property async def frame_tree(self): return await self.current_target.frame_tree @property async def targets(self) -> typing.Dict[str, TargetInfo]: return await self.get_targets() async def get_targets(self, _type: str = None, context_id="self") -> typing.Dict[str, TargetInfo]: if context_id == "self": context_id = self.context_id return await get_targets(cdp_exec=self.base_target.execute_cdp_cmd, target_getter=self.get_target, _type=_type, context_id=context_id, max_ws_size=self._max_ws_size) @property def current_target(self) -> Target: return self._current_target @property def base_target(self) -> BaseTarget: return self._driver.base_target @property async def _isolated_context_id(self): # noinspection PyProtectedMember return await self.current_target._isolated_context_id async def get_target(self, target_id: str = None, timeout: float = 2, max_ws_size: int = None) -> Target: if not max_ws_size: max_ws_size = self._max_ws_size if not target_id: return self._current_target target: Target = self._targets.get(target_id) if not target: target: Target = await get_target(target_id=target_id, host=self._host, loop=self._loop, is_remote=self._is_remote, timeout=timeout, max_ws_size=max_ws_size, driver=self._driver, context=self) self._targets[target_id] = target # noinspection PyUnusedLocal def remove_target(code: str, reason: str): if target_id in self._targets: del self._targets[target_id] # noinspection PyProtectedMember target._on_closed.append(remove_target) return target async def get_target_for_iframe(self, iframe: WebElement): return await self.current_target.get_target_for_iframe(iframe=iframe)
[docs] async def set_download_behaviour(self, behavior: typing.Literal["deny", "allow", "allowAndName", "default"], path: str = None): """set the download behaviour :param behavior: the behaviour to set the downloading to :param path: the path to the default download directory .. warning:: setting ``behaviour=allow`` instead of ``allowAndName`` can cause some bugs """ params = {"behavior": behavior, "eventsEnabled": True} if path: _dir = str(pathlib.Path(path)) if os.path.isfile(_dir): raise OSError("path can't point to a file") params["downloadPath"] = _dir if self._is_incognito: params["browserContextId"] = self.context_id await self.base_target.execute_cdp_cmd("Browser.setDownloadBehavior", params)
@property def downloads_dir(self): """the current downloads directory""" if self._is_incognito: return self.base_target.downloads_dir_for_context(context_id=self.context_id) else: return self.base_target.downloads_dir_for_context(context_id="DEFAULT")
[docs] async def wait_download(self, timeout: float or None = 30) -> dict: """ wait for a download on the current tab returns something like .. code-block:: python { "frameId": "2D543B5E8B14945B280C537A4882A695", "guid": "c91df4d5-9b45-4962-84df-3749bd3f926d", "url": "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", "suggestedFilename": "dummy.pdf", # only if options.downloads_dir specified "guid_file": "D:\\System\\AppData\\PyCharm\\scratches\\downloads\\c91df4d5-9b45-4962-84df-3749bd3f926d" } :param timeout: time in seconds to wait for a download .. warning:: downloads from iframes not supported yet """ return await self.current_target.wait_download(timeout=timeout)
[docs] async def get(self, url: str, referrer: str = None, wait_load: bool = True, timeout: float = 30) -> dict: """Loads a web page in the current Target :param url: the url to load. :param referrer: the referrer to load the page with :param wait_load: whether to wait for the webpage to load :param timeout: the maximum time in seconds for waiting on load returns the same as :func:`Target.wait_download <selenium_driverless.types.target.Target.wait_download>` if the url initiates a download """ if self._is_incognito and url in ["chrome://extensions"]: raise ValueError(f"{url} only supported in non-incognito contexts") target = self.current_target return await target.get(url=url, referrer=referrer, wait_load=wait_load, timeout=timeout)
@property async def title(self) -> str: """Returns the title of the current target""" target = await self.current_target_info return target.title @property def current_pointer(self) -> Pointer: """the :class:`Pointer <selenium_driverless.input.pointer.Pointer>` for the current target""" target = self.current_target return target.pointer
[docs] async def send_keys(self, text: str): """ send text & keys to the current target :param text: the text to send """ await self.current_target.send_keys(text)
[docs] async def execute_raw_script(self, script: str, *args, await_res: bool = False, serialization: str = None, max_depth: int = None, timeout: float = 2, execution_context_id: str = None, unique_context: bool = True): """ example: script= "function(...arguments){obj.click()}" "const obj" will be the Object according to obj_id """ return await self.current_target.execute_raw_script(script, *args, await_res=await_res, serialization=serialization, max_depth=max_depth, timeout=timeout, execution_context_id=execution_context_id, unique_context=unique_context)
[docs] async def execute_script(self, script: str, *args, max_depth: int = 2, serialization: str = None, timeout: float = 2, execution_context_id: str = None, unique_context: bool = True): """executes JavaScript synchronously on ``GlobalThis`` such as .. code-block:: js return document see :func:`Target.execute_raw_script <selenium_driverless.types.target.Target.execute_raw_script>` for argument descriptions """ return await self.current_target.execute_script(script, *args, max_depth=max_depth, serialization=serialization, timeout=timeout, execution_context_id=execution_context_id, unique_context=unique_context)
[docs] async def execute_async_script(self, script: str, *args, max_depth: int = 2, serialization: str = None, timeout: float = 2, execution_context_id: str = None, unique_context: bool = True): """executes JavaScript asynchronously on ``GlobalThis`` such as .. warning:: using execute_async_script is not recommended as it doesn't handle exceptions correctly. Use :func:`Chrome.eval_async <selenium_driverless.webdriver.Chrome.eval_async>` .. code-block:: js resolve = arguments[arguments.length-1] see :func:`Target.execute_raw_script <selenium_driverless.types.target.Target.execute_raw_script>` for argument descriptions """ return await self.current_target.execute_async_script(script, *args, max_depth=max_depth, serialization=serialization, timeout=timeout, execution_context_id=execution_context_id, unique_context=unique_context)
[docs] async def eval_async(self, script: str, *args, max_depth: int = 2, serialization: str = None, timeout: float = 2, execution_context_id: str = None, unique_context: bool = True): """executes JavaScript asynchronously on ``GlobalThis`` such as .. code-block:: js res = await fetch("https://httpbin.org/get"); // mind CORS! json = await res.json() return json see :func:`Target.execute_raw_script <selenium_driverless.types.target.Target.execute_raw_script>` for argument descriptions """ return await self.current_target.eval_async(script, *args, max_depth=max_depth, serialization=serialization, timeout=timeout, execution_context_id=execution_context_id, unique_context=unique_context)
@property async def current_url(self) -> str: """Gets the URL of the current page. :Usage: :: target.current_url """ target = self.current_target return await target.url @property async def page_source(self) -> str: """Gets the docs_source of the current page. :Usage: :: target.page_source """ target = self.current_target return await target.page_source
[docs] async def close(self, timeout: float = 2) -> None: """Closes the current window. :Usage: :: target.close() """ await self.current_target.close(timeout=timeout)
async def focus(self): await self.current_target.focus()
[docs] async def quit(self, timeout: float = 30, start_monotonic: float = None) -> None: """Quits the target and closes every associated window. :Usage: :: target.quit() """ from selenium_driverless import EXC_HANDLER if not start_monotonic: start_monotonic = time.perf_counter() # noinspection PyBroadException try: if self.context_id and self._is_remote: # noinspection PyUnresolvedReferences,PyBroadException try: await self.base_target.execute_cdp_cmd("Target.disposeBrowserContext", {"browserContextId": self.context_id}) except websockets.exceptions.ConnectionClosedError: pass except Exception as e: import sys EXC_HANDLER(e) else: targets = await self.targets for target in list(targets.values()): # noinspection PyUnresolvedReferences try: target = target.Target await target.close(timeout=7) check_timeout(start_monotonic, timeout) except websockets.exceptions.InvalidStatusCode: # already closed pass except ConnectionAbortedError: pass for callback in self._closed_callbacks: res = callback() if inspect.isawaitable(res): await res except Exception as e: EXC_HANDLER(e)
@property async def current_target_info(self): target = self.current_target return await target.info @property def current_window_handle(self) -> str: """Returns the current target_id """ return self.current_target.id @property async def current_window_id(self): result = await self.execute_cdp_cmd("Browser.getWindowForTarget", {"targetId": self.current_window_handle}) return result["windowId"] @property def context_id(self): return self._context_id @property async def window_handles(self) -> List[TargetInfo]: """Returns the handles of all windows within the current session.""" warnings.warn("window_handles aren't ordered") tabs = [] targets = await self.targets for info in list(targets.values()): if info.type == "page": tabs.append(info) return tabs
[docs] async def new_window(self, type_hint: typing.Literal["tab", "window"] = "tab", url="", activate: bool = False, focus: bool = True, background: bool = True) -> Target: """creates a new tab or window :param type_hint: what kind of target to create :param url: url to start the target at :param activate: whether to bring the target to the front :param focus: whether to emulate focus on the target :param background: whether to start the target in the background """ if self._is_incognito and url in ["chrome://extensions"]: raise ValueError(f"{url} only supported in non-incognito contexts") args = {"url": url} if type_hint == "window": args["newWindow"] = True elif type_hint == "tab": pass else: raise ValueError("type hint needs to be 'window' or 'tab'") if not (background is None): args["background"] = background # noinspection PyProtectedMember if self._context_id and self._is_incognito: args["browserContextId"] = self._context_id target = await self.base_target.execute_cdp_cmd("Target.createTarget", args) target_id = target["targetId"] target = await self.get_target(target_id) if activate: await target.activate() if focus: await target.focus() return target
async def set_window_state(self, state): states = ["normal", "minimized", "maximized", "fullscreen"] if state not in states: raise ValueError(f"expected one of {states}, but got: {state}") window_id = await self.current_window_id bounds = {"windowState": state} await self.execute_cdp_cmd("Browser.setWindowBounds", {"bounds": bounds, "windowId": window_id}) async def normalize_window(self): await self.set_window_state("normal")
[docs] async def maximize_window(self) -> None: """Maximizes the current window that webdriver is using.""" await self.set_window_state("maximized")
[docs] async def fullscreen_window(self) -> None: """Invokes the window manager-specific 'full screen' operation.""" await self.set_window_state("fullscreen")
[docs] async def minimize_window(self) -> None: """Invokes the window manager-specific 'minimize' operation.""" await self.set_window_state("minimized")
# noinspection PyUnusedLocal
[docs] async def print_page(self) -> str: """Takes PDF of the current page. The target makes the best effort to return a PDF based on the provided parameters. """ target = self.current_target return await target.print_page()
@property def switch_to(self) -> SwitchTo: """ :Returns: - SwitchTo: an object containing all options to switch focus into :Usage: :: element = target.switch_to.active_element alert = target.switch_to.alert target.switch_to.default_content() target.switch_to.frame('frame_name') target.switch_to.frame(1) target.switch_to.frame(target.find_elements(By.TAG_NAME, "iframe")[0]) target.switch_to.parent_frame() target.switch_to.window('main') """ return self._switch_to # Navigation
[docs] async def back(self) -> None: """Goes one step backward in the browser history. """ await self.current_target.back()
[docs] async def forward(self) -> None: """Goes one step forward in the browser history. """ await self.current_target.forward()
[docs] async def refresh(self) -> None: """Refreshes the current page """ await self.current_target.refresh()
# Options
[docs] async def get_cookies(self) -> List[dict]: """Returns a set of dictionaries, corresponding to cookies visible in the current target. """ return await self.current_target.get_cookies()
[docs] async def delete_all_cookies(self, ) -> None: """Delete all cookies in the scope of the session. """ await self.current_target.delete_all_cookies()
# noinspection GrazieInspection # Timeouts @staticmethod async def sleep(time_to_wait: float) -> None: await asyncio.sleep(time_to_wait) # noinspection PyUnusedLocal
[docs] async def find_element(self, by: str, value: str, timeout: float or None = None) -> WebElement: """find an element in the current target :param by: one of the locators at :func:`By <selenium_driverless.types.by.By>` :param value: the actual query to find the element by :param timeout: how long to wait for the element to exist """ target = await self.get_target() return await target.find_element(by=by, value=value, timeout=timeout)
[docs] async def find_elements(self, by: str, value: str, timeout: float = 3) -> typing.List[WebElement]: """find multiple elements in the current target :param by: one of the locators at :func:`By <selenium_driverless.types.by.By>` :param value: the actual query to find the elements by :param timeout: how long to wait for not being in a page reload loop in seconds """ target = await self.get_target() return await target.find_elements(by=by, value=value, timeout=timeout)
[docs] async def search_elements(self, query: str) -> typing.List[WebElement]: """ query:str | Plain text or query selector or XPath search query. """ return await self.current_target.search_elements(query=query)
[docs] async def get_screenshot_as_file(self, filename: str) -> None: """Saves a screenshot of the current window to a PNG image file. :param filename: The full path you wish to save your screenshot to. This should end with a `.png` extension. """ return await self.current_target.get_screenshot_as_file(filename=filename)
[docs] async def save_screenshot(self, filename) -> None: """alias to :func: `driver.get_screenshot_as_file <selenium_driverless.webdriver.Chrome.get_screenshot_as_file>`""" return await self.current_target.save_screenshot(filename=filename)
[docs] async def get_screenshot_as_png(self) -> bytes: """Gets the screenshot of the current window as a binary data. :Usage: :: target.get_screenshot_as_png() """ return await self.current_target.get_screenshot_as_png()
[docs] async def snapshot(self) -> str: """gets the current snapshot as mhtml""" return await self.current_target.snapshot()
[docs] async def save_snapshot(self, filename: str): """Saves a snapshot of the current window to a MHTML file. :param filename: The full path you wish to save your snapshot to. This should end with a ``.mhtml`` extension. .. code-block:: Python await driver.get_snapshot('snapshot.mhtml') """ return await self.current_target.save_snapshot(filename)
# noinspection PyPep8Naming
[docs] async def set_window_size(self, width, height, windowHandle: str = "current") -> None: """Sets the width and height of the current window. (window.resizeTo) :Args: - width: the width in pixels to set the window to - height: the height in pixels to set the window to :Usage: :: target.set_window_size(800,600) """ if windowHandle != "current": warnings.warn("Only 'current' window is supported for W3C compatible browsers.") await self.set_window_rect(width=int(width), height=int(height))
# noinspection PyPep8Naming
[docs] async def get_window_size(self) -> dict: """Gets the width and height of the current window.""" size = await self.get_window_rect() if size.get("value", None): size = size["value"] return {k: size[k] for k in ("width", "height")}
# noinspection PyPep8Naming
[docs] async def set_window_position(self, x, y, windowHandle: str = "current") -> dict: """Sets the x,y position of the current window. (window.moveTo) :Args: - x: the x-coordinate in pixels to set the window position - y: the y-coordinate in pixels to set the window position :Usage: :: target.set_window_position(0,0) """ if windowHandle != "current": warnings.warn("Only 'current' window is supported for W3C compatible browsers.") return await self.set_window_rect(x=int(x), y=int(y))
# noinspection PyPep8Naming
[docs] async def get_window_position(self, windowHandle="current") -> dict: """Gets the x,y position of the current window. :Usage: :: target.get_window_position() """ if windowHandle != "current": warnings.warn("Only 'current' window is supported for W3C compatible browsers.") position = await self.get_window_rect() return {k: position[k] for k in ("x", "y")}
[docs] async def get_window_rect(self) -> dict: """Gets the x, y coordinates of the window as well as height and width of the current window. :Usage: :: target.get_window_rect() """ json = await self.execute_cdp_cmd("Browser.getWindowBounds", {"windowId": await self.current_window_id}) json = json["bounds"] json["x"] = json["left"] del json["left"] json["y"] = json["top"] del json["top"] return json
[docs] async def set_window_rect(self, x=None, y=None, width=None, height=None) -> dict: """Sets the x, y coordinates of the window as well as height and width of the current window. This method is only supported for W3C compatible browsers; other browsers should use `set_window_position` and `set_window_size`. :Usage: :: target.set_window_rect(x=10, y=10) target.set_window_rect(width=100, height=200) target.set_window_rect(x=10, y=10, width=100, height=200) """ if (x is None and y is None) and (not height and not width): raise ValueError("x and y or height and width need values") bounds = {"left": x, "top": y, "width": width, 'height': height} await self.execute_cdp_cmd("Browser.setWindowBounds", {"windowId": await self.current_window_id, "bounds": bounds}) bounds["x"] = bounds["left"] del bounds["left"] bounds["y"] = bounds["top"] del bounds["top"] return bounds
[docs] async def get_network_conditions(self): """Gets Chromium network emulation settings. :Returns: A dict. For example: {'latency': 4, 'download_throughput': 2, 'upload_throughput': 2, 'offline': False} """ return await self.current_target.get_network_conditions()
[docs] async def set_network_conditions(self, offline: bool, latency: int, download_throughput: int, upload_throughput: int, connection_type: typing.Literal[ "none", "cellular2g", "cellular3g", "cellular4g", "bluetooth", "ethernet", "wifi", "wimax", "other"]) -> None: """Sets Chromium network emulation settings. :Args: - network_conditions: A dict with conditions specification. :Usage: :: target.set_network_conditions( offline=False, latency=5, # additional latency (ms) download_throughput=500 * 1024, # maximal throughput upload_throughput=500 * 1024, # maximal throughput connection_type="wifi") Note: 'throughput' can be used to set both (for download and upload). """ return await self.current_target.set_network_conditions(offline=offline, latency=latency, download_throughput=download_throughput, upload_throughput=upload_throughput, connection_type=connection_type)
[docs] async def delete_network_conditions(self) -> None: """Resets Chromium network emulation settings.""" await self.current_target.delete_network_conditions()
[docs] async def set_permissions(self, name: str, value: str, origin: str = None) -> None: """Sets Applicable Permission. :Args: - name: The item to set the permission on. - value: The value to set on the item :Usage: :: target.set_permissions('clipboard-read', 'denied') """ settings = ["granted", "denied", "prompt"] if value not in settings: raise ValueError(f"value needs to be within {settings}, but got {value}") args = {"permission": {"name": name}, "setting": value} if self.context_id: args["browserContextId"] = self.context_id if origin: args["origin"] = origin await self.execute_cdp_cmd("Browser.setPermission", args)
[docs] async def wait_for_cdp(self, event: str, timeout: float or None = None) -> dict: """ wait for an event on the current target see :func:`Target.wait_for_cdp <selenium_driverless.types.target.Target.wait_for_cdp>` for reference """ return await self.current_target.wait_for_cdp(event=event, timeout=timeout)
[docs] async def add_cdp_listener(self, event: str, callback: typing.Callable[[dict], any]): """ add a listener for a CDP event on the current target see :func:`Target.add_cdp_listener <selenium_driverless.types.target.Target.add_cdp_listener>` for reference """ return await self.current_target.add_cdp_listener(event=event, callback=callback)
[docs] async def remove_cdp_listener(self, event: str, callback: typing.Callable[[dict], any]): """ remove a listener for a CDP event on the current target see :func:`Target.remove_cdp_listener <selenium_driverless.types.target.Target.remove_cdp_listener>` for reference """ return await self.current_target.remove_cdp_listener(event=event, callback=callback)
[docs] async def get_cdp_event_iter(self, event: str) -> typing.AsyncIterable[dict]: """ iterate over CDP events on the current target see :func:`Target.get_cdp_event_iter <selenium_driverless.types.target.Target.get_cdp_event_iter>` for reference """ return await self.current_target.get_cdp_event_iter(event=event)
[docs] async def execute_cdp_cmd(self, cmd: str, cmd_args: dict or None = None, timeout: float or None = 10) -> dict: """Execute Chrome Devtools Protocol command on the current target executes it on :class:`Target.execute_cdp_cmd <selenium_driverless.types.base_target.BaseTarget>` if ``message:'Not allowed'`` received see :func:`Target.execute_cdp_cmd <selenium_driverless.types.target.Target.execute_cdp_cmd>` for reference """ try: return await self.current_target.execute_cdp_cmd(cmd=cmd, cmd_args=cmd_args, timeout=timeout) except CDPError as e: if e.code == -32000 and e.message == 'Not allowed': return await self.base_target.execute_cdp_cmd(cmd=cmd, cmd_args=cmd_args, timeout=timeout)
[docs] async def fetch(self, *args, **kwargs) -> dict: """ executes a JS ``fetch`` request within the current target see :func:`Target.fetch <selenium_driverless.types.target.Target.fetch>` for reference """ return await self.current_target.fetch(*args, **kwargs)
[docs] async def xhr(self, *args, **kwargs) -> dict: """ executes a JS ``XMLHttpRequest`` request within the current target see :func:`Target.fetch <selenium_driverless.types.target.Target.fetch>` for reference """ return await self.current_target.xhr(*args, **kwargs)
# noinspection PyTypeChecker
[docs] async def get_sinks(self) -> list: """ :Returns: A list of sinks available for Cast. """ return await self.current_target.get_sinks()
[docs] async def get_issue_message(self): """ :Returns: An error message when there is any issue in a Cast session. """ return await self.current_target.get_issue_message()
[docs] async def set_sink_to_use(self, sink_name: str) -> dict: """Sets a specific sink, using its name, as a Cast session receiver target. :Args: - sink_name: Name of the sink to use as the target. """ return await self.current_target.set_sink_to_use(sink_name=sink_name)
[docs] async def start_desktop_mirroring(self, sink_name: str) -> dict: """Starts a desktop mirroring session on a specific receiver target. :Args: - sink_name: Name of the sink to use as the target. """ return await self.current_target.start_desktop_mirroring(sink_name=sink_name)
[docs] async def start_tab_mirroring(self, sink_name: str) -> dict: """Starts a tab mirroring session on a specific receiver target. :Args: - sink_name: Name of the sink to use as the target. """ return await self.current_target.start_tab_mirroring(sink_name=sink_name)
[docs] async def stop_casting(self, sink_name: str) -> dict: """Stops the existing Cast session on a specific receiver target. :Args: - sink_name: Name of the sink to stop the Cast session. """ return await self.current_target.stop_casting(sink_name=sink_name)