diff --git a/Framework/Built_In_Automation/Sequential_Actions/action_declarations/playwright.py b/Framework/Built_In_Automation/Sequential_Actions/action_declarations/playwright.py index d0aca46a..3964a271 100644 --- a/Framework/Built_In_Automation/Sequential_Actions/action_declarations/playwright.py +++ b/Framework/Built_In_Automation/Sequential_Actions/action_declarations/playwright.py @@ -11,7 +11,9 @@ declarations = ( # Browser Management { "name": "open browser", "function": "Open_Browser", "screenshot": "web" }, + { "name": "open electron app", "function": "Open_Electron_App", "screenshot": "web" }, { "name": "go to link", "function": "Go_To_Link", "screenshot": "web" }, + { "name": "go to link v2", "function": "Go_To_Link_V2", "screenshot": "web" }, { "name": "tear down browser", "function": "Tear_Down_Playwright", "screenshot": "none" }, { "name": "teardown", "function": "Tear_Down_Playwright", "screenshot": "none" }, { "name": "switch browser", "function": "Switch_Browser", "screenshot": "none" }, @@ -21,6 +23,7 @@ { "name": "double click", "function": "Double_Click_Element", "screenshot": "web" }, { "name": "right click", "function": "Right_Click_Element", "screenshot": "web" }, { "name": "hover", "function": "Hover_Over_Element", "screenshot": "web" }, + { "name": "click and download", "function": "Click_and_Download", "screenshot": "web" }, # Text Input { "name": "text", "function": "Enter_Text_In_Text_Box", "screenshot": "web" }, @@ -34,6 +37,8 @@ # Element Information { "name": "save attribute", "function": "Save_Attribute", "screenshot": "web" }, + { "name": "change attribute value", "function": "Change_Attribute_Value", "screenshot": "web" }, + { "name": "capture network log", "function": "capture_network_log", "screenshot": "web" }, { "name": "get element info", "function": "get_element_info", "screenshot": "web" }, { "name": "extract table data", "function": "Extract_Table_Data", "screenshot": "web" }, @@ -45,6 +50,11 @@ { "name": "scroll", "function": "Scroll", "screenshot": "web" }, { "name": "scroll to element", "function": "scroll_to_element", "screenshot": "web" }, { "name": "scroll element to top", "function": "scroll_to_element", "screenshot": "web" }, + { "name": "scroll to top", "function": "scroll_to_top", "screenshot": "web" }, + + # Lists / attributes + { "name": "save attribute values in list", "function": "save_attribute_values_in_list", "screenshot": "web" }, + { "name": "save web elements in list", "function": "save_web_elements_in_list", "screenshot": "web" }, # Selection (Dropdowns/Checkboxes) { "name": "select by visible text", "function": "Select_Deselect", "screenshot": "web" }, @@ -55,6 +65,9 @@ { "name": "deselect by index", "function": "Select_Deselect", "screenshot": "web" }, { "name": "deselect all", "function": "Select_Deselect", "screenshot": "web" }, { "name": "check uncheck", "function": "check_uncheck", "screenshot": "web" }, + { "name": "check uncheck all", "function": "check_uncheck_all", "screenshot": "web" }, + { "name": "multiple check uncheck", "function": "multiple_check_uncheck", "screenshot": "web" }, + { "name": "slider bar", "function": "slider_bar", "screenshot": "web" }, # Window/Tab Management { "name": "switch window", "function": "switch_window_or_tab", "screenshot": "web" }, @@ -80,6 +93,7 @@ # File Upload { "name": "upload file", "function": "upload_file", "screenshot": "web" }, + { "name": "copy image into browser", "function": "copy_image_into_browser", "screenshot": "web" }, # Window Management { "name": "resize window", "function": "resize_window", "screenshot": "web" }, diff --git a/Framework/Built_In_Automation/Sequential_Actions/common_functions.py b/Framework/Built_In_Automation/Sequential_Actions/common_functions.py index c2b361a6..4fb23b5c 100755 --- a/Framework/Built_In_Automation/Sequential_Actions/common_functions.py +++ b/Framework/Built_In_Automation/Sequential_Actions/common_functions.py @@ -3723,7 +3723,7 @@ def _print(*args, sep=' ', end='\n', file=None, dont_send=False): @logger -def execute_python_code(data_set): +async def execute_python_code(data_set): try: sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME inp, out_var, main_function, Code, filepath_code = "", "", "", "", "" @@ -3749,10 +3749,31 @@ def execute_python_code(data_set): Code = filepath_code if filepath_code else Code sr.shared_variables["print"] = _print + try: + from Framework.Built_In_Automation.Web import utils as WebUtils + + await WebUtils.hydrate_browser_compatibility_globals(data_set) + except Exception: + CommonUtil.ExecLog( + sModuleInfo, + "Could not hydrate browser compatibility globals before executing python code", + 2, + ) + CommonUtil.Exception_Handler(sys.exc_info()) previous_vars = set(sr.shared_variables) - try: exec(Code, sr.shared_variables) - except: return CommonUtil.Exception_Handler(sys.exc_info()) + try: + compiled_code = compile( + Code, + "", + "exec", + flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT, + ) + exec_result = eval(compiled_code, sr.shared_variables) + if inspect.isawaitable(exec_result): + await exec_result + except: + return CommonUtil.Exception_Handler(sys.exc_info()) try: current_vars = set(sr.shared_variables) diff --git a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py index 6652809f..69af0cee 100644 --- a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py @@ -24,26 +24,29 @@ import sys import os import inspect +import platform import time -import re +import base64 from pathlib import Path +from urllib.parse import urlparse +import requests from playwright.async_api import ( async_playwright, Page, Browser, BrowserContext, - Locator, TimeoutError as PlaywrightTimeoutError, Error as PlaywrightError, ) -from Framework.Utilities import CommonUtil +from Framework.Utilities import CommonUtil, ConfigModule from Framework.Utilities.decorators import logger +from settings import ZEUZ_NODE_DOWNLOADS_DIR from Framework.Built_In_Automation.Shared_Resources import ( BuiltInFunctionSharedResources as sr, ) -from Framework.Utilities.CommonUtil import passed_tag_list, failed_tag_list +from Framework.Utilities.CommonUtil import failed_tag_list from . import locator as PlaywrightLocator from . import utils as PlaywrightUtils from Framework.Built_In_Automation.Web.utils import ( @@ -62,7 +65,7 @@ def _get_frame_locator(): if frame_locator in failed_tag_list: return None return frame_locator - except: + except Exception: # Variable doesn't exist yet return None @@ -143,6 +146,34 @@ def _save_current_playwright_frame(frame_locator): sr.Set_Shared_Variables("browser_sessions", sessions) +def _get_installed_cft_chromedriver(browser_version): + system = platform.system().lower() + arch = platform.machine().lower() + + if system == "windows": + platform_key = "win64" if arch in ("amd64", "x86_64") else "win32" + executable = "chromedriver.exe" + elif system == "darwin": + platform_key = "mac-arm64" if arch == "arm64" else "mac-x64" + executable = "chromedriver" + elif system == "linux": + platform_key = "linux64" + executable = "chromedriver" + else: + return None + + driver_path = ( + ZEUZ_NODE_DOWNLOADS_DIR + / "chrome_for_testing" + / "versions" + / browser_version + / "driver" + / f"chromedriver-{platform_key}" + / executable + ) + return str(driver_path) if driver_path.exists() else None + + async def _activate_browser_session_for_action(step_data, function_name=None): """Select the requested browser session before running Playwright actions.""" @@ -177,11 +208,45 @@ def connect_selenium_to_playwright(port=9222): try: from selenium import webdriver from selenium.webdriver.chrome.options import Options + from selenium.webdriver.chrome.service import Service + from webdriver_manager.chrome import ChromeDriverManager options = Options() options.add_experimental_option("debuggerAddress", f"127.0.0.1:{port}") - - driver = webdriver.Chrome(options=options) + + service = None + try: + response = requests.get(f"http://127.0.0.1:{port}/json/version", timeout=5) + response.raise_for_status() + browser_version = ( + response.json() + .get("Browser", "") + .split("/", 1)[-1] + .strip() + ) + if browser_version: + driver_path = _get_installed_cft_chromedriver(browser_version) + if not driver_path: + driver_path = ChromeDriverManager( + driver_version=browser_version + ).install() + service = Service(executable_path=driver_path) + CommonUtil.ExecLog( + "connect_selenium_to_playwright", + f"Using ChromeDriver matching browser version {browser_version}", + 1, + ) + except Exception: + CommonUtil.ExecLog( + "connect_selenium_to_playwright", + "Could not resolve matching ChromeDriver for Playwright browser; falling back to Selenium Manager", + 2, + ) + + if service: + driver = webdriver.Chrome(service=service, options=options) + else: + driver = webdriver.Chrome(options=options) from Framework.Built_In_Automation.Web.Selenium import BuiltInFunctions as SeleniumBuiltInFunctions SeleniumBuiltInFunctions.selenium_driver = driver @@ -203,6 +268,12 @@ def connect_selenium_to_playwright(port=9222): MODULE_NAME = inspect.getmodulename(__file__) +temp_config = str( + Path(os.path.abspath(__file__).split("Framework")[0]) + / "AutomationLog" + / ConfigModule.get_config_value("Advanced Options", "_file") +) + # Playwright instances playwright_instance = None browser: Browser = None @@ -212,12 +283,77 @@ def connect_selenium_to_playwright(port=9222): # Multi-page/context support playwright_details = {} # {"page_id": {"page": Page, "context": Context, "browser": Browser}} current_page_id = None +network_log_details = {} # Default settings default_timeout = 30000 # 30 seconds default_viewport = {"width": 1920, "height": 1080} +def _compact(value): + return str(value).replace(" ", "").replace("_", "").replace("-", "").lower() + + +def _is_action_mid(mid): + return "action" in str(mid).strip().lower() + + +def _truthy(value): + return str(value).strip().lower() in ("true", "yes", "ok", "1", "accept") + + +def _is_placeholder(value, *placeholders): + value_l = str(value).strip().lower() + return not value_l or value_l in placeholders or value_l == "default" + + +def _has_element_rows(step_data): + return any(_is_element_parameter_mid(mid) for _, mid, _ in step_data) + + +def _action_row_value(step_data, *action_names): + names = {name.strip().lower() for name in action_names} + for left, mid, right in step_data: + if _is_action_mid(mid) and (not names or left.strip().lower() in names): + return str(right) + return None + + +def _save_variable_from_action_or_save_parameter(step_data, *action_names): + save_variable = None + for left, mid, right in step_data: + mid_l = str(mid).strip().lower() + if mid_l == "save parameter": + save_variable = str(left).strip() + elif _is_action_mid(mid) and (not action_names or left.strip().lower() in action_names): + value = str(right).strip() + if not _is_placeholder(value, left.strip().lower()): + save_variable = value + return save_variable + + +def _screenshot_folder(): + try: + folder = ConfigModule.get_config_value("sectionOne", "screen_capture_folder", temp_config) + if folder: + Path(folder).mkdir(parents=True, exist_ok=True) + return folder + except Exception: + pass + return os.getcwd() + + +def _download_folder(): + try: + folder = sr.Get_Shared_Variables("zeuz_download_folder") + if folder not in failed_tag_list: + Path(folder).mkdir(parents=True, exist_ok=True) + return folder + except Exception: + pass + return os.getcwd() + + ######################### # # # Browser Management # @@ -291,7 +427,10 @@ async def Open_Browser(step_data): try: # Parse parameters url = None + dependency = sr.Get_Shared_Variables("dependency") browser_name = "chromium" + if isinstance(dependency, dict) and dependency.get("Browser"): + browser_name = dependency["Browser"].strip().lower().replace("headless", "").strip() or browser_name headless = False viewport = default_viewport.copy() args = [] @@ -318,7 +457,7 @@ async def Open_Browser(step_data): url = right_v elif left_l in ("browser", "browser name"): browser_name = right_v.lower() - elif left_l in ("driver id", "page id", "driver tag", "session"): + elif _compact(left_l) in ("driverid", "pageid", "drivertag", "session"): page_id = right_v elif mid_l == "optional parameter": @@ -349,7 +488,7 @@ async def Open_Browser(step_data): color_scheme = right_v elif left_l == "permission": permissions.append(right_v) - elif left_l in ("driver id", "page id", "driver tag", "session"): + elif _compact(left_l) in ("driverid", "pageid", "drivertag", "session"): page_id = right_v elif mid_l == "shared capability": @@ -404,7 +543,7 @@ async def Open_Browser(step_data): browser = await playwright_instance.chromium.launch(**launch_options) # Context options - context_options = {"viewport": viewport} + context_options = {"viewport": viewport, "accept_downloads": True} if record_video: context_options["record_video_dir"] = video_dir or "videos/" if locale: @@ -471,17 +610,186 @@ async def Open_Browser(step_data): @logger -async def Go_To_Link(step_data): +async def Open_Electron_App(step_data): """ - Navigate to a URL. + Launch an Electron desktop app via Playwright's Electron API. - Example: + Example - Basic (per-OS binary paths, like Selenium): Field Sub Field Value - go to link input parameter https://example.com - wait until optional parameter networkidle - go to link playwright action go to link + windows input parameter C:\\Path\\To\\MyApp.exe + mac input parameter /Applications/MyApp.app/Contents/MacOS/MyApp + linux input parameter /opt/myapp/myapp + open electron app playwright action open electron app + + Example - With optional parameters: + Field Sub Field Value + mac input parameter /Applications/MyApp.app/Contents/MacOS/MyApp + session optional parameter electron_1 + add argument optional parameter --no-sandbox + cwd optional parameter /tmp/working_dir + timeout optional parameter 30 + open electron app playwright action open electron app + + Notes: + - Only the path matching the current OS is used; other rows are ignored. + - The first Electron BrowserWindow becomes the active page, so subsequent + element / click / text actions work the same as in a normal browser session. + """ + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + global playwright_instance, browser, context, current_page + global current_page_id, playwright_details + + try: + desktop_app_path = "" + driver_id = "" + args = [] + cwd = None + env_vars = {} + timeout = None + record_video = False + video_dir = None + + for left, mid, right in step_data: + left_compact = left.replace(" ", "").replace("_", "").replace("-", "").lower() + mid_l = mid.strip().lower() + right_v = right.strip() + + if "windows" in left_compact and platform.system() == "Windows": + desktop_app_path = right_v + elif "mac" in left_compact and platform.system() == "Darwin": + desktop_app_path = right_v + elif "linux" in left_compact and platform.system() == "Linux": + desktop_app_path = right_v + elif left_compact == "driverid": + driver_id = right_v + elif left_compact == "session" and mid_l == "optional parameter": + driver_id = right_v + elif mid_l == "optional parameter": + if left_compact in ("addargument", "arg", "argument"): + args.append(right_v) + elif left_compact == "cwd": + cwd = right_v + elif left_compact == "env": + # Format: KEY=VALUE + if "=" in right_v: + k, v = right_v.split("=", 1) + env_vars[k.strip()] = v.strip() + elif left_compact == "timeout": + try: + timeout = int(float(right_v) * 1000) + except ValueError: + pass + elif left_compact == "recordvideo": + record_video = right_v.lower() in ("true", "yes", "1") + elif left_compact == "videodir": + video_dir = right_v - wait until options: load, domcontentloaded, networkidle, commit + if not desktop_app_path: + CommonUtil.ExecLog( + sModuleInfo, + f"You did not provide an Electron app path for {platform.system()} OS", + 3, + ) + return "zeuz_failed" + + if not driver_id: + driver_id = "default" + + desktop_app_path = CommonUtil.path_parser(desktop_app_path) + + # Reserve a debug port for the session even though Playwright drives Electron via CDP automatically. + electron_port = get_debug_port(driver_id or "electron", start=9230, stop=9320) + + launch_options = {"executable_path": desktop_app_path} + if args: + launch_options["args"] = args + if cwd: + launch_options["cwd"] = cwd + if env_vars: + launch_options["env"] = env_vars + if timeout: + launch_options["timeout"] = timeout + if record_video: + launch_options["record_video_dir"] = video_dir or "videos/" + + playwright_instance = await async_playwright().start() + try: + electron_app = await playwright_instance._electron.launch(**launch_options) + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + + try: + current_page = await electron_app.first_window() + except Exception: + # Some Electron apps create no visible BrowserWindow at startup. + current_page = None + + # In Electron there is no BrowserContext we own - bind the app object in its place so + # downstream session-aware code keeps working. + context = electron_app.context if hasattr(electron_app, "context") else None + browser = electron_app # `browser` slot holds the launched app for teardown. + current_page_id = driver_id + + playwright_details[driver_id] = { + "page": current_page, + "context": context, + "browser": electron_app, + "playwright": playwright_instance, + "remote-debugging-port": electron_port, + } + + sr.Set_Shared_Variables("playwright_page", current_page) + sr.Set_Shared_Variables("playwright_context", context) + sr.Set_Shared_Variables("playwright_browser", electron_app) + sr.Set_Shared_Variables("active_web_driver_type", "playwright") + if timeout: + sr.Set_Shared_Variables("element_wait", timeout / 1000) + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) + + create_browser_session( + session_name=driver_id, + playwright_page=current_page, + playwright_browser=electron_app, + playwright_context=context, + playwright_frame=None, + playwright_instance=playwright_instance, + remote_debugging_port=electron_port, + ) + + CommonUtil.ExecLog(sModuleInfo, "Started Electron App", 1) + return "passed" + + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + + +@logger +async def Go_To_Link(step_data): + """ + Navigate to a URL (and open browser if not already open). + + Example 1 - Basic: + Field Sub Field Value + go to link input parameter https://example.com + go to link playwright action go to link + + Example 2 - Selenium-compatible options: + Field Sub Field Value + go to link input parameter https://example.com + wait time to appear element optional parameter 20 + wait time to page load optional parameter 60 + resolution optional parameter 1920,1080 + wait until optional parameter networkidle + go to link playwright action go to link + + Options: + - wait until (load | domcontentloaded | networkidle | commit) + - timeout / wait time to page load: page load timeout in seconds + - wait for element / wait time to appear element: element wait timeout + (seconds) saved to the "element_wait" shared variable so subsequent + element lookups use it + - resolution: WIDTHxHEIGHT or WIDTH,HEIGHT (applied to the current page) + - session: reuse or create a named browser session """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME global current_page @@ -493,31 +801,30 @@ async def Go_To_Link(step_data): left_l = left.strip().lower() mid_l = mid.strip().lower() right_v = right.strip() - - if mid_l == "optional parameter" and left_l == "session": + + if mid_l == "optional parameter" and _compact(left_l) in ("session", "driverid", "driver", "drivertag", "pageid"): session_name = right_v break - + # Check if session exists and use it if session_name: existing_session = get_browser_session(session_name) - if existing_session and await _ensure_playwright_session(session_name, existing_session) not in failed_tag_list: CommonUtil.ExecLog(sModuleInfo, f"Using existing browser session: {session_name}", 1) else: # Session doesn't exist, open new browser with session name CommonUtil.ExecLog(sModuleInfo, f"Session '{session_name}' not found. Opening new browser.", 2) - + # Add session parameter to step_data for Open_Browser step_data_with_session = step_data.copy() if not any(left.strip().lower() == "session" and mid.strip().lower() == "optional parameter" for left, mid, right in step_data_with_session): step_data_with_session.append(("session", "optional parameter", session_name)) - + result = await Open_Browser(step_data_with_session) if result == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Failed to open browser for new session", 3) return "zeuz_failed" - + elif current_page is None: default_session = get_browser_session("default") if default_session and default_session.get("selenium_driver"): @@ -535,42 +842,102 @@ async def Go_To_Link(step_data): url = None wait_until = "domcontentloaded" timeout = None + element_wait_sec = None + window_size_x = None + window_size_y = None for left, mid, right in step_data: - left_l = left.strip().lower() + left_raw = left.strip() + left_l = left_raw.lower() + left_compact = left_l.replace(" ", "").replace("_", "").replace("-", "") mid_l = mid.strip().lower() right_v = right.strip() if left_l in ("go to link", "url", "link"): - url = right_v + url = right_v elif mid_l == "optional parameter": - if left_l == "session": - # Skip session parameter - already processed above + if _compact(left_l) in ("session", "driverid", "driver", "drivertag", "pageid"): continue - elif left_l in ("wait until", "wait_until", "waituntil", "wait time"): + if left_l in ("wait until", "wait_until", "waituntil"): wait_until = right_v.lower() - elif left_l == "timeout": - timeout = int(float(right_v) * 1000) + elif left_compact in ("timeout", "waittimetopageload", "pageloadtimeout"): + try: + timeout = int(float(right_v) * 1000) + except ValueError: + pass + elif left_compact in ("waittimetoappearelement", "waitforelement", "elementwait"): + try: + element_wait_sec = float(right_v) + except ValueError: + pass + elif left_l == "resolution": + try: + parts = right_v.replace("x", ",").split(",") + window_size_x = int(parts[0].strip()) + window_size_y = int(parts[1].strip()) + except (ValueError, IndexError): + pass if not url: CommonUtil.ExecLog(sModuleInfo, "No URL provided", 3) return "zeuz_failed" + if element_wait_sec is not None: + sr.Set_Shared_Variables("element_wait", element_wait_sec) + + if timeout: + try: + current_page.set_default_navigation_timeout(timeout) + current_page.set_default_timeout(timeout) + except Exception: + pass + + if window_size_x and window_size_y: + try: + await current_page.set_viewport_size({"width": window_size_x, "height": window_size_y}) + except Exception: + pass + goto_options = {"wait_until": wait_until} if timeout: goto_options["timeout"] = timeout - await current_page.goto(url, **goto_options) - + try: + await current_page.goto(url, **goto_options) + except PlaywrightTimeoutError: + CommonUtil.ExecLog(sModuleInfo, "Maximum page load time reached. Loading and proceeding", 2) + # Reset frame context when navigating to a new URL sr.Set_Shared_Variables("playwright_frame", None) _save_current_playwright_frame(None) - - CommonUtil.ExecLog(sModuleInfo, f"Navigated to: {url}", 1) + + CommonUtil.ExecLog(sModuleInfo, f"Successfully opened your link: {url}", 1) return "passed" except Exception: - return CommonUtil.Exception_Handler(sys.exc_info()) + return CommonUtil.Exception_Handler(sys.exc_info(), None, "failed to open your link") + + +@logger +async def Go_To_Link_V2(step_data): + """Selenium-compatible v2 navigation wrapper.""" + + translated = [] + for left, mid, right in step_data: + left_l = left.strip().lower() + if left_l == "go to link v2": + translated.append(("go to link", mid, right)) + elif left_l == "driver tag": + translated.append(("session", "optional parameter", right)) + elif left_l == "page load timeout": + translated.append(("wait time to page load", "optional parameter", right)) + elif left_l == "wait for element": + translated.append(("wait time to appear element", "optional parameter", right)) + elif left_l == "page load strategy": + translated.append(("wait until", "optional parameter", right)) + else: + translated.append((left, mid, right)) + return await Go_To_Link(translated) @logger @@ -600,7 +967,7 @@ async def Tear_Down_Playwright(step_data=None): mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "optional parameter" and left_l == "session": + if mid_l == "optional parameter" and _compact(left_l) in ("session", "driverid", "driver", "drivertag", "pageid"): session_name = right_v break @@ -632,7 +999,7 @@ async def Tear_Down_Playwright(step_data=None): pass CommonUtil.ExecLog(sModuleInfo, f"Teared down session '{session_name}'", 1) - except Exception as e: + except Exception: errMsg = f"Unable to tear down session '{session_name}'. may already been killed" CommonUtil.ExecLog(sModuleInfo, errMsg, 2) @@ -766,12 +1133,11 @@ async def Switch_Browser(step_data): right_v = right.strip() if mid_l in ("input parameter", "optional parameter"): - if left_l in ("driver id", "page id", "driver tag", "session"): + if _compact(left_l) in ("driverid", "pageid", "drivertag", "session"): target_id = right_v if not target_id: - CommonUtil.ExecLog(sModuleInfo, "No driver/page ID provided", 3) - return "zeuz_failed" + target_id = "default" existing_session = get_browser_session(target_id) if existing_session and existing_session.get("playwright_page"): @@ -814,7 +1180,7 @@ async def Switch_Browser(step_data): ######################### @logger -async def Click_Element(step_data): +async def Click_Element(step_data, retry=0): """ Click an element. @@ -823,19 +1189,24 @@ async def Click_Element(step_data): id element parameter submit-btn click playwright action click - Example 2 - With options: + Example 2 - With JS click (forces click via JS .click()): Field Sub Field Value id element parameter submit-btn use js optional parameter true - offset optional parameter 10,5 click playwright action click - Example 3 - Double click: + Example 3 - Click at offset (Selenium-compatible: percent from element center): + Field Sub Field Value + id element parameter submit-btn + offset optional parameter 20,30 + click playwright action click + + Example 4 - Double click: Field Sub Field Value id element parameter item double click playwright action double click - Example 4 - Right click: + Example 5 - Right click: Field Sub Field Value id element parameter item right click playwright action right click @@ -846,14 +1217,13 @@ async def Click_Element(step_data): try: # Handle session parameter session_name, current_page, current_page_id, context, browser = await _handle_playwright_session(step_data) - if current_page is None: CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) return "zeuz_failed" # Parse options use_js = False - offset = None + offset_value = "" double_click = False right_click = False click_count = 1 @@ -874,8 +1244,7 @@ async def Click_Element(step_data): if left_l == "use js": use_js = right_v.lower() in ("true", "yes", "1") elif left_l == "offset": - parts = right_v.split(",") - offset = {"x": float(parts[0].strip()), "y": float(parts[1].strip())} + offset_value = right_v elif left_l == "click count": click_count = int(right_v) elif left_l == "modifier": @@ -885,7 +1254,7 @@ async def Click_Element(step_data): elif left_l == "timeout": timeout = int(float(right_v) * 1000) - elif mid_l == "action": + elif "action" in mid_l: if "double" in left_l: double_click = True elif "right" in left_l: @@ -894,15 +1263,51 @@ async def Click_Element(step_data): # Get element locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": - CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + CommonUtil.ExecLog(sModuleInfo, "Could not find element", 3) return "zeuz_failed" + # Click using offset (Selenium-compatible: percentage of half element size from center) + if offset_value: + try: + box = await locator.bounding_box() + if not box: + CommonUtil.ExecLog(sModuleInfo, "Cannot determine element bounding box for offset click", 3) + return "zeuz_failed" + parts = offset_value.replace(" ", "").split(",") + pct_x = float(parts[0]) + pct_y = float(parts[1]) + # Selenium-style: percent of half-size from center, anchored at top-left of element + offset_x = (box["width"] / 2.0) + (box["width"] / 2.0) * (pct_x / 100.0) + offset_y = (box["height"] / 2.0) + (box["height"] / 2.0) * (pct_y / 100.0) + click_options = {"position": {"x": offset_x, "y": offset_y}} + if modifiers: + click_options["modifiers"] = modifiers + if delay: + click_options["delay"] = delay + if timeout: + click_options["timeout"] = timeout + if right_click: + click_options["button"] = "right" + if double_click: + await locator.dblclick(**click_options) + else: + await locator.click(**click_options) + CommonUtil.ExecLog(sModuleInfo, "Click on location successful", 1) + return "passed" + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info(), None, "Error clicking location") + + # JS click - matches Selenium use_js behavior (true HTMLElement.click() via JS) + if use_js: + try: + await locator.evaluate("el => el.click()") + CommonUtil.ExecLog(sModuleInfo, "Successfully clicked the element via JS", 1) + return "passed" + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + # Build click options click_options = {} - if use_js: - click_options["force"] = True - if offset: - click_options["position"] = offset if modifiers: click_options["modifiers"] = modifiers if delay: @@ -913,18 +1318,52 @@ async def Click_Element(step_data): click_options["click_count"] = click_count # Perform click - if double_click: - await locator.dblclick(**{k: v for k, v in click_options.items() if k != "click_count"}) - CommonUtil.ExecLog(sModuleInfo, "Double click performed", 1) - elif right_click: - click_options["button"] = "right" - await locator.click(**click_options) - CommonUtil.ExecLog(sModuleInfo, "Right click performed", 1) - else: - await locator.click(**click_options) - CommonUtil.ExecLog(sModuleInfo, "Click performed", 1) - - return "passed" + try: + if double_click: + await locator.dblclick(**{k: v for k, v in click_options.items() if k != "click_count"}) + CommonUtil.ExecLog(sModuleInfo, "Double click performed", 1) + elif right_click: + click_options["button"] = "right" + await locator.click(**click_options) + CommonUtil.ExecLog(sModuleInfo, "Right click performed", 1) + else: + await locator.click(**click_options) + CommonUtil.ExecLog(sModuleInfo, "Successfully clicked the element", 1) + return "passed" + except PlaywrightTimeoutError: + # Click intercepted or element not actionable - fall back to JS click (matches Selenium behavior) + try: + await locator.evaluate("el => el.click()") + CommonUtil.ExecLog( + sModuleInfo, + "Your element is overlapped with another sibling element. Clicked the element successfully by executing JavaScript", + 2, + ) + return "passed" + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + except PlaywrightError as e: + err_msg = str(e).lower() + # Stale element: retry up to 5 times with 1s delay + if ("stale" in err_msg or "detached" in err_msg) and retry < 5: + CommonUtil.ExecLog( + sModuleInfo, + "Javascript of the element is not fully loaded. Trying again after 1 second delay", + 2, + ) + await asyncio.sleep(1) + return await Click_Element(step_data, retry + 1) + # Try JS click fallback + try: + await locator.evaluate("el => el.click()") + CommonUtil.ExecLog( + sModuleInfo, + "Click failed natively; clicked successfully via JavaScript", + 2, + ) + return "passed" + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) except Exception: return CommonUtil.Exception_Handler(sys.exc_info()) @@ -944,7 +1383,7 @@ async def Double_Click_Element(step_data): modified_step_data = list(step_data) # Ensure the action indicates double click for i, (left, mid, right) in enumerate(modified_step_data): - if mid.strip().lower() == "action": + if "action" in mid.strip().lower(): modified_step_data[i] = ("double click", mid, right) break @@ -963,7 +1402,7 @@ async def Right_Click_Element(step_data): """ modified_step_data = list(step_data) for i, (left, mid, right) in enumerate(modified_step_data): - if mid.strip().lower() == "action": + if "action" in mid.strip().lower(): modified_step_data[i] = ("right click", mid, right) break @@ -1027,6 +1466,60 @@ async def Hover_Over_Element(step_data): return CommonUtil.Exception_Handler(sys.exc_info()) +@logger +async def Click_and_Download(data_set): + """Click an element and wait for a browser download.""" + + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + global current_page + + try: + if current_page is None: + CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) + return "zeuz_failed" + + wait_download = 20 + target_path = "" + click_dataset = [] + for left, mid, right in data_set: + left_c = _compact(left) + mid_l = mid.strip().lower() + if left_c == "waitfordownload": + wait_download = float(right.strip()) + elif left_c in ("folderpath", "directory", "filepath", "file", "folder") and mid_l == "optional parameter": + target_path = CommonUtil.path_parser(right.strip()) + elif left_c == "automatefirefoxsavewindow": + continue + else: + click_dataset.append((left, mid, right)) + + locator = await PlaywrightLocator.Get_Element(click_dataset, current_page, frame_locator=_get_frame_locator()) + if locator == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Unable to locate your element with given data.", 3) + return "zeuz_failed" + + CommonUtil.ExecLog(sModuleInfo, f"Download started. Will wait max {wait_download} seconds...", 1) + async with current_page.expect_download(timeout=int(wait_download * 1000)) as download_info: + await locator.click() + download = await download_info.value + + if target_path: + parsed_path = Path(target_path) + if parsed_path.suffix: + parsed_path.parent.mkdir(parents=True, exist_ok=True) + save_path = parsed_path + else: + parsed_path.mkdir(parents=True, exist_ok=True) + save_path = parsed_path / download.suggested_filename + else: + save_path = Path(_download_folder()) / download.suggested_filename + await download.save_as(str(save_path)) + CommonUtil.ExecLog(sModuleInfo, f"File downloaded to '{save_path}'", 1) + return "passed" + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + + ######################### # # # Text Input # @@ -1044,7 +1537,7 @@ async def Enter_Text_In_Text_Box(step_data): text action my_username text playwright action text - Example 2 - With options: + Example 2 - With options (Selenium-compatible): Field Sub Field Value id element parameter username text action my_username @@ -1059,7 +1552,6 @@ async def Enter_Text_In_Text_Box(step_data): try: # Handle session parameter session_name, current_page, current_page_id, context, browser = await _handle_playwright_session(step_data) - if current_page is None: CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) return "zeuz_failed" @@ -1078,7 +1570,7 @@ async def Enter_Text_In_Text_Box(step_data): if mid_l == "optional parameter" and left_l == "session": continue - if mid_l == "action": + if "action" in mid_l: text_value = right # Don't strip - preserve whitespace elif mid_l == "optional parameter": if left_l == "delay": @@ -1092,38 +1584,78 @@ async def Enter_Text_In_Text_Box(step_data): locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": - CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + CommonUtil.ExecLog(sModuleInfo, "Unable to locate your element with given data.", 3) return "zeuz_failed" # Enter text based on options if use_js: - # Use JavaScript to set value directly - await locator.evaluate(f"el => {{ el.value = `{text_value}`; }}") - # Trigger events + # JS mode mirrors Selenium: click, set value, dispatch input/change events, click again. + try: + await locator.evaluate("el => el.click()") + except Exception: + CommonUtil.ExecLog(sModuleInfo, "Entering text without clicking the element", 2) + # Use JS template-literal so embedded quotes/newlines are preserved (matches Selenium). + escaped = text_value.replace("\\", "\\\\").replace("`", "\\`").replace("${", "\\${") + await locator.evaluate(f"el => {{ el.value = `{escaped}`; }}") await locator.dispatch_event("input") await locator.dispatch_event("change") - CommonUtil.ExecLog(sModuleInfo, f"Text entered via JS: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) - elif clear: - # fill() clears and sets value - recommended approach - fill_options = {} - if timeout: - fill_options["timeout"] = timeout - await locator.fill(text_value, **fill_options) - CommonUtil.ExecLog(sModuleInfo, f"Text filled: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) + try: + await locator.evaluate("el => el.click()") + except Exception: + pass + CommonUtil.ExecLog(sModuleInfo, f"Successfully set the value of to text to: {text_value}", 1) + return "passed" + + # Non-JS path: click first to focus (best-effort), clear if requested, then type/fill. + try: + await locator.click() + except Exception: + CommonUtil.ExecLog(sModuleInfo, "Entering text without clicking the element", 2) + + if clear: + try: + # Select-all + delete pattern matches Selenium clear logic across platforms. + if sys.platform == "darwin": + await locator.press("Meta+A") + else: + await locator.press("Control+A") + await locator.press("Delete") + except Exception: + pass + try: + # fill() always clears first; also handles inputs where Select-All didn't apply. + fill_options = {} + if timeout: + fill_options["timeout"] = timeout + if delay == 0: + await locator.fill(text_value, **fill_options) + else: + # Caller wants per-keystroke delay -> type after clearing. + type_options = {"delay": int(delay * 1000)} + if timeout: + type_options["timeout"] = timeout + await locator.type(text_value, **type_options) + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) else: - # type() appends to existing value type_options = {} if delay > 0: type_options["delay"] = int(delay * 1000) if timeout: type_options["timeout"] = timeout await locator.type(text_value, **type_options) - CommonUtil.ExecLog(sModuleInfo, f"Text typed: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) + # Some text fields become unclickable after entering text - best-effort click. + try: + await locator.click() + except Exception: + pass + + CommonUtil.ExecLog(sModuleInfo, f"Successfully set the value of to text to: {text_value}", 1) return "passed" except Exception: - return CommonUtil.Exception_Handler(sys.exc_info()) + return CommonUtil.Exception_Handler(sys.exc_info(), None, "Could not select/click your element.") @logger @@ -1171,7 +1703,7 @@ async def Keystroke_For_Element(step_data): mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "action": + if "action" in mid_l: if left_l == "keystroke keys": keystroke_type = "keys" keystroke_value = right_v @@ -1202,8 +1734,13 @@ async def Keystroke_For_Element(step_data): key_map = { "CTRL": "Control", "CONTROL": "Control", + "CMD": "Meta", + "COMMAND": "Meta", "ALT": "Alt", "SHIFT": "Shift", + "PLUS": "+", + "MINUS": "-", + "DASH": "-", "ENTER": "Enter", "RETURN": "Enter", "TAB": "Tab", @@ -1223,6 +1760,39 @@ async def Keystroke_For_Element(step_data): } if keystroke_type == "keys": + normalized_keystroke = keystroke_value.replace(" ", "").replace("_", "").lower() + if normalized_keystroke in ("ctrl+v", "control+v", "ctrlv", "controlv", "cmd+v", "cmdv", "command+v", "commandv"): + try: + import pyperclip + + paste_text = pyperclip.paste() + if has_element: + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) + if locator == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + return "zeuz_failed" + await locator.evaluate("""(el, text) => { + el.focus(); + const setter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set; + if (setter) setter.call(el, text); else el.value = text; + el.dispatchEvent(new Event('input', { bubbles: true })); + el.dispatchEvent(new Event('change', { bubbles: true })); + }""", paste_text) + else: + await current_page.evaluate("""text => { + const el = document.activeElement; + if (el && 'value' in el) { + const setter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set; + if (setter) setter.call(el, text); else el.value = text; + el.dispatchEvent(new Event('input', { bubbles: true })); + el.dispatchEvent(new Event('change', { bubbles: true })); + } + }""", paste_text) + CommonUtil.ExecLog(sModuleInfo, "Paste successfully executed via JavaScript with events", 1) + return "passed" + except Exception: + CommonUtil.ExecLog(sModuleInfo, "JavaScript paste execution failed. Trying keypress.", 2) + # Convert key names key = keystroke_value.upper() if "+" in key: @@ -1313,14 +1883,13 @@ async def Validate_Text(step_data): expected_text = "" partial_match = False case_insensitive = False - timeout = None for left, mid, right in step_data: left_l = left.strip().lower() mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "action": + if "action" in mid_l: if left_l.startswith("**"): partial_match = True case_insensitive = True @@ -1331,29 +1900,33 @@ async def Validate_Text(step_data): expected_text = right_v elif mid_l == "optional parameter": - if left_l == "timeout": - timeout = int(float(right_v) * 1000) + if left_l == "ignore case": + case_insensitive = _truthy(right_v) locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - # Get actual text - actual_text = await locator.text_content() or "" + # Get visible text, matching Selenium Element.text behavior more closely. + try: + actual_text = await locator.inner_text() or "" + except Exception: + actual_text = await locator.text_content() or "" + actual_lines = [line for line in actual_text.split("\n") if line != ""] # Compare match = False if case_insensitive: if partial_match: - match = expected_text.lower() in actual_text.lower() + match = any(expected_text.lower() in line.lower() for line in actual_lines) else: - match = expected_text.lower() == actual_text.lower() + match = expected_text.lower() in [line.lower() for line in actual_lines] else: if partial_match: - match = expected_text in actual_text + match = any(expected_text in line for line in actual_lines) else: - match = expected_text == actual_text + match = expected_text in actual_lines if match: CommonUtil.ExecLog(sModuleInfo, f"Text validation passed: '{expected_text}'", 1) @@ -1361,7 +1934,7 @@ async def Validate_Text(step_data): else: CommonUtil.ExecLog( sModuleInfo, - f"Text validation failed.\nExpected: '{expected_text}'\nActual: '{actual_text}'", + f"Text validation failed.\nExpected: '{expected_text}'\nActual: '{actual_lines}'", 3 ) return "zeuz_failed" @@ -1373,14 +1946,22 @@ async def Validate_Text(step_data): @logger async def if_element_exists(step_data): """ - Check if an element exists on the page. + Check whether an element exists (true/false). - Example: + Selenium-compatible form (writes the result to a shared variable, always returns "passed"): + Field Sub Field Value + id element parameter optional-element + if element exists playwright action true=my_flag + + - If found: shared variable my_flag is set to "true" + - If not found: shared variable my_flag is set to "false" + + Plain form (no save): Field Sub Field Value id element parameter optional-element if element exists playwright action if element exists - Returns "passed" if element exists, "zeuz_failed" if not. + - Returns "passed" if found, "zeuz_failed" if not. """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME global current_page @@ -1390,35 +1971,59 @@ async def if_element_exists(step_data): CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) return "zeuz_failed" + variable_name = "" + value = "" timeout = 1000 # Short timeout for existence check + for left, mid, right in step_data: left_l = left.strip().lower() mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "optional parameter" and left_l == "timeout": + if "action" in mid_l and "=" in right_v: + try: + value_part, var_part = right_v.split("=", 1) + value = value_part.strip() + variable_name = var_part.strip() + except ValueError: + pass + elif mid_l == "optional parameter" and left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = await PlaywrightLocator.Get_Element(step_data, current_page, element_wait=timeout/1000, frame_locator=_get_frame_locator()) + locator = await PlaywrightLocator.Get_Element( + step_data, + current_page, + element_wait=timeout / 1000, + frame_locator=_get_frame_locator(), + ) - if locator == "zeuz_failed": - CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) - return "zeuz_failed" + found = False + if locator != "zeuz_failed": + try: + if await locator.count() > 0: + found = True + except Exception: + found = False - try: - count = await locator.count() - if count > 0: - CommonUtil.ExecLog(sModuleInfo, f"Element exists ({count} found)", 1) - return "passed" - else: - CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) - return "zeuz_failed" - except Exception: - CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) - return "zeuz_failed" + if variable_name: + # Selenium-compatible: always returns "passed"; the truthiness lives in the variable. + sr.Set_Shared_Variables(variable_name, value if found else "false") + CommonUtil.ExecLog( + sModuleInfo, + f"Element {'found' if found else 'not found'} - saved '{value if found else 'false'}' to '{variable_name}'", + 1, + ) + return "passed" + + if found: + CommonUtil.ExecLog(sModuleInfo, "Element exists", 1) + return "passed" + CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) + return "zeuz_failed" except Exception: - return CommonUtil.Exception_Handler(sys.exc_info()) + errMsg = "Failed to parse data/locate element. Data format: variableName = value" + return CommonUtil.Exception_Handler(sys.exc_info(), None, errMsg) @logger @@ -1426,7 +2031,13 @@ async def Save_Attribute(step_data): """ Save an element's attribute value to a shared variable. - Example: + Selenium-compatible form (recommended): + Field Sub Field Value + id element parameter my-link + href save parameter my_variable + save attribute playwright action save attribute + + Alternative form (attribute via input parameter): Field Sub Field Value id element parameter my-link href input parameter attribute_name @@ -1434,13 +2045,15 @@ async def Save_Attribute(step_data): save attribute playwright action save attribute Special attribute names: - - text: Get text content - - innertext: Get inner text - - innerhtml: Get inner HTML - - outerhtml: Get outer HTML - - value: Get input value - - checked: Get checkbox state - - selected: Get select option state + - text: text content (Selenium .text) + - tag: tag name (Selenium .tag_name) + - checked: checkbox/radio selected state + - innertext: inner text + - innerhtml: inner HTML + - outerhtml: outer HTML + - value: input value + - selected: