sniffer2.py 14 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # File : sniffer2.py
  4. # Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
  5. # Author's Blog: https://blog.csdn.net/qq_32394351
  6. # Date : 2024/3/24
  7. # 容器版selenium
  8. # https://codecopy.cn/post/i7re9p
  9. # desc 利用selenium实现的简易播放地址嗅探器
  10. # webdriver_manager 各个浏览器使用案例 https://blog.csdn.net/caixiangting/article/details/132049306
  11. import ujson
  12. from urllib.parse import urlparse
  13. from time import time, sleep
  14. from selenium import webdriver
  15. from selenium.webdriver.chrome.options import Options
  16. from selenium.webdriver.chrome.service import Service as ChromeService
  17. from webdriver_manager.chrome import ChromeDriverManager
  18. from webdriver_manager.microsoft import EdgeChromiumDriverManager
  19. import re
  20. import requests
  21. # 储存驱动器列表,给接口缓存用
  22. browser_drivers = []
  23. class Sniffer:
  24. # 正则嗅探匹配表达式
  25. urlRegex: str = 'http((?!http).){12,}?\\.(m3u8|mp4|flv|avi|mkv|rm|wmv|mpg|m4a|mp3)\\?.*|http((?!http).){12,}\\.(m3u8|mp4|flv|avi|mkv|rm|wmv|mpg|m4a|mp3)|http((?!http).)*?video/tos*'
  26. urlNoHead: str = 'http((?!http).){12,}?(ac=dm&url=)'
  27. # 每次嗅探间隔毫秒
  28. delta: int = 250
  29. def __init__(self,
  30. driver_path=None,
  31. _type=0,
  32. wait=5,
  33. head_timeout=200,
  34. timeout=10000, user_agent=None, custom_regex=None):
  35. """
  36. 初始化
  37. @param driver_path: 驱动器路径
  38. @param _type: 使用的浏览器 0:谷歌 1:edge
  39. @param wait:默认等待页面时间
  40. @param head_timeout:head请求超时
  41. @param timeout:嗅探超时
  42. @param user_agent:请求头
  43. @param custom_regex: 自定义嗅探正则
  44. """
  45. if driver_path is None:
  46. driver_path = r'C:\Users\dashen\.wdm\drivers\chromedriver\win64\123.0.6312.58\chromedriver-win32/chromedriver.exe'
  47. if user_agent is None:
  48. user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.50 Safari/537.36'
  49. options = webdriver.ChromeOptions()
  50. # 无痕模式
  51. options.add_argument('--incognito')
  52. # 开启性能监听
  53. options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})
  54. options.add_experimental_option('perfLoggingPrefs', {'enableNetwork': True})
  55. # 忽略证书错误
  56. options.add_argument("--ignore-certificate-errors")
  57. # 禁止加载图片
  58. options.add_argument("--blink-settings=imagesEnabled=false")
  59. # 禁用不安全的外链
  60. options.add_argument("--no-displaying-insecure-content")
  61. # 跳过首次运行检查
  62. options.add_argument("--no-first-run")
  63. # 不做浏览器默认检查
  64. options.add_argument("no-default-browser-check")
  65. # 禁用扩展
  66. options.add_argument("--disable-extensions")
  67. # 允许Https加载http内容
  68. options.add_argument("--allow-running-insecure-content")
  69. # 规避自动化检测
  70. options.add_experimental_option('excludeSwitches', ['enable-logging', 'enable-automation'])
  71. # 规避滑块检测
  72. options.add_argument('--disable-blink-features=AutomationControlled')
  73. # 解决加载速度慢的问题
  74. options.page_load_strategy = 'none'
  75. # 模拟手机
  76. # mobile_emulation = {'deviceName': 'iPhone 12 Pro'}
  77. # options.add_experimental_option('mobileEmulation', mobile_emulation)
  78. # 启动时全屏
  79. # options.add_argument("--start-maximized")
  80. options.add_argument("profile-directory={profile}")
  81. # 不使用GPU,有的机器不支持GPU
  82. options.add_argument('--disable-gpu')
  83. # 使用无头模式,无 GUI的Linux服务器必须添加
  84. options.add_argument('--no-sandbox')
  85. options.add_argument('--disable-dev-shm-usage')
  86. # options.add_argument("--headless")
  87. # options.add_argument("--remote-debugging-port=9222")
  88. # 使用代理
  89. options.add_argument('--proxy-server=http://127.0.0.1:7890')
  90. # 使用UA
  91. options.add_argument(f'user-agent={user_agent}')
  92. self.options = options
  93. self.wait = wait
  94. self.timeout = timeout
  95. self.head_timeout = head_timeout
  96. self.driver_path = driver_path
  97. self._type = _type
  98. self.custom_regex = custom_regex
  99. self.driver = self.init_driver()
  100. @classmethod
  101. def get_driver_path(cls, _type=0):
  102. """
  103. 智能化获取驱动路径
  104. @return:
  105. """
  106. driver_path = None
  107. if _type == 0:
  108. driver_path = ChromeDriverManager().install()
  109. elif _type == 1:
  110. driver_path = EdgeChromiumDriverManager().install()
  111. return driver_path
  112. def init_driver(self):
  113. """
  114. 初始化驱动程序
  115. @return:
  116. """
  117. _driver = None
  118. driver = None
  119. # if self._type == 0:
  120. # _driver = webdriver.Chrome
  121. # elif self._type == 1:
  122. # _driver = webdriver.Edge
  123. # if _driver:
  124. # if self.driver_path == 'auto':
  125. # self.driver_path = self.get_driver_path(self._type)
  126. if self.driver_path:
  127. # service = ChromeService(self.driver_path)
  128. # driver = _driver(service=service, options=self.options)
  129. driver = webdriver.Remote(command_executor=self.driver_path, options=self.options)
  130. driver.implicitly_wait(5) # 隐式等待时间
  131. # 设置窗口大小
  132. # driver.set_window_size(1, 0)
  133. # 设置要屏蔽的URL
  134. # driver.execute_cdp_cmd('Network.setBlockedURLs',
  135. # {"urls": ["*.googleapis.com", "www.google-analytics.com", "*.facebook.net"]})
  136. return driver
  137. def setCookie(self, _dict):
  138. """
  139. 设置cookie。可以在嗅探前或者获取源码前设置
  140. @param _dict:
  141. @return:
  142. """
  143. self.driver.add_cookie(_dict)
  144. def fetCodeByWebView(self, url):
  145. """
  146. 利用webview请求得到渲染完成后的源码
  147. @param url: 待获取源码的url
  148. @return:
  149. """
  150. self.driver.get(url)
  151. content = self.driver.page_source
  152. url = self.driver.current_url
  153. return {'content': content, 'headers': {'location': url}}
  154. def snifferMediaUrl(self, playUrl, mode=0, custom_regex=None, timeout=None):
  155. """
  156. 输入播放地址,返回嗅探到的真实视频链接
  157. @param playUrl: 播放网页地址
  158. @param mode: 模式:0 嗅探到一个就返回 1:在10秒内嗅探所有的返回列表
  159. @param custom_regex: 自定义嗅探正则
  160. @return:
  161. """
  162. if custom_regex is None:
  163. custom_regex = self.custom_regex
  164. realUrl = ''
  165. realUrls = []
  166. realHeaders = {}
  167. headUrls = []
  168. t1 = time()
  169. if timeout is None:
  170. timeout = self.timeout
  171. cost = 0
  172. # 必须这行代码,配置最后的设置about:blank防止串数据
  173. # self.driver.execute_cdp_cmd('Network.enable', {})
  174. # self.driver.execute_script(f"window.open('{playUrl}')")
  175. # handles = self.driver.window_handles
  176. # self.driver.switch_to.window(handles[-1])
  177. # # 获取主窗口句柄
  178. # main_window = self.driver.current_window_handle
  179. print(playUrl)
  180. self.driver.get(playUrl)
  181. while cost < self.timeout and (not realUrl or mode == 1):
  182. messages = []
  183. urls = []
  184. # 获取性能数据
  185. performance_logs = self.driver.get_log('performance')
  186. for entry in performance_logs:
  187. # 获取message的数据
  188. message = ujson.loads(entry.get('message')).get('message')
  189. if message.get('params') and message['params'].get('request'):
  190. messages.append(message)
  191. url = message['params']['request']['url']
  192. method = message['params']['request']['method']
  193. headers = message['params']['request']['headers']
  194. urls.append(url)
  195. if str(method).lower() == 'get' and str(url).startswith('http') and url != playUrl:
  196. parsed_url = urlparse(url)
  197. path = parsed_url.path
  198. filename = str(path.split('/')[-1])
  199. # 链接不含.并且正则匹配不在不head列表 或者 链接有.但是.后面没内容,也算空后缀
  200. if (filename and '.' not in filename and not re.search(self.urlNoHead, url, re.M | re.I)) or (
  201. '.' in filename and len(filename) > 1 and not filename.split('.')[1]):
  202. # 如果链接没有进行过head请求。防止多次嗅探的时候重复去head请求
  203. if url not in headUrls:
  204. try:
  205. r = requests.head(url=url, headers=headers,
  206. timeout=round(self.head_timeout / 1000, 2))
  207. rheaders = r.headers
  208. if rheaders.get('Content-Type') and rheaders[
  209. 'Content-Type'] == 'application/octet-stream' and '.m3u8' in rheaders[
  210. 'Content-Disposition']:
  211. realUrl = url
  212. if headers.get('Referer'):
  213. realHeaders['referer'] = headers['Referer']
  214. if headers.get('User-Agent'):
  215. realHeaders['user-agent'] = headers['User-Agent']
  216. if mode == 0:
  217. break
  218. else:
  219. realUrls.append({
  220. 'url': realUrl,
  221. 'headers': headers,
  222. })
  223. except Exception as e:
  224. print(f'head请求访问: {url} 发生了错误:{e}')
  225. headUrls.append(url)
  226. if custom_regex and re.search(custom_regex, url, re.M | re.I):
  227. # print(message)
  228. realUrl = url
  229. if headers.get('Referer'):
  230. realHeaders['referer'] = headers['Referer']
  231. if headers.get('User-Agent'):
  232. realHeaders['user-agent'] = headers['User-Agent']
  233. if mode == 0:
  234. break
  235. else:
  236. realUrls.append({
  237. 'url': realUrl,
  238. 'headers': headers,
  239. })
  240. if re.search(self.urlRegex, url, re.M | re.I):
  241. if url.find('url=http') < 0 and url.find('v=http') < 0 and url.find('.css') < 0 and url.find(
  242. '.html') < 0:
  243. realUrl = url
  244. if headers.get('Referer'):
  245. realHeaders['referer'] = headers['Referer']
  246. if headers.get('User-Agent'):
  247. realHeaders['user-agent'] = headers['User-Agent']
  248. if mode == 0:
  249. break
  250. else:
  251. realUrls.append({
  252. 'url': realUrl,
  253. 'headers': headers,
  254. })
  255. # print(len(urls), urls)
  256. sleep(round(self.delta / 1000, 2))
  257. t2 = time()
  258. cost = round((t2 - t1) * 1000, 2)
  259. cost_str = str(round(cost * 1000, 2)) + 'ms'
  260. self.driver.get('about:blank')
  261. # self.driver.close()
  262. # self.driver.get('http://localhost:5707/blank')
  263. # 循环遍历所有窗口句柄,关闭非主窗口句柄的窗口
  264. # for handle in handles:
  265. # if handle != main_window:
  266. # self.driver.switch_to.window(handle)
  267. # self.driver.close()
  268. if mode == 0 and realUrl:
  269. return {'url': realUrl, 'headers': realHeaders, 'from': playUrl, 'cost': cost_str, 'code': 200,
  270. 'msg': '嗅探成功'}
  271. elif mode == 1 and realUrls:
  272. return {'urls': realUrls, 'code': 200, 'from': playUrl, 'cost': cost_str, 'msg': '嗅探成功'}
  273. else:
  274. return {'url': realUrl, 'headers': realHeaders, 'from': playUrl, 'cost': cost_str, 'code': 404,
  275. 'msg': '嗅探失败'}
  276. def close(self):
  277. """
  278. 用完记得关闭驱动器
  279. @return:
  280. """
  281. self.driver.quit()
  282. if __name__ == '__main__':
  283. t1 = time()
  284. remote_url = 'http://127.0.0.1:9516/wd/hub'
  285. # url = 'https://www.cs1369.com/play/2-1-94.html'
  286. url = 'https://v.qq.com/x/page/i3038urj2mt.html'
  287. # url = 'http://www.mgtv.com/v/1/290346/f/3664551.html'
  288. browser = Sniffer(driver_path=remote_url)
  289. # ret = browser.snifferMediaUrl(url)
  290. ret = browser.snifferMediaUrl('https://www.freeok.pro/xplay/63170-8-12.html')
  291. print(ret)
  292. # ret = browser.snifferMediaUrl('http://www.mgtv.com/v/1/290346/f/3664551.html')
  293. # print(ret)
  294. ret = browser.snifferMediaUrl('https://jx.jsonplayer.com/player/?url=https://m.iqiyi.com/v_1pj3ayb1n70.html')
  295. print(ret)
  296. ret = browser.snifferMediaUrl('https://jx.yangtu.top/?url=https://m.iqiyi.com/v_1pj3ayb1n70.html',
  297. custom_regex='http((?!http).){12,}?(download4|pcDownloadFile)')
  298. print(ret)
  299. browser.close()
  300. t2 = time()
  301. print(f'共计耗时:{round(t2 - t1, 2)}s')