123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # File : asyncSnifferPro.py
- # Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
- # Date : 2024/3/28
- # desc 利用playwright实现的简易播放地址嗅探器,异步高性能
- from playwright.async_api import async_playwright, Playwright
- import re
- import os
- from urllib.parse import urlparse
- from time import time
- import asyncio
- _description = r"""
- pip install playwright
- 手动安装谷歌浏览器即可,不需要playwright install,因为它自带的三个浏览器都太垃圾了不好用
- 参考官方接口文档
- https://playwright.dev/python/docs/intro
- https://playwright.dev/python/docs/api/class-playwright
- """
- # 储存驱动器列表,给接口缓存用
- browser_drivers = []
- # 全部毫秒为单位不需要转换
- class Sniffer:
- # 正则嗅探匹配表达式
- urlRegex: str = 'http((?!http).){12,}?\\.(m3u8|mp4|flv|avi|mkv|rm|wmv|mpg|m4a|mp3)\\?.*|http((?!http).){12,}\\.(m3u8|mp4|flv|avi|mkv|rm|wmv|mpg|m4a|mp3)|http((?!http).)*?video/tos*'
- urlNoHead: str = 'http((?!http).){12,}?(ac=dm&url=)'
- # 每次嗅探间隔毫秒
- playwright = None
- browser = None
- main_page = None
- context = None
- requests = None
- user_agent: str = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.50 Safari/537.36'
- pages = []
- web_timeout = 15000 # 获取页面源码不超过10秒
- sniffer_timeout = 20000 # 嗅探网页不超20秒
- wait_timeout = 3000 # 等待时间不超过3秒
- def __init__(self,
- timeout=10000, head_timeout=200, user_agent=None,
- custom_regex=None, headless=True, debug=False, use_chrome=True, is_pc=False):
- """
- 初始化
- @param timeout: 全局嗅探超时
- @param head_timeout: head访问超时
- @param user_agent: 默认请求头
- @param custom_regex: 自定义嗅探正则
- @param headless: 无头模式
- @param debug: 启用debug打印日志
- """
- if user_agent is not None:
- self.user_agent = user_agent
- self.timeout = timeout
- self.head_timeout = head_timeout
- self.debug = debug
- self.custom_regex = custom_regex
- self.headless = headless
- self.channel = "chrome" if use_chrome else None
- self.is_pc = is_pc
- def log(self, *args):
- """
- 打印日志print函数|根据类的实例化是否传入debug=True进行开启打印
- @param args:
- @return:
- """
- if self.debug:
- print(*args)
- async def __aenter__(self):
- # 在进入上下文管理器时调用异步函数
- print('在进入上下文管理器时调用异步函数')
- self.browser = await self.init_browser()
- return self
- async def __aexit__(self, exc_type, exc_value, traceback):
- # 在上下文管理器退出时执行清理工作
- print('在上下文管理器退出时执行清理工作')
- pass
- async def init_browser(self):
- """
- 初始化驱动程序
- @return:
- """
- self.playwright = await async_playwright().start()
- # 用手动安装的chrome浏览器。不用它自带的三个垃圾浏览器
- browser = await self.playwright.chromium.launch(channel=self.channel, headless=self.headless)
- # print(self.playwright.devices)
- if not self.is_pc:
- # 模拟使用苹果手机
- iphone = self.playwright.devices["iPhone 14 Pro"]
- context = await browser.new_context(**iphone)
- else:
- context = await browser.new_context()
- # 开启一个主窗口方便后续的page新开和关闭不会退出程序
- self.main_page = await context.new_page()
- # 上下文自带的request库,跟requests库有点类似,但是用法也有差别
- self.requests = context.request
- # return browser
- return context
- async def setCookie(self, page, cookie=''):
- """
- 设置cookie。可以在嗅探前或者获取源码前设置
- @param _dict:
- @return:
- """
- await page.set_extra_http_headers(headers={'Cookie': cookie})
- @staticmethod
- async def _route_interceptor(route):
- """
- 全局路由拦截器,禁止加载某些资源
- @param route:
- @return:
- """
- excluded_resource_types = ["stylesheet", "image", "font"]
- resource_type = route.request.resource_type
- # print(resource_type)
- if resource_type in excluded_resource_types:
- # print('禁止加载资源:', excluded_resource_types, route.request.url, route.request.resource_type)
- await route.abort()
- else:
- await route.continue_()
- @staticmethod
- async def _on_dialog(dialog):
- """
- 全局弹窗拦截器
- @param dialog:
- @return:
- """
- # print('on_dialog:', dialog)
- await dialog.accept()
- @staticmethod
- async def _on_pageerror(error):
- """
- 全局页面请求错误拦截器
- @param error:
- @return:
- """
- # print('on_pageerror:', error)
- pass
- async def _get_page(self, headers=None):
- """
- 新建一个页面。注入好相关依赖
- @param headers:
- @return:
- """
- page = await self.browser.new_page()
- # 设置全局导航超时
- page.set_default_navigation_timeout(self.timeout)
- # 设置全局等待超时
- page.set_default_timeout(self.timeout)
- # 添加初始化脚本 提高速度并且过无法播放的验证
- await page.add_init_script(path=os.path.join(os.path.dirname(__file__), './stealth.min.js'))
- await page.add_init_script(path=os.path.join(os.path.dirname(__file__), './devtools.js'))
- # 屏蔽控制台监听器 https://cdn.staticfile.net/devtools-detector/2.0.14/devtools-detector.min.js
- await page.route(re.compile(r"devtools-detector.*\.js$"), lambda route: route.abort())
- # 设置请求头
- if headers is not None:
- await page.set_extra_http_headers(headers=headers)
- else:
- await page.set_extra_http_headers(headers={'user-agent': self.user_agent})
- # 打开静态资源拦截器
- await page.route(re.compile(r"\.(png|jpg|jpeg|css|ttf)$"), self._route_interceptor)
- # await page.route(re.compile(r"\.(png|jpg|jpeg|ttf)$"), self._route_interceptor)
- # 打开弹窗拦截器
- page.on("dialog", self._on_dialog)
- # 打开页面错误监听
- page.on("pageerror", self._on_pageerror)
- # page.set_viewport_size({"width": 360, "height": 540})
- # 加入页面列表
- self.pages.append(page)
- return page
- @staticmethod
- def remove_element(array, element):
- """
- 移除列表指定元素
- @param array:
- @param element:
- @return:
- """
- new_array = [x for x in array if x != element]
- return new_array
- async def close_page(self, page):
- """
- 移除页面储存列表的指定page
- @param page:
- @return:
- """
- self.pages = self.remove_element(self.pages, page)
- await page.close()
- self.log('成功关闭page')
- async def close(self):
- """
- 用完记得关闭驱动器
- @return:
- """
- await self.main_page.close()
- await self.browser.close()
- await self.playwright.stop()
- async def fetCodeByWebView(self, url, headers=None, timeout=None, is_pc=False, css=None, script=None):
- """
- 利用webview请求得到渲染完成后的源码
- @param url: 待获取源码的网页链接
- @param headers: 访问网页的浏览器自定义请求头
- @param timeout: 访问网页的超时毫秒数
- @param is_pc: 是否用PC.此配置不生效。默认手机访问
- @param css: 等待出现定位器|如果不传css就等待加载页面状态为load
- @param script: 页面状态加载完毕后执行的网页脚本,可以点击网页元素之类的
- @return:
- """
- t1 = time()
- if timeout is None:
- timeout = self.timeout
- else:
- timeout = min([timeout, self.web_timeout])
- if not is_pc:
- is_pc = self.is_pc
- do_css = str(css).strip() if css and str(css).strip() else False
- page = await self._get_page(headers)
- # 设置全局导航超时
- page.set_default_navigation_timeout(timeout)
- # 设置全局等待超时
- page.set_default_timeout(timeout)
- response = {'content': '', 'headers': {'location': url}}
- try:
- await page.goto(url)
- except Exception as e:
- self.log(f'发生了错误:{e}')
- else:
- if do_css:
- await page.wait_for_selector(do_css)
- else:
- await page.wait_for_load_state('load')
- if script:
- try:
- await page.evaluate("""(script) => {
- eval(script);
- }
- """, script)
- self.log(f'网页加载完成后成功执行脚本:{script}')
- except Exception as e:
- self.log(f'网页加载完成后执行脚本:{script}发生错误:{e}')
- response['content'] = await page.content()
- response['headers']['location'] = page.url
- t2 = time()
- cost = round((t2 - t1) * 1000, 2)
- response['cost'] = cost
- await self.close_page(page)
- return response
- async def snifferMediaUrl(self, playUrl, mode=0, custom_regex=None, timeout=None, css=None, is_pc=False,
- headers=None,
- script=None):
- """
- 输入播放地址,返回嗅探到的真实视频链接
- @param playUrl: 待嗅探的视频播放也地址
- @param mode: 模式:0 嗅探到一个就返回 1:在10秒内嗅探所有的返回列表
- @param custom_regex: 自定义嗅探正则
- @param timeout: 超时
- @param css: 等待出现定位器|如果不传css并且传了script就等待加载页面状态为load
- @param is_pc: 是否用PC.此配置不生效。默认手机访问
- @param headers: 访问网页的浏览器自定义请求头
- @param script: 页面状态加载完毕后执行的网页脚本,可以点击网页元素之类的
- @return:
- """
- t1 = time()
- if custom_regex is None:
- custom_regex = self.custom_regex
- if not is_pc:
- is_pc = self.is_pc
- do_css = str(css).strip() if css and str(css).strip() else False
- realUrls = [] # 真实链接列表,用于mode=1场景
- headUrls = [] # 已经head请求过的链接
- page = await self._get_page(headers)
- if timeout is None:
- timeout = self.timeout
- else:
- if mode == 1: # 嗅探所有超时设定在10s内
- timeout = min([timeout, self.timeout])
- else:
- # 其他单个自定义时间不得超过20s
- timeout = min([timeout, self.sniffer_timeout])
- async def _on_request(request):
- nonlocal realUrls, headUrls
- url = request.url
- method = request.method
- headers = request.headers
- resource_type = request.resource_type
- self.log('on_request:', url, ' method:', method, ' resource_type:', resource_type)
- if custom_regex and re.search(custom_regex, url, re.M | re.I):
- _headers = {}
- if headers.get('referer'):
- _headers['referer'] = headers['referer']
- if headers.get('user-agent'):
- _headers['user-agent'] = headers['user-agent']
- realUrls.append({
- 'url': url,
- 'headers': _headers
- })
- await page.evaluate("""([url, _headers,realUrls]) => {
- window.realUrl = url
- window.realHeaders = _headers
- window.realUrls = realUrls
- }
- """, [url, _headers, realUrls])
- self.log('on_request通过custom_regex嗅探到真实地址:', url)
- if mode == 0:
- page.remove_listener("request", _on_request)
- return True
- if re.search(self.urlRegex, url, re.M | re.I):
- if url.find('url=http') < 0 and url.find('v=http') < 0 and url.find('.css') < 0 and url.find(
- '.html') < 0:
- _headers = {}
- if headers.get('referer'):
- _headers['referer'] = headers['referer']
- if headers.get('user-agent'):
- _headers['user-agent'] = headers['user-agent']
- realUrls.append({
- 'url': url,
- 'headers': _headers
- })
- await page.evaluate("""([url, _headers,realUrls]) => {
- window.realUrl = url
- window.realHeaders = _headers
- window.realUrls = realUrls
- }
- """, [url, _headers, realUrls])
- self.log('on_request通过默认正则已嗅探到真实地址:', url)
- if mode == 0:
- page.remove_listener("request", _on_request)
- return True
- elif str(method).lower() == 'get' and str(url).startswith('http') and url != playUrl:
- parsed_url = urlparse(url)
- path = parsed_url.path
- filename = str(path.split('/')[-1])
- # 链接不含.并且正则匹配不在不head列表 或者 链接有.但是.后面没内容,也算空后缀
- if (filename and '.' not in filename and not re.search(self.urlNoHead, url, re.M | re.I)) or (
- '.' in filename and len(filename) > 1 and not filename.split('.')[1]) and resource_type not in [
- 'script']:
- # 如果链接没有进行过head请求。防止多次嗅探的时候重复去head请求
- if url not in headUrls:
- try:
- r = await self.requests.head(url=url, timeout=self.head_timeout)
- rheaders = r.headers
- if rheaders.get('content-type') and rheaders[
- 'content-type'] == 'application/octet-stream' and '.m3u8' in rheaders[
- 'content-disposition']:
- _headers = {}
- if headers.get('referer'):
- _headers['referer'] = headers['referer']
- if headers.get('user-agent'):
- _headers['user-agent'] = headers['user-agent']
- realUrls.append({
- 'url': url,
- 'headers': _headers
- })
- await page.evaluate("""([url, _headers,realUrls]) => {
- window.realUrl = url
- window.realHeaders = _headers
- window.realUrls = realUrls
- }
- """, [url, _headers, realUrls])
- self.log('on_request通过head请求嗅探到真实地址:', url)
- if mode == 0:
- page.remove_listener("request", _on_request)
- return True
- except Exception as e:
- print(f'head请求访问: {url} 发生了错误:{e}')
- headUrls.append(url)
- page.on('request', _on_request)
- # 设置全局导航超时
- page.set_default_navigation_timeout(timeout)
- # 设置全局等待超时
- page.set_default_timeout(timeout)
- await page.expose_function("log", lambda *args: print(*args))
- await page.add_init_script(path=os.path.join(os.path.dirname(__file__), './preload.js'))
- await page.evaluate("""
- window.realUrl = ''
- window.realHeaders = {}
- window.realUrls = []
- """)
- try:
- await page.goto(playUrl)
- except Exception as e:
- self.log('嗅探发生错误:', e)
- t2 = time()
- cost = round((t2 - t1) * 1000, 2)
- return {'url': '', 'headers': {}, 'from': playUrl, 'cost': cost, 'code': 404,
- 'msg': f'嗅探失败:{e}'}
- # 这里不需要另外分支去判断状态为load,因为嗅探无需等待页面加载完毕。异步就行。一般也不传css
- if do_css:
- await page.wait_for_selector(do_css)
- if script:
- if not do_css:
- await page.wait_for_load_state('load')
- try:
- await page.evaluate("""(script) => {
- eval(script);
- }
- """, script)
- self.log(f'网页加载完成后成功执行脚本:{script}')
- except Exception as e:
- self.log(f'网页加载完成后执行脚本:{script}发生错误:{e}')
- is_timeout = False
- if mode == 0:
- try:
- await page.wait_for_function("() => window.realUrl")
- except:
- is_timeout = True
- elif mode == 1:
- try:
- await page.wait_for_timeout(timeout)
- except:
- is_timeout = True
- realUrl = await page.evaluate('window.realUrl')
- realHeaders = await page.evaluate('window.realHeaders')
- realUrls = await page.evaluate('window.realUrls')
- t2 = time()
- cost = round((t2 - t1) * 1000, 2)
- cost_str = f'{cost} ms'
- self.log(f'共计耗时{cost}毫秒|{"已超时" if is_timeout else "未超时"}')
- self.log('realUrl:', realUrl)
- self.log('realHeaders:', realHeaders)
- await self.close_page(page)
- if mode == 0 and realUrl:
- return {'url': realUrl, 'headers': realHeaders, 'from': playUrl, 'cost': cost_str, 'code': 200,
- 'msg': '超级嗅探解析成功'}
- elif mode == 1 and realUrls:
- return {'urls': realUrls, 'code': 200, 'from': playUrl, 'cost': cost_str, 'msg': '超级嗅探解析成功'}
- else:
- return {'url': realUrl, 'headers': realHeaders, 'from': playUrl, 'cost': cost_str, 'code': 404,
- 'msg': '超级嗅探解析失败'}
- async def main_test():
- t1 = time()
- urls = [
- # 'https://www.cs1369.com/play/2-1-94.html',
- # 'https://m.ting13.cc/play/19176_1_91258.html',
- 'https://v.qq.com/x/page/i3038urj2mt.html',
- 'http://www.mgtv.com/v/1/290346/f/3664551.html',
- 'https://jx.jsonplayer.com/player/?url=https://m.iqiyi.com/v_1pj3ayb1n70.html',
- 'https://jx.yangtu.top/?url=https://m.iqiyi.com/v_1pj3ayb1n70.html',
- ]
- _count = 0
- async with Sniffer(debug=True, headless=True) as browser:
- # 在这里,async_func已被调用并已完成
- pass
- for url in urls:
- _count += 1
- ret = await browser.snifferMediaUrl(url, timeout=15000)
- print(ret)
- _count += 1
- ret = await browser.snifferMediaUrl('https://jx.yangtu.top/?url=https://m.iqiyi.com/v_1pj3ayb1n70.html',
- custom_regex='http((?!http).){12,}?(download4|pcDownloadFile)')
- print(ret)
- # _count += 1
- # ret = await browser.fetCodeByWebView('https://www.freeok.pro/xplay/63170-8-12.html')
- # print(ret)
- await browser.close()
- t2 = time()
- print(f'嗅探{_count}个页面共计耗时:{round(t2 - t1, 2)}s')
- if __name__ == '__main__':
- asyncio.run(main_test())
|