Files
jtx/jtxtv12/py/两个BT.py

600 lines
21 KiB
Python
Raw Normal View History

2025-12-24 12:50:50 +00:00
# coding=utf-8
# !/usr/bin/python
import sys
sys.path.append('..')
from base.spider import Spider
import json
import time
import urllib.parse
import re
import base64
class Spider(Spider):
def getName(self):
return "两个BT"
def init(self, extend=""):
self.host = "https://www.bttwoo.com"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Referer': self.host
}
self.log(f"两个BT爬虫初始化完成主站: {self.host}")
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
"""首页内容 - TVBox标准实现"""
result = {}
# 1. 定义分类 - 基于实际网站结构
classes = [
{'type_id': 'movie_bt_tags/xiju', 'type_name': '喜剧'},
{'type_id': 'movie_bt_tags/aiqing', 'type_name': '爱情'},
{'type_id': 'movie_bt_tags/adt', 'type_name': '冒险'},
{'type_id': 'movie_bt_tags/at', 'type_name': '动作'},
{'type_id': 'movie_bt_tags/donghua', 'type_name': '动画'},
{'type_id': 'movie_bt_tags/qihuan', 'type_name': '奇幻'},
{'type_id': 'movie_bt_tags/xuanni', 'type_name': '悬疑'},
{'type_id': 'movie_bt_tags/kehuan', 'type_name': '科幻'},
{'type_id': 'movie_bt_tags/juqing', 'type_name': '剧情'},
{'type_id': 'movie_bt_tags/kongbu', 'type_name': '恐怖'},
{'type_id': 'meiju', 'type_name': '美剧'},
{'type_id': 'gf', 'type_name': '高分电影'}
]
result['class'] = classes
# 2. 添加筛选配置
result['filters'] = self._get_filters()
# 3. 获取首页推荐内容
try:
rsp = self.fetch(self.host, headers=self.headers)
doc = self.html(rsp.text)
videos = self._get_videos(doc, limit=50)
result['list'] = videos
except Exception as e:
self.log(f"首页获取出错: {str(e)}")
result['list'] = []
return result
def homeVideoContent(self):
"""兼容性方法 - 提供分类定义"""
return {
'class': [
{'type_id': 'movie_bt_tags/xiju', 'type_name': '喜剧'},
{'type_id': 'movie_bt_tags/aiqing', 'type_name': '爱情'},
{'type_id': 'movie_bt_tags/adt', 'type_name': '冒险'},
{'type_id': 'movie_bt_tags/at', 'type_name': '动作'},
{'type_id': 'movie_bt_tags/donghua', 'type_name': '动画'},
{'type_id': 'movie_bt_tags/qihuan', 'type_name': '奇幻'},
{'type_id': 'movie_bt_tags/xuanni', 'type_name': '悬疑'},
{'type_id': 'movie_bt_tags/kehuan', 'type_name': '科幻'},
{'type_id': 'movie_bt_tags/juqing', 'type_name': '剧情'},
{'type_id': 'movie_bt_tags/kongbu', 'type_name': '恐怖'},
{'type_id': 'meiju', 'type_name': '美剧'},
{'type_id': 'gf', 'type_name': '高分电影'}
],
'filters': self._get_filters()
}
def categoryContent(self, tid, pg, filter, extend):
"""分类内容 - 支持筛选功能"""
try:
# 合并filter和extend参数
if filter and isinstance(filter, dict):
if not extend:
extend = {}
extend.update(filter)
self.log(f"分类请求: tid={tid}, pg={pg}, extend={extend}")
url = self._build_url(tid, pg, extend)
if not url:
return {'list': []}
rsp = self.fetch(url, headers=self.headers)
doc = self.html(rsp.text)
videos = self._get_videos(doc, limit=20)
return {
'list': videos,
'page': int(pg),
'pagecount': 999,
'limit': 20,
'total': 19980
}
except Exception as e:
self.log(f"分类内容获取出错: {str(e)}")
return {'list': []}
def searchContent(self, key, quick, pg="1"):
"""搜索功能 - 智能过滤"""
try:
search_url = f"{self.host}/xssssearch?q={urllib.parse.quote(key)}"
if pg and pg != "1":
search_url += f"&p={pg}"
self.log(f"搜索URL: {search_url}")
rsp = self.fetch(search_url, headers=self.headers)
doc = self.html(rsp.text)
videos = []
seen_ids = set()
# 搜索结果选择器
elements = doc.xpath('//li[contains(@class,"") and .//a[contains(@href,"/movie/")]]')
self.log(f"找到 {len(elements)} 个搜索结果元素")
for elem in elements:
video = self._extract_video_info(elem, is_search=True)
if video and video['vod_id'] not in seen_ids:
# 添加相关性检查
if self._is_relevant_search_result(video['vod_name'], key):
videos.append(video)
seen_ids.add(video['vod_id'])
self.log(f"✅ 相关视频: {video['vod_name']} (ID: {video['vod_id']})")
else:
self.log(f"❌ 过滤无关: {video['vod_name']} (搜索: {key})")
self.log(f"最终搜索结果: {len(videos)} 个视频")
return {'list': videos}
except Exception as e:
self.log(f"搜索出错: {str(e)}")
return {'list': []}
def detailContent(self, ids):
"""详情页面"""
try:
vid = ids[0]
detail_url = f"{self.host}/movie/{vid}.html"
rsp = self.fetch(detail_url, headers=self.headers)
doc = self.html(rsp.text)
video_info = self._get_detail(doc, vid)
return {'list': [video_info]} if video_info else {'list': []}
except Exception as e:
self.log(f"详情获取出错: {str(e)}")
return {'list': []}
def playerContent(self, flag, id, vipFlags):
"""播放链接"""
try:
self.log(f"获取播放链接: flag={flag}, id={id}")
# 解码Base64播放ID
try:
decoded_id = base64.b64decode(id).decode('utf-8')
self.log(f"解码播放ID: {decoded_id}")
except:
decoded_id = id
play_url = f"{self.host}/v_play/{id}.html"
# 返回播放页面URL让播放器处理
return {'parse': 1, 'playUrl': '', 'url': play_url}
except Exception as e:
self.log(f"播放链接获取出错: {str(e)}")
return {'parse': 1, 'playUrl': '', 'url': f"{self.host}/v_play/{id}.html"}
# ========== 辅助方法 ==========
def _get_filters(self):
"""获取筛选配置 - TVBox兼容版"""
base_filters = [
{
'key': 'area',
'name': '地区',
'value': [
{'n': '全部', 'v': ''},
{'n': '中国大陆', 'v': '中国大陆'},
{'n': '美国', 'v': '美国'},
{'n': '韩国', 'v': '韩国'},
{'n': '日本', 'v': '日本'},
{'n': '英国', 'v': '英国'},
{'n': '法国', 'v': '法国'},
{'n': '德国', 'v': '德国'},
{'n': '其他', 'v': '其他'}
]
},
{
'key': 'year',
'name': '年份',
'value': [
{'n': '全部', 'v': ''},
{'n': '2025', 'v': '2025'},
{'n': '2024', 'v': '2024'},
{'n': '2023', 'v': '2023'},
{'n': '2022', 'v': '2022'},
{'n': '2021', 'v': '2021'},
{'n': '2020', 'v': '2020'},
{'n': '2019', 'v': '2019'},
{'n': '2018', 'v': '2018'}
]
}
]
# 为每个分类提供筛选配置
filters = {}
category_ids = [
'movie_bt_tags/xiju', 'movie_bt_tags/aiqing', 'movie_bt_tags/adt',
'movie_bt_tags/at', 'movie_bt_tags/donghua', 'movie_bt_tags/qihuan',
'movie_bt_tags/xuanni', 'movie_bt_tags/kehuan', 'movie_bt_tags/juqing',
'movie_bt_tags/kongbu', 'meiju', 'gf'
]
for category_id in category_ids:
filters[category_id] = base_filters
return filters
def _build_url(self, tid, pg, extend):
"""构建URL - 支持筛选"""
try:
# 基础分类URL映射
if tid.startswith('movie_bt_tags/'):
url = f"{self.host}/{tid}"
elif tid == 'meiju':
url = f"{self.host}/meiju"
elif tid == 'gf':
url = f"{self.host}/gf"
else:
url = f"{self.host}/{tid}"
# 添加分页
if pg and pg != '1':
if '?' in url:
url += f"&paged={pg}"
else:
url += f"?paged={pg}"
return url
except Exception as e:
self.log(f"构建URL出错: {str(e)}")
return f"{self.host}/movie_bt_tags/xiju"
def _get_videos(self, doc, limit=None):
"""获取视频列表"""
try:
videos = []
seen_ids = set()
# 尝试多种选择器
selectors = [
'//li[.//a[contains(@href,"/movie/")]]',
'//div[contains(@class,"item")]//li[.//a[contains(@href,"/movie/")]]'
]
for selector in selectors:
elements = doc.xpath(selector)
if elements:
for elem in elements:
video = self._extract_video_info(elem)
if video and video['vod_id'] not in seen_ids:
videos.append(video)
seen_ids.add(video['vod_id'])
break
return videos[:limit] if limit and videos else videos
except Exception as e:
self.log(f"获取视频列表出错: {str(e)}")
return []
def _extract_video_info(self, element, is_search=False):
"""提取视频信息"""
try:
# 提取链接
links = element.xpath('.//a[contains(@href,"/movie/")]/@href')
if not links:
return None
link = links[0]
if link.startswith('/'):
link = self.host + link
vod_id = self.regStr(r'/movie/(\d+)\.html', link)
if not vod_id:
return None
# 提取标题
title_selectors = [
'.//h3/a/text()',
'.//h3/text()',
'.//a/@title',
'.//a/text()'
]
title = ''
for selector in title_selectors:
titles = element.xpath(selector)
for t in titles:
if t and t.strip() and len(t.strip()) > 1:
title = t.strip()
break
if title:
break
if not title:
return None
# 提取图片
pic = self._extract_image(element, is_search, vod_id)
# 提取备注
remarks = self._extract_remarks(element)
return {
'vod_id': vod_id,
'vod_name': title,
'vod_pic': pic,
'vod_remarks': remarks,
'vod_year': ''
}
except Exception as e:
self.log(f"提取视频信息出错: {str(e)}")
return None
def _extract_image(self, element, is_search=False, vod_id=None):
"""图片提取 - 处理懒加载"""
pic_selectors = [
'.//img/@data-original',
'.//img/@data-src',
'.//img/@src'
]
for selector in pic_selectors:
pics = element.xpath(selector)
for p in pics:
# 跳过懒加载占位符
if (p and not p.endswith('blank.gif') and
not p.startswith('data:image/') and 'base64' not in p):
if p.startswith('//'):
return 'https:' + p
elif p.startswith('/'):
return self.host + p
elif p.startswith('http'):
return p
# 搜索页面特殊处理:从详情页面获取
if is_search and vod_id:
return self._get_image_from_detail(vod_id)
return ''
def _extract_remarks(self, element):
"""提取备注信息"""
remarks_selectors = [
'.//span[contains(@class,"rating")]/text()',
'.//div[contains(@class,"rating")]/text()',
'.//span[contains(@class,"status")]/text()',
'.//div[contains(@class,"status")]/text()',
'.//span[contains(text(),"")]/text()',
'.//span[contains(text(),"1080p")]/text()',
'.//span[contains(text(),"HD")]/text()'
]
for selector in remarks_selectors:
remarks_list = element.xpath(selector)
for r in remarks_list:
if r and r.strip():
return r.strip()
return ''
def _get_image_from_detail(self, vod_id):
"""从详情页面获取图片"""
try:
detail_url = f"{self.host}/movie/{vod_id}.html"
rsp = self.fetch(detail_url, headers=self.headers)
doc = self.html(rsp.text)
# 详情页图片选择器
pic_selectors = [
'//img[contains(@class,"poster")]/@src',
'//div[contains(@class,"poster")]//img/@src',
'//img[contains(@alt,"")]/@src'
]
for selector in pic_selectors:
pics = doc.xpath(selector)
for p in pics:
if p and not p.endswith('blank.gif'):
if p.startswith('//'):
return 'https:' + p
elif p.startswith('/'):
return self.host + p
elif p.startswith('http'):
return p
except:
pass
return ''
def _is_relevant_search_result(self, title, search_key):
"""检查搜索结果是否与搜索关键词相关"""
if not title or not search_key:
return False
title_lower = title.lower()
search_key_lower = search_key.lower()
# 直接包含搜索关键词的肯定相关
if search_key_lower in title_lower:
return True
# 字符匹配
search_chars = set(search_key_lower.replace(' ', ''))
title_chars = set(title_lower.replace(' ', ''))
if len(search_chars) > 0:
match_ratio = len(search_chars & title_chars) / len(search_chars)
if match_ratio >= 0.6:
return True
# 短搜索词要求严格匹配
if len(search_key_lower) <= 2:
return search_key_lower in title_lower
return False
def _get_detail(self, doc, vod_id):
"""获取详情信息"""
try:
# 提取标题
title_selectors = [
'//h1/text()',
'//h2/text()',
'//title/text()'
]
title = ''
for selector in title_selectors:
titles = doc.xpath(selector)
for t in titles:
if t and t.strip():
title = t.strip()
break
if title:
break
# 提取图片
pic_selectors = [
'//img[contains(@class,"poster")]/@src',
'//div[contains(@class,"poster")]//img/@src',
'//img/@src'
]
pic = ''
for selector in pic_selectors:
pics = doc.xpath(selector)
for p in pics:
if p and not p.endswith('blank.gif'):
if p.startswith('//'):
pic = 'https:' + p
elif p.startswith('/'):
pic = self.host + p
elif p.startswith('http'):
pic = p
break
if pic:
break
# 提取描述
desc_selectors = [
'//div[contains(@class,"intro")]//text()',
'//div[contains(@class,"description")]//text()',
'//p[contains(@class,"desc")]//text()'
]
desc = ''
for selector in desc_selectors:
descs = doc.xpath(selector)
desc_parts = []
for d in descs:
if d and d.strip():
desc_parts.append(d.strip())
if desc_parts:
desc = ' '.join(desc_parts)
break
# 提取演员
actor_selectors = [
'//li[contains(text(),"主演")]/text()',
'//span[contains(text(),"主演")]/following-sibling::text()',
'//div[contains(@class,"actor")]//text()'
]
actor = ''
for selector in actor_selectors:
actors = doc.xpath(selector)
for a in actors:
if a and a.strip() and '主演' in a:
actor = a.strip().replace('主演:', '').replace('主演', '')
break
if actor:
break
# 提取导演
director_selectors = [
'//li[contains(text(),"导演")]/text()',
'//span[contains(text(),"导演")]/following-sibling::text()',
'//div[contains(@class,"director")]//text()'
]
director = ''
for selector in director_selectors:
directors = doc.xpath(selector)
for d in directors:
if d and d.strip() and '导演' in d:
director = d.strip().replace('导演:', '').replace('导演', '')
break
if director:
break
# 提取播放源
play_sources = self._parse_play_sources(doc, vod_id)
return {
'vod_id': vod_id,
'vod_name': title,
'vod_pic': pic,
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': actor,
'vod_director': director,
'vod_content': desc,
'vod_play_from': '$$$'.join([source['name'] for source in play_sources]),
'vod_play_url': '$$$'.join([source['episodes'] for source in play_sources])
}
except Exception as e:
self.log(f"获取详情出错: {str(e)}")
return None
def _parse_play_sources(self, doc, vod_id):
"""解析播放源"""
try:
play_sources = []
# 查找播放链接
episode_selectors = [
'//a[contains(@href,"/v_play/")]',
'//div[contains(@class,"play")]//a'
]
episodes = []
for selector in episode_selectors:
episode_elements = doc.xpath(selector)
if episode_elements:
for ep in episode_elements:
ep_title = ep.xpath('./text()')[0] if ep.xpath('./text()') else ''
ep_url = ep.xpath('./@href')[0] if ep.xpath('./@href') else ''
if ep_title and ep_url:
# 提取播放ID
play_id = self.regStr(r'/v_play/([^.]+)\.html', ep_url)
if play_id:
episodes.append(f"{ep_title.strip()}${play_id}")
break
if episodes:
play_sources.append({
'name': '默认播放',
'episodes': '#'.join(episodes)
})
else:
# 默认播放源
play_sources.append({
'name': '默认播放',
'episodes': f'第1集$bXZfMTM0NTY4LW5tXzE='
})
return play_sources
except Exception as e:
self.log(f"解析播放源出错: {str(e)}")
return [{'name': '默认播放', 'episodes': f'第1集$bXZfMTM0NTY4LW5tXzE='}]