class SeleniumUtil():\n def init(self, driver):\n self.driver = driver\n\n def safe_get_elements_(self, by, xpath, err):\n els = self.driver.find_elements(by=by, value=xpath)\n if not els:\n raise SpiderException(err, xpath)\n return els[0]\n\n def get_page(self, url, sleep=1):\n self.driver.get(url)\n time.sleep(sleep)\n\n def input_with_xpath(self, by, xpath, keys, err):\n element = self.safe_get_elements_(by, xpath, err)\n return element.send_keys(keys)\n\n def click_with_xpath(self, by, xpath, err_msg, front_sleep=3):\n time.sleep(front_sleep)\n element = self.safe_get_elements_(by, xpath, err_msg)\n if element.is_enabled():\n return element.click()\n else:\n raise SpiderException(err_msg, xpath)\n\n def check_logging_with(self ,by, xpath):\n error_element = self.driver.find_elements(by, xpath)\n for err in error_element:\n error_message = err.text\n if error_message:\n raise SpiderException(error_message, xpath=xpath)\n\n current_url = self.driver.current_url\n login_url = LOGIN_URL\n if current_url == login_url:\n raise SpiderException(LogErrorMsg.LOGIN_PAGE_JUMP_FAILED, xpath=xpath)\n\n logout = self.driver.find_elements(by, BUTTON_LOGOUT_ELEMENT)\n if not logout:\n raise SpiderException(LogErrorMsg.LOGIN_PAGE_NOT_REFRESHED, xpath=xpath)\n\n\nclass BaseSpider(ABC):\n\n def init(self):\n self.driver = None\n self.browser_ = None\n self.records = list()\n self.now_time = datetime.datetime.now()\n\n def set_driver(self, driver):\n self.driver = driver\n self.browser_ = SeleniumUtil(self.driver)\n\n def del(self):\n if self.driver:\n self.driver.quit()\n\n def add_record(self, urn, class_name, operate, exception, ele_message):\n self.records.extend([self.now_time, urn, class_name, operate, exception, ele_message])\n QueryResult.insert_data(INSERT_OPERATION_LOG, self.records)\n self.records.clear()\n\n def get_search_url(self, url):\n self.browser_.get_page(url)\n\n def check_login(self):\n self.browser_.get_page(LOGIN_URL)\n try:\n self.browser_.input_with_xpath(By.NAME, TEXTBOX_LOGIN_ELEMENT, settings['dfm_username'],\n ErrorMsg.TEXT_BOX_NOT_DISPLAYED)\n self.browser_.input_with_xpath(By.NAME, TEXTBOX_PWD_ELEMENT, \n generate_utils.transform_str(settings['dfm_pwd']),\n ErrorMsg.TEXT_BOX_NOT_DISPLAYED)\n self.browser_.click_with_xpath(By.NAME, BUTTON_LOGIN_ELEMENT, \n LogErrorMsg.LOGIN_CLICK_BUTTON_NOT_DISPLAYED)\n self.browser_.check_logging_with(By.XPATH, CUSTOM_LOGIN_ELEMENT)\n return True\n\n except SpiderException as e:\n log_error(e)\n self.add_record(False, self.class.name, Operate.LOGIN.value, e.message, e.xpath)\n return False\n\n\nclass EmailSpider(BaseSpider):\n\n def init(self):\n BaseSpider.init(self)\n\n def get_advanced_and_ingestion(self):\n self.get_search_url(SEARCH_PAGE)\n self.browser_.click_with_xpath(By.LINK_TEXT, ADVANCED_SEARCH_ELEMENT, \n EmailErrorMsg.ADVANCED_SEARCH_NOR_DISPLAYED)\n self.browser_.click_with_xpath(By.LINK_TEXT, EMAIL_INGESTION_DATA_ELEMENT, \n EmailErrorMsg.INGESTION_DATA_NOR_DISPLAYED)\n\n\nclass DFMProcessor(BaseProcessor):\n def init(self):\n BaseProcessor.init(self)\n self.name = 'DFMProcessor'\n self.lock = ''\n self.driver = ''\n self.display = settings['CHROME_DISPLAY']\n self.baseSpider = None\n\n @abstractmethod\n def get_spider(self):\n pass\n\n def process(self):\n self.baseSpider = self.get_spider()\n self.baseSpider.get_advanced_and_ingestion()\n\n def get_download_path(self):\n return ''\n\n def start(self):\n try:\n self.lock.acquire()\n logger.info(r" >>> 开始执行 {}".format(self.name))\n self.driver = get_chrome_driver(self.get_download_path(), display=self.display)\n self.baseSpider.set_driver(self.driver)\n status = self.baseSpider.check_login()\n if status:\n self.process()\n logger.info(r" >>> 完成执行 {}".format(self.name))\n except Exception as e:\n log_error(e)\n finally:\n self.lock.release()\n\n\n@singleton\nclass EmailIngestionProcessor(DFMProcessor):\n def init(self):\n DFMProcessor.init(self)\n self.lock = threading.Lock()\n self.name = "EmailIngestionProcessor"\n self.emailSpider = None\n\n def get_spider(self):\n self.emailSpider = EmailSpider()\n return self.emailSpider\n\n def get_download_path(self):\n cur_date = get_strtime('%Y-%m-%d')\n download_path = os.path.join(settings['email_ingestion_path'], cur_date)\n if not os.path.exists(download_path):\n os.makedirs(download_path)\n return download_path\n\n\ndef run():\n EmailIngestionProcessor().start()\n\n\nif name == 'main':\n run()

SeleniumUtil 和 BaseSpider 类代码优化:简化 Driver 管理和错误处理

原文地址: https://www.cveoy.top/t/topic/qeNM 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录