iptv/iptv/downloadchannels.py
2025-09-20 11:57:23 +08:00

98 lines
2.3 KiB
Python

# url=https://m3uplaylist.download/free-iptv-m3u-playlists
# the m3u playlist from here
import sys
import os
import asyncio
from datetime import datetime
from appPublic.jsonConfig import getConfig
from appPublic.streamhttpclient import StreamHttpClient
from appPublic.uniqueID import getID
from appPublic.log import debug
from sqlor.dbpools import DBPools
from iptv import m3u
from iptv.m3u8test import test_channels
async def download(url):
client = StreamHttpClient()
bin = await client.request('GET', url)
if bin is None:
debug(f'request {url=} error')
return None
txt = bin.decode('utf-8')
clist = m3u.m3uParser(txt)
return clist
async def saveChannels(media_type,clist, dbname):
pool = DBPools()
sql = """insert into iptvchannels
(
id,
tv_group,
tv_name,
logo_url,
url,
media_type,
download_date,
del_flg
)
values
(
${id}$,
${tv_group}$,
${tv_name}$,
${logo_url}$,
${url}$,
${media_type}$,
${download_date}$,
'0'
)"""
dup = 0
query = """select * from iptvchannels where url=${url}$"""
async with pool.sqlorContext(dbname) as sor:
for r in clist:
q = await sor.sqlExe(query,r)
if len(q) < 1:
r['media_type'] = media_type
r['id'] = getID()
r['tv_group'] = r.get('group-title','')[:500]
r['tv_name'] = r.get('name','')[:500]
r['logo_url'] = r.get('tvg-logo',None)
if r['logo_url'] and len(r['logo_url']) > 1000:
r['logo_url'] = None
if len(r['url']) >= 1000:
continue;
dt = datetime.now()
r['download_date'] = '%d-%02d-%02d' % (dt.year,dt.month,dt.day)
await sor.sqlExe(sql,r)
else:
dup += 1
debug(f'{dup} exists')
async def load_url_iptv(media_type,url, dbname):
clist = await download(url)
goodchannels, badchannels = test_channels(clist)
if goodchannels:
debug('%d channels' % len(goodchannels))
await saveChannels(media_type,goodchannels, dbname)
else:
debug(f'{url} return None')
if __name__ == '__main__':
import sys
if len(sys.argv) < 3:
print('Usage:\n%s workdir m3u_playlist_url\n' % sys.argv[0])
sys.exit(1)
p = sys.argv[1]
config = getConfig(p, {'workdir':p})
DBPools(config.databases)
dbname = 'iptvdb'
media_type = 'iptv'
if len(sys.argv) > 3:
media_type = sys.argv[3]
loop = asyncio.get_event_loop()
loop.run_until_complete(load_url_iptv(media_type, sys.argv[2], dbname))