import asyncio import functools import sys from typing import Optional from playwright.async_api import BrowserContext, Page from playwright.async_api import TimeoutError as PlaywrightTimeoutError from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt, wait_fixed) import config from base.base_crawler import AbstractLogin from cache.cache_factory import CacheFactory from tools import utils class DouYinLogin(AbstractLogin): def __init__(self, login_type: str, browser_context: BrowserContext, # type: ignore context_page: Page, # type: ignore login_phone: Optional[str] = "", cookie_str: Optional[str] = "" ): config.LOGIN_TYPE = login_type self.browser_context = browser_context self.context_page = context_page self.login_phone = login_phone self.scan_qrcode_time = 60 self.cookie_str = cookie_str async def begin(self): """ Start login douyin website 滑块中间页面的验证准确率不太OK... 如果没有特俗要求,建议不开抖音登录,或者使用cookies登录 """ # popup login dialog await self.popup_login_dialog() # select login type if config.LOGIN_TYPE == "qrcode": await self.login_by_qrcode() elif config.LOGIN_TYPE == "phone": await self.login_by_mobile() elif config.LOGIN_TYPE == "cookie": await self.login_by_cookies() else: raise ValueError("[DouYinLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...") # 如果页面重定向到滑动验证码页面,需要再次滑动滑块 await asyncio.sleep(6) current_page_title = await self.context_page.title() if "验证码中间页" in current_page_title: await self.check_page_display_slider(move_step=3, slider_level="hard") # check login state utils.logger.info(f"[DouYinLogin.begin] login finished then check login state ...") try: await self.check_login_state() except RetryError: utils.logger.info("[DouYinLogin.begin] login failed please confirm ...") sys.exit() # wait for redirect wait_redirect_seconds = 5 utils.logger.info(f"[DouYinLogin.begin] Login successful then wait for {wait_redirect_seconds} seconds redirect ...") await asyncio.sleep(wait_redirect_seconds) @retry(stop=stop_after_attempt(600), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False)) async def check_login_state(self): """Check if the current login status is successful and return True otherwise return False""" current_cookie = await self.browser_context.cookies() _, cookie_dict = utils.convert_cookies(current_cookie) for page in self.browser_context.pages: try: local_storage = await page.evaluate("() => window.localStorage") if local_storage.get("HasUserLogin", "") == "1": return True except Exception as e: # utils.logger.warn(f"[DouYinLogin] check_login_state waring: {e}") await asyncio.sleep(0.1) if cookie_dict.get("LOGIN_STATUS") == "1": return True return False async def popup_login_dialog(self): """If the login dialog box does not pop up automatically, we will manually click the login button""" dialog_selector = "xpath=//div[@id='login-pannel']" try: # check dialog box is auto popup and wait for 10 seconds await self.context_page.wait_for_selector(dialog_selector, timeout=1000 * 10) except Exception as e: utils.logger.error(f"[DouYinLogin.popup_login_dialog] login dialog box does not pop up automatically, error: {e}") utils.logger.info("[DouYinLogin.popup_login_dialog] login dialog box does not pop up automatically, we will manually click the login button") login_button_ele = self.context_page.locator("xpath=//p[text() = '登录']") await login_button_ele.click() await asyncio.sleep(0.5) async def login_by_qrcode(self): utils.logger.info("[DouYinLogin.login_by_qrcode] Begin login douyin by qrcode...") qrcode_img_selector = "xpath=//article[@class='web-login']//img" base64_qrcode_img = await utils.find_login_qrcode( self.context_page, selector=qrcode_img_selector ) if not base64_qrcode_img: utils.logger.info("[DouYinLogin.login_by_qrcode] login qrcode not found please confirm ...") sys.exit() partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img) asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode) await asyncio.sleep(2) async def login_by_mobile(self): utils.logger.info("[DouYinLogin.login_by_mobile] Begin login douyin by mobile ...") mobile_tap_ele = self.context_page.locator("xpath=//li[text() = '验证码登录']") await mobile_tap_ele.click() await self.context_page.wait_for_selector("xpath=//article[@class='web-login-mobile-code']") mobile_input_ele = self.context_page.locator("xpath=//input[@placeholder='手机号']") await mobile_input_ele.fill(self.login_phone) await asyncio.sleep(0.5) send_sms_code_btn = self.context_page.locator("xpath=//span[text() = '获取验证码']") await send_sms_code_btn.click() # 检查是否有滑动验证码 await self.check_page_display_slider(move_step=10, slider_level="easy") cache_client = CacheFactory.create_cache(config.CACHE_TYPE_MEMORY) max_get_sms_code_time = 60 * 2 # 最长获取验证码的时间为2分钟 while max_get_sms_code_time > 0: utils.logger.info(f"[DouYinLogin.login_by_mobile] get douyin sms code from redis remaining time {max_get_sms_code_time}s ...") await asyncio.sleep(1) sms_code_key = f"dy_{self.login_phone}" sms_code_value = cache_client.get(sms_code_key) if not sms_code_value: max_get_sms_code_time -= 1 continue sms_code_input_ele = self.context_page.locator("xpath=//input[@placeholder='请输入验证码']") await sms_code_input_ele.fill(value=sms_code_value.decode()) await asyncio.sleep(0.5) submit_btn_ele = self.context_page.locator("xpath=//button[@class='web-login-button']") await submit_btn_ele.click() # 点击登录 # todo ... 应该还需要检查验证码的正确性有可能输入的验证码不正确 break async def check_page_display_slider(self, move_step: int = 10, slider_level: str = "easy"): """ 检查页面是否出现滑动验证码 :return: """ # 等待滑动验证码的出现 back_selector = "#captcha-verify-image" try: await self.context_page.wait_for_selector(selector=back_selector, state="visible", timeout=30 * 1000) except PlaywrightTimeoutError: # 没有滑动验证码,直接返回 return gap_selector = 'xpath=//*[@id="captcha_container"]/div/div[2]/img[2]' max_slider_try_times = 20 slider_verify_success = False while not slider_verify_success: if max_slider_try_times <= 0: utils.logger.error("[DouYinLogin.check_page_display_slider] slider verify failed ...") sys.exit() try: await self.move_slider(back_selector, gap_selector, move_step, slider_level) await asyncio.sleep(1) # 如果滑块滑动慢了,或者验证失败了,会提示操作过慢,这里点一下刷新按钮 page_content = await self.context_page.content() if "操作过慢" in page_content or "提示重新操作" in page_content: utils.logger.info("[DouYinLogin.check_page_display_slider] slider verify failed, retry ...") await self.context_page.click(selector="//a[contains(@class, 'secsdk_captcha_refresh')]") continue # 滑动成功后,等待滑块消失 await self.context_page.wait_for_selector(selector=back_selector, state="hidden", timeout=1000) # 如果滑块消失了,说明验证成功了,跳出循环,如果没有消失,说明验证失败了,上面这一行代码会抛出异常被捕获后继续循环滑动验证码 utils.logger.info("[DouYinLogin.check_page_display_slider] slider verify success ...") slider_verify_success = True except Exception as e: utils.logger.error(f"[DouYinLogin.check_page_display_slider] slider verify failed, error: {e}") await asyncio.sleep(1) max_slider_try_times -= 1 utils.logger.info(f"[DouYinLogin.check_page_display_slider] remaining slider try times: {max_slider_try_times}") continue async def move_slider(self, back_selector: str, gap_selector: str, move_step: int = 10, slider_level="easy"): """ Move the slider to the right to complete the verification :param back_selector: 滑动验证码背景图片的选择器 :param gap_selector: 滑动验证码的滑块选择器 :param move_step: 是控制单次移动速度的比例是1/10 默认是1 相当于 传入的这个距离不管多远0.1秒钟移动完 越大越慢 :param slider_level: 滑块难度 easy hard,分别对应手机验证码的滑块和验证码中间的滑块 :return: """ # get slider background image slider_back_elements = await self.context_page.wait_for_selector( selector=back_selector, timeout=1000 * 10, # wait 10 seconds ) slide_back = str(await slider_back_elements.get_property("src")) # type: ignore # get slider gap image gap_elements = await self.context_page.wait_for_selector( selector=gap_selector, timeout=1000 * 10, # wait 10 seconds ) gap_src = str(await gap_elements.get_property("src")) # type: ignore # 识别滑块位置 slide_app = utils.Slide(gap=gap_src, bg=slide_back) distance = slide_app.discern() # 获取移动轨迹 tracks = utils.get_tracks(distance, slider_level) new_1 = tracks[-1] - (sum(tracks) - distance) tracks.pop() tracks.append(new_1) # 根据轨迹拖拽滑块到指定位置 element = await self.context_page.query_selector(gap_selector) bounding_box = await element.bounding_box() # type: ignore await self.context_page.mouse.move(bounding_box["x"] + bounding_box["width"] / 2, # type: ignore bounding_box["y"] + bounding_box["height"] / 2) # type: ignore # 这里获取到x坐标中心点位置 x = bounding_box["x"] + bounding_box["width"] / 2 # type: ignore # 模拟滑动操作 await element.hover() # type: ignore await self.context_page.mouse.down() for track in tracks: # 循环鼠标按照轨迹移动 # steps 是控制单次移动速度的比例是1/10 默认是1 相当于 传入的这个距离不管多远0.1秒钟移动完 越大越慢 await self.context_page.mouse.move(x + track, 0, steps=move_step) x += track await self.context_page.mouse.up() async def login_by_cookies(self): utils.logger.info("[DouYinLogin.login_by_cookies] Begin login douyin by cookie ...") for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items(): await self.browser_context.add_cookies([{ 'name': key, 'value': value, 'domain': ".douyin.com", 'path': "/" }])