nm_jx.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # File : nm_jx.py
  4. # Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
  5. # Date : 2024/4/11
  6. import asyncio
  7. from asyncSnifferPro import Sniffer
  8. from time import time, localtime, strftime
  9. import requests
  10. import json
  11. from urllib3 import encode_multipart_formdata
  12. import warnings
  13. # 关闭警告
  14. warnings.filterwarnings("ignore")
  15. requests.packages.urllib3.disable_warnings()
  16. nm_get_url = 'https://igdux.top/~nmjx'
  17. nm_put_url = 'https://igdux.top/~nmjx:6Q2bC4BWAayiZ3sifysBpJPE'
  18. timeout = 2000
  19. def update_content(content, boundary=None):
  20. """
  21. 更新剪切板内容为指定的content文本,返回剪切板服务端回应文本
  22. @param content: str
  23. @param boundary:
  24. @return:
  25. """
  26. if boundary is None:
  27. boundary = f'--dio-boundary-{int(time())}'
  28. headers = {'Content-Type': f'multipart/form-data; boundary={boundary}'}
  29. fields = []
  30. data = {
  31. 'c': content
  32. }
  33. for key, value in data.items():
  34. fields.append((key, (None, value, None)))
  35. m = encode_multipart_formdata(fields, boundary=boundary)
  36. data = m[0]
  37. r = requests.put(nm_put_url, data=data, headers=headers, timeout=round(timeout / 1000, 2), verify=False)
  38. print(r.text)
  39. return r.text
  40. def get_content():
  41. """
  42. 获取剪切板内容的字符串
  43. @return:
  44. """
  45. r = requests.get(nm_get_url, timeout=round(timeout / 1000, 2))
  46. ret = r.text
  47. return ret
  48. def get_content_dict():
  49. """
  50. 获取剪切板内容python字典
  51. @return:
  52. """
  53. r = requests.get(nm_get_url, timeout=round(timeout / 1000, 2))
  54. ret = r.json()
  55. return ret
  56. async def demo_test_nm():
  57. """
  58. 自动爬取农民解析链接然后对比剪切板内容,如果发生了改变就重新写到剪切板。
  59. @return:
  60. """
  61. t1 = time()
  62. async with Sniffer(debug=True, headless=True) as browser:
  63. # 在这里,async_func已被调用并已完成
  64. pass
  65. page = await browser.browser.new_page()
  66. await page.set_extra_http_headers(headers={'referer': 'https://m.emsdn.cn/'})
  67. await page.goto('https://api.cnmcom.com/webcloud/nmm.php?url=') #
  68. html = await page.content()
  69. # print(html)
  70. lis = await page.locator('li').count()
  71. print('共计线路路:', lis)
  72. lis = await page.locator('li').all()
  73. urls = []
  74. for li in lis:
  75. await li.click()
  76. # iframe = page.locator('#WANG')
  77. iframe = page.locator('iframe').first
  78. src = await iframe.get_attribute('src')
  79. urls.append(src)
  80. await browser.close_page(page)
  81. await browser.close()
  82. t2 = time()
  83. cost = round((t2 - t1) * 1000, 2)
  84. ctime = localtime(t2)
  85. time_str = strftime("%Y-%m-%d %H:%M:%S", ctime)
  86. print(f'共计耗时{cost}毫秒,解析数:{len(urls)} 列表为: {urls}')
  87. c_data = {
  88. "data": urls,
  89. "code": 200,
  90. "cost": cost,
  91. "msg": "农民解析获取成功",
  92. "from": "https://api.cnmcom.com/webcloud/nmm.php?url=",
  93. "update": time_str,
  94. }
  95. old_data = get_content_dict()
  96. old_urls = old_data.get('data')
  97. if old_urls != urls and len(urls) > 0:
  98. print('检测到农民影视解析地址发生变化,开始写入剪切板')
  99. # 如果剪切板里存的解析接口数据跟爬出来的接口数据不一致就更新剪切板内容
  100. c_data = json.dumps(c_data, ensure_ascii=False)
  101. update_content(c_data)
  102. elif len(urls) == 0:
  103. print('本次没有成功嗅探到农民解析,记录失败时间到剪切板')
  104. old_data['error_update'] = time_str
  105. c_data = json.dumps(old_data, ensure_ascii=False)
  106. update_content(c_data)
  107. elif old_urls == urls:
  108. print('本次成功嗅探到农民解析并与剪切板内容一致,无需进行任何操作')
  109. if __name__ == '__main__':
  110. # 主文件,作为linux定时任务调用,python3 nm_jx.py 建议1分钟一次
  111. # 运行事件循环
  112. asyncio.run(demo_test_nm())
  113. print(get_content())