Source code for selenium_driverless.webdriver

# Licensed to the Software Freedom Conservancy (SFC) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The SFC licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# modified by kaliiiiiiiiii | Aurin Aegerter
# all modifications are licensed under the license provided at LICENSE.md

"""The WebDriver implementation."""
import os
import shutil
import subprocess
import sys
import tempfile
import time
import typing
import uuid
import warnings
import signal
from typing import List

# io
import asyncio

import cdp_socket.exceptions
import websockets

# interactions
from selenium_driverless.input.pointer import Pointer
from selenium_driverless.types.webelement import WebElement
from selenium_driverless.scripts.switch_to import SwitchTo

# contexts
from selenium_driverless.sync.context import Context as SyncContext
from selenium_driverless.types.context import Context

# Targets
from selenium_driverless.scripts.driver_utils import get_target
from selenium_driverless.types.target import Target, TargetInfo
from selenium_driverless.types.base_target import BaseTarget
from selenium_driverless.sync.base_target import BaseTarget as SyncBaseTarget

# others
from cdp_socket.utils.conn import get_json
from selenium_driverless.types.options import Options as ChromeOptions
from selenium_driverless.utils.utils import sel_driverless_path
from selenium_driverless.types import JSEvalException
from selenium_driverless import EXC_HANDLER


