iptv/iptv/downloadchannels.py
2025-10-15 16:47:47 +08:00

117 lines
2.9 KiB
Python

# url=https://m3uplaylist.download/free-iptv-m3u-playlists
# the m3u playlist from here
import sys
import os
import asyncio
from datetime import datetime
from appPublic.jsonConfig import getConfig
from appPublic.streamhttpclient import StreamHttpClient
from appPublic.uniqueID import getID
from appPublic.log import debug
from sqlor.dbpools import DBPools
from iptv import m3u
from iptv.m3u8test import test_channels, write_goodchannel
async def download(url):
client = StreamHttpClient()
bin = await client.request('GET', url)
if bin is None:
debug(f'request {url=} error')
return None
txt = bin.decode('utf-8')
clist = m3u.m3uParser(txt)
return clist
async def check_if_exists(url):
env = ServerEnv()
dbname = env.get_module_dbname('iptv')
db = DBPools()
async with db.sqlorContext(dbname) as sor:
sql = "select * from iptvchannles where url = ${url}$"
recs = await sor.sqlExe(sql, {'url': url})
if len(recs) > 0:
return True
return False
async def write_goodchannel(b):
debug(f'write badchannels({b.url})')
env = ServerEnv()
dbname = env.get_module_dbname('iptv')
db = DBPools()
async with db.sqlorContext(dbname) as sor:
sql = """insert into iptvchannels
(
id,
tv_group,
tv_name,
logo_url,
url,
media_type,
download_date,
del_flg
)
values
(
${id}$,
${tv_group}$,
${tv_name}$,
${logo_url}$,
${url}$,
${media_type}$,
${download_date}$,
'0'
)"""
query = """select * from iptvchannels where url=${url}$"""
q = await sor.sqlExe(query,{'url', b['url']})
if len(q) == 0:
r = copy(b)
r['media_type'] = 'iptv'
r['id'] = getID()
if not r.get('tv_group'):
r['tv_group'] = r.get('group-title','')[:500]
if not r.get('tv_name'):
r['tv_name'] = r.get('name','')[:500]
if not r.get('logo_url'):
r['logo_url'] = r.get('tvg-logo',None)
if r['logo_url'] and len(r['logo_url']) > 1000:
r['logo_url'] = None
if len(r['url']) >= 1000:
return
dt = datetime.now()
r['download_date'] = '%d-%02d-%02d' % (dt.year,dt.month,dt.day)
await sor.sqlExe(sql,r)
async def load_url_iptv(url):
clist = await download(url)
debug(f'clist={clist}')
newchannels = []
for c in clist:
b = await check_if_exists(c.url)
if not b:
newchannels.append(c)
if len(newchannels) > 0:
debug('%d new channels' % len(newchannels))
good, bad = await test_channels(newchannels, if_ok=write_goodchannel)
debug(f'{len(good)} new channels add {len(bad)} channels exists')
else:
debug(f'{url} return None')
if __name__ == '__main__':
import sys
if len(sys.argv) < 3:
print('Usage:\n%s workdir m3u_playlist_url\n' % sys.argv[0])
sys.exit(1)
p = sys.argv[1]
config = getConfig(p, {'workdir':p})
DBPools(config.databases)
dbname = 'iptvdb'
media_type = 'iptv'
if len(sys.argv) > 3:
media_type = sys.argv[3]
loop = asyncio.get_event_loop()
loop.run_until_complete(load_url_iptv(media_type, sys.argv[2], dbname))