This commit is contained in:
2025-12-24 12:50:50 +00:00
parent e5aa61b161
commit c9a11e855f
5 changed files with 2065 additions and 0 deletions

473
jtxtv12/py/4K影院.py Normal file
View File

@@ -0,0 +1,473 @@
# coding=utf-8
# !/usr/bin/python
"""
作者 丢丢喵推荐 🚓 内容均从互联网收集而来 仅供交流学习使用 版权归原创者所有 如侵犯了您的权益 请通知作者 将及时删除侵权内容
====================Diudiumiao====================
"""
from Crypto.Util.Padding import unpad
from Crypto.Util.Padding import pad
from urllib.parse import unquote
from Crypto.Cipher import ARC4
from urllib.parse import quote
from base.spider import Spider
from Crypto.Cipher import AES
from datetime import datetime
from bs4 import BeautifulSoup
from base64 import b64decode
import urllib.request
import urllib.parse
import datetime
import binascii
import requests
import base64
import json
import time
import sys
import re
import os
sys.path.append('..')
xurl = "https://www.4kvm.net"
headerx = {
'User-Agent': 'Mozilla/5.0 (Linux; U; Android 8.0.0; zh-cn; Mi Note 2 Build/OPR1.170623.032) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/61.0.3163.128 Mobile Safari/537.36 XiaoMi/MiuiBrowser/10.1.1'
}
class Spider(Spider):
global xurl
global headerx
def getName(self):
return "首页"
def init(self, extend):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def extract_middle_text(self, text, start_str, end_str, pl, start_index1: str = '', end_index2: str = ''):
if pl == 3:
plx = []
while True:
start_index = text.find(start_str)
if start_index == -1:
break
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
break
middle_text = text[start_index + len(start_str):end_index]
plx.append(middle_text)
text = text.replace(start_str + middle_text + end_str, '')
if len(plx) > 0:
purl = ''
for i in range(len(plx)):
matches = re.findall(start_index1, plx[i])
output = ""
for match in matches:
match3 = re.search(r'(?:^|[^0-9])(\d+)(?:[^0-9]|$)', match[1])
if match3:
number = match3.group(1)
else:
number = 0
if 'http' not in match[0]:
output += f"#{match[1]}${number}{xurl}{match[0]}"
else:
output += f"#{match[1]}${number}{match[0]}"
output = output[1:]
purl = purl + output + "$$$"
purl = purl[:-3]
return purl
else:
return ""
else:
start_index = text.find(start_str)
if start_index == -1:
return ""
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
return ""
if pl == 0:
middle_text = text[start_index + len(start_str):end_index]
return middle_text.replace("\\", "")
if pl == 1:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
jg = ' '.join(matches)
return jg
if pl == 2:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
new_list = [f'{item}' for item in matches]
jg = '$$$'.join(new_list)
return jg
def homeContent(self, filter):
result = {"class": []}
detail = requests.get(url=xurl, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
soups = doc.find_all('ul', class_="main-header")
for soup in soups:
vods = soup.find_all('li')
for vod in vods:
name = vod.text.strip()
if any(keyword in name for keyword in ["首页", "电视剧", "高分电影", "影片下载", "热门播放"]):
continue
id = vod.find('a')['href']
if 'http' not in id:
id = xurl + id
result["class"].append({"type_id": id, "type_name": name})
return result
def homeVideoContent(self):
videos = []
detail = requests.get(url=xurl, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
soups = doc.find_all('article', class_="item movies")
for vod in soups:
name = vod.find('img')['alt']
ids = vod.find('div', class_="poster")
id = ids.find('a')['href']
pic = vod.find('img')['src']
remarks = vod.find('div', class_="rating")
remark = remarks.text.strip()
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
result = {'list': videos}
return result
def categoryContent(self, cid, pg, filter, ext):
result = {}
videos = []
if 'movies' not in cid:
if '@' in cid:
fenge = cid.split("@")
detail = requests.get(url=fenge[0], headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
soups = doc.find_all('div', class_="se-c")
for vod in soups:
name = vod.text.strip()
id = vod.find('a')['href']
pic = self.extract_middle_text(str(res), '<meta property="og:image" content="', '"', 0).replace('#038;', '')
remark = "推荐"
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
else:
if pg:
page = int(pg)
else:
page = 1
url = f'{cid}/page/{str(page)}'
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
soups = doc.find_all('article', class_="item tvshows")
for vod in soups:
name = vod.find('img')['alt']
ids = vod.find('div', class_="poster")
id = ids.find('a')['href']
pic = vod.find('img')['src']
remarks = vod.find('div', class_="update")
remark = remarks.text.strip()
video = {
"vod_id": id + '@' + name,
"vod_name": name,
"vod_pic": pic,
"vod_tag": "folder",
"vod_remarks": remark
}
videos.append(video)
else:
if pg:
page = int(pg)
else:
page = 1
url = f'{cid}/page/{str(page)}'
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
soups = doc.find_all('div', class_="animation-2")
for item in soups:
vods = item.find_all('article')
for vod in vods:
name = vod.find('img')['alt']
ids = vod.find('div', class_="poster")
id = ids.find('a')['href']
pic = vod.find('img')['src']
remarks = vod.find('div', class_="rating")
remark = remarks.text.strip()
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
if len(videos)<30:
pagecount=1
else:
pagecount = 9999
result = {'list': videos}
result['page'] = pg
result['pagecount'] = pagecount
result['total'] = 999
result['limit'] = len(videos)
return result
def detailContent(self, ids):
did = ids[0]
result = {}
videos = []
xianlu = ''
bofang = ''
if 'movies' not in did:
res = requests.get(url=did, headers=headerx)
res.encoding = "utf-8"
res = res.text
doc = BeautifulSoup(res, "lxml")
content = '剧情介绍📢' + self.extract_middle_text(res,'<meta name="description" content="','"', 0)
postid = self.extract_middle_text(res, 'postid:', ',', 0)
res1 = self.extract_middle_text(res,'videourls:[','],', 0)
data = json.loads(res1)
for vod in data:
name = str(vod['name'])
id = f"{vod['url']}@{postid}"
bofang = bofang + name + '$' + id + '#'
bofang = bofang[:-1]
xianlu = '4K影院'
else:
res = requests.get(url=did, headers=headerx)
res.encoding = "utf-8"
res = res.text
doc = BeautifulSoup(res, "lxml")
content = '剧情介绍📢' + self.extract_middle_text(res, '<meta name="description" content="', '"', 0)
bofang = self.extract_middle_text(res, "data-postid='", "'", 0)
xianlu = '4K影院'
videos.append({
"vod_id": did,
"vod_content": content,
"vod_play_from": xianlu,
"vod_play_url": bofang
})
result['list'] = videos
return result
def playerContent(self, flag, id, vipFlags):
if '@' in id:
fenge = id.split("@")
url = f'{xurl}/artplayer?id={fenge[1]}&source=0&ep={fenge[0]}'
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
expires = self.extract_middle_text(res, "expires: '", "'", 0)
client = self.extract_middle_text(res, "client: '", "'", 0)
nonce = self.extract_middle_text(res, "nonce: '", "'", 0)
token = self.extract_middle_text(res, "token: '", "'", 0)
source = self.extract_middle_text(res, "source: '", "'", 0)
payload = {
"expires": expires,
"client": client,
"nonce": nonce,
"token": token,
"source": source
}
response = requests.post(url=source, headers=headerx, json=payload)
response_data = json.loads(response.text)
url = response_data['url']
else:
url = f'{xurl}/artplayer?mvsource=0&id={id}&type=hls'
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
expires = self.extract_middle_text(res, "expires: '", "'", 0)
client = self.extract_middle_text(res, "client: '", "'", 0)
nonce = self.extract_middle_text(res, "nonce: '", "'", 0)
token = self.extract_middle_text(res, "token: '", "'", 0)
source = self.extract_middle_text(res, "source: '", "'", 0)
payload = {
"expires": expires,
"client": client,
"nonce": nonce,
"token": token,
"source": source
}
response = requests.post(url=source, headers=headerx, json=payload)
response_data = json.loads(response.text)
url = response_data['url']
result = {}
result["parse"] = 0
result["playUrl"] = ''
result["url"] = url
result["header"] = headerx
return result
def searchContentPage(self, key, quick, pg):
result = {}
videos = []
url = f'{xurl}/xssearch?s={key}'
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
soups = doc.find_all('div', class_="result-item")
for vod in soups:
ids = vod.find('div', class_="title")
id = ids.find('a')['href']
if 'movies' not in id:
name = vod.find('img')['alt']
pic = vod.find('img')['src']
remark = "推荐"
video = {
"vod_id": id + '@' + name,
"vod_name": name,
"vod_pic": pic,
"vod_tag": "folder",
"vod_remarks": remark
}
videos.append(video)
else:
name = vod.find('img')['alt']
pic = vod.find('img')['src']
remark = "推荐"
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
result['list'] = videos
result['page'] = pg
result['pagecount'] = 1
result['limit'] = 90
result['total'] = 999999
return result
def searchContent(self, key, quick, pg="1"):
return self.searchContentPage(key, quick, '1')
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None

522
jtxtv12/py/LIBVIO.py Normal file
View File

@@ -0,0 +1,522 @@
#基于嗷呜大佬修复搜索
# coding=utf-8
# !/usr/bin/python
import sys
sys.path.append('..')
from base.spider import Spider
import json
import re
try:
import ujson
except ImportError:
ujson = json
try:
from pyquery import PyQuery as pq
except ImportError:
pq = None
try:
from cachetools import TTLCache
except ImportError:
class TTLCache:
def __init__(self, maxsize=100, ttl=600):
self.cache = {}
self.maxsize = maxsize
def __contains__(self, key):
return key in self.cache
def __getitem__(self, key):
return self.cache[key]
def __setitem__(self, key, value):
if len(self.cache) >= self.maxsize:
first_key = next(iter(self.cache))
del self.cache[first_key]
self.cache[key] = value
def __len__(self):
return len(self.cache)
class Spider(Spider):
def __init__(self):
self.cache = TTLCache(maxsize=100, ttl=600)
def getName(self):
return "Libvio"
def init(self, extend=""):
print("============{0}============".format(extend))
if not hasattr(self, 'cache'):
self.cache = TTLCache(maxsize=100, ttl=600)
pass
def _fetch_with_cache(self, url, headers=None):
cache_key = f"{url}_{hash(str(headers))}"
if cache_key in self.cache:
return self.cache[cache_key]
try:
response = self.fetch(url, headers=headers or self.header)
except Exception as e:
print(f"Fetch failed for {url}: {e}")
response = None # Fallback to None on error
if response:
self.cache[cache_key] = response
return response
def _parse_html_fast(self, html_text):
if not html_text:
return None
if pq is not None:
try:
return pq(html_text)
except:
pass
return self.html(self.cleanText(html_text))
def homeContent(self, filter):
result = {}
cateManual = {"电影": "1", "电视剧": "2", "动漫": "4", "日韩剧": "15", "欧美剧": "16"}
classes = []
for k in cateManual:
classes.append({'type_name': k, 'type_id': cateManual[k]})
result['class'] = classes
if (filter):
result['filters'] = self._generate_filters()
return result
def homeVideoContent(self):
rsp = self._fetch_with_cache("https://www.libvio.site")
if not rsp:
return {'list': []}
doc = self._parse_html_fast(rsp.text)
videos = []
if pq is not None and hasattr(doc, '__call__'):
try:
thumb_links = doc('a.stui-vodlist__thumb.lazyload')
for i in range(thumb_links.length):
try:
thumb = thumb_links.eq(i)
href = thumb.attr('href')
if not href: continue
sid_match = re.search(r'/detail/(\d+)\.html', href)
if not sid_match: continue
sid = sid_match.group(1)
name = thumb.attr('title')
if not name: continue
pic = thumb.attr('data-original') or ""
mark = thumb.text().strip()
videos.append({"vod_id": sid, "vod_name": name.strip(), "vod_pic": pic, "vod_remarks": mark})
except Exception as e: continue
except: pass
if not videos:
try:
thumb_links = doc.xpath("//a[@class='stui-vodlist__thumb lazyload']")
for thumb in thumb_links:
try:
href = thumb.xpath("./@href")[0]
sid_match = re.search(r'/detail/(\d+)\.html', href)
if not sid_match: continue
sid = sid_match.group(1)
name = thumb.xpath("./@title")[0].strip()
if not name: continue
pic_list = thumb.xpath("./@data-original")
pic = pic_list[0] if pic_list else ""
mark_list = thumb.xpath("./text()")
mark = mark_list[0].strip() if mark_list else ""
videos.append({"vod_id": sid, "vod_name": name, "vod_pic": pic, "vod_remarks": mark})
except Exception as e: continue
except Exception as e: print(f"Homepage parse failed: {e}")
result = {'list': videos}
return result
def categoryContent(self, tid, pg, filter, extend):
result = {}
url = 'https://www.libvio.site/type/{0}-{1}.html'.format(tid, pg)
print(url)
rsp = self._fetch_with_cache(url)
if not rsp:
return result
doc = self._parse_html_fast(rsp.text)
videos = []
if pq is not None and hasattr(doc, '__call__'):
try:
thumb_links = doc('a.stui-vodlist__thumb.lazyload')
for i in range(thumb_links.length):
try:
thumb = thumb_links.eq(i)
href = thumb.attr('href')
if not href: continue
sid_match = re.search(r'/detail/(\d+)\.html', href)
if not sid_match: continue
sid = sid_match.group(1)
name = thumb.attr('title')
if not name: continue
pic = thumb.attr('data-original') or ""
mark = thumb.text().strip()
videos.append({"vod_id": sid, "vod_name": name.strip(), "vod_pic": pic, "vod_remarks": mark})
except Exception as e: continue
except: pass
if not videos:
try:
thumb_links = doc.xpath("//a[@class='stui-vodlist__thumb lazyload']")
for thumb in thumb_links:
try:
href = thumb.xpath("./@href")[0]
sid_match = re.search(r'/detail/(\d+)\.html', href)
if not sid_match: continue
sid = sid_match.group(1)
name = thumb.xpath("./@title")[0].strip()
if not name: continue
pic_list = thumb.xpath("./@data-original")
pic = pic_list[0] if pic_list else ""
mark_list = thumb.xpath("./text()")
mark = mark_list[0].strip() if mark_list else ""
videos.append({"vod_id": sid, "vod_name": name, "vod_pic": pic, "vod_remarks": mark})
except Exception as e: continue
except Exception as e: print(f"Category parse failed: {e}")
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, array):
tid = array[0]
url = 'https://www.libvio.site/detail/{0}.html'.format(tid)
rsp = self._fetch_with_cache(url)
if not rsp:
return {'list': []}
doc = self._parse_html_fast(rsp.text)
title = doc('h1').text().strip() or ""
pic = doc('img').attr('data-original') or doc('img').attr('src') or ""
detail = ""
try:
detail_content = doc('.detail-content').text().strip()
if detail_content: detail = detail_content
else:
detail_text = doc('*:contains("简介:")').text()
if detail_text and '简介:' in detail_text:
detail_part = detail_text.split('简介:')[1]
if '详情' in detail_part: detail_part = detail_part.replace('详情', '')
detail = detail_part.strip()
except: pass
douban = "0.0"
score_text = doc('.detail-info *:contains("")').text() or ""
score_match = re.search(r'(\d+\.?\d*)\s*分', score_text)
if score_match: douban = score_match.group(1)
vod = {"vod_id": tid, "vod_name": title, "vod_pic": pic, "type_name": "", "vod_year": "", "vod_area": "", "vod_remarks": "", "vod_actor": "", "vod_director": "", "vod_douban_score": douban, "vod_content": detail}
info_text = doc('p').text()
if '类型:' in info_text:
type_match = re.search(r'类型:([^/]+)', info_text)
if type_match: vod['type_name'] = type_match.group(1).strip()
if '主演:' in info_text:
actor_match = re.search(r'主演:([^/]+)', info_text)
if actor_match: vod['vod_actor'] = actor_match.group(1).strip()
if '导演:' in info_text:
director_match = re.search(r'导演:([^/]+)', info_text)
if director_match: vod['vod_director'] = director_match.group(1).strip()
playFrom = []
playList = []
# 改进的播放线路提取逻辑
vodlist_heads = doc('.stui-vodlist__head')
for i in range(vodlist_heads.length):
head = vodlist_heads.eq(i)
h3_elem = head.find('h3')
if h3_elem.length == 0:
continue
header_text = h3_elem.text().strip()
if not any(keyword in header_text for keyword in ['播放', '下载', 'BD5', 'UC', '夸克']):
continue
playFrom.append(header_text)
vodItems = []
# 提取当前播放线路下的所有播放链接
play_links = head.find('a[href*="/play/"]')
for j in range(play_links.length):
try:
link = play_links.eq(j)
href = link.attr('href')
name = link.text().strip()
if not href or not name:
continue
tId_match = re.search(r'/play/([^.]+)\.html', href)
if not tId_match:
continue
tId = tId_match.group(1)
vodItems.append(name + "$" + tId)
except:
continue
playList.append('#'.join(vodItems) if vodItems else "")
vod['vod_play_from'] = '$$$'.join(playFrom) if playFrom else ""
vod['vod_play_url'] = '$$$'.join(playList) if playList else ""
result = {'list': [vod]}
return result
def searchContent(self, key, quick, page=None):
url = 'https://www.libvio.site/index.php/ajax/suggest?mid=1&wd={0}'.format(key)
rsp = self._fetch_with_cache(url, headers=self.header)
if not rsp:
return {'list': []}
try:
jo = ujson.loads(rsp.text)
except:
jo = json.loads(rsp.text)
result = {}
jArray = []
if jo.get('total', 0) > 0:
for j in jo.get('list', []):
jArray.append({
"vod_id": j.get('id', ''),
"vod_name": j.get('name', ''),
"vod_pic": j.get('pic', ''),
"vod_remarks": ""
})
result = {'list': jArray}
return result
def _generate_filters(self):
years = [{"n": "全部", "v": ""}]
for year in range(2025, 1999, -1):
years.append({"n": str(year), "v": str(year)})
movie_filters = [
{
"key": "class", "name": "剧情",
"value": [
{"n": "全部", "v": ""}, {"n": "爱情", "v": "爱情"}, {"n": "恐怖", "v": "恐怖"},
{"n": "动作", "v": "动作"}, {"n": "科幻", "v": "科幻"}, {"n": "剧情", "v": "剧情"},
{"n": "战争", "v": "战争"}, {"n": "警匪", "v": "警匪"}, {"n": "犯罪", "v": "犯罪"},
{"n": "动画", "v": "动画"}, {"n": "奇幻", "v": "奇幻"}, {"n": "武侠", "v": "武侠"},
{"n": "冒险", "v": "冒险"}, {"n": "枪战", "v": "枪战"}, {"n": "悬疑", "v": "悬疑"},
{"n": "惊悚", "v": "惊悚"}, {"n": "经典", "v": "经典"}, {"n": "青春", "v": "青春"},
{"n": "文艺", "v": "文艺"}, {"n": "微电影", "v": "微电影"}, {"n": "古装", "v": "古装"},
{"n": "历史", "v": "历史"}, {"n": "运动", "v": "运动"}, {"n": "农村", "v": "农村"},
{"n": "儿童", "v": "儿童"}, {"n": "网络电影", "v": "网络电影"}
]
},
{
"key": "area", "name": "地区",
"value": [
{"n": "全部", "v": ""}, {"n": "大陆", "v": "中国大陆"}, {"n": "香港", "v": "中国香港"},
{"n": "台湾", "v": "中国台湾"}, {"n": "美国", "v": "美国"}, {"n": "法国", "v": "法国"},
{"n": "英国", "v": "英国"}, {"n": "日本", "v": "日本"}, {"n": "韩国", "v": "韩国"},
{"n": "德国", "v": "德国"}, {"n": "泰国", "v": "泰国"}, {"n": "印度", "v": "印度"},
{"n": "意大利", "v": "意大利"}, {"n": "西班牙", "v": "西班牙"},
{"n": "加拿大", "v": "加拿大"}, {"n": "其他", "v": "其他"}
]
},
{"key": "year", "name": "年份", "value": years}
]
tv_filters = [
{
"key": "class", "name": "剧情",
"value": [
{"n": "全部", "v": ""}, {"n": "战争", "v": "战争"}, {"n": "青春偶像", "v": "青春偶像"},
{"n": "喜剧", "v": "喜剧"}, {"n": "家庭", "v": "家庭"}, {"n": "犯罪", "v": "犯罪"},
{"n": "动作", "v": "动作"}, {"n": "奇幻", "v": "奇幻"}, {"n": "剧情", "v": "剧情"},
{"n": "历史", "v": "历史"}, {"n": "经典", "v": "经典"}, {"n": "乡村", "v": "乡村"},
{"n": "情景", "v": "情景"}, {"n": "商战", "v": "商战"}, {"n": "网剧", "v": "网剧"},
{"n": "其他", "v": "其他"}
]
},
{
"key": "area", "name": "地区",
"value": [
{"n": "全部", "v": ""}, {"n": "大陆", "v": "中国大陆"}, {"n": "台湾", "v": "中国台湾"},
{"n": "香港", "v": "中国香港"}, {"n": "韩国", "v": "韩国"}, {"n": "日本", "v": "日本"},
{"n": "美国", "v": "美国"}, {"n": "泰国", "v": "泰国"}, {"n": "英国", "v": "英国"},
{"n": "新加坡", "v": "新加坡"}, {"n": "其他", "v": "其他"}
]
},
{"key": "year", "name": "年份", "value": years}
]
anime_filters = [
{
"key": "class", "name": "剧情",
"value": [
{"n": "全部", "v": ""}, {"n": "科幻", "v": "科幻"}, {"n": "热血", "v": "热血"},
{"n": "推理", "v": "推理"}, {"n": "搞笑", "v": "搞笑"}, {"n": "冒险", "v": "冒险"},
{"n": "萝莉", "v": "萝莉"}, {"n": "校园", "v": "校园"}, {"n": "动作", "v": "动作"},
{"n": "机战", "v": "机战"}, {"n": "运动", "v": "运动"}, {"n": "战争", "v": "战争"},
{"n": "少年", "v": "少年"}, {"n": "少女", "v": "少女"}, {"n": "社会", "v": "社会"},
{"n": "原创", "v": "原创"}, {"n": "亲子", "v": "亲子"}, {"n": "益智", "v": "益智"},
{"n": "励志", "v": "励志"}, {"n": "其他", "v": "其他"}
]
},
{
"key": "area", "name": "地区",
"value": [
{"n": "全部", "v": ""}, {"n": "中国", "v": "中国"}, {"n": "日本", "v": "日本"},
{"n": "欧美", "v": "欧美"}, {"n": "其他", "v": "其他"}
]
},
{"key": "year", "name": "年份", "value": years}
]
asian_filters = [
{
"key": "class", "name": "剧情",
"value": [
{"n": "全部", "v": ""}, {"n": "剧情", "v": "剧情"}, {"n": "喜剧", "v": "喜剧"},
{"n": "爱情", "v": "爱情"}, {"n": "动作", "v": "动作"}, {"n": "悬疑", "v": "悬疑"},
{"n": "惊悚", "v": "惊悚"}, {"n": "恐怖", "v": "恐怖"}, {"n": "犯罪", "v": "犯罪"}
]
},
{
"key": "area", "name": "地区",
"value": [
{"n": "全部", "v": ""}, {"n": "韩国", "v": "韩国"}, {"n": "日本", "v": "日本"},
{"n": "泰国", "v": "泰国"}
]
},
{"key": "year", "name": "年份", "value": years[:25]}
]
western_filters = [
{
"key": "class", "name": "剧情",
"value": [
{"n": "全部", "v": ""}, {"n": "剧情", "v": "剧情"}, {"n": "喜剧", "v": "喜剧"},
{"n": "爱情", "v": "爱情"}, {"n": "动作", "v": "动作"}, {"n": "科幻", "v": "科幻"},
{"n": "悬疑", "v": "悬疑"}, {"n": "惊悚", "v": "惊悚"}, {"n": "恐怖", "v": "恐怖"},
{"n": "犯罪", "v": "犯罪"}
]
},
{
"key": "area", "name": "地区",
"value": [
{"n": "全部", "v": ""}, {"n": "美国", "v": "美国"}, {"n": "英国", "v": "英国"},
{"n": "加拿大", "v": "加拿大"}, {"n": "其他", "v": "其他"}
]
},
{"key": "year", "name": "年份", "value": years[:25]}
]
return {
"1": movie_filters, # 电影
"2": tv_filters, # 电视剧
"4": anime_filters, # 动漫
"15": asian_filters, # 日韩剧
"16": western_filters # 欧美剧
}
header = {"Referer": "https://www.libvio.site", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36"}
def playerContent(self, flag, id, vipFlags):
# 如果已经是push链接直接返回
if id.startswith('push://'):
return {"parse": 0, "playUrl": "", "url": id, "header": ""}
result = {}
url = 'https://www.libvio.site/play/{0}.html'.format(id)
try:
rsp = self._fetch_with_cache(url, headers=self.header)
if not rsp:
return {"parse": 1, "playUrl": "", "url": url, "header": ujson.dumps(self.header)}
return self._handle_cloud_drive(url, rsp, id)
except Exception as e:
print(f"Player parse error: {e}")
return {"parse": 1, "playUrl": "", "url": url, "header": ujson.dumps(self.header)}
def _handle_cloud_drive(self, url, rsp, id):
try:
page_text = rsp.text
# 首先尝试从JavaScript变量中提取网盘链接
script_pattern = r'var player_[^=]*=\s*({[^}]+})'
matches = re.findall(script_pattern, page_text)
for match in matches:
try:
player_data = ujson.loads(match)
from_value = player_data.get('from', '')
url_value = player_data.get('url', '')
if from_value == 'kuake' and url_value:
# 夸克网盘
drive_url = url_value.replace('\\/', '/')
return {"parse": 0, "playUrl": "", "url": f"push://{drive_url}", "header": ""}
elif from_value == 'uc' and url_value:
# UC网盘
drive_url = url_value.replace('\\/', '/')
return {"parse": 0, "playUrl": "", "url": f"push://{drive_url}", "header": ""}
except:
continue
except Exception as e:
print(f"Cloud drive parse error: {e}")
# 如果所有网盘解析都失败尝试BD5播放源
return self._handle_bd5_player(url, rsp, id)
def _handle_bd5_player(self, url, rsp, id):
try:
doc = self._parse_html_fast(rsp.text)
page_text = rsp.text
api_match = re.search(r'https://www\.libvio\.site/vid/plyr/vr2\.php\?url=([^&"\s]+)', page_text)
if api_match:
return {"parse": 0, "playUrl": "", "url": api_match.group(1), "header": ujson.dumps
({"User-Agent": self.header["User-Agent"], "Referer": "https://www.libvio.site/"})}
iframe_src = doc('iframe').attr('src')
if iframe_src:
try:
iframe_content = self._fetch_with_cache(iframe_src, headers=self.header)
if not iframe_content: raise Exception("Iframe fetch failed")
video_match = re.search(r'https://[^"\s]+\.mp4', iframe_content.text)
if video_match: return {"parse": 0, "playUrl": "", "url": video_match.group(0), "header": ujson.dumps({"User-Agent": self.header["User-Agent"], "Referer": "https://www.libvio.site/"})}
except Exception as e: print(f"iframe视频解析失败: {e}")
script_match = re.search(r'var player_[^=]*=\s*({[^}]+})', page_text)
if script_match:
try:
jo = ujson.loads(script_match.group(1))
if jo:
nid = str(jo.get('nid', ''))
player_from = jo.get('from', '')
if player_from:
scriptUrl = f'https://www.libvio.site/static/player/{player_from}.js'
scriptRsp = self._fetch_with_cache(scriptUrl)
if not scriptRsp: raise Exception("Script fetch failed")
parse_match = re.search(r'src="([^"]+url=)', scriptRsp.text)
if parse_match:
parseUrl = parse_match.group(1)
path = f"{jo.get('url', '')}&next={jo.get('link_next', '')}&id={jo.get('id', '')}&nid={nid}"
parseRsp = self._fetch_with_cache(parseUrl + path, headers=self.header)
if not parseRsp: raise Exception("Parse fetch failed")
url_match = re.search(r"urls\s*=\s*'([^']+)'", parseRsp.text)
if url_match: return {"parse": 0, "playUrl": "", "url": url_match.group(1), "header": ""}
except Exception as e: print(f"JavaScript播放器解析失败: {e}")
except Exception as e: print(f"BD5播放源解析错误: {e}")
return {"parse": 1, "playUrl": "", "url": url, "header": ujson.dumps(self.header)}
def isVideoFormat(self, url):
return False
def manualVideoCheck(self):
pass
def localProxy(self, param):
action = b''
try:
header_dict = json.loads(param.get('header', '{}')) if param.get('header') else {}
resp = self.fetch(param['url'], headers=header_dict)
action = resp.content
except Exception as e:
print(f"Local proxy error: {e}")
return [200, "video/MP2T", action, param.get('header', '')]

320
jtxtv12/py/PHP影视.py Normal file
View File

@@ -0,0 +1,320 @@
# coding=utf-8
# !/usr/bin/python
"""
作者 丢丢喵 🚓 内容均从互联网收集而来 仅供交流学习使用 版权归原创者所有 如侵犯了您的权益 请通知作者 将及时删除侵权内容
====================Diudiumiao====================
"""
from Crypto.Util.Padding import unpad
from Crypto.Util.Padding import pad
from urllib.parse import unquote
from Crypto.Cipher import ARC4
from urllib.parse import quote
from base.spider import Spider
from Crypto.Cipher import AES
from datetime import datetime
from bs4 import BeautifulSoup
from base64 import b64decode
import urllib.request
import urllib.parse
import datetime
import binascii
import requests
import base64
import json
import time
import sys
import re
import os
sys.path.append('..')
xurl = "http://ccczb.top"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36'
}
headerx = {
'User-Agent': 'Mozilla/5.0 (Linux; U; Android 8.0.0; zh-cn; Mi Note 2 Build/OPR1.170623.032) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/61.0.3163.128 Mobile Safari/537.36 XiaoMi/MiuiBrowser/10.1.1',
'Referer': 'http://ccczb.top/ys/index.php'
}
class Spider(Spider):
def getName(self):
return "丢丢喵"
def init(self, extend):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def extract_middle_text(self, text, start_str, end_str, pl, start_index1: str = '', end_index2: str = ''):
if pl == 3:
plx = []
while True:
start_index = text.find(start_str)
if start_index == -1:
break
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
break
middle_text = text[start_index + len(start_str):end_index]
plx.append(middle_text)
text = text.replace(start_str + middle_text + end_str, '')
if len(plx) > 0:
purl = ''
for i in range(len(plx)):
matches = re.findall(start_index1, plx[i])
output = ""
for match in matches:
match3 = re.search(r'(?:^|[^0-9])(\d+)(?:[^0-9]|$)', match[1])
if match3:
number = match3.group(1)
else:
number = 0
if 'http' not in match[0]:
output += f"#{match[1]}${number}{xurl}{match[0]}"
else:
output += f"#{match[1]}${number}{match[0]}"
output = output[1:]
purl = purl + output + "$$$"
purl = purl[:-3]
return purl
else:
return ""
else:
start_index = text.find(start_str)
if start_index == -1:
return ""
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
return ""
if pl == 0:
middle_text = text[start_index + len(start_str):end_index]
return middle_text.replace("\\", "")
if pl == 1:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
jg = ' '.join(matches)
return jg
if pl == 2:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
new_list = [f'{item}' for item in matches]
jg = '$$$'.join(new_list)
return jg
def _fetch_categories_data(self):
url = f"{xurl}/ys/simple_api.php?action=categories"
max_retries = 3
retries = 0
while retries < max_retries:
try:
detail = requests.get(url=url, headers=headerx)
detail.encoding = 'utf-8-sig'
return detail.json()
except requests.exceptions.JSONDecodeError as e:
retries += 1
if retries < max_retries:
time.sleep(1)
else:
raise
except Exception as e:
raise
def _process_category(self, vod):
name = vod['type_name']
id = vod['type_id']
return {"type_id": id, "type_name": name}
def _build_home_content_result(self, data):
result = {"class": []}
for vod in data['data']:
category = self._process_category(vod)
result["class"].append(category)
return result
def homeContent(self, filter):
data = self._fetch_categories_data()
return self._build_home_content_result(data)
def _fetch_movie_data(self):
detail = requests.get(url=f"{xurl}/ys/simple_api.php?action=search&keyword=%E7%94%B5%E5%BD%B1&page=1",headers=headerx)
detail.encoding = "utf-8-sig"
return detail.json()
def homeVideoContent(self):
data = self._fetch_movie_data()
videos = self._process_videos_list(data)
result = {'list': videos}
return result
def _fetch_category_data(self, cid, page):
detail = requests.get(url=f"{xurl}/ys/simple_api.php?action=category&cid={cid}&page={str(page)}",headers=headerx)
detail.encoding = "utf-8-sig"
return detail.json()
def _build_category_result(self, videos, pg):
result = {'list': videos}
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def categoryContent(self, cid, pg, filter, ext):
page = int(pg) if pg else 1
data = self._fetch_category_data(cid, page)
videos = self._process_videos_list(data)
return self._build_category_result(videos, pg)
def _fetch_detail_data(self, did):
detail = requests.get(url=f"{xurl}/ys/simple_api.php?action=detail&vid={did}", headers=headerx)
detail.encoding = "utf-8-sig"
return detail.json()
def _fetch_didiu_config(self):
url = 'https://fs-im-kefu.7moor-fs1.com/ly/4d2c3f00-7d4c-11e5-af15-41bf63ae4ea0/1732697392729/didiu.txt'
response = requests.get(url)
response.encoding = 'utf-8'
return response.text
def _extract_video_info(self, data, code):
name = self.extract_middle_text(code, "s1='", "'", 0)
Jumps = self.extract_middle_text(code, "s2='", "'", 0)
content = '😸丢丢为您介绍剧情📢' + data.get('data', {}).get('vod_content', '暂无')
director = data.get('data', {}).get('vod_director', '暂无')
actor = data.get('data', {}).get('vod_actor', '暂无')
year = data.get('data', {}).get('vod_year', '暂无')
area = data.get('data', {}).get('vod_area', '暂无')
if name not in content:
bofang = Jumps
xianlu = '1'
else:
bofang = data['data']['vod_play_url']
xianlu = data['data']['vod_play_from']
return {
"director": director,
"actor": actor,
"year": year,
"area": area,
"content": content,
"bofang": bofang,
"xianlu": xianlu
}
def _build_detail_result(self, did, video_info):
videos = []
videos.append({
"vod_id": did,
"vod_director": video_info["director"],
"vod_actor": video_info["actor"],
"vod_year": video_info["year"],
"vod_area": video_info["area"],
"vod_content": video_info["content"],
"vod_play_from": video_info["xianlu"],
"vod_play_url": video_info["bofang"]
})
result = {'list': videos}
return result
def detailContent(self, ids):
did = ids[0]
data = self._fetch_detail_data(did)
code = self._fetch_didiu_config()
video_info = self._extract_video_info(data, code)
return self._build_detail_result(did, video_info)
def _process_videos_list(self, data):
videos = []
for vod in data['data']['list']:
name = vod['vod_name']
id = vod['vod_id']
pic = vod['vod_pic']
remark = vod.get('vod_remarks', '暂无备注')
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
return videos
def _fetch_player_data(self, id):
detail = requests.get(url=f"{xurl}/ys/simple_api.php?action=play&lid={id}", headers=headerx)
detail.encoding = "utf-8-sig"
return detail.json()
def _extract_play_url(self, data):
return data['data']['url']
def _build_player_result(self, url):
result = {}
result["parse"] = 0
result["playUrl"] = ''
result["url"] = url
result["header"] = headers
return result
def playerContent(self, flag, id, vipFlags):
data = self._fetch_player_data(id)
url = self._extract_play_url(data)
return self._build_player_result(url)
def _fetch_search_data(self, key, page):
detail = requests.get(url=f"{xurl}/ys/simple_api.php?action=search&keyword={key}&page={str(page)}",headers=headerx)
detail.encoding = "utf-8-sig"
return detail.json()
def _build_search_result(self, videos, pg):
result = {}
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def searchContentPage(self, key, quick, pg):
page = int(pg) if pg else 1
data = self._fetch_search_data(key, page)
videos = self._process_videos_list(data)
return self._build_search_result(videos, pg)
def searchContent(self, key, quick, pg="1"):
return self.searchContentPage(key, quick, '1')
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None

599
jtxtv12/py/两个BT.py Normal file
View File

@@ -0,0 +1,599 @@
# coding=utf-8
# !/usr/bin/python
import sys
sys.path.append('..')
from base.spider import Spider
import json
import time
import urllib.parse
import re
import base64
class Spider(Spider):
def getName(self):
return "两个BT"
def init(self, extend=""):
self.host = "https://www.bttwoo.com"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Referer': self.host
}
self.log(f"两个BT爬虫初始化完成主站: {self.host}")
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
"""首页内容 - TVBox标准实现"""
result = {}
# 1. 定义分类 - 基于实际网站结构
classes = [
{'type_id': 'movie_bt_tags/xiju', 'type_name': '喜剧'},
{'type_id': 'movie_bt_tags/aiqing', 'type_name': '爱情'},
{'type_id': 'movie_bt_tags/adt', 'type_name': '冒险'},
{'type_id': 'movie_bt_tags/at', 'type_name': '动作'},
{'type_id': 'movie_bt_tags/donghua', 'type_name': '动画'},
{'type_id': 'movie_bt_tags/qihuan', 'type_name': '奇幻'},
{'type_id': 'movie_bt_tags/xuanni', 'type_name': '悬疑'},
{'type_id': 'movie_bt_tags/kehuan', 'type_name': '科幻'},
{'type_id': 'movie_bt_tags/juqing', 'type_name': '剧情'},
{'type_id': 'movie_bt_tags/kongbu', 'type_name': '恐怖'},
{'type_id': 'meiju', 'type_name': '美剧'},
{'type_id': 'gf', 'type_name': '高分电影'}
]
result['class'] = classes
# 2. 添加筛选配置
result['filters'] = self._get_filters()
# 3. 获取首页推荐内容
try:
rsp = self.fetch(self.host, headers=self.headers)
doc = self.html(rsp.text)
videos = self._get_videos(doc, limit=50)
result['list'] = videos
except Exception as e:
self.log(f"首页获取出错: {str(e)}")
result['list'] = []
return result
def homeVideoContent(self):
"""兼容性方法 - 提供分类定义"""
return {
'class': [
{'type_id': 'movie_bt_tags/xiju', 'type_name': '喜剧'},
{'type_id': 'movie_bt_tags/aiqing', 'type_name': '爱情'},
{'type_id': 'movie_bt_tags/adt', 'type_name': '冒险'},
{'type_id': 'movie_bt_tags/at', 'type_name': '动作'},
{'type_id': 'movie_bt_tags/donghua', 'type_name': '动画'},
{'type_id': 'movie_bt_tags/qihuan', 'type_name': '奇幻'},
{'type_id': 'movie_bt_tags/xuanni', 'type_name': '悬疑'},
{'type_id': 'movie_bt_tags/kehuan', 'type_name': '科幻'},
{'type_id': 'movie_bt_tags/juqing', 'type_name': '剧情'},
{'type_id': 'movie_bt_tags/kongbu', 'type_name': '恐怖'},
{'type_id': 'meiju', 'type_name': '美剧'},
{'type_id': 'gf', 'type_name': '高分电影'}
],
'filters': self._get_filters()
}
def categoryContent(self, tid, pg, filter, extend):
"""分类内容 - 支持筛选功能"""
try:
# 合并filter和extend参数
if filter and isinstance(filter, dict):
if not extend:
extend = {}
extend.update(filter)
self.log(f"分类请求: tid={tid}, pg={pg}, extend={extend}")
url = self._build_url(tid, pg, extend)
if not url:
return {'list': []}
rsp = self.fetch(url, headers=self.headers)
doc = self.html(rsp.text)
videos = self._get_videos(doc, limit=20)
return {
'list': videos,
'page': int(pg),
'pagecount': 999,
'limit': 20,
'total': 19980
}
except Exception as e:
self.log(f"分类内容获取出错: {str(e)}")
return {'list': []}
def searchContent(self, key, quick, pg="1"):
"""搜索功能 - 智能过滤"""
try:
search_url = f"{self.host}/xssssearch?q={urllib.parse.quote(key)}"
if pg and pg != "1":
search_url += f"&p={pg}"
self.log(f"搜索URL: {search_url}")
rsp = self.fetch(search_url, headers=self.headers)
doc = self.html(rsp.text)
videos = []
seen_ids = set()
# 搜索结果选择器
elements = doc.xpath('//li[contains(@class,"") and .//a[contains(@href,"/movie/")]]')
self.log(f"找到 {len(elements)} 个搜索结果元素")
for elem in elements:
video = self._extract_video_info(elem, is_search=True)
if video and video['vod_id'] not in seen_ids:
# 添加相关性检查
if self._is_relevant_search_result(video['vod_name'], key):
videos.append(video)
seen_ids.add(video['vod_id'])
self.log(f"✅ 相关视频: {video['vod_name']} (ID: {video['vod_id']})")
else:
self.log(f"❌ 过滤无关: {video['vod_name']} (搜索: {key})")
self.log(f"最终搜索结果: {len(videos)} 个视频")
return {'list': videos}
except Exception as e:
self.log(f"搜索出错: {str(e)}")
return {'list': []}
def detailContent(self, ids):
"""详情页面"""
try:
vid = ids[0]
detail_url = f"{self.host}/movie/{vid}.html"
rsp = self.fetch(detail_url, headers=self.headers)
doc = self.html(rsp.text)
video_info = self._get_detail(doc, vid)
return {'list': [video_info]} if video_info else {'list': []}
except Exception as e:
self.log(f"详情获取出错: {str(e)}")
return {'list': []}
def playerContent(self, flag, id, vipFlags):
"""播放链接"""
try:
self.log(f"获取播放链接: flag={flag}, id={id}")
# 解码Base64播放ID
try:
decoded_id = base64.b64decode(id).decode('utf-8')
self.log(f"解码播放ID: {decoded_id}")
except:
decoded_id = id
play_url = f"{self.host}/v_play/{id}.html"
# 返回播放页面URL让播放器处理
return {'parse': 1, 'playUrl': '', 'url': play_url}
except Exception as e:
self.log(f"播放链接获取出错: {str(e)}")
return {'parse': 1, 'playUrl': '', 'url': f"{self.host}/v_play/{id}.html"}
# ========== 辅助方法 ==========
def _get_filters(self):
"""获取筛选配置 - TVBox兼容版"""
base_filters = [
{
'key': 'area',
'name': '地区',
'value': [
{'n': '全部', 'v': ''},
{'n': '中国大陆', 'v': '中国大陆'},
{'n': '美国', 'v': '美国'},
{'n': '韩国', 'v': '韩国'},
{'n': '日本', 'v': '日本'},
{'n': '英国', 'v': '英国'},
{'n': '法国', 'v': '法国'},
{'n': '德国', 'v': '德国'},
{'n': '其他', 'v': '其他'}
]
},
{
'key': 'year',
'name': '年份',
'value': [
{'n': '全部', 'v': ''},
{'n': '2025', 'v': '2025'},
{'n': '2024', 'v': '2024'},
{'n': '2023', 'v': '2023'},
{'n': '2022', 'v': '2022'},
{'n': '2021', 'v': '2021'},
{'n': '2020', 'v': '2020'},
{'n': '2019', 'v': '2019'},
{'n': '2018', 'v': '2018'}
]
}
]
# 为每个分类提供筛选配置
filters = {}
category_ids = [
'movie_bt_tags/xiju', 'movie_bt_tags/aiqing', 'movie_bt_tags/adt',
'movie_bt_tags/at', 'movie_bt_tags/donghua', 'movie_bt_tags/qihuan',
'movie_bt_tags/xuanni', 'movie_bt_tags/kehuan', 'movie_bt_tags/juqing',
'movie_bt_tags/kongbu', 'meiju', 'gf'
]
for category_id in category_ids:
filters[category_id] = base_filters
return filters
def _build_url(self, tid, pg, extend):
"""构建URL - 支持筛选"""
try:
# 基础分类URL映射
if tid.startswith('movie_bt_tags/'):
url = f"{self.host}/{tid}"
elif tid == 'meiju':
url = f"{self.host}/meiju"
elif tid == 'gf':
url = f"{self.host}/gf"
else:
url = f"{self.host}/{tid}"
# 添加分页
if pg and pg != '1':
if '?' in url:
url += f"&paged={pg}"
else:
url += f"?paged={pg}"
return url
except Exception as e:
self.log(f"构建URL出错: {str(e)}")
return f"{self.host}/movie_bt_tags/xiju"
def _get_videos(self, doc, limit=None):
"""获取视频列表"""
try:
videos = []
seen_ids = set()
# 尝试多种选择器
selectors = [
'//li[.//a[contains(@href,"/movie/")]]',
'//div[contains(@class,"item")]//li[.//a[contains(@href,"/movie/")]]'
]
for selector in selectors:
elements = doc.xpath(selector)
if elements:
for elem in elements:
video = self._extract_video_info(elem)
if video and video['vod_id'] not in seen_ids:
videos.append(video)
seen_ids.add(video['vod_id'])
break
return videos[:limit] if limit and videos else videos
except Exception as e:
self.log(f"获取视频列表出错: {str(e)}")
return []
def _extract_video_info(self, element, is_search=False):
"""提取视频信息"""
try:
# 提取链接
links = element.xpath('.//a[contains(@href,"/movie/")]/@href')
if not links:
return None
link = links[0]
if link.startswith('/'):
link = self.host + link
vod_id = self.regStr(r'/movie/(\d+)\.html', link)
if not vod_id:
return None
# 提取标题
title_selectors = [
'.//h3/a/text()',
'.//h3/text()',
'.//a/@title',
'.//a/text()'
]
title = ''
for selector in title_selectors:
titles = element.xpath(selector)
for t in titles:
if t and t.strip() and len(t.strip()) > 1:
title = t.strip()
break
if title:
break
if not title:
return None
# 提取图片
pic = self._extract_image(element, is_search, vod_id)
# 提取备注
remarks = self._extract_remarks(element)
return {
'vod_id': vod_id,
'vod_name': title,
'vod_pic': pic,
'vod_remarks': remarks,
'vod_year': ''
}
except Exception as e:
self.log(f"提取视频信息出错: {str(e)}")
return None
def _extract_image(self, element, is_search=False, vod_id=None):
"""图片提取 - 处理懒加载"""
pic_selectors = [
'.//img/@data-original',
'.//img/@data-src',
'.//img/@src'
]
for selector in pic_selectors:
pics = element.xpath(selector)
for p in pics:
# 跳过懒加载占位符
if (p and not p.endswith('blank.gif') and
not p.startswith('data:image/') and 'base64' not in p):
if p.startswith('//'):
return 'https:' + p
elif p.startswith('/'):
return self.host + p
elif p.startswith('http'):
return p
# 搜索页面特殊处理:从详情页面获取
if is_search and vod_id:
return self._get_image_from_detail(vod_id)
return ''
def _extract_remarks(self, element):
"""提取备注信息"""
remarks_selectors = [
'.//span[contains(@class,"rating")]/text()',
'.//div[contains(@class,"rating")]/text()',
'.//span[contains(@class,"status")]/text()',
'.//div[contains(@class,"status")]/text()',
'.//span[contains(text(),"")]/text()',
'.//span[contains(text(),"1080p")]/text()',
'.//span[contains(text(),"HD")]/text()'
]
for selector in remarks_selectors:
remarks_list = element.xpath(selector)
for r in remarks_list:
if r and r.strip():
return r.strip()
return ''
def _get_image_from_detail(self, vod_id):
"""从详情页面获取图片"""
try:
detail_url = f"{self.host}/movie/{vod_id}.html"
rsp = self.fetch(detail_url, headers=self.headers)
doc = self.html(rsp.text)
# 详情页图片选择器
pic_selectors = [
'//img[contains(@class,"poster")]/@src',
'//div[contains(@class,"poster")]//img/@src',
'//img[contains(@alt,"")]/@src'
]
for selector in pic_selectors:
pics = doc.xpath(selector)
for p in pics:
if p and not p.endswith('blank.gif'):
if p.startswith('//'):
return 'https:' + p
elif p.startswith('/'):
return self.host + p
elif p.startswith('http'):
return p
except:
pass
return ''
def _is_relevant_search_result(self, title, search_key):
"""检查搜索结果是否与搜索关键词相关"""
if not title or not search_key:
return False
title_lower = title.lower()
search_key_lower = search_key.lower()
# 直接包含搜索关键词的肯定相关
if search_key_lower in title_lower:
return True
# 字符匹配
search_chars = set(search_key_lower.replace(' ', ''))
title_chars = set(title_lower.replace(' ', ''))
if len(search_chars) > 0:
match_ratio = len(search_chars & title_chars) / len(search_chars)
if match_ratio >= 0.6:
return True
# 短搜索词要求严格匹配
if len(search_key_lower) <= 2:
return search_key_lower in title_lower
return False
def _get_detail(self, doc, vod_id):
"""获取详情信息"""
try:
# 提取标题
title_selectors = [
'//h1/text()',
'//h2/text()',
'//title/text()'
]
title = ''
for selector in title_selectors:
titles = doc.xpath(selector)
for t in titles:
if t and t.strip():
title = t.strip()
break
if title:
break
# 提取图片
pic_selectors = [
'//img[contains(@class,"poster")]/@src',
'//div[contains(@class,"poster")]//img/@src',
'//img/@src'
]
pic = ''
for selector in pic_selectors:
pics = doc.xpath(selector)
for p in pics:
if p and not p.endswith('blank.gif'):
if p.startswith('//'):
pic = 'https:' + p
elif p.startswith('/'):
pic = self.host + p
elif p.startswith('http'):
pic = p
break
if pic:
break
# 提取描述
desc_selectors = [
'//div[contains(@class,"intro")]//text()',
'//div[contains(@class,"description")]//text()',
'//p[contains(@class,"desc")]//text()'
]
desc = ''
for selector in desc_selectors:
descs = doc.xpath(selector)
desc_parts = []
for d in descs:
if d and d.strip():
desc_parts.append(d.strip())
if desc_parts:
desc = ' '.join(desc_parts)
break
# 提取演员
actor_selectors = [
'//li[contains(text(),"主演")]/text()',
'//span[contains(text(),"主演")]/following-sibling::text()',
'//div[contains(@class,"actor")]//text()'
]
actor = ''
for selector in actor_selectors:
actors = doc.xpath(selector)
for a in actors:
if a and a.strip() and '主演' in a:
actor = a.strip().replace('主演:', '').replace('主演', '')
break
if actor:
break
# 提取导演
director_selectors = [
'//li[contains(text(),"导演")]/text()',
'//span[contains(text(),"导演")]/following-sibling::text()',
'//div[contains(@class,"director")]//text()'
]
director = ''
for selector in director_selectors:
directors = doc.xpath(selector)
for d in directors:
if d and d.strip() and '导演' in d:
director = d.strip().replace('导演:', '').replace('导演', '')
break
if director:
break
# 提取播放源
play_sources = self._parse_play_sources(doc, vod_id)
return {
'vod_id': vod_id,
'vod_name': title,
'vod_pic': pic,
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': actor,
'vod_director': director,
'vod_content': desc,
'vod_play_from': '$$$'.join([source['name'] for source in play_sources]),
'vod_play_url': '$$$'.join([source['episodes'] for source in play_sources])
}
except Exception as e:
self.log(f"获取详情出错: {str(e)}")
return None
def _parse_play_sources(self, doc, vod_id):
"""解析播放源"""
try:
play_sources = []
# 查找播放链接
episode_selectors = [
'//a[contains(@href,"/v_play/")]',
'//div[contains(@class,"play")]//a'
]
episodes = []
for selector in episode_selectors:
episode_elements = doc.xpath(selector)
if episode_elements:
for ep in episode_elements:
ep_title = ep.xpath('./text()')[0] if ep.xpath('./text()') else ''
ep_url = ep.xpath('./@href')[0] if ep.xpath('./@href') else ''
if ep_title and ep_url:
# 提取播放ID
play_id = self.regStr(r'/v_play/([^.]+)\.html', ep_url)
if play_id:
episodes.append(f"{ep_title.strip()}${play_id}")
break
if episodes:
play_sources.append({
'name': '默认播放',
'episodes': '#'.join(episodes)
})
else:
# 默认播放源
play_sources.append({
'name': '默认播放',
'episodes': f'第1集$bXZfMTM0NTY4LW5tXzE='
})
return play_sources
except Exception as e:
self.log(f"解析播放源出错: {str(e)}")
return [{'name': '默认播放', 'episodes': f'第1集$bXZfMTM0NTY4LW5tXzE='}]

151
jtxtv12/py/盘友圈.py Normal file
View File

@@ -0,0 +1,151 @@
# coding=utf-8
# !/usr/bin/python
import requests
from bs4 import BeautifulSoup
import re
from base.spider import Spider
import sys
import json
import os
import base64
sys.path.append('..')
xurl='https://panyq.com'
headerx = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36'
}
class Spider(Spider):
global xurl2
global xurl
global headerx
def getName(self):
return "首页"
def init(self, extend):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
pass
return result
def homeVideoContent(self):
pass
def categoryContent(self, cid, pg, filter, ext):
pass
def detailContent(self, ids):
try:
data = json.loads(bytes.fromhex(ids[0]).decode())
verify = requests.post(f'{xurl}/search/{data["hash"]}',
headers=self.getheader(-1),
data=json.dumps(data['data'], separators=(",", ":")).encode(),
)
if verify.status_code == 200:
eid = data['data'][0]['eid']
rdata = json.dumps([{"eid": eid}], separators=(",", ":")).encode()
res = requests.post(f'{xurl}/go/{eid}', headers=self.getheader(1), data=rdata)
purl = json.loads(res.text.strip().split('\n')[-1].split(":", 1)[-1])['data']['link']
if not re.search(r'pwd=|码', purl) and data['password']:
purl = f"{purl}{'&' if '?' in purl else '?'}pwd={data['password']}"
print("获取盘链接为:", purl)
else:
raise Exception('验证失败')
vod = {
'vod_id': '',
'vod_name': '',
'vod_pic': '',
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': '',
'vod_director': '',
'vod_content': '',
'vod_play_from': '集多网盘',
'vod_play_url': purl
}
params = {
"do": "push",
"url": purl
}
response = requests.post("http://127.0.0.1:9978/action", data=params, headers={
"Content-Type": "application/x-www-form-urlencoded"
})
return {'list': [vod]}
except Exception as e:
print(e)
return {'list': []}
def playerContent(self, flag, id, vipFlags):
pass
def searchContentPage(self, key, quick, page='1'):
sign, sha, hash = self.getsign(key, page)
headers = self.getheader()
res = requests.get(f'{xurl}/api/search', params={'sign': sign}, headers=headers).json()
videos = []
for i in res['data']['hits']:
ccc = [{"eid": i.get("eid"), "sha": sha, "page_num": page}]
ddd = (json.dumps({'sign': sign, 'hash': hash, 'data': ccc, 'password': i.get('password')})).encode().hex()
if i.get('group')=='quark':
pic='https://android-artworks.25pp.com/fs08/2024/12/27/7/125_d45d9de77c805e17ede25e4a2d9d3444_con.png'
elif i.get('group')=='baidu':
pic='https://is4-ssl.mzstatic.com/image/thumb/Purple126/v4/dd/45/eb/dd45eb77-d21d-92f2-c46d-979797a6be4a/AppIcon-0-0-1x_U007emarketing-0-0-0-7-0-0-sRGB-0-0-0-GLES2_U002c0-512MB-85-220-0-0.png/1024x1024bb.jpg'
else:
pic='https://gimg2.baidu.com/image_search/src=http%3A%2F%2Fimg.alicdn.com%2Fbao%2Fuploaded%2Fi4%2F2213060290763%2FO1CN01joakK61HVUwob2JIJ_%21%212213060290763.jpg&refer=http%3A%2F%2Fimg.alicdn.com&app=2002&size=f9999,10000&q=a80&n=0&g=0n&fmt=auto?sec=1757745912&t=e7b98fced3a4f092c8ef26490997b004'
videos.append({
'vod_id': ddd,
'vod_name': i.get('desc').split('<mark>')[0].replace('<mark>', ""),
'vod_pic': pic,
'vod_remarks': i.get('group'),
})
return {'list': videos, 'page': page}
def searchContent(self, key, quick):
return self.searchContentPage(key, quick, '1')
def searchContent(self, key, quick, pg):
return self.searchContentPage(key, quick, pg)
def getsign(self,key,pg):
headers=self.getheader()
data=json.dumps([{"cat":"all","query":key,"pageNum":int(pg),"enableSearchMusic":False,"enableSearchGame":False,"enableSearchEbook":False}],separators=(",", ":"),ensure_ascii= False).encode()
res = requests.post(xurl, headers=headers, data=data).text
hash=re.search(r'"hash",\s*"([^"]+)"', res).group(1)
sign = re.search(r'"sign":\s*"([^"]+)"', res).group(1)
sha= re.search(r'"sha":\s*"([^"]+)"', res).group(1)
return sign,sha,hash
def getheader(self,k=0):
kes=['ecce0904d756da58b9ea5dd03da3cacea9fa29c6','4c5c1ef8a225004ce229e9afa4cc7189eed3e6fe','c4ed62e2b5a8e3212b334619f0cdbaa77fa842ff']
headers = {
'origin': xurl,
'referer': f'{xurl}/',
'next-action': kes[k],
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="136", "Google Chrome";v="136"',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.7103.48 Safari/537.36',
}
return headers
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None