[docs] class Chrome: """Control the chromium based browsers without any driver.""" def __init__( self, options: ChromeOptions = None, timeout: float = 30, debug: bool = False, max_ws_size: int = 2 ** 27 ) -> None: # noinspection GrazieInspection """Creates a new instance of the chrome target. Starts the service and then creates new instance of chrome target. .. code-block:: python options = webdriver.ChromeOptions.rst() async with webdriver.Chrome(options=options) as driver: await driver.get('https://abrahamjuliot.github.io/creepjs/', wait_load=True) print(await driver.title) :param options: this takes an instance of ChromeOptions.rst :param timeout: timeout in seconds to start chrome :param debug: redirect errors from the chromium process output (stderr) to console :param max_ws_size: maximum size for websocket messages in bytes. 2^27 ~= 130 MB by default """ self._prefs = {} self._auth_interception_enabled = None self._mv3_extension = None self._extensions_incognito_allowed = None self._base_context = None self._stderr = None self._stderr_file = None self._process = None self._current_target = None self._host = None self._timeout = timeout self._loop: asyncio.AbstractEventLoop or None = None self.browser_pid: int or None = None self._base_target = None self._debug = debug # noinspection PyTypeChecker self._current_context: Context = None self._contexts: typing.Dict[str, Context] = {} self._temp_dir = tempfile.TemporaryDirectory(prefix="selenium_driverless_").name self._max_ws_size = max_ws_size self._auth = {} if not options: options = ChromeOptions() if not options.binary_location: from selenium_driverless.utils.utils import find_chrome_executable options.binary_location = find_chrome_executable() self._options: ChromeOptions = options self._is_remote = True self._is_remote = False self._has_incognito_contexts: bool = False self._started = False def __repr__(self): return f'<{type(self).__module__}.{type(self).__name__} (session="{self.current_target.id}")>' async def __aenter__(self): await self.start_session() return self def __enter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.quit(clean_dirs=self._options.auto_clean_dirs) def __exit__(self, exc_type, exc_val, exc_tb): pass def __await__(self): return self.start_session().__await__() async def start_session(self): if not self._started: from selenium_driverless.utils.utils import read from selenium_driverless.utils.utils import is_first_run, get_default_ua, set_default_ua from selenium_driverless.scripts.prefs import read_prefs, write_prefs await is_first_run() user_agent = await get_default_ua() if self._options.use_extension: # extension self._options.add_extension(sel_driverless_path() + "files/mv3_extension") if not self._options.debugger_address: from selenium_driverless.utils.utils import random_port port = random_port() self._options._debugger_address = f"127.0.0.1:{port}" self._options.add_argument(f"--remote-debugging-port={port}") if self._options.headless and not self._is_remote: # patch useragent if user_agent: self._options.add_argument(f"--user-agent={user_agent}") else: warnings.warn("headless is detectable at first run") # handle prefs if self._options.user_data_dir: prefs_path = self._options.user_data_dir + "/Default/Preferences" if os.path.isfile(prefs_path): self._prefs = await read_prefs(prefs_path) else: os.makedirs(os.path.dirname(prefs_path), exist_ok=True) # write prefs self._prefs.update(self._options.prefs) await write_prefs(self._prefs, prefs_path) elif self._options.user_data_dir is None: self._options.add_argument( "--user-data-dir=" + self._temp_dir + "/data_dir") prefs_path = self._options.user_data_dir + "/Default/Preferences" os.makedirs(os.path.dirname(prefs_path), exist_ok=True) # write prefs self._prefs.update(self._options.prefs) await write_prefs(self._prefs, prefs_path) # noinspection PyProtectedMember # handle extensions if self._options._extension_paths: import zipfile extension_paths = [] loop = asyncio.get_running_loop() # noinspection PyProtectedMember def extractall(): for _path in self._options._extension_paths: if os.path.isfile(_path): with zipfile.ZipFile(_path, 'r') as zip_ref: _path = self._temp_dir + f"/{uuid.uuid4().hex}" zip_ref.extractall(_path) extension_paths.append(_path) await loop.run_in_executor(None, extractall) self._options.arguments.append(f"--load-extension=" + ','.join(extension_paths)) self._options._extension_paths = [] if self._options.startup_url: self._options.add_argument(self._options.startup_url) self._options._startup_url = None options = self._options # noinspection PyProtectedMember self._is_remote = self._options._is_remote if not self._is_remote: path = options.binary_location args = options.arguments if self._debug: self._stderr = sys.stderr else: self._stderr = tempfile.TemporaryFile(prefix="selenium_driverless") self._stderr_file = self._stderr self._process = subprocess.Popen( [path, *args], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=self._stderr, close_fds=True, preexec_fn=os.setsid if os.name == 'posix' else None, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0, shell=False, text=True, env=self._options.env ) host, port = self._options.debugger_address.split(":") port = int(port) if port == 0: path = self._options.user_data_dir + "/DevToolsActivePort" while not os.path.isfile(path): await asyncio.sleep(0.1) port = await read(path, sel_root=False) port = int(port.split("\n")[0]) self._options.debugger_address = f"127.0.0.1:{port}" host, port = self._options.debugger_address.split(":") port = int(port) self._host = f"{host}:{port}" if self._loop: self._base_target = await SyncBaseTarget(host=self._host, is_remote=self._is_remote, timeout=self._timeout, loop=self._loop, max_ws_size=self._max_ws_size) else: self._base_target = await BaseTarget(host=self._host, is_remote=self._is_remote, timeout=self._timeout, loop=self._loop, max_ws_size=self._max_ws_size) # fetch useragent at first headless run # noinspection PyUnboundLocalVariable if not self._is_remote: res = await self._base_target.execute_cdp_cmd("Browser.getVersion") user_agent = res["userAgent"] user_agent = user_agent.replace("HeadlessChrome", "Chrome") await set_default_ua(user_agent) if not self._is_remote: # noinspection PyUnboundLocalVariable self.browser_pid = self._process.pid targets = await get_json(self._host, timeout=self._timeout) for target in targets: if target["type"] == "page": target_id = target["id"] self._current_target = await get_target(target_id=target_id, host=self._host, loop=self._loop, is_remote=self._is_remote, timeout=10, max_ws_size=self._max_ws_size, driver=self, context=None) # handle the context if self._loop: context = await SyncContext(base_target=self._current_target, driver=self, loop=self._loop, max_ws_size=self._max_ws_size) else: context = await Context(base_target=self._current_target, driver=self, loop=self._loop, max_ws_size=self._max_ws_size) _id = context.context_id self._current_target._context = context def remove_context(): if _id in self._contexts: del self._contexts[_id] self._base_context = None # noinspection PyProtectedMember context._closed_callbacks.append(remove_context) self._current_context = context self._base_context = context self._contexts[_id] = context break await self.execute_cdp_cmd("Emulation.setFocusEmulationEnabled", {"enabled": True}) if self._options.single_proxy: await self.set_single_proxy(self._options.single_proxy) downloads_dir = self._options.downloads_dir if self._options.downloads_dir: # ensure download events are dispatched await self.set_download_behaviour("allowAndName", downloads_dir) else: await self.set_download_behaviour("default") self._started = True return self @property async def frame_tree(self) -> dict: """ **async** all nested frames within the current target """ return await self.current_context.frame_tree @property async def targets(self) -> typing.Dict[str, TargetInfo]: """ **async** all targets within the current context """ return await self.current_context.targets @property async def contexts(self) -> typing.Dict[str, Context]: """ **async** all (incognito) contexts on Chrome. """ targets = await self.get_targets(context_id=None) contexts = {} for info in targets.values(): _id = info.browser_context_id if _id: context = self._contexts.get(_id) if not context: if self._loop: context = await SyncContext(base_target=self._current_target, context_id=_id, loop=self._loop, max_ws_size=self._max_ws_size, driver=self) else: context = await Context(base_target=self._current_target, context_id=_id, loop=self._loop, max_ws_size=self._max_ws_size, driver=self) contexts[_id] = context self._contexts.update(contexts) return self._contexts
[docs] async def new_context(self, proxy_bypass_list: typing.List[str] = None, proxy_server: str = True, universal_access_origins: typing.List[str] = None, url: str = "about:blank") -> Context: """ creates a new (incognito) context :param url: the url the first tab will start at. "about:blank" by default :param universal_access_origins: An optional list of origins to grant unlimited cross-origin access to. Parts of the URL other than those constituting origin are ignored .. warning:: The proxy parameter doesn't work on Windows due to `crbug#1310057 <https://bugs.chromium.org/p/chromium/issues/detail?id=1310057>`__. .. code block:: python await driver.set_auth("username", "password", "localhost:5000") context = await driver.new_context(proxy_bypass_list=["localhost"], proxy_server="http://localhost:5000") :param proxy_server: a proxy-server to use for the context :param proxy_bypass_list: a list of proxies to ignore """ await self.ensure_extensions_incognito_allowed() if proxy_bypass_list is None: proxy_bypass_list = ["localhost"] if proxy_server is None: proxy_server = "" args = {"disposeOnDetach": False} if proxy_bypass_list: args["proxyBypassList"] = ",".join(proxy_bypass_list) if not (proxy_server is True): args["proxyServer"] = proxy_server if universal_access_origins: args["originsWithUniversalNetworkAccess"] = universal_access_origins # create context ensuring extension racing conditions self._auth_interception_enabled = False has_incognito_ctxs = not (not self._has_incognito_contexts) self._has_incognito_contexts = True mv3_ext = await self.mv3_extension self._mv3_extension = None try: res = await self.base_target.execute_cdp_cmd("Target.createBrowserContext", args) except Exception as e: self._mv3_extension = mv3_ext self._auth_interception_enabled = True self._has_incognito_contexts = has_incognito_ctxs raise e _id = res["browserContextId"] if self._loop: context = await SyncContext(base_target=self._base_target, context_id=_id, loop=self._loop, is_incognito=True, max_ws_size=self._max_ws_size, driver=self) else: context = await Context(base_target=self._base_target, context_id=_id, loop=self._loop, is_incognito=True, max_ws_size=self._max_ws_size, driver=self) self._contexts[_id] = context def remove_context(): if _id in self._contexts: del self._contexts[_id] # noinspection PyProtectedMember context._closed_callbacks.append(remove_context) await context.switch_to.new_window("window", activate=False, url=url) tabs = await context.get_targets(_type="page", context_id=_id) context._current_target = list(tabs.values())[0].Target context._current_target._timeout = 5 # reload auth & extension target to fix non-applied auth if self._auth: self._mv3_extension = None while True: # ensure racing conditions with extension try: mv3_target = await self.mv3_extension self._auth_interception_enabled = False await self._ensure_auth_interception(timeout=0.5, set_flag=False) await mv3_target.execute_script("globalThis.authCreds = arguments[0]", self._auth, timeout=0.5) except (asyncio.TimeoutError, TimeoutError): await asyncio.sleep(0.1) self._mv3_extension = None else: self._auth_interception_enabled = True self._mv3_extension = mv3_target return context return context
[docs] async def get_targets(self, _type: typing.Literal["page", "background_page", "service_worker", "browser", "other"] = None, context_id: str or None = "self") -> typing.Dict[str, TargetInfo]: """ get all targets within the current context :param _type: filter by target type :param context_id: if ``None``, this function returns all targets for all contexts. """ return await self.current_context.get_targets(_type=_type, context_id=context_id)
@property def current_target(self) -> Target: """ the current Target """ if self.current_context: return self.current_context.current_target return self._current_target @property def base_target(self) -> BaseTarget: """ The connection handle for the global connection to Chrome .. warning:: only the bindings for using the CDP-protocol on BaseTarget supported """ return self._base_target @property async def mv3_extension(self, timeout: float = 10) -> Target: """ **async** the target for the background script of the by default loaded Chrome-extension (manifest-version==3) .. note: for incognito context, the extension uses the "spanning" configuration, as there isn't a way to debug "split" mode over CDP """ if self._has_incognito_contexts: await self.ensure_extensions_incognito_allowed() if not self._mv3_extension: import re import time start = time.perf_counter() extension_target = None while not extension_target: targets = await self.get_targets(context_id=None) for target in targets.values(): if target.type == "service_worker": if re.fullmatch( r"chrome-extension://(.*)/" r"driverless_background_mv3_243ffdd55e32a012b4f253b2879af978\.js", target.url): extension_target = target.Target break if not extension_target: if (time.perf_counter() - start) > timeout: raise asyncio.TimeoutError(f"Couldn't find mv3 extension within {timeout} seconds") while True: try: # fix WebRTC leak await extension_target.execute_script( "chrome.privacy.network.webRTCIPHandlingPolicy.set(arguments[0])", {"value": "disable_non_proxied_udp"}, timeout=2) except (asyncio.TimeoutError, TimeoutError): await asyncio.sleep(0.2) return await self.mv3_extension except JSEvalException: await asyncio.sleep(0.2) except cdp_socket.exceptions.CDPError as e: if e.code == -32000 and e.message == 'Could not find object with given id': await asyncio.sleep(0.2) else: break self._mv3_extension = extension_target return self._mv3_extension
[docs] async def ensure_extensions_incognito_allowed(self): """ ensure that all installed Chrome-extensions are allowed in incognito context. .. warning:: Generally, the extension decides whether to use the ``split``, ``spanning`` or ``not_allowed`` configuration. For changing this behaviour, you'll have to modify the ``manifest.json`` file within the compressed extension or directory. See `developer.chrome.com/docs/extensions/reference/manifest/incognito <https://developer.chrome.com/docs/extensions/reference/manifest/incognito?hl=en>`__. """ if not self._extensions_incognito_allowed: self._extensions_incognito_allowed = True # noinspection PyTypeChecker page = None try: base_ctx = self._base_context page: Context = await base_ctx.new_window("tab", "chrome://extensions", activate=False) script = """ async function make_global(){ const extensions = await chrome.developerPrivate.getExtensionsInfo(); extensions.forEach( async function(extension) { chrome.developerPrivate.updateExtensionConfiguration({ extensionId: extension.id, incognitoAccess: true }) }); }; await make_global() """ await asyncio.sleep(0.1) await page.eval_async(script, timeout=10) except Exception as e: EXC_HANDLER(e) self._extensions_incognito_allowed = False if page: await page.close() await self.ensure_extensions_incognito_allowed() self._extensions_incognito_allowed = True await page.close()
@property def base_context(self) -> Context: """ the Context which isn't incognito """ return self._base_context @property def downloads_dir(self): """the current downloads directory for the current context""" return self.base_target.downloads_dir_for_context(context_id="DEFAULT")
[docs] async def set_download_behaviour(self,behaviour:typing.Literal["deny", "allow", "allowAndName", "default"], path:str=None): """set the download behaviour :param behaviour: the behaviour to set the downloading to :param path: the path to the default download directory .. warning:: setting ``behaviour=allow`` instead of ``allowAndName`` can cause some bugs """ await self.current_context.set_download_behaviour(behaviour, path)
@property def current_context(self) -> Context: """ the current context switched to """ if not self._current_context: if self._contexts: return list(self._contexts.values())[0] return self._current_context @property async def _isolated_context_id(self): # noinspection PyProtectedMember return await self.current_context._isolated_context_id
[docs] async def get_target(self, target_id: str = None, timeout: float = 2) -> Target: """ get a Target by TargetId for advanced usage of the CDP protocol :param target_id: :param timeout: timeout in seconds for connecting to the target if it's not tracked already """ if not target_id: return self.current_target return await self.current_context.get_target(target_id=target_id, timeout=timeout)
[docs] async def get_target_for_iframe(self, iframe: WebElement) -> Target: """ get the Target for a specific iframe .. warning:: only cross-iframes have a Target due to `OOPIF <https://www.chromium.org/developers/design-documents/oop-iframes/>`__. See `site-isolation <https://www.chromium.org/Home/chromium-security/site-isolation/>`__ For a general solution, have a look at ``WebElement.content_document`` instead :param iframe: the iframe to get the Target for """ return await self.current_target.get_target_for_iframe(iframe=iframe)
[docs] async def get_targets_for_iframes(self, iframes: typing.List[WebElement]) -> typing.List[Target]: """ returns a list of targets for iframes see ``webdriver.Chrome.get_target_for_iframe`` for more information :param iframes: the iframe to get the targets for """ return await self.current_target.get_targets_for_iframes(iframes=iframes)
[docs] async def wait_download(self, timeout:float or None=30) -> dict: """ wait for a download on the current tab returns something like .. code-block:: python { "frameId": "2D543B5E8B14945B280C537A4882A695", "guid": "c91df4d5-9b45-4962-84df-3749bd3f926d", "url": "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", "suggestedFilename": "dummy.pdf", # only if options.downloads_dir specified "guid_file": "D:\\System\\AppData\\PyCharm\\scratches\\downloads\\c91df4d5-9b45-4962-84df-3749bd3f926d" } :param timeout: time in seconds to wait for a download .. warning:: downloads from iframes not supported yet """ return await self.current_target.wait_download(timeout=timeout)
[docs] async def get(self, url: str, referrer: str = None, wait_load: bool = True, timeout: float = 30) -> typing.Union[None, dict]: """Loads a web page in the current Target :param url: the url to load. :param referrer: the referrer to load the page with :param wait_load: whether to wait for the webpage to load :param timeout: the maximum time in seconds for waiting on load returns the same as :func:`Target.wait_download <selenium_driverless.types.target.Target.wait_download>` if the url initiates a download """ return await self.current_target.get(url=url, referrer=referrer, wait_load=wait_load, timeout=timeout)
@property async def title(self) -> str: """**async** the title of the current target""" target = await self.current_target_info return target.title @property def current_pointer(self) -> Pointer: """the :class:`Pointer <selenium_driverless.input.pointer.Pointer>` for this target""" target = self.current_target return target.pointer
[docs] async def send_keys(self, text: str): """ send text & keys to the current target :param text: the text to send """ await self.current_target.send_keys(text)
[docs] async def execute_raw_script(self, script: str, *args, await_res: bool = False, serialization: typing.Literal["deep", "json", "idOnly"] = "deep", max_depth: int = None, timeout: int = 2, execution_context_id: str = None, unique_context: bool = False): """executes a JavaScript on ``GlobalThis`` such as .. code-block:: js function(...arguments){return document} ``this`` and ``obj`` refers to ``globalThis`` (=> window) here :param script: the script as a string :param args: the argument which are passed to the function. Those can be either json-serializable or a RemoteObject such as WebElement :param await_res: whether to await the function or the return value of it :param serialization: can be one of ``deep``, ``json``, ``idOnly`` :param max_depth: The maximum depth objects get serialized. :param timeout: the maximum time to wait for the execution to complete :param execution_context_id: the execution context id to run the JavaScript in. Exclusive with unique_context :param unique_context: whether to use an isolated context to run the Script in. see `Runtime.callFunctionOn <https://chromedevtools.github.io/devtools-protocol/tot/Runtime/#method-callFunctionOn>`_ """ return await self.current_target.execute_raw_script(script, *args, await_res=await_res, serialization=serialization, max_depth=max_depth, timeout=timeout, execution_context_id=execution_context_id, unique_context=unique_context)
[docs] async def execute_script(self, script: str, *args, max_depth: int = 2, serialization: str = None, timeout: int = None, target_id: str = None, execution_context_id: str = None, unique_context: bool = False): """executes JavaScript synchronously on ``GlobalThis`` such as .. code-block:: js return document ``this`` and ``obj`` refers to ``globalThis`` (=> window) here see :func:`Target.execute_raw_script <selenium_driverless.types.target.Target.execute_raw_script>` for argument descriptions """ target = await self.get_target(target_id) return await target.execute_script(script, *args, max_depth=max_depth, serialization=serialization, timeout=timeout, execution_context_id=execution_context_id, unique_context=unique_context)
[docs] async def execute_async_script(self, script: str, *args, max_depth: int = 2, serialization: str = None, timeout: int = 2, target_id: str = None, execution_context_id: str = None, unique_context: bool = False): """executes JavaScript asynchronously on ``GlobalThis`` such as .. warning:: using execute_async_script is not recommended as it doesn't handle exceptions correctly. Use :func:`Chrome.eval_async <selenium_driverless.webdriver.Chrome.eval_async>` .. code-block:: js resolve = arguments[arguments.length-1] ``this`` refers to ``globalThis`` (=> window) see :func:`Target.execute_raw_script <selenium_driverless.types.target.Target.execute_raw_script>` for argument descriptions """ target = await self.get_target(target_id) return await target.execute_async_script(script, *args, max_depth=max_depth, serialization=serialization, timeout=timeout, execution_context_id=execution_context_id, unique_context=unique_context)
[docs] async def eval_async(self, script: str, *args, max_depth: int = 2, serialization: str = None, timeout: int = 2, target_id: str = None, execution_context_id: str = None, unique_context: bool = False): """executes JavaScript asynchronously on ``GlobalThis`` such as .. code-block:: js res = await fetch("https://httpbin.org/get"); // mind CORS! json = await res.json() return json ``this`` refers to ``globalThis`` (=> window) see :func:`Target.execute_raw_script <selenium_driverless.types.target.Target.execute_raw_script>` for argument descriptions """ target = await self.get_target(target_id) return await target.eval_async(script, *args, max_depth=max_depth, serialization=serialization, timeout=timeout, execution_context_id=execution_context_id, unique_context=unique_context)
@property async def current_url(self) -> str: """**async** current URL of the current Target """ target = self.current_target return await target.url @property async def page_source(self) -> str: """**async** html of the current page. """ target = self.current_target return await target.page_source
[docs] async def close(self, timeout: float = 2) -> None: """Closes the current target (only works for tabs). :param timeout: timeout in seconds for the tab to close """ await self.current_target.close(timeout=timeout)
[docs] async def focus(self): """focuses the current target (only works for tabs) """ await self.current_target.focus()
[docs] async def quit(self, timeout: float = 30, clean_dirs: bool = True) -> None: """Closes Chrome :param timeout: the maximum time waiting for chrome to quit correctly :param clean_dirs: whether to clean out the user-data-dir directory """ from selenium_driverless import EXC_HANDLER loop = asyncio.get_running_loop() def clean_dirs_sync(dirs: typing.List[str]): for _dir in dirs: while os.path.isdir(_dir): shutil.rmtree(_dir, ignore_errors=True) if self._started: start = time.perf_counter() # noinspection PyUnresolvedReferences try: # assumption: chrome is still running await self.base_target.execute_cdp_cmd("Browser.close", timeout=7) except websockets.ConnectionClosedError: pass except Exception as e: EXC_HANDLER(e) if not self._is_remote: if self._process is not None: # assumption: chrome is being shutdown manually or programmatically try: await loop.run_in_executor(None, lambda: self._process.wait(timeout)) except Exception as e: EXC_HANDLER(e) else: self._process = None try: # assumption: chrome hasn't closed within timeout, killing with force # wait for process to be killed if self._process is not None: if os.name == 'posix': os.killpg(os.getpgid(self._process.pid), signal.SIGTERM) else: self._process.terminate() try: await loop.run_in_executor(None, lambda: self._process.wait(timeout)) except subprocess.TimeoutExpired: if os.name == 'posix': os.killpg(os.getpgid(self._process.pid), signal.SIGKILL) else: self._process.kill() except Exception as e: EXC_HANDLER(e) finally: self._started = False if self._stderr_file: try: self._stderr.close() except Exception as e: EXC_HANDLER(e) # clean temp dir for extensions etc try: await asyncio.wait_for( # wait for loop.run_in_executor(None, lambda: clean_dirs_sync( [self._temp_dir])), timeout=max(5, int(timeout - (time.perf_counter() - start)))) except Exception as e: EXC_HANDLER(e) if clean_dirs: # clean user-data-dir for chrome try: await asyncio.wait_for( # wait for loop.run_in_executor(None, lambda: clean_dirs_sync( [self._options.user_data_dir])), timeout=max(5,int(timeout - (time.perf_counter() - start)))) except Exception as e: warnings.warn( "driver hasn't quit correctly, " "files might be left in your temp folder & chrome might still be running", ResourceWarning) raise e
def __del__(self): try: if self._started: warnings.warn( "driver hasn't quit correctly, " "files might be left in your temp folder & chrome might still be running", ResourceWarning) except AttributeError: pass @property async def current_target_info(self) -> TargetInfo: """**async** TargetInfo of the current target""" return await self.current_target.info @property def current_window_handle(self) -> str: """current TargetId .. warning:: this is deprecated and will be removed use ``webdriver.Chrome.current_target.id`` instead """ warnings.warn(f'"webdriver.Chrome.current_window_handle" is deprecated and will be removed\n' 'use "webdriver.Chrome.current_target.id" instead', DeprecationWarning) if self.current_target: return self.current_target.id @property async def current_window_id(self): """**async** the ``WindowId`` of the window the current Target belongs to """ result = await self.execute_cdp_cmd("Browser.getWindowForTarget", {"targetId": self.current_target.id}) return result["windowId"] @property async def window_handles(self) -> List[TargetInfo]: """**async** TargetInfo on all tabs in the current context .. warning:: the tabs aren't ordered by position in the window. Do not rely on the index, but iterate and filter them. """ warnings.warn( "the tabs aren't ordered by position in the window. Do not rely on the index, but iterate and filter them.") tabs = [] targets = await self.targets for info in list(targets.values()): if info.type == "page": tabs.append(info) return tabs
[docs] async def new_window(self, type_hint: typing.Literal["tab", "window"] = "tab", url="", activate: bool = True) -> Target: """Creates a new window or tab in the current context :param type_hint: whether to create a tab or window :param url: the url which the new window should start on. Defaults to about:blank :param activate: whether to explicitly activate/focus the window .. code-block:: python new_target = driver.new_window('tab') """ return await self.current_context.new_window(type_hint=type_hint, url=url, activate=activate)
[docs] async def set_window_state(self, state: typing.Literal["normal", "minimized", "maximized", "fullscreen"]): """sets the window state on the window the current Target belongs to :param state: the state to set """ window_id = await self.current_window_id bounds = {"windowState": state} await self.execute_cdp_cmd("Browser.setWindowBounds", {"bounds": bounds, "windowId": window_id})
[docs] async def normalize_window(self): """Normalizes the window position and size on the window the current Target belongs to """ await self.set_window_state("normal")
[docs] async def maximize_window(self) -> None: """Maximizes the window the current Target belongs to""" await self.set_window_state("maximized")
[docs] async def fullscreen_window(self) -> None: """enters fullscreen on the window the current Target belongs to""" await self.set_window_state("fullscreen")
[docs] async def minimize_window(self) -> None: """minimizes the window the current Target belongs to. .. warning:: Minimizing isn't recommended as it can throttle some functionalities in chrome. """ await self.set_window_state("maximized")
# noinspection PyUnusedLocal
[docs] async def print_page(self) -> str: """Prints the page (current target => tab) to PDF """ target = self.current_target return await target.print_page()
@property def switch_to(self) -> SwitchTo: """SwitchTo handle """ return self.current_context.switch_to # Navigation
[docs] async def back(self) -> None: """Goes one step backward in the browser history on the current target (has to be a tab). """ await self.current_target.back()
[docs] async def forward(self) -> None: """Goes one step forward in the browser history on the current target (has to be a tab). """ await self.current_target.forward()
[docs] async def refresh(self) -> None: """Refreshes the current tab (target). """ await self.current_target.refresh()
# Options
[docs] async def get_cookies(self) -> List[dict]: """list of cookies for the current tab """ return await self.current_target.get_cookies()
[docs] async def delete_all_cookies(self) -> None: """Delete all cookies in the current (incognito-) context. """ await self.current_target.delete_all_cookies()
# noinspection GrazieInspection # Timeouts
[docs] @staticmethod async def sleep(time_to_wait) -> None: # noinspection GrazieInspection """sleep .. note:: use this one instead of time.sleep in the sync version. :param time_to_wait: time in seconds to sleep """ await asyncio.sleep(time_to_wait)
# noinspection PyUnusedLocal
[docs] async def find_element(self, by: str, value: str, timeout: int or None = None) -> WebElement: """find an element in the current target :param by: one of the locators at :func:`By <selenium_driverless.types.by.By>` :param value: the actual query to find the element by :param timeout: how long to wait for the element to exist """ return await self.current_target.find_element(by=by, value=value, timeout=timeout)
[docs] async def find_elements(self, by: str, value: str) -> typing.List[WebElement]: """find multiple elements in the current target :param by: one of the locators at :func:`By <selenium_driverless.types.by.By>` :param value: the actual query to find the elements by """ return await self.current_target.find_elements(by=by, value=value)
[docs] async def search_elements(self, query: str) -> typing.List[WebElement]: """ find elements similarly to how "CTRL+F" in the DevTools Console works :param query: Plain text to find elements with """ return await self.current_target.search_elements(query=query)
[docs] async def get_screenshot_as_file(self, filename: str) -> None: """Saves a screenshot of the current tab to a PNG image file. :param filename: The path you wish to save your screenshot to. should end with a `.png` extension. .. code-block:: python driver.get_screenshot_as_file('screenshots/test.png') """ return await self.current_target.get_screenshot_as_file(filename=filename)
[docs] async def save_screenshot(self, filename) -> None: """alias to :func: `driver.get_screenshot_as_file <selenium_driverless.webdriver.Chrome.get_screenshot_as_file>`""" return await self.get_screenshot_as_file(filename)
[docs] async def get_screenshot_as_png(self) -> bytes: """Gets the screenshot of the current tab as a binary data. """ return await self.current_target.get_screenshot_as_png()
[docs] async def snapshot(self) -> str: """gets the current snapshot as mhtml""" return await self.current_target.snapshot()
[docs] async def save_snapshot(self, filename: str): """Saves a snapshot of the current window to a MHTML file. :param filename: The full path you wish to save your snapshot to. This should end with a ``.mhtml`` extension. .. code-block:: Python await driver.get_snapshot('snapshot.mhtml') """ return await self.current_target.save_snapshot(filename)
# noinspection PyPep8Naming
[docs] async def set_window_size(self, width: int, height: int) -> None: """Sets the width and height of the window, the current tab is within (unless ``windowHandle`` specified) :param width: the width in pixels to set the window to :param height: the height in pixels to set the window to """ await self.set_window_rect(width=int(width), height=int(height))
# noinspection PyPep8Naming
[docs] async def get_window_size(self) -> dict: """Gets the width and height of the current window. returns something like: .. code-block: json {"width":1280, "height":720} """ size = await self.get_window_rect() if size.get("value"): size = size["value"] return {k: size[k] for k in ("width", "height")}
# noinspection PyPep8Naming
[docs] async def set_window_position(self, x: int, y: int) -> dict: """Sets the x,y position of the window, the current tab is in. :param x: the x-coordinate in pixels to set the window position :param y: the y-coordinate in pixels to set the window position """ return await self.set_window_rect(x=int(x), y=int(y))
# noinspection PyPep8Naming
[docs] async def get_window_position(self) -> dict: """Gets the x,y position of the window, the current tab is in. returns something like: .. code-block: json {"x":0, "y":0} """ position = await self.get_window_rect() return {k: position[k] for k in ("x", "y")}
[docs] async def get_window_rect(self) -> dict: """Gets the x, y, with and height coordinates of the window, the current tab is in. returns something like: .. code-block: json {"x":0, "y":0, "width":1280, "height":720, "windowState":"normal" } .. note:: ``windowState`` can be one of "normal", "minimized", "maximized", "fullscreen" """ json = await self.execute_cdp_cmd("Browser.getWindowBounds", {"windowId": await self.current_window_id}) json = json["bounds"] json["x"] = json["left"] del json["left"] json["y"] = json["top"] del json["top"] return json
[docs] async def set_window_rect(self, x=None, y=None, width=None, height=None) -> dict: """Sets the x, y, width and height coordinates of the window the current target is in. :param x: the x-coordinate in pixels to set the window position :param y: the y-coordinate in pixels to set the window position :param width: the width in pixels to set the window to :param height: the height in pixels to set the window to .. note:: either x and y or with and height have to be specified """ if (x is None and y is None) and (height is None and width is None): raise ValueError("x and y or height and width need values") bounds = {"left": x, "top": y, "width": width, 'height': height} await self.execute_cdp_cmd("Browser.setWindowBounds", {"windowId": await self.current_window_id, "bounds": bounds}) bounds["x"] = bounds["left"] del bounds["left"] bounds["y"] = bounds["top"] del bounds["top"] return bounds
[docs] async def get_network_conditions(self): """Gets Chromium network emulation settings. returns a dict like: .. code-block:: python {"latency": 4, "download_throughput": 2, "upload_throughput": 2, "offline": False} """ return await self.current_target.get_network_conditions()
# noinspection SpellCheckingInspection
[docs] async def set_network_conditions(self, offline: bool, latency: int, download_throughput: int, upload_throughput: int, connection_type: typing.Literal[ "none", "cellular2g", "cellular3g", "cellular4g", "bluetooth", "ethernet", "wifi", "wimax", "other"]) -> None: # noinspection GrazieInspection """Sets Chromium network emulation settings. :param offline: :param latency: additional latency in ms :param download_throughput: maximum throughput, 500 * 1024 for example :param upload_throughput: maximum throughput, 500 * 1024 for example :param connection_type: the connection type .. note:: 'throughput' can be used to set both (for download and upload). """ return await self.current_target.set_network_conditions(offline=offline, latency=latency, download_throughput=download_throughput, upload_throughput=upload_throughput, connection_type=connection_type)
[docs] async def delete_network_conditions(self) -> None: """Resets Chromium network emulation settings.""" await self.current_target.delete_network_conditions()
[docs] async def set_permissions(self, name: str, value: typing.Literal["granted", "denied", "prompt"], origin: str = None) -> None: """Sets Applicable Permission :param name: The item to set the permission on. :param value: The value to set on the item :param origin: the origin the permission for. Applies to any origin if not set .. code-block:: python target.set_permissions('clipboard-read', 'denied') """ settings = ["granted", "denied", "prompt"] if value not in settings: raise ValueError(f"value needs to be within {settings}, but got {value}") args = {"permission": {"name": name}, "setting": value} if origin: args["origin"] = origin await self.execute_cdp_cmd("Browser.setPermission", args)
[docs] async def set_proxy(self, proxy_config): # noinspection GrazieInspection """ set a proxy dynamically Example parameters: .. code-block:: python proxy_config = { "mode": "fixed_servers", "rules": { "proxyForHttp": { "scheme": scheme, "host": host, "port": port }, "proxyForHttps": { "scheme": scheme, "host": host, "port": port }, "proxyForFtp": { "scheme": scheme, "host": host, "port": port }, "fallbackProxy": { "scheme": scheme, "host": host, "port": port }, "bypassList": ["<local>"] } } :param proxy_config: see `developer.chrome.com/docs/extensions/reference/proxy <https://developer.chrome.com/docs/extensions/reference/proxy/>`__ for reference for authentification, see :func:`webdriver.Chrome.set_auth <selenium_driverless.webdriver.Chrome.set_auth>` """ extension = await self.mv3_extension await extension.eval_async("await chrome.proxy.settings.set(arguments[0])", {"value": proxy_config, "scope": 'regular'})
[docs] async def set_single_proxy(self, proxy: str, bypass_list=None): """ Set a single proxy dynamically to be applied in all contexts. .. code-block:: python "http://user1:passwrd1@example.proxy.com:5001/" .. warning:: - Only supported when Chrome has been started with driverless or the extension at ``selenium_driverless/files/mv3_extension`` has been loaded into the browser. - ``Socks5`` doesn't support authentication due to `crbug#1309413 <https://bugs.chromium.org/p/chromium/issues/detail?id=1309413>`__. """ # parse scheme proxy = proxy.split("://") if len(proxy) == 2: scheme, proxy = proxy else: scheme = None proxy = proxy[0] proxy = proxy.split("@") if len(proxy) == 2: creds, proxy = proxy else: proxy = proxy[0] creds = None # parse host & port proxy = proxy.split(":") if len(proxy) == 2: host, port = proxy port = int(port.replace("/", "")) else: port = None host = proxy[0] rule = {"host": host} if scheme: rule["scheme"] = scheme if port: rule["port"] = port if bypass_list is None: bypass_list = ["<local>"] proxy_config = { "mode": "fixed_servers", "rules": { "proxyForHttp": rule, "proxyForHttps": rule, "proxyForFtp": rule, "fallbackProxy": rule, "bypassList": bypass_list } } await self.set_proxy(proxy_config) if creds: user, passw = creds.split(":") await self.set_auth(user, passw, f"{host}:{port}")
[docs] async def clear_proxy(self): """ Clear the applied proxy (=> use no proxy at all) in all contexts. """ extension = await self.mv3_extension await extension.eval_async(""" await chrome.proxy.settings.set( {value: {mode: "direct"}, scope: 'regular'} ); """)
async def _ensure_auth_interception(self, timeout: float = 0.3, set_flag: bool = True): # internal, to re-apply auth interception which is broken when a new context gets opened. Due to how extensions in incognito work if not self._auth_interception_enabled: script = """ if(globalThis.authCreds == undefined){globalThis.authCreds = {}} globalThis.onAuth = function onAuth(details) { return globalThis.authCreds[details.challenger.host+":"+details.challenger.port] } chrome.webRequest.onAuthRequired.addListener( onAuth, {urls: ["<all_urls>"]}, ['blocking'] ); """ mv3_target = await self.mv3_extension await mv3_target.execute_script(script, timeout=timeout) if set_flag: self._auth_interception_enabled = True
[docs] async def set_auth(self, username: str, password: str, host_with_port): """ Set authentication dynamically to be applied in all contexts. .. code-block:: python driver.set_auth("user1","passwrd1", "example.com:5001") .. warning:: - Only supported when Chrome has been started with driverless or the extension at ``selenium_driverless/files/mv3_extension`` has been loaded into the browser. - ``Socks5`` doesn't support authentication due to `crbug#1309413 <https://bugs.chromium.org/p/chromium/issues/detail?id=1309413>`__. :param username: :param password: :param host_with_port: in format "example.com:5001" """ # provide auth await self._ensure_auth_interception() mv3_target = await self.mv3_extension arg = { "authCredentials": { "username": username, "password": password } } await mv3_target.execute_script("globalThis.authCreds[arguments[1]] = arguments[0]", arg, host_with_port) self._auth[host_with_port] = arg
[docs] async def clear_auth(self): """ clear the applied auth from :func:`webdriver.Chrome.set_auth <selenium_driverless.webdriver.Chrome.set_auth>` """ # provide auth mv3_target = await self.mv3_extension script = "chrome.webRequest.onAuthRequired.removeListener(globalThis.onAuth);" self._auth = {} await mv3_target.execute_script(script) self._auth_interception_enabled = False
[docs] async def wait_for_cdp(self, event: str, timeout: float or None = None) -> dict: """ wait for an event on the current target see :func:`Target.wait_for_cdp <selenium_driverless.types.target.Target.wait_for_cdp>` for reference """ return await self.current_target.wait_for_cdp(event=event, timeout=timeout)
[docs] async def add_cdp_listener(self, event: str, callback: typing.Callable[[dict], any]): """ add a listener for a CDP event on the current target see :func:`Target.add_cdp_listener <selenium_driverless.types.target.Target.add_cdp_listener>` for reference """ return await self.current_target.add_cdp_listener(event=event, callback=callback)
[docs] async def remove_cdp_listener(self, event: str, callback: typing.Callable[[dict], any]): """ remove a listener for a CDP event on the current target see :func:`Target.remove_cdp_listener <selenium_driverless.types.target.Target.remove_cdp_listener>` for reference """ return await self.current_target.remove_cdp_listener(event=event, callback=callback)
[docs] async def get_cdp_event_iter(self, event: str, target_id: str = None) -> typing.AsyncIterable[dict]: """ iterate over CDP events on the current target see :func:`Target.get_cdp_event_iter <selenium_driverless.types.target.Target.get_cdp_event_iter>` for reference """ target = await self.get_target(target_id=target_id) return await target.get_cdp_event_iter(event=event)
[docs] async def execute_cdp_cmd(self, cmd: str, cmd_args: dict or None = None, timeout: float or None = 10, target_id: str = None) -> dict: """Execute Chrome Devtools Protocol command on the current target executes it on :class:`Target.execute_cdp_cmd <selenium_driverless.types.base_target.BaseTarget>` if ``message:'Not allowed'`` received see :func:`Target.execute_cdp_cmd <selenium_driverless.types.target.Target.execute_cdp_cmd>` for reference """ return await self.current_context.execute_cdp_cmd(cmd=cmd, cmd_args=cmd_args, timeout=timeout, target_id=target_id)
[docs] async def fetch(self, *args, **kwargs) -> dict: """ executes a JS ``fetch`` request within the current target see :func:`Target.fetch <selenium_driverless.types.target.Target.fetch>` for reference """ return await self.current_target.fetch(*args, **kwargs)
[docs] async def xhr(self, *args, **kwargs) -> dict: """ executes a JS ``XMLHttpRequest`` request within the current target see :func:`Target.fetch <selenium_driverless.types.target.Target.fetch>` for reference """ return await self.current_target.xhr(*args, **kwargs)
# noinspection PyTypeChecker
[docs] async def get_sinks(self, target_id: str = None) -> list: """ :Returns: A list of sinks available for Cast. """ target = await self.get_target(target_id=target_id) return await target.get_sinks()
[docs] async def get_issue_message(self, target_id: str = None): """ :Returns: An error message when there is any issue in a Cast session. """ target = await self.get_target(target_id=target_id) return await target.get_issue_message()
[docs] async def set_sink_to_use(self, sink_name: str) -> dict: """Sets a specific sink, using its name, as a Cast session receiver target. :param sink_name: Name of the sink to use as the target. """ return await self.current_target.set_sink_to_use(sink_name=sink_name)
[docs] async def start_desktop_mirroring(self, sink_name: str) -> dict: """Starts a desktop mirroring session on a specific receiver target. :param sink_name: Name of the sink to use as the target. """ return await self.current_target.start_desktop_mirroring(sink_name=sink_name)
[docs] async def start_tab_mirroring(self, sink_name: str) -> dict: """Starts a tab mirroring session on a specific receiver target. :param sink_name: Name of the sink to use as the target. """ return await self.current_target.start_tab_mirroring(sink_name=sink_name)
[docs] async def stop_casting(self, sink_name: str, target_id: str = None) -> dict: """Stops the existing Cast session on a specific receiver target. :Args: - sink_name: Name of the sink to stop the Cast session. """ target = await self.get_target(target_id=target_id) return await target.stop_casting(sink_name=sink_name)