95 lines
2.2 KiB
Python
95 lines
2.2 KiB
Python
# url=https://m3uplaylist.download/free-iptv-m3u-playlists
|
|
# the m3u playlist from here
|
|
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
from datetime import datetime
|
|
from appPublic.jsonConfig import getConfig
|
|
from appPublic.streamhttpclient import StreamHttpClient
|
|
from appPublic.uniqueID import getID
|
|
from appPublic.log import debug
|
|
from sqlor.dbpools import DBPools
|
|
import m3u
|
|
|
|
async def download(url):
|
|
client = StreamHttpClient()
|
|
bin = await client.request('GET', url)
|
|
if bin is None:
|
|
debug(f'request {url=} error')
|
|
return None
|
|
txt = bin.decode('utf-8')
|
|
clist = m3u.m3uParser(txt)
|
|
return clist
|
|
|
|
async def saveChannels(media_type,clist, dbname):
|
|
pool = DBPools()
|
|
sql = """insert into iptvchannels
|
|
(
|
|
id,
|
|
tv_group,
|
|
tv_name,
|
|
logo_url,
|
|
url,
|
|
media_type,
|
|
download_date,
|
|
del_flg
|
|
)
|
|
values
|
|
(
|
|
${id}$,
|
|
${tv_group}$,
|
|
${tv_name}$,
|
|
${logo_url}$,
|
|
${url}$,
|
|
${media_type}$,
|
|
${download_date}$,
|
|
'0'
|
|
)"""
|
|
dup = 0
|
|
query = """select * from iptvchannels where url=${url}$"""
|
|
async with pool.sqlorContext(dbname) as sor:
|
|
for r in clist:
|
|
q = await sor.sqlExe(query,r)
|
|
if len(q) < 1:
|
|
r['media_type'] = media_type
|
|
r['id'] = getID()
|
|
r['tv_group'] = r.get('group-title','')[:500]
|
|
r['tv_name'] = r.get('name','')[:500]
|
|
r['logo_url'] = r.get('tvg-logo',None)
|
|
if r['logo_url'] and len(r['logo_url']) > 1000:
|
|
r['logo_url'] = None
|
|
if len(r['url']) >= 1000:
|
|
continue;
|
|
dt = datetime.now()
|
|
r['download_date'] = '%d-%02d-%02d' % (dt.year,dt.month,dt.day)
|
|
await sor.sqlExe(sql,r)
|
|
else:
|
|
dup += 1
|
|
debug(f'{dup} exists')
|
|
|
|
async def load_url_iptv(media_type,url, dbname):
|
|
clist = await download(url)
|
|
if clist:
|
|
debug('%d channels' % len(clist))
|
|
await saveChannels(media_type,clist, dbname)
|
|
else:
|
|
debug(f'{url} return None')
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
if len(sys.argv) < 3:
|
|
print('Usage:\n%s workdir m3u_playlist_url\n' % sys.argv[0])
|
|
sys.exit(1)
|
|
|
|
p = sys.argv[1]
|
|
config = getConfig(p, {'workdir':p})
|
|
DBPools(config.databases)
|
|
dbname = 'iptvdb'
|
|
media_type = 'iptv'
|
|
if len(sys.argv) > 3:
|
|
media_type = sys.argv[3]
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(load_url_iptv(media_type, sys.argv[2], dbname))
|
|
|