# url=https://m3uplaylist.download/free-iptv-m3u-playlists # the m3u playlist from here import sys import os import asyncio from datetime import datetime from appPublic.jsonConfig import getConfig from appPublic.httpclient import HttpClient from appPublic.uniqueID import getID from appPublic.log import debug from sqlor.dbpools import DBPools import m3u async def download(url): client = HttpClient() txt = await client.request(url, 'GET') clist = m3u.m3uParser(txt) return clist async def saveChannels(media_type,clist, dbname): pool = DBPools() sql = """insert into iptvchannels ( id, tv_group, tv_name, logo_url, url, media_type, download_date, del_flg ) values ( ${id}$, ${tv_group}$, ${tv_name}$, ${logo_url}$, ${url}$, ${media_type}$, ${download_date}$, '0' )""" dup = 0 query = """select * from iptvchannels where url=${url}$""" async with pool.sqlorContext(dbname) as sor: for r in clist: q = await sor.sqlExe(query,r) if len(q) < 1: r['media_type'] = media_type r['id'] = getID() r['tv_group'] = r.get('group-title','')[:500] r['tv_name'] = r.get('name','')[:500] r['logo_url'] = r.get('tvg-logo',None) if r['logo_url'] and len(r['logo_url']) > 1000: r['logo_url'] = None if len(r['url']) >= 1000: continue; dt = datetime.now() r['download_date'] = '%d-%02d-%02d' % (dt.year,dt.month,dt.day) await sor.sqlExe(sql,r) else: dup += 1 debug(f'{dup} exists') async def load_url_iptv(media_type,url, dbname): clist = await download(url) if clist: debug('%d channels' % len(clist)) await saveChannels(media_type,clist, dbname) else: debug(f'{url} return None') if __name__ == '__main__': import sys if len(sys.argv) < 3: print('Usage:\n%s workdir m3u_playlist_url\n' % sys.argv[0]) sys.exit(1) p = sys.argv[1] config = getConfig(p, {'workdir':p}) DBPools(config.databases) dbname = 'iptvdb' media_type = 'iptv' if len(sys.argv) > 3: media_type = sys.argv[3] loop = asyncio.get_event_loop() loop.run_until_complete(load_url_iptv(media_type, sys.argv[2], dbname))