# coding=utf-8
# !/usr/bin/python
import sys, os, json
from base.spider import Spider
from requests import session, utils, get as requests_get
from requests.adapters import HTTPAdapter, Retry
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import hashlib
import time
import random
import base64
from functools import reduce
from urllib.parse import quote, urlencode
sys.path.append('..')
dirname, filename = os.path.split(os.path.abspath(__file__))
sys.path.append(dirname)
class Spider(Spider):
#默认设置
defaultConfig = {
'currentVersion': "20231213_1",
#【建议通过扫码确认】设置Cookie,在双引号内填写
'raw_cookie_line': "",
#如果主cookie没有vip,可以设置第二cookie,仅用于播放会员番剧,所有的操作、记录还是在主cookie,不会同步到第二cookie
'raw_cookie_vip': "",
#主页默认显示3图
'maxHomeVideoContent': '3',
#收藏标签默认显示追番1,追剧2,默认收藏夹0
'favMode': '0',
#部分视频列表分页,限制每次加载数量
'page_size': 12,
#上传播放进度间隔时间,单位秒,b站默认间隔15,0则不上传播放历史
'heartbeatInterval': '15',
#视频默认画质ID
'vodDefaultQn': '80',
#视频默认解码ID
'vodDefaultCodec': '7',
#音频默认码率ID
'vodDefaultAudio': '30280',
#获取视频热门评论
'show_vod_hot_reply': True,
#从正片中拆分出番剧的预告
'hide_bangumi_preview': True,
#登陆会员账号后,影视播放页不显示会员专享的标签,更简洁
'hide_bangumi_vip_badge': True,
#番剧(热门)列表使用横图
'bangumi_horizontal_cover': True,
#非会员播放会员专享视频时,添加一个页面可以使用解析源,解析源自行解决
'bangumi_vip_parse': True,
#付费视频添加一个页面可以使用解析,解析源自行解决
'bangumi_pay_parse': True,
#是否显示直播标签筛选中分区的细化标签, 0为不显示,1为显示
'showLiveFilterTag': '0',
#主页标签排序, 未登录或cookie失效时自动隐藏动态、收藏、关注、历史
'cateManual': [
"推荐",
"影视",
"直播",
"动态",
"频道",
"收藏",
"关注",
"历史",
"搜索",
],
#自定义推荐标签的筛选
'tuijianLis': [
"热门",
"排行榜",
"每周必看",
"入站必刷",
"番剧时间表",
"国创时间表"
],
'rankingLis': [
"动画",
"音乐",
"舞蹈",
"游戏",
"鬼畜",
"知识",
"科技",
"运动",
"生活",
"美食",
"动物",
"汽车",
"时尚",
"娱乐",
"影视",
"原创",
"新人",
],
}
#在动态标签的筛选中固定显示他,n为用户名或任意都可以,v必须为准确的UID
focus_on_up_list = [
#{"n":"徐云流浪中国", "v":"697166795"},
]
#在搜索标签的筛选中固定显示搜索词
focus_on_search_key = []
def getName(self):
return "哔哩哔哩"
def load_config(self):
try:
with open(f"{self.configdir}/config.json",encoding="utf-8") as f:
self.userConfig = json.load(f)
old_config = {
'master': 'cookie_dic',
'vip': 'cookie_vip_dic',
'fake': 'cookie_fake_dic',
}
for _type, old in old_config.items():
old = self.userConfig.get(old)
if old:
if not self.userConfig.get('users'):
self.userConfig['users'] = {}
self.userConfig['users'][_type] = {'cookies_dic': old}
users = self.userConfig.get('users', {})
if users.get('master') and users['master'].get('cookies_dic'):
self.session_master.cookies = utils.cookiejar_from_dict(users['master']['cookies_dic'])
self.userid = users['master']['userid']
if users.get('fake') and users['fake'].get('cookies_dic'):
self.session_fake.cookies = utils.cookiejar_from_dict(users['fake']['cookies_dic'])
except:
self.userConfig = {}
self.userConfig = {**self.defaultConfig, **self.userConfig}
dump_config_lock = threading.Lock()
def dump_config(self):
needSaveConfig = ['users', 'channel_list', 'cateLive', 'cateManualLive', 'cateManualLiveExtra']
userConfig_new = {}
for key, value in self.userConfig.items():
dafalutValue = self.defaultConfig.get(key)
if dafalutValue != None and value != dafalutValue or key in needSaveConfig:
userConfig_new[key] = value
self.dump_config_lock.acquire()
with open(f"{self.configdir}/config.json", 'w', encoding="utf-8") as f:
data = json.dumps(userConfig_new, indent=1, ensure_ascii=False)
f.write(data)
self.dump_config_lock.release()
pool = ThreadPoolExecutor(max_workers=8)
task_pool = []
# 主页
def homeContent(self, filter):
self.pool.submit(self.add_live_filter)
self.pool.submit(self.add_channel_filter)
self.pool.submit(self.add_search_key)
self.pool.submit(self.add_focus_on_up_filter)
self.pool.submit(self.get_tuijian_filter)
self.pool.submit(self.add_fav_filter)
#self.pool.submit(self.homeVideoContent)
needLogin = ['频道', '动态', '收藏', '关注', '历史']
cateManual = self.userConfig['cateManual']
if not self.userid and not 'UP' in cateManual or not '动态' in cateManual and not 'UP' in cateManual:
cateManual += ['UP']
classes = []
for k in cateManual:
if k in needLogin and not self.userid:
continue
classes.append({
'type_name': k,
'type_id': k
})
self.add_focus_on_up_filter_event.wait()
if 'UP' in cateManual:
self.config["filter"].update({'UP': self.config["filter"].pop('动态')})
result = {'class': classes}
self.add_live_filter_event.wait()
self.add_channel_filter_event.wait()
self.add_fav_filter_event.wait()
self.add_search_key_event.wait()
if filter:
result['filters'] = self.config['filter']
self.pool.submit(self.dump_config)
return result
# 用户cookies
userid = csrf = ''
session_master = session()
session_vip = session()
session_fake = session()
con = threading.Condition()
getCookie_event = threading.Event()
retries = Retry(total=5,
#status_forcelist=[ 500, 502, 503, 504 ],
backoff_factor=0.1)
adapter = HTTPAdapter(max_retries=retries)
session_master.mount('https://', adapter)
session_vip.mount('https://', adapter)
session_fake.mount('https://', adapter)
def getCookie_dosth(self, co):
c = co.strip().split('=', 1)
if not '%' in c[1]:
c[1] = quote(c[1])
return c
def getCookie(self, _type='master'):
raw_cookie = 'raw_cookie_line'
if _type == 'vip':
raw_cookie = 'raw_cookie_vip'
raw_cookie = self.userConfig.get(raw_cookie)
users = self.userConfig.get('users', {})
user = users.get(_type, {})
if not raw_cookie and not user:
if _type == 'master':
self.getCookie_event.set()
with self.con:
self.con.notifyAll()
return
cookies_dic = user.get('cookies_dic', {})
if raw_cookie:
cookies_dic = dict(map(self.getCookie_dosth, raw_cookie.split(';')))
cookies = utils.cookiejar_from_dict(cookies_dic)
url = 'https://api.bilibili.com/x/web-interface/nav'
content = self.fetch(url, headers=self.header, cookies=cookies)
res = json.loads(content.text)
user['isLogin'] = 0
if res["code"] == 0:
user['isLogin'] = 1
user['userid'] = res["data"]['mid']
user['face'] = res['data']['face']
user['uname'] = res['data']['uname']
user['cookies_dic'] = cookies_dic
user['isVIP'] = int(res['data']['vipStatus'])
if _type == 'master':
self.session_master.cookies = cookies
self.userid = user['userid']
self.csrf = cookies_dic['bili_jct']
if user['isVIP']:
self.session_vip.cookies = cookies
else:
self.userid = ''
users[_type] = user
with self.con:
if len(user) > 1:
self.userConfig.update({'users': users})
if _type == 'master':
self.getCookie_event.set()
getFakeCookie_event = threading.Event()
def getFakeCookie(self, fromSearch=None):
if self.session_fake.cookies:
self.getFakeCookie_event.set()
header = {}
header['User-Agent'] = self.header['User-Agent']
rsp = self.fetch('https://space.bilibili.com/2/video', headers=header)
self.session_fake.cookies = rsp.cookies
self.getFakeCookie_event.set()
with self.con:
users = self.userConfig.get('users', {})
users['fake'] = {'cookies_dic': dict(rsp.cookies)}
self.userConfig.update({'users': users})
if not fromSearch:
self.getCookie_event.wait()
if not self.session_master.cookies:
self.session_master.cookies = rsp.cookies
def get_fav_list_dict(self, fav):
fav_dict = {
'n': fav['title'].replace("", "").replace("", "").replace(""",'"').strip(),
'v': fav['id']}
return fav_dict
add_fav_filter_event = threading.Event()
def add_fav_filter(self):
users = self.userConfig.get('users', {})
if users.get('master') and users['master'].get('userid'):
userid = self.userConfig['users']['master']['userid']
else:
self.getCookie_event.wait()
userid = self.userid
fav_list = []
if userid:
url = 'https://api.bilibili.com/x/v3/fav/folder/created/list-all?up_mid=%s&jsonp=jsonp' % str(userid)
rsp = self._get_sth(url)
jo = json.loads(rsp.text)
if jo['code'] == 0 and jo.get('data'):
fav = jo['data'].get('list')
fav_list = list(map(self.get_fav_list_dict, fav))
fav_top = [{"n": "追番", "v": "1"},{"n": "追剧", "v": "2"}]
fav_config = self.config["filter"].get('收藏')
if fav_config:
fav_config.insert(0, {
"key": "mlid",
"name": "分区",
"value": fav_top + fav_list,
})
self.add_fav_filter_event.set()
self.userConfig["fav_list"] = fav_list
def get_channel_list_dict(self, channel):
channel_dict = {
'n': channel['name'].replace("", "").replace("", "").replace(""",'"').strip(),
'v': channel['id']}
return channel_dict
def get_channel_list(self):
url = 'https://api.bilibili.com/x/web-interface/web/channel/category/channel/list?id=100&offset=0&page_size=15'
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
channel_list = []
if jo['code'] == 0:
channel = jo['data'].get('channels')
self.userConfig['channel_list'] = list(map(self.get_channel_list_dict, channel))
return self.userConfig['channel_list']
add_channel_filter_event = threading.Event()
def add_channel_filter(self):
channel_list = self.userConfig.get('channel_list', '')
channel_list_task = self.pool.submit(self.get_channel_list)
if not channel_list:
channel_list = channel_list_task.result()
channel_config = self.config["filter"].get('频道', [])
if channel_config:
channel_config.insert(0, {
"key": "cid",
"name": "分区",
"value": channel_list,
})
self.config["filter"]['频道'] = channel_config
self.add_channel_filter_event.set()
add_focus_on_up_filter_event = threading.Event()
def add_focus_on_up_filter(self):
first_list = [{"n": "上个视频的UP主", "v": "上个视频的UP主"}]
up_list = self.focus_on_up_list
if not self.session_master.cookies:
self.getCookie_event.wait()
focus_on_up_list_mid = list(map(lambda x: x['v'], up_list))
if self.session_master.cookies:
url = 'https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/all?timezone_offset=-480&type=video&page=1'
rsp = self._get_sth(url)
jo = json.loads(rsp.text)
if jo['code'] == 0 and jo.get('data'):
up = jo['data'].get('items', [])
for u in map(lambda x: {'n': x['modules']["module_author"]['name'], 'v': str(x['modules']["module_author"]['mid'])}, up):
if not u in up_list and not u['v'] in focus_on_up_list_mid:
up_list.append(u)
last_list = [{"n": "登录与设置", "v": "登录"}]
if not self.isFongmi:
up_list = first_list + up_list
up_list += last_list
dynamic_config = self.config["filter"].get('动态', [])
if dynamic_config:
dynamic_config.insert(0, {
"key": "mid",
"name": "UP主",
"value": up_list,
})
self.config["filter"]['动态'] = dynamic_config
self.add_focus_on_up_filter_event.set()
def get_live_parent_area_list(self, parent_area):
name = parent_area['name']
id = str(parent_area['id'])
area = parent_area['list']
area_dict = list(map(lambda area: {'n': area['name'], 'v': str(area['parent_id']) + '_' + str(area['id'])}, area))
live_area = {'key': 'tid', 'name': name, 'value': area_dict}
cateLive_name = {'id': id + '_0', 'value': live_area}
return (name, cateLive_name)
def get_live_list(self):
url = 'https://api.live.bilibili.com/xlive/web-interface/v1/index/getWebAreaList?source_id=2'
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
cateLive = {}
if jo['code'] == 0:
parent = jo['data']['data']
self.userConfig['cateLive'] = dict(self.pool.map(self.get_live_parent_area_list, parent))
return self.userConfig['cateLive']
def set_default_cateManualLive(self):
cateManualLive = [{'n': '推荐', 'v': '推荐'},]
for name in self.userConfig['cateLive']:
area_dict = {'n': name, 'v': self.userConfig['cateLive'][name]['id']}
cateManualLive.append(area_dict)
self.defaultConfig['cateManualLive'] = cateManualLive
return cateManualLive
add_live_filter_event = threading.Event()
def add_live_filter(self):
cateLive = self.userConfig.get('cateLive', {})
cateLive_task = self.pool.submit(self.get_live_list)
if not cateLive:
cateLive = cateLive_task.result()
default_cateManualLive_task = self.pool.submit(self.set_default_cateManualLive)
self.config["filter"]['直播'] = []
#分区栏
cateManualLive = self.userConfig.get('cateManualLive', [])
if not cateManualLive:
cateManualLive = default_cateManualLive_task.result()
if cateManualLive:
live_area = {'key': 'tid', 'name': '分区', 'value': cateManualLive}
self.config["filter"]['直播'].append(live_area)
#显示分区细分
if int(self.userConfig['showLiveFilterTag']):
for name in cateLive.values():
if len(name['value']['value']) == 1:
continue
self.config["filter"]['直播'].append(name['value'])
self.add_live_filter_event.set()
add_search_key_event = threading.Event()
def add_search_key(self):
focus_on_search_key = self.focus_on_search_key
url = 'https://api.bilibili.com/x/web-interface/search/square?limit=10&platform=web'
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
cateLive = {}
if jo['code'] == 0:
trending = jo['data']['trending'].get('list', [])
focus_on_search_key += list(map(lambda x:x['keyword'], trending))
keyword = {"key": "keyword", "name": "搜索词","value": []}
keyword["value"] = list(map(lambda i: {'n': i, 'v': i}, focus_on_search_key))
self.config["filter"]['搜索'].insert(0, keyword)
self.add_search_key_event.set()
def get_tuijian_filter(self):
tuijian_filter = {"番剧时间表": "10001", "国创时间表": "10004", "排行榜": "0", "动画": "1", "音乐": "3", "舞蹈": "129", "游戏": "4", "鬼畜": "119", "知识": "36", "科技": "188", "运动": "234", "生活": "160", "美食": "211", "动物": "217", "汽车": "223", "时尚": "155", "娱乐": "5", "影视": "181", "原创": "origin", "新人": "rookie"}
_dic = [{'n': 'tuijianLis', 'v': '分区'}, {'n': 'rankingLis', 'v': '排行榜'}]
filter_lis = []
for d in _dic:
_filter = {"key": "tid" ,'name': d['v'],"value": []}
t_lis = self.userConfig.get(d['n'], [])
for t in t_lis:
tf = tuijian_filter.get(t)
if not tf:
tf = t
tf_dict = {'n': t, 'v': tf}
_filter["value"].append(tf_dict)
filter_lis.append(_filter)
self.config["filter"]['推荐'] = filter_lis
isFongmi = False
def __init__(self):
self.configdir = dirname
if dirname.startswith('/data/'):
self.isFongmi = True
configdir = os.path.abspath(os.path.join(dirname, ".."))
configdir = os.path.abspath(os.path.join(configdir, ".."))
self.configdir = f"{configdir}/files"
self.load_config()
self.pool.submit(self.getCookie)
self.pool.submit(self.getFakeCookie)
self.pool.submit(self.getCookie, 'vip')
wts = round(time.time())
self.pool.submit(self.get_wbiKey, wts)
def init(self, extend=""):
print("============{0}============".format(extend))
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
# 降低内存占用
def format_img(self, img):
img += "@672w_378h_1c.webp"
if not img.startswith('http'):
img = 'https:' + img
return img
def pagination(self, array, pg):
max_number = self.userConfig['page_size'] * int(pg)
min_number = max_number - self.userConfig['page_size']
return array[min_number:max_number]
# 将超过10000的数字换成成以万和亿为单位
def zh(self, num):
if int(num) >= 100000000:
p = round(float(num) / float(100000000), 1)
p = str(p) + '亿'
else:
if int(num) >= 10000:
p = round(float(num) / float(10000), 1)
p = str(p) + '万'
else:
p = str(num)
return p
# 将秒数转化为 时分秒的格式
def second_to_time(self, a):
a = int(a)
if a < 3600:
result = time.strftime("%M:%S", time.gmtime(a))
else:
result = time.strftime("%H:%M:%S", time.gmtime(a))
if str(result).startswith('0'):
result = str(result).replace('0', '', 1)
return result
# 字符串时分秒以及分秒形式转换成秒
def str2sec(self, x):
x = str(x)
try:
h, m, s = x.strip().split(':') # .split()函数将其通过':'分隔开,.strip()函数用来除去空格
return int(h) * 3600 + int(m) * 60 + int(s) # int()函数转换成整数运算
except:
m, s = x.strip().split(':') # .split()函数将其通过':'分隔开,.strip()函数用来除去空格
return int(m) * 60 + int(s) # int()函数转换成整数运算
# 按时间过滤
def filter_duration(self, vodlist, key):
if key == '0':
return vodlist
else:
vod_list_new = [i for i in vodlist if
self.time_diff1[key][0] <= self.str2sec(str(i["vod_remarks"])) < self.time_diff1[key][1]]
return vod_list_new
# 提取番剧id
def find_bangumi_id(self, url):
aid = str(url).strip().split(r"/")[-1]
if not aid:
aid = str(url).strip().split(r"/")[-2]
aid = aid.split(r"?")[0]
return aid
# 登录二维码
def get_Login_qrcode(self, pg):
result = {}
if int(pg) != 1:
return result
video = [{
"vod_id": 'setting_tab&filter',
"vod_name": '标签与筛选',
"vod_pic": 'https://www.bilibili.com/favicon.ico'
},{
"vod_id": 'setting_liveExtra',
"vod_name": '查看直播细化标签',
"vod_pic": 'https://www.bilibili.com/favicon.ico'
}]
url = 'https://passport.bilibili.com/x/passport-login/web/qrcode/generate'
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
if jo['code'] == 0:
id = jo['data']['qrcode_key']
url = jo['data']['url']
account = {'master': '主账号', 'vip': '副账号'}
isLogin = {0: '未登录', 1: '已登录'}
isVIP = {0: '', 1: '👑'}
users = self.userConfig.get('users', {})
for _type, typeName in account.items():
user = users.get(_type)
if user:
video.append({
"vod_id": 'setting_login_' + id,
"vod_name": user['uname'],
"vod_pic": self.format_img(user['face']),
"vod_remarks": isVIP[user['isVIP']] + typeName + ' ' + isLogin[user['isLogin']]
})
#pic_url = {'qrcode': url}
pic_url = {'qrcode': 'https://passport.bilibili.com/h5-app/passport/login/scan?qrcode_key=' + id + '&navhide=1'}
#if not dirname.startswith('/data/'):
# pic_url['qr_chs'] = '208x117'
pic_url['qr_chs'] = '208x117'
video.append({
"vod_id": 'setting_login_' + id,
#"vod_name": '扫码后点击',
'vod_pic': 'http://jm92swf.s1002.xrea.com/?' + urlencode(pic_url),
})
result['list'] = video
result['page'] = 1
result['pagecount'] = 1
result['limit'] = 1
result['total'] = 1
return result
time_diff1 = {'1': [0, 300],
'2': [300, 900], '3': [900, 1800], '4': [1800, 3600],
'5': [3600, 99999999999999999999999999999999]
}
time_diff = '0'
dynamic_offset = ''
def get_dynamic(self, pg, mid, order):
if mid == '0':
result = {}
if int(pg) == 1:
self.dynamic_offset = ''
url = 'https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/all?timezone_offset=-480&type=video&offset=%s&page=%s' % (self.dynamic_offset, pg)
rsp = self._get_sth(url)
jo = json.loads(rsp.text)
if jo['code'] == 0:
self.dynamic_offset = jo['data'].get('offset')
videos = []
vodList = jo['data']['items']
for vod in vodList:
if not vod['visible']:
continue
up = vod['modules']["module_author"]['name']
ivod = vod['modules']['module_dynamic']['major']['archive']
aid = str(ivod['aid']).strip()
title = ivod['title'].strip().replace("", "").replace("", "")
img = ivod['cover'].strip()
# remark = str(ivod['duration_text']).strip()
remark = str(self.second_to_time(self.str2sec(ivod['duration_text']))).strip() + ' 🆙' + str(
up).strip() # 显示分钟数+up主名字
videos.append({
"vod_id": 'av' + aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 99
result['total'] = 999999
return result
else:
return self.get_up_videos(mid=mid, pg=pg, order=order)
def get_found_vod(self, vod):
aid = vod.get('aid', '')
if not aid:
aid = vod.get('id', '')
goto = vod.get('goto', '')
if not goto or goto and goto == 'av':
aid = 'av' + str(aid).strip()
elif goto == 'ad':
return []
title = vod['title'].strip()
img = vod['pic'].strip()
is_followed = vod.get('is_followed')
if goto == 'live':
room_info = vod['room_info']
remark = ''
live_status = room_info.get('live_status', '')
if live_status:
remark = '直播中 '
else:
return []
remark += '👁' + room_info['watched_show']['text_small'] + ' 🆙' + vod['owner']['name'].strip()
else:
rcmd_reason = vod.get('rcmd_reason', '')
if rcmd_reason and type(rcmd_reason) == dict and rcmd_reason.get('content'):
reason= ' 🔥' + rcmd_reason['content'].strip()
if '人气飙升' in reason:
reason= ' 🔥人气飙升'
elif is_followed:
reason = ' 已关注'
else:
#reason = " 💬" + self.zh(vod['stat']['danmaku'])
reason = ' 🆙' + vod['owner']['name'].strip()
remark = str(self.second_to_time(vod['duration'])).strip() + " ▶" + self.zh(vod['stat']['view']) + reason
video = [{
"vod_id": aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
}]
for v in self.pool.map(self.get_found_vod, vod.get('others', [])):
video.extend(v)
return video
_popSeriesInit = 0
def get_found(self, tid, rid, pg):
result = {}
if tid == '推荐':
query = self.encrypt_wbi(fresh_type=4, feed_version='V3', brush=1, fresh_idx=pg, fresh_idx_1h=pg, ps=self.userConfig['page_size'])
url = f'https://api.bilibili.com/x/web-interface/wbi/index/top/feed/rcmd?{query}'
else:
url = 'https://api.bilibili.com/x/web-interface/ranking/v2?rid={0}&type={1}'.format(rid, tid)
if tid == '热门':
url = 'https://api.bilibili.com/x/web-interface/popular?pn={0}&ps={1}'.format(pg, self.userConfig['page_size'])
elif tid == "入站必刷":
url = 'https://api.bilibili.com/x/web-interface/popular/precious'
elif tid == "每周必看":
if not self._popSeriesInit or int(pg) == 1:
url = 'https://api.bilibili.com/x/web-interface/popular/series/list'
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
number = self._popSeriesInit = jo['data']['list'][0]['number']
self._popSeriesNum = [int(number), 1]
else:
number = self._popSeriesNum[0]
url = 'https://api.bilibili.com/x/web-interface/popular/series/one?number=' + str(number)
rsp = self._get_sth(url)
jo = json.loads(rsp.text)
if jo['code'] == 0:
videos = []
vodList = jo['data'].get('item')
if not vodList:
vodList = jo['data']['list']
if len(vodList) > self.userConfig['page_size']:
if tid == "每周必看":
_tmp_pg = int(self._popSeriesNum[1])
value = len(vodList) / self.userConfig['page_size'] - _tmp_pg
if value > 0:
value += 1
if not int(value):
self._popSeriesNum = [int(number) - 1, 1]
else:
self._popSeriesNum[1] = _tmp_pg + 1
else:
_tmp_pg = pg
vodList = self.pagination(vodList, _tmp_pg)
for v in self.pool.map(self.get_found_vod, vodList):
videos.extend(v)
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 99
result['total'] = 999999
return result
def get_bangumi(self, tid, pg, order, season_status):
result = {}
if order == '追番剧':
url = 'https://api.bilibili.com/x/space/bangumi/follow/list?type={0}&vmid={1}&pn={2}&ps={3}'.format(tid, self.userid, pg, self.userConfig['page_size'])
rsp = self._get_sth(url)
else:
url = 'https://api.bilibili.com/pgc/season/index/result?type=1&season_type={0}&page={1}&order={2}&season_status={3}&pagesize={4}'.format(tid, pg, order, season_status, self.userConfig['page_size'])
if order == '热门':
if tid == '1':
url = 'https://api.bilibili.com/pgc/web/rank/list?season_type={0}&day=3'.format(tid)
else:
url = 'https://api.bilibili.com/pgc/season/rank/web/list?season_type={0}&day=3'.format(tid)
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
if jo['code'] == 0:
if 'data' in jo:
vodList = jo['data']['list']
else:
vodList = jo['result']['list']
if len(vodList) > self.userConfig['page_size']:
vodList = self.pagination(vodList, pg)
videos = []
for vod in vodList:
aid = str(vod['season_id']).strip()
title = vod['title']
img = vod.get('ss_horizontal_cover')
if not img or tid == '1' and not self.userConfig['bangumi_horizontal_cover']:
if vod.get('first_ep_info') and 'cover' in vod['first_ep_info']:
img = vod['first_ep_info']['cover']
elif vod.get('first_ep') and 'cover' in vod['first_ep']:
img = vod['first_ep']['cover']
else:
img = vod['cover'].strip()
remark = vod.get('index_show', '')
if not remark and vod.get('new_ep') and vod['new_ep'].get('index_show'):
remark = vod['new_ep']['index_show']
remark = remark.replace('更新至', '🆕')
stat = vod.get('stat')
if stat:
remark = '▶' + self.zh(stat.get('view')) + ' ' + remark
videos.append({
"vod_id": 'ss' + aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def get_timeline(self, tid, pg):
result = {}
url = 'https://api.bilibili.com/pgc/web/timeline/v2?season_type={0}&day_before=2&day_after=4'.format(tid)
rsp = self._get_sth(url, 'fake')
content = rsp.text
jo = json.loads(content)
if jo['code'] == 0:
videos1 = []
vodList = jo['result']['latest']
for vod in vodList:
aid = str(vod['season_id']).strip()
title = vod['title'].strip()
img = vod['ep_cover'].strip()
remark = '🆕' + vod['pub_index'] + ' ❤ ' + vod['follows'].replace('系列', '').replace('追番', '')
videos1.append({
"vod_id": 'ss' + aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
videos2 = []
vodList2 = jo['result']['timeline']
for i in range(len(vodList2)):
vodList = vodList2[i]['episodes']
for vod in vodList:
if str(vod['published']) == "0":
aid = str(vod['season_id']).strip()
title = str(vod['title']).strip()
img = str(vod['ep_cover']).strip()
date = str(time.strftime("%m-%d %H:%M", time.localtime(vod['pub_ts'])))
remark = date + " " + vod['pub_index']
videos2.append({
"vod_id": 'ss' + aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
result['list'] = videos2 + videos1
result['page'] = 1
result['pagecount'] = 1
result['limit'] = 90
result['total'] = 999999
return result
def get_live(self, pg, parent_area_id, area_id):
result = {}
if parent_area_id == '推荐':
url = 'https://api.live.bilibili.com/xlive/web-interface/v1/webMain/getList?platform=web&page=%s' % pg
rsp = self._get_sth(url)
else:
url = 'https://api.live.bilibili.com/xlive/web-interface/v1/second/getList?platform=web&parent_area_id=%s&area_id=%s&sort_type=online&page=%s' % (parent_area_id, area_id, pg)
if parent_area_id == '热门':
url = 'https://api.live.bilibili.com/room/v1/room/get_user_recommend?page=%s&page_size=%s' % (pg, self.userConfig['page_size'])
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
if jo['code'] == 0:
videos = []
vodList = jo['data']
if 'recommend_room_list' in vodList:
vodList = vodList['recommend_room_list']
elif 'list' in vodList:
vodList = vodList['list']
for vod in vodList:
aid = str(vod['roomid']).strip()
title = vod['title'].replace("", "").replace("", "").replace(""", '"')
img = vod.get('user_cover')
if not img:
img = vod.get('cover')
remark = '👁' + vod['watched_show']['text_small'].strip() + " 🆙" + vod['uname'].strip()
videos.append({
"vod_id": aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 99
result['total'] = 999999
return result
def get_up_series(self, mid, pg):
result = {}
url = 'https://api.bilibili.com/x/polymer/web-space/seasons_series_list?mid=%s&page_num=%s&page_size=%s' % (mid, pg, self.userConfig['page_size'])
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
if jo['code'] == 0:
videos = []
jo = jo['data']['items_lists']
vodList = jo['seasons_list'] + jo['series_list']
for vod in vodList:
vod = vod.get('meta')
aid = str(vod.get('season_id', '')).strip()
if aid:
aid = 'list_' + str(mid) + '_season_' + aid
else:
aid = 'list_' + str(mid) + '_series_' + str(vod.get('series_id', '')).strip()
title = vod['name'].replace("", "").replace("", "").replace(""", '"')
img = vod.get('cover')
remark = vod.get('description', '').strip()
videos.append({
"vod_id": aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 99
result['total'] = 999999
return result
get_up_videos_result = {}
def get_up_videos(self, mid, pg, order):
result = {}
if not mid.isdigit():
if int(pg) == 1:
self.get_up_videos_mid = mid = self.detailContent_args.get('mid', '')
if not mid in self.get_up_videos_result:
self.get_up_videos_result.clear()
self.get_up_videos_result[mid] = []
else:
mid = self.get_up_videos_mid
if int(pg) == 1:
self.get_up_videoNum_event.clear()
self.get_up_info_event.clear()
self.pool.submit(self.get_up_info, mid)
Space = order2 = ''
if order == 'oldest':
order2 = order
order = 'pubdate'
elif order == 'quicksearch':
Space = '投稿: '
videos = self.get_up_videos_result.get(mid, [])
if videos:
result['list'] = videos
return result
elif order == 'series':
return self.get_up_series(mid=mid, pg=pg)
tmp_pg = pg
if order2:
self.get_up_videoNum_event.wait()
tmp_pg = self.up_info[mid]['vod_pc'] - int(pg) + 1
query = self.encrypt_wbi(mid=mid, pn=tmp_pg, ps=self.userConfig['page_size'], order=order)
url = f'https://api.bilibili.com/x/space/wbi/arc/search?{query}'
rsp = self._get_sth(url, 'fake')
content = rsp.text
jo = json.loads(content)
videos = []
if jo['code'] == 0:
vodList = jo['data']['list']['vlist']
for vod in vodList:
aid = str(vod['aid']).strip()
title = vod['title'].strip().replace("", "").replace("", "")
img = vod['pic'].strip()
remark = self.second_to_time(self.str2sec(str(vod['length']).strip())) + " ▶" + self.zh(vod['play'])
if not Space:
remark += " 💬" + self.zh(vod['video_review'])
videos.append({
"vod_id": 'av' + aid,
"vod_name": Space + title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
if order2:
videos.reverse()
if int(pg) == 1:
self.get_up_info_event.wait()
vodname = self.up_info[mid]['name'] + " 个人主页"
if Space:
vodname = 'UP: ' + self.up_info[mid]['name']
gotoUPHome={
"vod_id": 'up' + str(mid),
"vod_name": vodname,
"vod_pic": self.format_img(self.up_info[mid]['face']),
"vod_remarks": self.up_info[mid]['following'] + ' 👥' + self.up_info[mid]['fans'] + ' 🎬' + str(self.up_info[mid]['vod_count'])
}
videos.insert(0, gotoUPHome)
if Space:
self.get_up_videos_result[mid] = videos
result['list'] = videos
result['page'] = pg
result['pagecount'] = 99
result['limit'] = 99
result['total'] = 999999
return result
history_view_at = 0
def get_history(self, type, pg):
result = {}
if int(pg) == 1:
self.history_view_at = 0
url = 'https://api.bilibili.com/x/web-interface/history/cursor?ps={0}&view_at={1}&type={2}'.format(self.userConfig['page_size'], self.history_view_at, type)
if type == '稍后再看':
url = 'https://api.bilibili.com/x/v2/history/toview'
rsp = self._get_sth(url)
jo = json.loads(rsp.text)
if jo['code'] == 0:
videos = []
vodList = jo['data'].get('list', [])
if type == '稍后再看':
vodList = self.pagination(vodList, pg)
else:
self.history_view_at = jo['data']['cursor']['view_at']
for vod in vodList:
history = vod.get('history', '')
if history:
business = history['business']
aid = str(history['oid']).strip()
img = vod['cover'].strip()
part = str(history['part']).strip()
else:
business = 'archive'
aid = str(vod["aid"]).strip()
img = vod['pic'].strip()
part = str(vod['page']['part']).strip()
if business == 'article':
continue
elif business == 'pgc':
aid = 'ep' + str(history['epid'])
_total = vod['total']
part = vod.get('show_title')
elif business == 'archive':
aid = 'av' + aid
_total = vod['videos']
title = vod['title'].replace("", "").replace("", "").replace(""", '"')
if business == 'live':
live_status = vod.get('badge', '')
remark = live_status + ' 🆙' + vod['author_name'].strip()
else:
if str(vod['progress']) == '-1':
remark = '已看完'
elif str(vod['progress']) == '0':
remark = '刚开始看'
else:
process = str(self.second_to_time(vod['progress'])).strip()
remark = '看到 ' + process
if not _total in [0, 1] and part:
remark += ' (' + str(part) + ')'
videos.append({
"vod_id": aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def get_fav_detail(self, pg, mlid, order):
result = {}
url = 'https://api.bilibili.com/x/v3/fav/resource/list?media_id=%s&order=%s&pn=%s&ps=10&platform=web&type=0' % (mlid, order, pg)
rsp = self._get_sth(url)
content = rsp.text
jo = json.loads(content)
if jo['code'] == 0:
videos = []
vodList = jo['data'].get('medias', [])
for vod in vodList:
# 只展示类型为 视频的条目
# 过滤去掉收藏中的 已失效视频;如果不喜欢可以去掉这个 if条件
if vod.get('type') in [2] and vod.get('title') != '已失效视频':
aid = str(vod['id']).strip()
title = vod['title'].replace("", "").replace("", "").replace(""",
'"')
img = vod['cover'].strip()
remark = str(self.second_to_time(vod['duration'])).strip() + " ▶" + self.zh(vod['cnt_info']['play']) + " 💬" + self.zh(vod['cnt_info']['danmaku'])
videos.append({
"vod_id": 'av' + aid + '_mlid' + str(mlid),
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
# videos=self.filter_duration(videos, duration_diff)
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 99
result['total'] = 999999
return result
get_up_videoNum_event = threading.Event()
def get_up_videoNum(self, mid):
url = "https://api.bilibili.com/x/space/navnum?mid={0}".format(mid)
rsp = self._get_sth(url)
jRoot = json.loads(rsp.text)
if jRoot['code'] == 0:
return jRoot['data']['video']
else:
return 0
get_up_info_event = threading.Event()
up_info = {}
def get_up_info(self, mid, **kwargs):
if mid in self.up_info:
self.get_up_info_event.set()
get_up_videoNum = self.pool.submit(self.get_up_videoNum, mid)
data = kwargs.get('data')
if not data:
url = "https://api.bilibili.com/x/web-interface/card?mid={0}".format(mid)
rsp = self._get_sth(url)
jRoot = json.loads(rsp.text)
if jRoot['code'] == 0:
data = jRoot['data']
else:
self.get_up_info_event.set()
return
jo = data['card']
info = {}
info['following'] = '未关注'
if data['following']:
info['following'] = '已关注'
info['name'] = info['crname'] = jo['name'].replace("", "").replace("", "")
if self.isFongmi:
info['crname'] = '[a=cr:{"id": "' + mid + '_pubdate_getupvideos","name": "' + info['name'].replace('"', '\\"') + '"}/]' + info['name'] + '[/a]'
info['face'] = jo['face']
info['fans'] = self.zh(jo['fans'])
info['like_num'] = self.zh(data['like_num'])
info['desc'] = jo['Official']['desc'] + " " + jo['Official']['title']
info['vod_count'] = str(data['archive_count']).strip()
self.up_info[mid] = info
self.get_up_info_event.set()
self.up_info[mid]['vod_count'] = str(get_up_videoNum.result()).strip()
pc = divmod(int(info['vod_count']), self.userConfig['page_size'])
vod_pc = pc[0]
if pc[1] != 0:
vod_pc += 1
self.up_info[mid]['vod_pc'] = vod_pc
self.get_up_videoNum_event.set()
def get_vod_relation(self, id):
if id.isdigit():
urlarg = 'aid=' + str(id)
elif '=' in id:
urlarg = id
else:
urlarg = 'bvid=' + id
url = 'https://api.bilibili.com/x/web-interface/archive/relation?' + urlarg
rsp = self._get_sth(url)
jo = json.loads(rsp.text)
relation = []
if jo['code'] == 0:
jo = jo['data']
if jo['attention']:
relation.append('已关注')
else:
relation.append('未关注')
triple = []
if jo['favorite']:
triple.append('⭐')
if jo['like']:
triple.append('👍')
coin = jo.get('coin')
if coin:
triple.append('💰'*coin)
if len(triple) == 3:
relation.append('👍💰⭐')
else:
relation.extend(triple)
if jo['dislike']:
relation.append('👎')
if jo['season_fav']:
relation.append('已订阅合集')
return relation
def get_channel(self, pg, cid, order):
result = {}
if str(pg) == '1':
self.channel_offset = ''
if order == "featured":
url = 'https://api.bilibili.com/x/web-interface/web/channel/featured/list?channel_id={0}&filter_type=0&offset={1}&page_size={2}'.format(cid, self.channel_offset, self.userConfig['page_size'])
else:
url = 'https://api.bilibili.com/x/web-interface/web/channel/multiple/list?channel_id={0}&sort_type={1}&offset={2}&page_size={3}'.format(cid, order, self.channel_offset, self.userConfig['page_size'])
rsp = self._get_sth(url, 'master')
jo = json.loads(rsp.text)
if jo.get('code') == 0:
self.channel_offset = jo['data'].get('offset')
videos = []
vodList = jo['data']['list']
if pg == '1' and 'items' in vodList[0]:
vodList_rank = vodList[0]['items']
del (vodList[0])
vodList = vodList_rank + vodList
for vod in vodList:
if 'uri' in vod and 'bangumi' in vod['uri']:
aid = self.find_bangumi_id(vod['uri'])
else:
aid = 'av' + str(vod['id']).strip()
title = vod['name'].replace("", "").replace("", "").replace(""", '"')
img = vod['cover'].strip()
remark = "▶" + str(vod['view_count'])
duration = vod.get('duration', '')
if duration:
remark = str(self.second_to_time(self.str2sec(duration))).strip() + ' ' + remark
danmaku = vod.get('danmaku', '')
like_count = vod.get('like_count', '')
follow_count = vod.get('follow_count', '')
if danmaku:
remark += " 💬" + self.zh(danmaku)
elif like_count:
remark += " 👍" + str(like_count)
elif follow_count:
remark += " ❤" + str(follow_count)
videos.append({
"vod_id": aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 99
result['total'] = 999999
return result
def get_follow(self, pg, sort):
result = {}
if sort == "最常访问":
url = 'https://api.bilibili.com/x/relation/followings?vmid={0}&pn={1}&ps=10&order=desc&order_type=attention' .format(self.userid, pg)
elif sort == "最近关注":
url = 'https://api.bilibili.com/x/relation/followings?vmid={0}&pn={1}&ps=10&order=desc&order_type='.format(self.userid, pg)
elif sort == "正在直播":
url = 'https://api.live.bilibili.com/xlive/web-ucenter/v1/xfetter/GetWebList?page={0}&page_size=10'.format(pg)
elif sort == "最近访问":
url = 'https://api.bilibili.com/x/v2/history?pn={0}&ps=15'.format(pg)
elif sort == "特别关注":
url = 'https://api.bilibili.com/x/relation/tag?mid={0}&tagid=-10&pn={1}&ps=10'.format(self.userid, pg)
elif sort == "悄悄关注":
url = 'https://api.bilibili.com/x/relation/whispers?pn={0}&ps=10'.format(pg)
else:
url = 'https://api.bilibili.com/x/relation/followers?vmid={0}&pn={1}&ps=10&order=desc&order_type=attention'.format(self.userid, pg)
rsp = self._get_sth(url)
jo = json.loads(rsp.text)
if jo['code'] != 0:
return result
if sort == "特别关注" or sort == "最近访问":
vodList = jo['data']
elif sort == "正在直播":
vodList = jo['data']['rooms']
else:
vodList = jo['data']['list']
if int(pg) == 1:
self.recently_up_list = []
follow = []
for f in vodList:
remark = ''
if sort == "最近访问":
mid = 'up' + str(f['owner']['mid'])
if mid in self.recently_up_list:
continue
self.recently_up_list.append(mid)
title = str(f['owner']['name']).strip()
img = str(f['owner']['face']).strip()
elif sort == "正在直播":
mid = str(f['room_id'])
title = f['title'].replace("", "").replace("", "").replace(""", '"')
img = f['cover_from_user'].strip()
remark = f['uname'].strip()
else:
mid = 'up' + str(f['mid'])
title = str(f['uname']).strip()
img = str(f['face']).strip()
if 'special' in f and f['special'] == 1:
remark = '特别关注'
follow.append({
"vod_id": mid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
result['list'] = follow
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 99
result['total'] = 999999
return result
homeVideoContent_result = {}
def homeVideoContent(self):
if not self.homeVideoContent_result:
videos = self.get_found(rid='0', tid='all', pg=1)['list'][0:int(self.userConfig['maxHomeVideoContent'])]
self.homeVideoContent_result['list'] = videos
return self.homeVideoContent_result
def categoryContent(self, tid, pg, filter, extend):
self.stop_heartbeat_event.set()
if tid == "推荐":
if 'tid' in extend:
tid = extend['tid']
if tid.isdigit():
tid = int(tid)
if tid > 10000:
tid -= 10000
return self.get_timeline(tid=tid, pg=pg)
rid = tid
tid = 'all'
return self.get_found(tid=tid, rid=rid, pg=pg)
rid = '0'
return self.get_found(tid=tid, rid=rid, pg=pg)
elif tid == "影视":
tid = '1'
order = '热门'
season_status = '-1'
if 'tid' in extend:
tid = extend['tid']
if 'order' in extend:
order = extend['order']
if 'season_status' in extend:
if order == '热门':
order = '2'
season_status = extend['season_status']
return self.get_bangumi(tid, pg, order, season_status)
elif tid == "动态":
mid = '0'
order = 'pubdate'
if 'mid' in extend:
mid = extend['mid']
if 'order' in extend:
order = extend['order']
if mid == '0' and not self.userid or mid == '登录':
return self.get_Login_qrcode(pg)
return self.get_dynamic(pg=pg, mid=mid, order=order)
elif tid == '频道':
order = 'hot'
cid = random.choice(self.userConfig['channel_list'])
cid = cid['v']
if 'order' in extend:
order = extend['order']
if 'cid' in extend:
cid = extend['cid']
return self.get_channel(pg=pg, cid=cid, order=order)
elif tid == '直播':
tid = "热门"
area_id = '0'
if 'tid' in extend:
tid = extend['tid']
if '_' in tid:
tids = tid.split('_')
tid = tids[0]
area_id = tids[1]
return self.get_live(pg=pg, parent_area_id=tid, area_id=area_id)
elif tid == "UP":
mid = self.detailContent_args.get('mid', '')
if 'mid' in extend:
mid = extend['mid']
if not mid or mid == '登录':
return self.get_Login_qrcode(pg)
up_config = self.config["filter"].get('UP')
if not mid and up_config:
for i in up_config:
if i['key'] == 'mid':
if len(i['value']) > 1:
mid = i['value'][1]['v']
break
order = 'pubdate'
if 'order' in extend:
order = extend['order']
return self.get_up_videos(mid=mid, pg=pg, order=order)
elif tid == "关注":
sort = "最常访问"
if 'sort' in extend:
sort = extend['sort']
return self.get_follow(pg, sort)
elif tid == "收藏":
mlid = str(self.userConfig['favMode'])
if 'mlid' in extend:
mlid = extend['mlid']
fav_config = self.config["filter"].get('收藏')
if mlid in ['1', '2']:
return self.get_bangumi(tid=mlid, pg=pg, order='追番剧', season_status='')
elif mlid == '0' and fav_config:
for i in fav_config:
if i['key'] == 'mlid':
if len(i['value']) > 1:
mlid = i['value'][2]['v']
break
order = 'mtime'
if 'order' in extend:
order = extend['order']
return self.get_fav_detail(pg=pg, mlid=mlid, order=order)
elif tid == '历史':
type = 'all'
if 'type' in extend:
type = extend['type']
if type == 'UP主':
return self.get_follow(pg=pg, sort='最近访问')
return self.get_history(type=type, pg=pg)
elif tid.endswith('_getbangumiseasons'):
if int(pg) == 1:
return {'list': self.detailContent_args.get('seasons', [])}
elif tid.endswith('_getupvideos'):
mid, order, clicklink = tid.split('_')
return self.get_up_videos(pg=pg, mid=mid, order=order)
elif tid.endswith('_clicklink'):
keyword = tid.replace('_clicklink', '')
duration_diff = '0'
if 'duration' in extend:
duration_diff = extend['duration']
return self.get_search_content(key=keyword, pg=pg, duration_diff=duration_diff, order='', type='video', ps=self.userConfig['page_size'])
else:
duration_diff = '0'
if 'duration' in extend:
duration_diff = extend['duration']
type = 'video'
if 'type' in extend:
type = extend['type']
order = 'totalrank'
if 'order' in extend:
order = extend['order']
keyword = str(self.search_key)
search_config = self.config["filter"].get('搜索')
if not keyword and search_config:
for i in search_config:
if i['key'] == 'keyword':
if len(i['value']) > 0:
keyword = i['value'][0]['v']
break
if 'keyword' in extend:
keyword = extend['keyword']
return self.get_search_content(key=keyword, pg=pg, duration_diff=duration_diff, order=order, type=type, ps=self.userConfig['page_size'])
def get_search_content(self, key, pg, duration_diff, order, type, ps):
value = None
if not pg.isdigit():
value = pg
pg = 1
query = self.encrypt_wbi(keyword=key, page=pg, duration=duration_diff, order=order, search_type=type, page_size=ps)
url = f'https://api.bilibili.com/x/web-interface/wbi/search/type?{query}'
rsp = self._get_sth(url, 'fake')
content = rsp.text
jo = json.loads(content)
result = {}
if jo.get('code') == 0 and 'result' in jo['data']:
videos = []
vodList = jo['data'].get('result')
if vodList and type == 'live':
vodList = vodList.get('live_room')
if not vodList:
return result
for vod in vodList:
if type != vod['type']:
continue
title = ''
if type == 'bili_user':
aid = 'up' + str(vod['mid']).strip()
img = vod['upic'].strip()
remark = '👥' + self.zh(vod['fans']) + " 🎬" + self.zh(vod['videos'])
title = vod['uname']
elif type == 'live':
aid = str(vod['roomid']).strip()
img = vod['cover'].strip()
remark = '👁' + self.zh(vod['online']) + ' 🆙' + vod['uname']
elif 'media' in type:
aid = str(vod['season_id']).strip()
if self.detailContent_args:
seasons = self.detailContent_args.get('seasons')
if seasons:
bangumi_seasons_id = []
for ss in self.detailContent_args['seasons']:
bangumi_seasons_id.append(ss['vod_id'])
if aid + 'ss' in bangumi_seasons_id:
continue
aid = 'ss' + aid
img = vod['cover'].strip()
remark = str(vod['index_show']).strip().replace('更新至', '🆕')
else:
aid = 'av' + str(vod['aid']).strip()
img = vod['pic'].strip()
remark = str(self.second_to_time(self.str2sec(vod['duration']))).strip() + " ▶" + self.zh(vod['play'])
if value == None:
remark += " 💬" + self.zh(vod['danmaku'])
if not title:
title = vod['title'].replace("", "").replace("", "").replace(""",'"').replace('&', '&')
if value:
title = value + title
videos.append({
"vod_id": aid,
"vod_name": title,
"vod_pic": self.format_img(img),
"vod_remarks": remark
})
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 99
result['total'] = 999999
return result
def cleanSpace(self, str):
return str.replace('\n', '').replace('\t', '').replace('\r', '').replace(' ', '')
def get_normal_episodes(self, episode):
ssid = epid = ''
aid = episode.get('aid', '')
if not aid:
aid = self.detailContent_args['aid']
cid = episode.get('cid', '')
ep_title = episode.get('title', '')
if not ep_title:
ep_title = episode.get('part', '')
duration = episode.get('duration', '')
if not duration:
page = episode.get('page', '')
if page:
duration = page['duration']
badge = long_title = preview = parse = ''
ssid = self.detailContent_args.get('ssid', '')
if ssid:
ssid = '_ss' + ssid
epid = episode.get('id', '')
if epid:
epid = '_ep' + str(epid)
if duration and str(duration).endswith('000'):
duration = int(duration / 1000)
if ep_title.isdigit():
ep_title = '第' + ep_title + self.detailContent_args['title_type']
badge = episode.get('badge', '')
if not self.session_vip.cookies and badge == '会员' and self.userConfig['bangumi_vip_parse'] or badge == '付费' and self.userConfig['bangumi_pay_parse']:
parse = '_parse'
if self.session_vip.cookies and self.userConfig['hide_bangumi_vip_badge']:
badge = badge.replace('会员', '')
if self.userConfig['hide_bangumi_preview'] and badge == '预告':
badge = badge.replace('预告', '')
preview = 1
if badge:
badge = '【' + badge + '】'
long_title = episode.get('long_title', '')
if not badge and long_title:
long_title = ' ' + long_title
title = ep_title + badge + long_title
title = title.replace("#", "﹟").replace("$", "﹩")
if duration:
duration = '_dur' + str(duration)
url = '{0}${1}_{2}{3}{4}{5}'.format(title, aid, cid, ssid, epid, duration)
fromep = self.detailContent_args.get('fromep', '')
if '_' + str(fromep) == epid:
self.detailContent_args['fromep'] = url
replyList = self.detailContent_args.get('Reply')
if '_' + str(fromep) == epid or not fromep and replyList == None:
self.detailContent_args['Reply'] = ''
if self.userConfig['show_vod_hot_reply']:
self.get_vod_hot_reply_event.clear()
self.pool.submit(self.get_vod_hot_reply, aid)
if ssid:
if preview:
return url, ''
if parse:
self.detailContent_args['parse'] = 1
if long_title:
long_title = '【解析】' + long_title
ep_title += long_title
parseurl = '{0}${1}_{2}{3}{4}{5}{6}'.format(ep_title, aid, cid, ssid, epid, duration, parse)
if '_' + str(fromep) == epid:
self.detailContent_args['fromep'] += '#' + parseurl
else:
parseurl = url
return url, parseurl
else:
return url
def get_ugc_season(self, section, sections_len):
if sections_len > 1:
sec_title = self.detailContent_args['season_title'] + ' ' + section['title']
else:
sec_title = self.detailContent_args['season_title']
sec_title = sec_title.replace("#", "﹟").replace("$", "﹩")
episodes = section.get('episodes')
playUrl = '#'.join(map(self.get_normal_episodes, episodes))
result = (sec_title, playUrl)
return result
get_vod_hot_reply_event = threading.Event()
def get_vod_hot_reply(self, oid):
query = self.encrypt_wbi(type=1, ps=30, oid=str(oid))
url = f'http://api.bilibili.com/x/v2/reply/main?{query}'
rsp = self._get_sth(url, 'fake')
jRoot = json.loads(rsp.text)
if jRoot['code'] == 0:
replies = jRoot['data'].get('replies')
top_replies = jRoot['data'].get('top_replies')
if replies and top_replies:
replies = top_replies + replies
if replies:
up_mid = jRoot['data']['upper']['mid']
ReplyList = []
Reply_jump = []
for r in replies:
rpid = r['rpid']
sex = r['member']['sex']
if sex and sex == '女':
sex = '👧'
else:
sex = '👦'
name = sex + r['member']['uname'] + ':'
mid = r['mid']
if mid == up_mid:
name = '🆙' + name
like = '👍' + self.zh(r['like'])
message = r['content']['message']
if '/note-app/' in message:
continue
content = like + ' ' + name + message
content = content.replace("#", "﹟").replace("$", "﹩")
content += '$' + str(oid) + '_' + str(rpid) + '_notplay_reply'
ReplyList.append(content)
jump_url = r['content'].get('jump_url',{})
for key, value in jump_url.items():
if not value.get('app_url_schema') and not value.get('pc_url'):
if key.startswith('https://www.bilibili.com/'):
key = str(key).split('?')[0].split('/')
while key[-1] == '':
key.pop(-1)
key = key[-1]
if key.startswith('https://b23.tv/') or key.startswith('BV') or key.startswith('ep') or key.startswith('ss'):
title = str(value['title']).replace("#", "﹟").replace("$", "﹩")
vod = {'vod_id': str(key), 'vod_name': '评论:' + title}
if not vod in Reply_jump:
Reply_jump.append(vod)
title = '快搜:' + str(key) +' ' + title
content = title + '$ '
ReplyList.append(content)
self.detailContent_args['Reply'] = '#'.join(ReplyList)
self.detailContent_args['Reply_jump'] = Reply_jump
self.get_vod_hot_reply_event.set()
detailContent_args = {}
def detailContent(self, array):
self.stop_heartbeat_event.set()
aid = array[0]
if aid.startswith('edgeid'):
return self.interaction_detailContent(aid)
elif aid.startswith('list'):
return self.series_detailContent(aid)
self.detailContent_args = {}
if aid.startswith('https://b23.tv/'):
try:
r = requests_get(url=aid, headers=self.header, allow_redirects=False)
url = r.headers['Location'].split('?')[0].split('/')
while url[-1] == '':
url.pop(-1)
aid = url[-1]
if not aid.startswith('BV', 0, 2):
return {}
except:
return {}
id = mlid = urlargs = ''
self.get_vod_hot_reply_event.set()
if aid.startswith('setting'):
aid = aid.split('_')
if aid[1] == 'tab&filter':
return self.setting_tab_filter_detailContent()
elif aid[1] == 'liveExtra':
return self.setting_liveExtra_detailContent()
elif aid[1] == 'login':
key = aid[2]
return self.setting_login_detailContent(key)
elif aid.startswith('av') or aid.startswith('BV'):
for i in aid.split('_'):
if i.startswith('av'):
id = i.replace('av', '', 1)
urlargs = 'aid=' + str(id)
elif i.startswith('BV'):
id = i
urlargs = 'bvid=' + id
elif i.startswith('mlid'):
mlid = i.replace('mlid', '', 1)
#获取热门评论
if self.userConfig['show_vod_hot_reply']:
self.detailContent_args['Reply'] = ''
self.get_vod_hot_reply_event.clear()
self.pool.submit(self.get_vod_hot_reply, id)
elif 'up' in aid:
return self.up_detailContent(array)
elif 'ss' in aid or 'ep' in aid:
return self.ysContent(array)
elif aid.isdigit():
return self.live_detailContent(array)
relation = self.pool.submit(self.get_vod_relation, urlargs)
url = 'https://api.bilibili.com/x/web-interface/view/detail?' + urlargs
rsp = self._get_sth(url, 'fake')
jRoot = json.loads(rsp.text)
if jRoot['code'] != 0:
return {}
jo = jRoot['data']['View']
redirect_url = jo.get('redirect_url', '')
if 'bangumi' in redirect_url:
ep_id = self.find_bangumi_id(redirect_url)
new_array = []
for i in array:
new_array.append(i)
new_array[0] = ep_id
return self.ysContent(new_array)
self.detailContent_args['mid'] = up_mid = str(jo['owner']['mid'])
self.detailContent_args['aid'] = aid = jo.get('aid')
self.pool.submit(self.get_up_info, mid=up_mid, data=jRoot['data'].get('Card'))
#相关合集
ugc_season = jo.get('ugc_season')
if ugc_season:
self.detailContent_args['season_title'] = ugc_season['title']
sections = ugc_season['sections']
sections_len = len(sections)
ugc_season_task = []
for section in sections:
t = self.pool.submit(self.get_ugc_season, section, sections_len)
ugc_season_task.append(t)
#相关推荐
jo_Related = jRoot['data'].get('Related')
#正片
pages = jo['pages']
title = jo['title'].replace("", "").replace("", "")
pic = jo['pic']
desc = jo['desc'].strip()
typeName = jo['tname']
date = time.strftime("%Y%m%d", time.localtime(jo['pubdate'])) # 投稿时间本地年月日表示
stat = jo['stat']
# 演员项展示视频状态,包括以下内容:
remark = []
remark.append('▶' + self.zh(stat['view']))
remark.append('💬' + self.zh(stat['danmaku']))
remark.append('👍' + self.zh(stat['like']))
remark.append('💰' + self.zh(stat['coin']))
remark.append('⭐' + self.zh(stat['favorite']))
_is_stein_gate = jo['rights'].get('is_stein_gate', 0)
vod = {
"vod_id": 'av' + str(aid),
"vod_name": title,
"vod_pic": pic,
"type_name": typeName,
"vod_year": date,
"vod_content": desc
}
if self.isFongmi:
vod['vod_remarks'] = " ".join(remark)
vod_tags = ', '.join(sorted(map(lambda x: '[a=cr:{"id": "' + x['tag_name'].replace('"', '\\"') + '_clicklink","name":"' + x['tag_name'].replace('"', '\\"') + '"}/]' + x['tag_name'] + '[/a]', jRoot['data'].get('Tags', [])), key=len))
vod['vod_content'] = vod_tags + ' \n' + desc
self.userConfig['show_vod_hot_reply'] = False
else:
vod['vod_actor'] = " ".join(remark)
vod['vod_area'] = "bilidanmu"
secondP = []
if self.userid:
#做点什么
follow = '➕关注$1_notplay_follow'
unfollow = '➖取关$2_notplay_follow'
like = '👍点赞$1_notplay_like'
unlike = '👍🏻取消点赞$2_notplay_like'
coin1 = '👍💰投币$1_notplay_coin'
coin2 = '👍💰💰$2_notplay_coin'
triple = '👍💰⭐三连$notplay_triple'
secondPList = [follow, triple, like, coin1, coin2, unfollow, unlike]
if mlid:
favdel = f"☆取消收藏${mlid}_del_notplay_fav"
secondPList.append(favdel)
for fav in self.userConfig.get("fav_list", []):
folder = fav['n'].replace("#", "﹟").replace("$", "﹩")
ids = fav['v']
fav = '⭐{}${}_add_notplay_fav'.format(folder, ids)
secondPList.append(fav)
defaultQn = int(self.userConfig['vodDefaultQn'])
if defaultQn > 116:
secondPList.append('⚠️限高1080$116_notplay_vodTMPQn')
secondP = ['#'.join(secondPList)]
AllPt = []
AllPu = []
if pages:
AllPt = ['B站']
if _is_stein_gate:
AllPt = ['互动视频【快搜继续】']
AllPu = ['#'.join(self.pool.map(self.get_normal_episodes, pages))]
if secondP:
AllPt.append('做点什么')
AllPu.extend(secondP)
if jo_Related:
AllPt.append('相关推荐')
AllPu.append('#'.join(self.pool.map(self.get_normal_episodes, jo_Related)))
if self.userConfig['show_vod_hot_reply']:
self.get_vod_hot_reply_event.wait()
replyList = self.detailContent_args.get('Reply', '')
if replyList:
AllPt.append('热门评论')
AllPu.extend([replyList])
if ugc_season:
for t in as_completed(ugc_season_task):
AllPt.append(t.result()[0])
AllPu.append(t.result()[1])
vod['vod_play_from'] = "$$$".join(AllPt)
vod['vod_play_url'] = "$$$".join(AllPu)
#视频关系
vod['vod_director'] = '🆙 ' + self.up_info[up_mid]['crname'] + ' 👥 ' + self.up_info[up_mid]['fans'] + ' ' + ' '.join(relation.result())
#互动视频套用
if _is_stein_gate:
self.detailContent_args['AllPt'] = AllPt.copy()
self.detailContent_args['AllPu'] = AllPu.copy()
self.detailContent_args['vod_list'] = vod.copy()
result = {
'list': [
vod
]
}
return result
def series_detailContent(self, array):
mid, type, sid = array.replace('list_', '').split('_')
pg = 1
ps = 99
vod = {"vod_id": array, "vod_area": "bilidanmu", "vod_play_from": "B站"}
urlL = []
while True:
if type == 'season':
url = 'https://api.bilibili.com/x/polymer/web-space/seasons_archives_list?mid=%s&season_id=%s&page_num=%s&page_size=%s' % (mid, sid, pg, ps)
else:
url = 'https://api.bilibili.com/x/series/archives?mid=%s&series_id=%s&pn=%s&ps=%s' % (mid, sid, pg, ps)
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
data = jo.get('data')
if not vod.get("vod_name"):
if data.get('meta'):
vod["vod_name"] = data['meta']['name']
vod["vod_pic"] = data['meta']['cover']
vod["vod_content"] = data['meta']['description']
else:
vod["vod_name"] = data['archives'][0]['title']
playUrl = '#'.join(map(self.get_normal_episodes, data.get('archives')))
urlL.append(playUrl)
total = data['page']['total']
if (ps * pg) >= total:
break
pg += 1
vod['vod_play_url'] = '#'.join(urlL)
result = {
'list': [
vod
]
}
return result
def interaction_detailContent(self, array=''):
array = array.split('_')
cid = edgeid = 0
for i in array:
if i.startswith('edgeid'):
edgeid = i.replace('edgeid', '')
elif i.startswith('cid'):
cid = i.replace('cid', '')
aid = self.detailContent_args.get('aid')
graph_version = self.detailContent_args.get('graph_version')
url = 'https://api.bilibili.com/x/stein/edgeinfo_v2?aid={0}&graph_version={1}&edge_id={2}'.format(aid, graph_version, edgeid)
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
data = jo.get('data')
result = {}
if data:
questions = data['edges'].get('questions', [])
choice_lis = []
for question in questions:
q_title = str(question.get('title', ''))
if q_title:
q_title += ' '
for choice in question.get('choices', []):
c_edgeid = str(choice['id'])
c_cid = str(choice['cid'])
option = str(choice.get('option', ''))
choice_lis.append({
"vod_id": 'edgeid' + c_edgeid + '_' + 'cid' + c_cid,
"vod_name": '互动:' + q_title + option,
})
self.detailContent_args['interaction'] = choice_lis.copy()
if edgeid:
AllPt = self.detailContent_args['AllPt'].copy()
if not choice_lis:
AllPt[0] = '互动视频'
AllPu = self.detailContent_args['AllPu'].copy()
title = str(data['title']).replace("#", "﹟").replace("$", "﹩")
url = '{0}${1}_{2}'.format(title, aid, cid)
AllPu[0] = url
vod = self.detailContent_args['vod_list'].copy()
vod['vod_play_from'] = "$$$".join(AllPt)
vod['vod_play_url'] = "$$$".join(AllPu)
result['list'] = [vod]
return result
def up_detailContent(self, array):
self.detailContent_args['mid'] = mid = array[0].replace('up', '')
self.get_up_info_event.clear()
self.pool.submit(self.get_up_info, mid)
first = '是否关注$ '
follow = '关注$1_notplay_follow'
unfollow = '取消关注$2_notplay_follow'
qqfollow = '悄悄关注$3_notplay_follow'
spfollow = '特别关注$-10_notplay_special_follow'
unspfollow = '取消特别关注$0_notplay_special_follow'
doWhat = [first, follow, qqfollow, spfollow, unfollow, unspfollow]
doWhat = '#'.join(doWhat)
self.get_up_info_event.wait()
up_info = self.up_info[mid]
vod = {
"vod_id": 'up' + str(mid),
"vod_name": up_info['name'] + " 个人主页",
"vod_pic": up_info['face'],
"vod_director": '🆙 ' + up_info['name'] + " " + up_info['following'] + ' UID:' + str(mid),
"vod_content": up_info['desc'],
'vod_play_from': '关注TA$$$视频投稿在动态标签——筛选——上个UP,选择后查看',
'vod_play_url': doWhat
}
remark = "👥 " + up_info['fans'] + " 🎬 " + up_info['vod_count'] + " 👍 " + up_info['like_num']
if self.isFongmi:
vod["vod_remarks"] = remark
tabfilter = self.config['filter'].get('动态')
if not tabfilter:
tabfilter = self.config['filter'].get('UP', [])
vod["vod_actor"] = ' '.join(map(lambda x: '[a=cr:{"id": "' + str(mid) + '_' + x['v'] +'_getupvideos","name": "' + up_info['name'].replace('"', '\\"') + ' ' + x['n'] + '"}/]' + x['n'] + '[/a]', tabfilter[-1]['value']))
else:
vod["vod_actor"] = remark
result = {
'list': [
vod
]
}
return result
def setting_login_detailContent(self, key):
cookie_dic_tmp = self.cookie_dic_tmp.get(key, '')
message = ''
if not cookie_dic_tmp:
message = self.get_cookies(key)
if message:
message = f"【{message}】通过手机客户端扫码确认登录后点击相应按钮设置账号"
else:
message = '【已扫码并确认登录】请点击相应按钮设置当前获取的账号为:'
vod = {
"vod_name": "登录与设置",
"vod_content": '通过手机客户端扫码并确认登录后,点击相应按钮设置cookie,设置后不需要管嗅探结果,直接返回二维码页面刷新,查看是否显示已登录,已登录即可重新打开APP以加载全部标签',
}
vod_play_from = ['登录$$$退出登录']
vod_play_url = []
first = message + '$ '
login = '设置为主账号,动态收藏关注等内容源于此$' + str(key) + '_master_login_setting'
login_vip = '设置为备用的VIP账号,仅用于播放会员番剧$' + str(key) + '_vip_login_setting'
vod_play_url.append('#'.join([first, login, login_vip]))
second = '点击相应按钮退出账号>>>$ '
logout = '退出主账号$master_logout_setting'
logout_vip = '退出备用的VIP账号$vip_logout_setting'
vod_play_url.append('#'.join([second, logout, logout_vip]))
cate_lis = [{
'f': '主页站点推荐栏',
'c': 'maxHomeVideoContent',
'd': {
'3': '3图',
'4': '4图',
'6': '6图',
'8': '8图',
'10': '10图',
}
},{
'f': '视频画质',
'c': 'vodDefaultQn',
'd': self.vod_qn_id
},{
'f': '视频编码',
'c': 'vodDefaultCodec',
'd': self.vod_codec_id
},{
'f': '音频码率',
'c': 'vodDefaultAudio',
'd': self.vod_audio_id
},{
'f': '收藏默认显示',
'c': 'favMode',
'd': {
'0': '默认收藏夹',
'1': '追番',
'2': '追剧',
}
},{
'f': '上传播放进度',
'c': 'heartbeatInterval',
'd': {
'0': '关',
'15': '开',
}
},{
'f': '直播筛选细化',
'c': 'showLiveFilterTag',
'd': {
'0': '关',
'1': '开',
}
}]
for cate in cate_lis:
vod_play_from.append(cate['f'])
defaultConfig = cate['d'][str(int(self.userConfig[cate['c']]))]
if 'vodDefaultAudio' == cate['c']:
defaultConfig = str(defaultConfig).replace('000', 'k')
url = ['当前:' + defaultConfig + '$ ']
for id, name in cate['d'].items():
if 'vodDefaultAudio' == cate['c']:
name = str(name).replace('000', 'k')
url.append(name + '$' + str(id) + '_' + cate['c'] + '_setting')
vod_play_url.append('#'.join(url))
vod['vod_play_from'] = '$$$'.join(vod_play_from)
vod['vod_play_url'] = '$$$'.join(vod_play_url)
result = {
'list': [
vod
]
}
return result
def setting_tab_filter_detailContent(self):
vod = {
"vod_name": "标签与筛选",
"vod_content": '依次点击各标签,同一标签第一次点击为添加,第二次删除,可以返回到二维码页后重进本页查看预览,最后点击保存,未选择的将追加到末尾,如果未保存就重启app,将丢失未保存的配置',
}
vod_play_from = []
vod_play_url = []
cate_lis = [
{'n': 'cateManual', 'v': '标签'},
{'n': 'tuijianLis', 'v': '推荐[分区]'},
{'n': 'rankingLis', 'v': '推荐[排行榜]'},
{'n': 'cateManualLive', 'v': '直播'},
]
for cate in cate_lis:
_List = cate['n']
vod_play_from.append(cate['v'])
List_tmp = self.userConfig.get(str(_List) + '_tmp', [])
status = ''
if List_tmp:
status = '【未保存】'
else:
List_tmp = self.userConfig.get(_List, [])
if not List_tmp:
List_tmp = self.defaultConfig.get(_List)
if List_tmp and type(List_tmp[0]) == dict:
List_tmp = list(map(lambda x:x['n'], List_tmp))
url = ['当前: ' + ','.join(List_tmp) + '$ ', f"{status}点击这里保存$_{_List}_save_setting", f"点击这里恢复默认并保存$_{_List}_clear_setting"]
defaultConfig = self.defaultConfig[_List].copy()
if _List == 'cateManual' and not 'UP' in defaultConfig:
defaultConfig.append('UP')
elif _List == 'cateManualLive':
extra_live_filter = self.userConfig.get('cateManualLiveExtra', [])
defaultConfig.extend(extra_live_filter.copy())
for name in defaultConfig:
value = str(name)
if type(name) == dict:
value = name['n'] + '@@@' + name['v'].replace('_', '@@@')
name = name['n']
url.append(f"{name}${value}_{_List}_setting")
vod_play_url.append('#'.join(url))
vod['vod_play_from'] = '$$$'.join(vod_play_from)
vod['vod_play_url'] = '$$$'.join(vod_play_url)
result = {
'list': [
vod
]
}
return result
def setting_liveExtra_detailContent(self):
vod = {
"vod_name": "查看直播细化标签",
"vod_content": '点击想要添加的标签,同一标签第一次点击为添加,第二次删除,完成后在[标签与筛选]页继续操作,以添加到直播筛选分区列中',
}
vod_play_from = ['已添加']
cateManualLiveExtra = self.userConfig.get('cateManualLiveExtra', [])
vod_play_url = ['点击相应标签(只)可以删除$ #清空$clear_liveFilter_setting']
for name in cateManualLiveExtra:
value = name['v']
name = name['n']
vod_play_url.append(name + '$' + 'del_' + name + '_' + value + '_liveFilter_setting')
vod_play_url = ['#'.join(vod_play_url)]
cateLive = self.userConfig.get('cateLive', {})
for parent, parent_dic in cateLive.items():
area_dic = parent_dic['value']['value']
if len(area_dic) == 1:
continue
vod_play_from.append(parent)
url = []
for area in area_dic:
name = str(area['n']).replace('_', '-').replace("#", "﹟").replace("$", "﹩")
id = str(area['v']).replace('_', '@@@').replace("#", "﹟").replace("$", "﹩")
url.append(name + '$add_' + name + '_' + id + '_liveFilter_setting')
vod_play_url.append('#'.join(url))
vod['vod_play_from'] = '$$$'.join(vod_play_from)
vod['vod_play_url'] = '$$$'.join(vod_play_url)
result = {
'list': [
vod
]
}
return result
def get_all_season(self, season):
season_id = str(season['season_id'])
season_title = season['season_title']
if season_id == self.detailContent_args['ssid']:
self.detailContent_args['s_title'] = season_title
pic = season['cover']
remark = season['new_ep']['index_show']
result = {
"vod_id": season_id + 'ss',
"vod_name": '系列:' + season_title,
"vod_pic": self.format_img(pic),
"vod_remarks": remark}
return result
def get_bangumi_section(self, section):
sec_title = section['title'].replace("#", "﹟").replace("$", "﹩")
sec_type = section['type']
if sec_type in [1, 2] and len(section['episode_ids']) == 0:
episodes = section['episodes']
playUrl = '#'.join(map(lambda x: self.get_normal_episodes(x)[0], episodes))
return (sec_title, playUrl)
def ysContent(self, array):
aid = array[0]
if 'ep' in aid:
self.detailContent_args['fromep'] = aid
aid = 'ep_id=' + aid.replace('ep', '')
elif 'ss' in aid:
aid = 'season_id=' + aid.replace('ss', '')
url = "https://api.bilibili.com/pgc/view/web/season?{0}".format(aid)
rsp = self._get_sth(url, 'fake')
jRoot = json.loads(rsp.text)
jo = jRoot['result']
self.detailContent_args['ssid'] = str(jo['season_id'])
title = jo['title']
self.detailContent_args['s_title'] = jo['season_title']
self.detailContent_args['title_type'] = '集'
if jo['type'] in [1, 4]:
self.detailContent_args['title_type'] = '话'
#添加系列到搜索
seasons = jo.get('seasons')
if len(seasons) == 1:
self.detailContent_args['s_title'] = seasons[0]['season_title']
seasons = 0
else:
self.detailContent_args['seasons'] = list(self.pool.map(self.get_all_season, seasons))
#获取正片
episodes = jo.get('episodes')
#获取花絮
section_task = []
for s in jo.get('section', []):
if s:
t = self.pool.submit(self.get_bangumi_section, s)
section_task.append(t)
pic = jo['cover']
typeName = jo['share_sub_title']
date = jo['publish']['pub_time'][0:4]
dec = jo['evaluate']
remark = jo['new_ep']['desc']
stat = jo['stat']
# 演员和导演框展示视频状态,包括以下内容:
status = "▶" + self.zh(stat['views']) + " ❤" + self.zh(stat['favorites'])
vod = {
"vod_id": 'ss' + self.detailContent_args['ssid'],
"vod_name": title,
"vod_pic": pic,
"type_name": typeName,
"vod_year": date,
"vod_actor": status,
"vod_content": dec
}
if self.isFongmi:
if seasons:
remark += ' [a=cr:{"id": "_getbangumiseasons","name": "' + title.replace('"', '\\"') + '"}/]更多系列[/a]'
seasons = 0
if 'rating' in jo:
remark = str(jo['rating']['score']) + '分 ' + remark
self.userConfig['show_vod_hot_reply'] = False
else:
vod["vod_area"] = "bilidanmu"
vod["vod_remarks"] = remark
ZhuiPf = []
ZhuiPu = []
if self.userid:
ZhuiPf = ['做点什么']
ZhuiPu = '是否追番剧$ #❤追番剧$add_notplay_zhui#💔取消追番剧$del_notplay_zhui'
defaultQn = int(self.userConfig['vodDefaultQn'])
if defaultQn > 116:
ZhuiPu += '#⚠️限高1080$116_notplay_vodTMPQn'
ZhuiPu = [ZhuiPu]
if seasons:
ZhuiPf.append('更多系列')
ZhuiPu.append('更多系列在快速搜索中查看$ #')
FirstPf = []
FirstPu = []
PreviewPf = []
PreviewPu = []
ParsePf = []
ParsePu = []
if episodes:
for x, y in self.pool.map(self.get_normal_episodes, episodes):
if y:
FirstPu.append(x)
ParsePu.append(y)
else:
PreviewPu.append(x)
if FirstPu:
FirstPf = [self.detailContent_args['s_title']]
FirstPu = ['#'.join(FirstPu)]
if PreviewPu:
PreviewPf = ['预告']
PreviewPu = ['#'.join(PreviewPu)]
if not self.detailContent_args.get('parse'):
ParsePu = []
if ParsePu:
ParsePf = [str(self.detailContent_args['s_title']) + '【解析】']
ParsePu = ['#'.join(ParsePu)]
fromL = ParsePf + FirstPf + PreviewPf
urlL = ParsePu + FirstPu + PreviewPu
for t in as_completed(section_task):
s = t.result()
if s:
fromL.append(s[0])
urlL.append(s[1])
fromep = self.detailContent_args.get('fromep', '')
if '_' in fromep:
fromL = ['B站'] + fromL
urlL = [fromep] + urlL
if self.userConfig['show_vod_hot_reply']:
self.get_vod_hot_reply_event.wait()
ReplyPu = self.detailContent_args.get('Reply', '')
if ReplyPu:
ZhuiPf.append('热门评论')
ZhuiPu.append(ReplyPu)
fromL.insert(1, '$$$'.join(ZhuiPf))
urlL.insert(1, '$$$'.join(ZhuiPu))
vod['vod_play_from'] = '$$$'.join(fromL)
vod['vod_play_url'] = '$$$'.join(urlL)
result = {
'list': [
vod
]
}
return result
def get_live_api2_playurl(self, room_id):
playFrom = []
playUrl = []
url = 'https://api.live.bilibili.com/xlive/web-room/v2/index/getRoomPlayInfo?room_id={0}&no_playurl=0&mask=1&qn=0&platform=web&protocol=0,1&format=0,1,2&codec=0,1&dolby=5&panorama=1'.format(room_id)
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
if jo['code'] == 0:
playurl_info = jo['data'].get('playurl_info', '')
if playurl_info:
stream = playurl_info['playurl']['stream']
liveDic = {
'codec': {'avc': '0', 'hevc': '1'},
'format': {'flv': '0', 'ts': '1', 'fmp4': '2'},
}
liveDic['qn'] = dict(self.pool.map(lambda x:(x['qn'], x['desc']), playurl_info['playurl']['g_qn_desc']))
vodList = []
for i in stream:
vodList.extend(i['format'])
api2_playUrl = {}
for v in vodList:
format = str(v.get('format_name'))
for c in v['codec']:
codec = str(c.get('codec_name'))
accept_qn = c.get('accept_qn')
for qn in accept_qn:
url = format + '_' + codec + '$liveapi2_' + str(qn) + '_' + liveDic['format'][format] + '_' + liveDic['codec'][codec] + '_' + str(room_id)
if not api2_playUrl.get(liveDic['qn'][qn]):
api2_playUrl[liveDic['qn'][qn]] = []
api2_playUrl[liveDic['qn'][qn]].append(url)
for key, value in api2_playUrl.items():
playFrom.append(key)
playUrl.append('#'.join(value))
result = {'From': playFrom, 'url': playUrl}
return result
def live_detailContent(self, array):
room_id = array[0]
get_live_api2_playurl = self.pool.submit(self.get_live_api2_playurl, room_id)
url = "https://api.live.bilibili.com/room/v1/Room/get_info?room_id=" + str(room_id)
rsp = self._get_sth(url, 'fake')
jRoot = json.loads(rsp.text)
result = {}
if jRoot.get('code') == 0:
jo = jRoot['data']
self.detailContent_args['mid'] = mid = str(jo["uid"])
self.get_up_info_event.clear()
self.pool.submit(self.get_up_info, mid)
title = jo['title'].replace("", "").replace("", "")
pic = jo.get("user_cover")
desc = jo.get('description')
typeName = jo.get('parent_area_name') + '--' + jo.get('area_name')
live_status = jo.get('live_status', '')
if live_status:
live_status = "开播时间:" + jo.get('live_time').replace('-', '.')
else:
live_status = "未开播"
vod = {
"vod_id": room_id,
"vod_name": title,
"vod_pic": pic,
"type_name": typeName,
"vod_content": desc,
}
remark = "房间号:" + room_id + " " + live_status
if self.isFongmi:
vod["vod_remarks"] = remark
else:
vod["vod_actor"] = remark
vod["vod_area"] = "bililivedanmu"
secondPFrom = ''
secondP = ''
if self.userid:
secondPFrom = '关注Ta'
first = '是否关注$ '
follow = '➕关注$1_notplay_follow'
unfollow = '➖取关$2_notplay_follow'
secondPList = [first, follow, unfollow]
secondP = '#'.join(secondPList)
playFrom = get_live_api2_playurl.result().get('From', [])
playUrl = get_live_api2_playurl.result().get('url', [])
if playFrom:
api1_playFrom = 'API_1'
api1_playUrl = 'flv线路原画$platform=web&quality=4_' + room_id + '#flv线路高清$platform=web&quality=3_' + room_id + '#h5线路原画$platform=h5&quality=4_' + room_id + '#h5线路高清$platform=h5&quality=3_' + room_id
playFrom.append(api1_playFrom)
playUrl.append(api1_playUrl)
if secondPFrom:
playFrom.insert(1, secondPFrom)
playUrl.insert(1, secondP)
vod['vod_play_from'] = '$$$'.join(playFrom)
vod['vod_play_url'] = '$$$'.join(playUrl)
self.get_up_info_event.wait()
up_info = self.up_info[mid]
vod["vod_director"] = '🆙 ' + up_info['crname'] + " 👥 " + self.zh(jo.get('attention')) + ' ' + up_info['following']
result['list'] = [vod]
return result
search_key = ''
def searchContent(self, key, quick):
if not self.session_fake.cookies:
self.pool.submit(self.getFakeCookie, True)
for t in self.task_pool:
t.cancel()
self.task_pool = []
self.search_key = key
mid = self.detailContent_args.get('mid', '')
if quick and mid:
get_up_videos = self.pool.submit(self.get_up_videos, mid, 1, 'quicksearch')
types = {'video': '','media_bangumi': '番剧: ', 'media_ft': '影视: ', 'bili_user': '用户: ', 'live': '直播: '}
for type, value in types.items():
t = self.pool.submit(self.get_search_content, key = key, pg = value, duration_diff = 0, order = '', type = type, ps = self.userConfig['page_size'])
self.task_pool.append(t)
result = {'list': []}
for t in as_completed(self.task_pool):
res = t.result().get('list', [])
result['list'].extend(res)
self.task_pool.remove(t)
if quick:
if not self.isFongmi:
if mid:
result['list'] = self.detailContent_args.get('interaction', []) + get_up_videos.result().get('list', []) + self.detailContent_args.get('Reply_jump', []) + result['list']
else:
result['list'] = self.detailContent_args.get('seasons', []) + result['list']
return result
stop_heartbeat_event = threading.Event()
def start_heartbeat(self, aid, cid, ids):
duration = ssid = epid = ''
for i in ids:
if 'ss' in i:
ssid = i.replace('ss', '')
elif 'ep' in i:
epid = i.replace('ep', '')
elif 'dur' in i:
duration = int(i.replace('dur', ''))
url = 'https://api.bilibili.com/x/player/v2?aid={0}&cid={1}'.format(aid, cid)
rsp = self._get_sth(url)
jo = json.loads(rsp.text)
data = jo.get('data',{})
interaction = data.get('interaction', {})
if interaction.get('graph_version'):
graph_version = interaction.get('graph_version')
old = self.detailContent_args.get('graph_version')
if old != graph_version:
self.detailContent_args['graph_version'] = graph_version
self.pool.submit(self.interaction_detailContent)
heartbeatInterval = int(self.userConfig['heartbeatInterval'])
if not self.userid or not heartbeatInterval:
return
if not duration:
url = 'https://api.bilibili.com/x/web-interface/view?aid={0}&cid={1}'.format(aid, cid)
rsp = self._get_sth(url, 'fake')
jRoot = json.loads(rsp.text)
duration = jRoot['data']['duration']
played_time = 0
if int(data.get('last_play_cid', 0)) == int(cid):
last_play_time = int(data.get('last_play_time'))
if last_play_time > 0:
played_time = int(last_play_time / 1000)
heartbeat_times = int((duration - played_time) / heartbeatInterval) + 1
url = 'https://api.bilibili.com/x/click-interface/web/heartbeat'
data = {'aid': str(aid), 'cid': str(cid), 'csrf': str(self.csrf)}
if ssid:
data['sid'] = str(ssid)
data['epid'] = str(epid)
data['type'] = '4'
heartbeat_count = 0
self.stop_heartbeat_event.clear()
while True:
if heartbeat_count == heartbeatInterval or self.stop_heartbeat_event.is_set():
played_time += heartbeat_count
heartbeat_count = 0
if not heartbeat_count:
heartbeat_times -= 1
if not heartbeat_times:
#播完为-1
played_time = -1
self.stop_heartbeat_event.set()
data['played_time'] = str(played_time)
self.pool.submit(self._post_sth, url=url, data=data)
if self.stop_heartbeat_event.is_set():
break
time.sleep(1)
heartbeat_count += 1
wbi_key = {}
def get_wbiKey(self, wts):
r = self.fetch("https://api.bilibili.com/x/web-interface/nav", headers=self.header)
wbi_img_url = r.json()['data']['wbi_img']['img_url']
wbi_sub_url = r.json()['data']['wbi_img']['sub_url']
oe = [46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12,
38, 41, 13, 37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60, 51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62,
11, 36, 20, 34, 44, 52]
ae = wbi_img_url.split("/")[-1].split(".")[0] + wbi_sub_url.split("/")[-1].split(".")[0]
le = reduce(lambda s, i: s + ae[i], oe, "")[:32]
self.wbi_key = {
"key": le,
"wts": wts
}
def encrypt_wbi(self, **params):
wts = round(time.time())
if not self.wbi_key or abs(self.wbi_key['wts']) < 120:
self.get_wbiKey(wts)
params["wts"] = wts
params["dm_img_list"] = []
dm_rand = ['QQ','Qg','Qw','RA','RQ']
params["dm_img_str"] = random.choice(dm_rand)
params["dm_cover_img_str"] = random.choice(dm_rand)
params = dict(sorted(params.items()))
params = {k : ''.join(filter(lambda chr: chr not in "!'()*", str(v))) for k, v in params.items()}
Ae = urlencode(params)
return Ae + "&w_rid=" + hashlib.md5((Ae + self.wbi_key['key']).encode(encoding='utf-8')).hexdigest()
def _get_sth(self, url, _type='master'):
if _type == 'vip' and self.session_vip.cookies:
rsp = self.session_vip.get(url, headers=self.header)
elif _type == 'fake':
if not self.session_fake.cookies:
self.getFakeCookie_event.wait()
rsp = self.session_fake.get(url, headers=self.header)
else:
rsp = self.session_master.get(url, headers=self.header)
return rsp
def _post_sth(self, url, data):
return self.session_master.post(url, headers=self.header, data=data)
def post_live_history(self, room_id):
data = {'room_id': str(room_id), 'platform': 'pc', 'csrf': str(self.csrf)}
url = 'https://api.live.bilibili.com/xlive/web-room/v1/index/roomEntryAction'
self._post_sth(url=url, data=data)
def do_notplay(self, ids):
aid = self.detailContent_args.get('aid')
mid = self.detailContent_args.get('mid')
ssid = self.detailContent_args.get('ssid')
data = {'csrf': str(self.csrf)}
url = ''
if 'vodTMPQn' in ids:
self.detailContent_args['vodTMPQn'] = str(ids[0])
return
elif 'follow' in ids:
if 'special' in ids:
data.update({'fids': str(mid), 'tagids': str(ids[0])})
url = 'https://api.bilibili.com/x/relation/tags/addUsers'
else:
data.update({'fid': str(mid), 'act': str(ids[0])})
url = 'https://api.bilibili.com/x/relation/modify'
elif 'zhui' in ids:
data.update({'season_id': str(ssid)})
url = 'https://api.bilibili.com/pgc/web/follow/' + str(ids[0])
elif 'like' in ids:
data.update({'aid': str(aid), 'like': str(ids[0])})
url = 'https://api.bilibili.com/x/web-interface/archive/like'
elif 'coin' in ids:
data.update({'aid': str(aid), 'multiply': str(ids[0]), 'select_like': '1'})
url = 'https://api.bilibili.com/x/web-interface/coin/add'
elif 'fav' in ids:
data.update({'rid': str(aid), 'type': '2'})
data[ids[1] + '_media_ids'] = str(ids[0])
url = 'https://api.bilibili.com/x/v3/fav/resource/deal'
elif 'triple' in ids:
data.update({'aid': str(aid)})
url = 'https://api.bilibili.com/x/web-interface/archive/like/triple'
elif 'reply' in ids:
data.update({'oid': str(ids[0]), 'rpid': str(ids[1]), 'type': '1', 'action': '1'})
url = 'http://api.bilibili.com/x/v2/reply/action'
self._post_sth(url=url, data=data)
def get_cid(self, video):
url = "https://api.bilibili.com/x/web-interface/view?aid=%s" % str(video['aid'])
rsp = self._get_sth(url)
jRoot = json.loads(rsp.text)
jo = jRoot['data']
video['cid'] = jo['cid']
video['duration'] = jo['duration']
if 'redirect_url' in jo and 'bangumi' in jo['redirect_url']:
video['ep'] = self.find_bangumi_id(jo['redirect_url'])
cookie_dic_tmp = {}
def get_cookies(self, key):
url = 'https://passport.bilibili.com/x/passport-login/web/qrcode/poll?qrcode_key=' + key
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
if jo['code'] == 0:
message = jo['data']['message']
if not message:
self.cookie_dic_tmp[key] = dict(self.session_fake.cookies)
self.pool.submit(self.getFakeCookie)
return message
return '网络错误'
def set_cookie(self, key, _type):
cookie_dic_tmp = self.cookie_dic_tmp.get(key, '')
if not cookie_dic_tmp:
message = self.get_cookies(key)
if message:
return
users = self.userConfig.get('users', {})
users[_type] = {'cookies_dic': self.cookie_dic_tmp.get(key, {})}
self.userConfig.update({'users': users})
self.getCookie(_type)
self.dump_config()
def unset_cookie(self, _type):
if _type == 'vip':
self.session_vip.cookies.clear()
else:
self.session_master.cookies = self.session_fake.cookies
self.userid = self.csrf = ''
if _type in self.userConfig.get('users', {}):
self.userConfig['users'].pop(_type)
self.dump_config()
def set_normal_default(self, id, type):
self.userConfig[type] = str(id)
self.dump_config()
def set_normal_cateManual(self, name, _List, action):
List_tmp = self.userConfig.get(str(_List) + '_tmp')
if not List_tmp:
List_tmp = self.userConfig[str(_List) + '_tmp'] = []
if action == 'save':
for _item in self.defaultConfig[_List]:
if not _item in List_tmp.copy():
self.userConfig[str(_List) + '_tmp'].append(_item)
self.userConfig[_List] = self.userConfig[str(_List) + '_tmp'].copy()
self.userConfig.pop(_List + '_tmp')
self.dump_config()
elif action == 'clear':
self.userConfig[_List] = self.defaultConfig[_List].copy()
self.userConfig.pop(str(_List) + '_tmp')
self.dump_config()
else:
if _List == 'cateManualLive':
name = name.split('@@@')
if len(name) == 3:
name[1] += '_' + str(name[2])
name = {'n': name[0], 'v': str(name[1])}
if name in List_tmp:
self.userConfig[str(_List) + '_tmp'].remove(name)
else:
self.userConfig[str(_List) + '_tmp'].append(name)
def add_cateManualLiveExtra(self, action, name, id):
_Extra = self.userConfig.get('cateManualLiveExtra', [])
if not _Extra:
_Extra = self.userConfig['cateManualLiveExtra'] = []
if action == 'clear':
for _ext in _Extra:
_ext['v'] = _ext['v'].replace('@@@', '_')
if _ext in self.userConfig.get('cateManualLive', []):
self.userConfig['cateManualLive'].remove(_ext)
if _ext in self.userConfig.get('cateManualLive_tmp', []):
self.userConfig['cateManualLive_tmp'].remove(_ext)
self.userConfig.pop('cateManualLiveExtra')
elif id in list(map(lambda x:x['v'], self.userConfig.get('cateManualLiveExtra', []))):
area_dict = {'n': name, 'v': id}
self.userConfig['cateManualLiveExtra'].remove(area_dict)
area_dict['v'] = id.replace('@@@', '_')
if area_dict in self.userConfig.get('cateManualLive', []):
self.userConfig['cateManualLive'].remove(area_dict)
if area_dict in self.userConfig.get('cateManualLive_tmp', []):
self.userConfig['cateManualLive_tmp'].remove(area_dict)
else:
area_dict = {'n': name, 'v': id}
self.userConfig['cateManualLiveExtra'].append(area_dict)
self.dump_config()
vod_qn_id = {
'127': "8K",
'126': "杜比视界",
'125': "HDR",
'120': "4K",
'116': "1080P60帧",
'112': "1080P+",
'80': "1080P",
'64': "720P",
}
vod_codec_id = {
'7': 'avc',
'12': 'hevc',
'13': 'av1',
}
vod_audio_id = {
'30280': '192000',
'30232': '132000',
'30216': '64000',
}
def get_dash_media(self, video):
qnid = str(video.get('id'))
codecid = video.get('codecid')
media_codecs = video.get('codecs')
media_bandwidth = video.get('bandwidth')
media_startWithSAP = video.get('startWithSap')
media_mimeType = video.get('mimeType')
media_BaseURL = video.get('baseUrl').replace('&', '&')
media_SegmentBase_indexRange = video['SegmentBase'].get('indexRange')
media_SegmentBase_Initialization = video['SegmentBase'].get('Initialization')
mediaType = media_mimeType.split('/')[0]
if mediaType == 'video':
media_frameRate = video.get('frameRate')
media_sar = video.get('sar')
media_width = video.get('width')
media_height = video.get('height')
media_type_params = f"height='{media_height}' width='{media_width}' frameRate='{media_frameRate}' sar='{media_sar}'"
elif mediaType == 'audio':
audioSamplingRate = self.vod_audio_id.get(qnid, '192000')
media_type_params = f"numChannels='2' sampleRate='{audioSamplingRate}'"
if codecid:
qnid += '_' + str(codecid)
result = f"""
{media_BaseURL}
"""
return result
def get_dash_media_list(self, media_lis):
if not media_lis:
return ""
mediaType = media_lis[0]['mimeType'].split('/')[0]
defaultQn = defaultCodec = ''
if mediaType == 'video':
defaultQn = vodTMPQn = self.detailContent_args.get('vodTMPQn', '')
if vodTMPQn:
vodTMPQn = int(vodTMPQn)
else:
defaultQn = str(self.userConfig['vodDefaultQn'])
vodTMPQn = 120
defaultCodec = str(self.userConfig['vodDefaultCodec'])
elif mediaType == 'audio':
defaultQn = str(self.userConfig['vodDefaultAudio'])
vodTMPQn = int(defaultQn)
defaultCodec = '0'
qn_codec = list(map(lambda x: str(x['id']) + '_' + str(x['codecid']), media_lis))
Qn_available_lis = []
#按设定的质量和设定的编码找
if defaultQn + '_' + defaultCodec in qn_codec:
Qn_available_lis.append(media_lis[qn_codec.index(defaultQn + '_' + defaultCodec)])
#按设定的质量找推荐的编码
if not Qn_available_lis and mediaType == 'video':
for c in self.vod_codec_id.keys():
if defaultQn + '_' + str(c) in qn_codec:
Qn_available_lis.append(media_lis[qn_codec.index(defaultQn + '_' + str(c))])
#找4K及以下最高可用画质/音质
if not Qn_available_lis:
qn_top = ''
for q in qn_codec:
q_c = q.split('_')
if qn_top and int(qn_top) > int(q_c[0]):
break
elif mediaType == 'video' and int(q_c[0]) <= vodTMPQn and not qn_top or mediaType == 'audio' and not qn_top or int(q_c[0]) == qn_top:
qn_top = int(q_c[0])
#匹配设定的编码,否则全部
if mediaType == 'video' and str(q_c[1]) == defaultCodec:
Qn_available_lis = [media_lis[qn_codec.index(str(q))]]
break
Qn_available_lis.append(media_lis[qn_codec.index(str(q))])
result = f"""
{''.join(map(self.get_dash_media, Qn_available_lis))}
"""
return result
get_dash_event = threading.Event()
def get_dash(self, ja):
duration = ja.get('duration')
minBufferTime = ja.get('minBufferTime')
video_list = self.pool.submit(self.get_dash_media_list, ja.get('video'))
audio_list = self.pool.submit(self.get_dash_media_list, ja.get('audio'))
mpd = f"""
{video_list.result()}{audio_list.result()}
"""
with open(f"{dirname}/playurl.mpd", 'w', encoding="utf-8") as f:
f.write(mpd)
self.get_dash_event.set()
#time.sleep(3)
#os.remove(f"{dirname}/playurl.mpd")
def get_durl(self, ja):
maxSize = -1
position = -1
for i in range(len(ja)):
tmpJo = ja[i]
if maxSize < int(tmpJo['size']):
maxSize = int(tmpJo['size'])
position = i
url = ''
if len(ja) > 0:
if position == -1:
position = 0
url = ja[position]['url']
return url
def miao(self, m):
m = str(m).partition('.')[2] #取小数部分
if len(m)==0:m = '000' #补齐三位小数
if len(m)==1:m = m + '00'
if len(m)==2:m = m + '0'
return m #返回标准三位的毫秒数
def down_sub(self, url):
rsp = self._get_sth(url, 'fake')
data = json.loads(rsp.text)['body']
srt = ''
for d in data:
f = round(d['from'],3) # 开始时间 (round(n,3)四舍五入为三位小数)
t = round(d['to'],3) # 结束时间
c = d['content'] # 字幕内容
ff = time.strftime("%H:%M:%S",time.gmtime(f)) + ',' + self.miao(f) # 开始时间,秒数转 时:分:秒 格式,加逗号、毫秒修正为三位
tt = time.strftime("%H:%M:%S",time.gmtime(t)) + ',' + self.miao(t) # 结束时间,处理方式同上
srt += '\n' + ff + ' ' + '-->' + ' ' + tt + '\n' + c + '\n\n' # 格式化为Srt字幕
return srt
localProxyUrl = 'http://127.0.0.1:UndCover/proxy'
def get_subs(self, aid, cid):
result = []
query = self.encrypt_wbi(aid=aid, cid=cid)
url = f'https://api.bilibili.com/x/player/wbi/v2?{query}'
rsp = self._get_sth(url, 'master')
jRoot = json.loads(rsp.text)
if jRoot['code'] == 0:
for sub in jRoot['data']['subtitle'].get('subtitles', []):
lanDoc = str(sub.get('lan_doc', ''))
lanUrl = sub.get('subtitle_url')
if lanUrl.startswith('//'):
lanUrl = 'https:' + lanUrl
lanUrl = base64.b64encode(lanUrl.encode('utf-8')).decode('utf-8')
result.append(
{
"url": f"{self.localProxyUrl}&siteType=3&siteKey=py_bilibili&type=subtitle&url={lanUrl}",
"name": lanDoc,
"format": "application/x-subrip"
}
)
return result
def playerContent(self, flag, id, vipFlags):
self.stop_heartbeat_event.set()
result = {'playUrl': '', 'url': ''}
ids = id.split("_")
if 'web' in id or 'liveapi2' == ids[0]:
return self.live_playerContent(flag, id, vipFlags)
if len(ids) < 2:
return result
aid = ids[0]
cid = ids[1]
if 'setting' in ids:
if 'liveFilter' in id:
id = ids[2]
self.add_cateManualLiveExtra(aid, cid, id)
elif cid in ['cateManual', 'cateManualLive', 'tuijianLis', 'rankingLis']:
action = ids[2]
self.set_normal_cateManual(aid, cid, action)
elif 'login' in id:
self.set_cookie(aid, cid)
elif 'logout' in id:
self.unset_cookie(aid)
else:
self.set_normal_default(aid, cid)
return result
elif 'notplay' in ids:
self.pool.submit(self.do_notplay, ids)
return result
elif cid == 'cid' or not cid:
video = {'aid': str(aid)}
self.get_cid(video, )
cid = video['cid']
ids.append('dur' + str(video['duration']))
ep = video.get('ep')
if ep:
id += '_' + ep
ids.append(ep)
query = self.encrypt_wbi(avid=aid, cid=cid, fnval=4048, fnver=0, fourk=1)
url = f'https://api.bilibili.com/x/player/wbi/playurl?{query}'
if 'ep' in id:
if 'parse' in id:
test = list(x for x in map(lambda x: x if 'ep' in x else None, ids) if x is not None)
url = 'https://www.bilibili.com/bangumi/play/' + test[0]
result["url"] = url
result["flag"] = 'bilibili'
result["parse"] = '1'
result['jx'] = '1'
result["header"] = str({"User-Agent": self.header["User-Agent"]})
result["danmaku"] = 'https://api.bilibili.com/x/v1/dm/list.so?oid=' + str(cid)
return result
url = 'https://api.bilibili.com/pgc/player/web/playurl?aid={}&cid={}&fnval=4048&fnver=0&fourk=1'.format(aid, cid)
rsp = self._get_sth(url, 'vip')
jRoot = json.loads(rsp.text)
if jRoot['code'] == 0:
if 'data' in jRoot:
jo = jRoot['data']
elif 'result' in jRoot:
jo = jRoot['result']
else:
return result
else:
return result
ja = jo.get('dash')
if ja:
self.get_dash_event.clear()
get_dash = self.pool.submit(self.get_dash, ja)
self.get_dash_event.wait()
result["url"] = f"{dirname}/playurl.mpd"
else:
result["url"] = self.get_durl(jo.get('durl', {}))
result["parse"] = '0'
result["contentType"] = ''
result["header"] = self.header
result["danmaku"] = 'https://api.bilibili.com/x/v1/dm/list.so?oid=' + str(cid)
if self.isFongmi:
result["subs"] = self.get_subs(aid, cid)
#回传播放记录
self.pool.submit(self.start_heartbeat, aid, cid, ids)
return result
def live_playerContent(self, flag, id, vipFlags):
result = {'playUrl': '', 'url': ''}
ids = id.split("_")
if len(ids) < 2:
return result
# 回传观看直播记录
if self.userid and int(self.userConfig['heartbeatInterval']) > 0:
self.pool.submit(self.post_live_history, ids[-1])
if ids[0] == 'liveapi2':
qn = int(ids[1])
format = int(ids[2])
codec = int(ids[3])
room_id = int(ids[-1])
url = 'https://api.live.bilibili.com/xlive/web-room/v2/index/getRoomPlayInfo?room_id={0}&protocol=0,1&format={1}&codec={2}&qn={3}&ptype=8&platform=web&dolby=5&panorama=1&no_playurl=0&mask=1'.format(room_id, format, codec, qn)
rsp = self._get_sth(url, 'fake')
jo = json.loads(rsp.text)
if jo['code'] == 0:
try:
playurl = jo['data']['playurl_info'].get('playurl')
codec = playurl['stream'][0]['format'][0]['codec'][0]
except:
return result
base_url = str(codec['base_url'])
host = str(codec['url_info'][0]['host'])
extra = str(codec['url_info'][0]['extra'])
playurl = host + base_url + extra
result["url"] = playurl
if ".flv" in playurl:
result["contentType"] = 'video/x-flv'
else:
result["contentType"] = ''
else:
return result
else:
url = 'https://api.live.bilibili.com/room/v1/Room/playUrl?cid=%s&%s' % (ids[1], ids[0])
# raise Exception(url)
try:
rsp = self._get_sth(url)
except:
return result
jRoot = json.loads(rsp.text)
if jRoot['code'] == 0:
jo = jRoot['data']
ja = jo['durl']
if len(ja) > 0:
result["url"] = ja[0]['url']
if "h5" in ids[0]:
result["contentType"] = ''
else:
result["contentType"] = 'video/x-flv'
else:
return result
result["parse"] = '0'
# result['type'] ="m3u8"
result["header"] = {
"Referer": "https://live.bilibili.com",
"User-Agent": self.header["User-Agent"]
}
return result
config = {
"player": {},
"filter": {
"关注": [{"key": "sort", "name": "分类",
"value": [{"n": "正在直播", "v": "正在直播"}, {"n": "最常访问", "v": "最常访问"},
{"n": "最近关注", "v": "最近关注"}, {"n": "特别关注", "v": "特别关注"},
{"n": "悄悄关注", "v": "悄悄关注"}, {"n": "我的粉丝", "v": "我的粉丝"}]}],
"动态": [{"key": "order", "name": "别人投稿排序",
"value": [{"n": "最新发布", "v": "pubdate"}, {"n": "最多播放", "v": "click"},
{"n": "最多收藏", "v": "stow"}, {"n": "最早发布", "v": "oldest"}, {"n": "合集和列表", "v": "series"}]}, ],
"影视": [{"key": "tid", "name": "分类",
"value": [{"n": "番剧", "v": "1"}, {"n": "国创", "v": "4"}, {"n": "电影", "v": "2"},
{"n": "电视剧", "v": "5"}, {"n": "纪录片", "v": "3"}, {"n": "综艺", "v": "7"}]},
{"key": "order", "name": "排序",
"value": [{"n": "热门", "v": "热门"}, {"n": "播放数量", "v": "2"}, {"n": "更新时间", "v": "0"},
{"n": "最高评分", "v": "4"}, {"n": "弹幕数量", "v": "1"}, {"n": "追看人数", "v": "3"},
{"n": "开播时间", "v": "5"}, {"n": "上映时间", "v": "6"}]},
{"key": "season_status", "name": "付费",
"value": [{"n": "全部", "v": "-1"}, {"n": "免费", "v": "1"},
{"n": "付费", "v": "2%2C6"}, {"n": "大会员", "v": "4%2C6"}]}],
"频道": [{"key": "order", "name": "排序",
"value": [{"n": "近期热门", "v": "hot"}, {"n": "月播放量", "v": "view"},
{"n": "最新投稿", "v": "new"}, {"n": "频道精选", "v": "featured"}, ]}, ],
"收藏": [{"key": "order", "name": "排序",
"value": [{"n": "收藏时间", "v": "mtime"}, {"n": "播放量", "v": "view"},
{"n": "投稿时间", "v": "pubtime"}]}, ],
"历史": [{"key": "type", "name": "分类",
"value": [{"n": "全部", "v": "all"}, {"n": "视频", "v": "archive"}, {"n": "直播", "v": "live"}, {"n": "UP主", "v": "UP主"}, {"n": "稍后再看", "v": "稍后再看"}]}, ],
"搜索": [{"key": "type", "name": "类型",
"value": [{"n": "视频", "v": "video"}, {"n": "番剧", "v": "media_bangumi"}, {"n": "影视", "v": "media_ft"},
{"n": "直播", "v": "live"}, {"n": "用户", "v": "bili_user"}]},
{"key": "order", "name": "视频排序",
"value": [{"n": "综合排序", "v": "totalrank"}, {"n": "最多点击", "v": "click"}, {"n": "最新发布", "v": "pubdate"},
{"n": "最多收藏", "v": "stow"}, {"n": "最多弹幕", "v": "dm"}]},
{"key": "duration", "name": "视频时长",
"value": [{"n": "全部", "v": "0"}, {"n": "60分钟以上", "v": "4"}, {"n": "30~60分钟", "v": "3"},
{"n": "5~30分钟", "v": "2"}, {"n": "5分钟以下", "v": "1"}]}],
}
}
header = {
'Origin': 'https://www.bilibili.com',
'Referer': 'https://www.bilibili.com',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_2_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.3 Safari/605.1.15'
}
def localProxy(self, param):
action = {
'url': '',
'header': '',
'param': '',
'type': 'string',
'after': ''
}
if param['type'] == 'subtitle':
url = base64.b64decode(param['url']).decode('utf-8')
content = self.down_sub(url)
return [200, "application/octet-stream", action, content]
return [200, "video/MP2T", action, ""]