[mythtvnz] Updated EPGSnoop Script
Andrew Bruce
mythtvnz@lists.linuxnut.co.nz
Tue, 24 Oct 2006 17:43:47 +1300
This is a multi-part message in MIME format.
--------------020307090600070409090906
Content-Type: text/plain; charset=ISO-8859-1; format=flowed
Content-Transfer-Encoding: 7bit
>> This information was provided to me and I was asked to repost the
following to this list, in the hope that it may be of some use to the
community:
This script has a slight fix to the previously posted version.
--------------020307090600070409090906
Content-Type: text/plain;
name="epgsnoop.py"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
filename="epgsnoop.py"
#!/usr/bin/python
import sys
import shutil
import os, os.path
import popen2, signal
import datetime
import string
import cgi
import re
# Convert DVB dates, ETSI EN 300 468 (DVB SI), Annex C
def mjd_to_date(dvb):
# bcd hour/min/sec
hour = int(dvb[-6:-4])
minute = int(dvb[-4:-2])
second = int(dvb[-2:])
# Date or duration
mjd = int(dvb[:-6], 16)
if mjd == 0:
return (int(hour), int(minute), int(second))
# Intermediate calcs
y = int((mjd - 15078.2) / 365.25)
m = int((mjd - 14956.1 - int(y * 365.25)) / 30.6001)
if m == 14 or m == 15:
k = 1
else:
k = 0
# Date
m_year = y + k + 1900
m_month = m - 1 - k * 12
m_day = mjd - 14956 - int(y * 365.25) - int(m * 30.6001)
return (int(m_year), int(m_month), int(m_day), int(hour), int(minute), int(second))
# Output the programe XML
def output_programme(f, p, hoffset=0):
# Get the programme start
offset = datetime.timedelta(hours=hoffset)
# Create the datetime representations
s = mjd_to_date(p['start'])
d = mjd_to_date(p['duration'])
startdt = datetime.datetime(s[0], s[1], s[2], s[3], s[4], s[5])
duration = datetime.timedelta(hours=d[0], minutes=d[1], seconds=d[2])
# Calculate programme date stuff
startdt = startdt + offset
stopdt = startdt + duration + offset
# Convert to xml string forms
startstr = startdt.strftime("%Y%m%d%H%M%S")
stopstr = stopdt.strftime("%Y%m%d%H%M%S")
durationstr = "%02d:%02d:%02d" % d
# Start programme record
print >>f, '<programme channel="%s.%s" start="%s" stop="%s">' % (p['channel_id'], p['channel'], startstr, stopstr)
# We will always have a title
title = '\t<title'
if p.has_key("language"):
title = title + ' lang="%s"' % p['language']
title = title + '>%s</title>' % cgi.escape(p['title'])
print >>f, title
# Sort out the description (if there is one)
description = ""
if p.has_key("description"):
description = cgi.escape(p['description'])
print >>f, '\t<desc length="%d">%s</desc>' % (len(description), description)
# Duration .. informational
print >>f, '\t<duration mjd="%s" from="%s" to="%s">%s</duration>' % (p['start'], startdt, stopdt, durationstr)
# Rating
if p.has_key("rating"):
rating = '\t<rating'
if p.has_key("country"):
rating = rating + ' country="%s"' % p['country']
if p.has_key("ratinginfo"):
rating = rating + ' parental="%s"' % cgi.escape(p['ratinginfo'], quote=True)
rating = rating + '>' + p['rating'] + '</rating>'
print >>f, rating
# Content
if p.has_key("content_1") and p.has_key("content_2"):
hexvalue = "%x%x" % (int(p["content_1"]), int(p["content_2"]))
content = '\t<content'
if p.has_key("contentinfo"):
content = content + ' genre="%s"' % cgi.escape(p['contentinfo'], quote=True)
content = content + '>0x' + hexvalue + '</content>'
print >>f, content
# Done
print >>f, '</programme>'
# Process packet
def process_packet(pkt):
global events, packets
found = 0
# Create some regex's for extraction
decode_regex = re.compile("""\[= (.*?)\]$""")
detail_regex = re.compile("""[char|name]: "(.*?)" -- Charset""")
# Process the packet
packets += 1
event_id = None
for data in pkt:
#print data
# Log key details from top of packet
if data[:8] == "Table_ID":
print ".", data.split(' [=')[0]
table_id = data.split(': ')[1:][0].split()[0]
continue
if data[:14] == "Version_number":
print ".", data
version = data.split(': ')[1:][0].split()[0]
continue
if data[:10] == "Service_ID":
print ".", data.split(' [=')[0]
channel_id = data.split(': ')[1:][0].split()[0]
if channels.has_key(channel_id):
channel_name = channels[channel_id]
else:
channel_name = "UNKNOWN"
continue
# Are we in en event
if data[:8] == "Event_ID":
print "* Found", data
# Store old event and create a new one
if event_id != None:
key = channel_id + "|" + event_id
if not unique.has_key(key):
unique[key] = version
programmes.append(event)
print "= events", events, "programmes", len(programmes)
found += 1
event_id = data.split(': ')[1:][0].split()[0]
event = {}
event['eventid'] = event_id
event['version'] = version
event['channel_id'] = channel_id
event['channel'] = channel_name
events += 1
continue
# End of packet store last event
if data[:3] == "CRC":
if event_id != None:
key = channel_id + "|" + event_id
if not unique.has_key(key):
unique[key] = version
programmes.append(event)
print "= events", events, "programmes", len(programmes)
found += 1
# Check for event data
if event_id != None:
# Starttime
if data[:11] == "Start_time:":
event['start'] = string.join(data.split(': ')[1:]).split()[0]
event['startinfo'] = decode_regex.findall(data)[0].strip()
continue
# Duration
if data[:9] == "Duration:":
event['duration'] = string.join(data.split(': ')[1:]).split()[0]
event['durationinfo'] = decode_regex.findall(data)[0].strip()
continue
# Name
if data[:11] == "event_name:":
event['title'] = detail_regex.findall(data)[0]
print "+", event['title'], channel_name
continue
# Description
if data[:10] == "text_char:":
try:
event['description'] = detail_regex.findall(data)[0]
except:
event['description'] = ""
continue
# Rating
if data[:7] == "Rating:":
event['rating'] = string.join(data.split(': ')[1:]).split()[0]
event['ratinginfo'] = decode_regex.findall(data)[0].strip()
continue
# Country
if data[:13] == "Country_code:":
event['country'] = string.join(data.split(': ')[1:]).strip()
continue
# Language
if data.find("language_code:") >= 0:
event['language'] = string.join(data.split(': ')[1:]).strip()
continue
# Content
if data[:23] == "Content_nibble_level_1:":
event['content_1'] = string.join(data.split(': ')[1:]).split()[0]
continue
if data[:23] == "Content_nibble_level_2:":
event['content_2'] = string.join(data.split(': ')[1:]).split()[0]
continue
if data[:3] == "[= ":
event['contentinfo'] = decode_regex.findall(data)[0].strip()
continue
# User
if data[:14] == "User_nibble_1:":
event['user_1'] = string.join(data.split(': ')[1:]).split()[0]
continue
if data[:14] == "User_nibble_2:":
event['user_2'] = string.join(data.split(': ')[1:]).split()[0]
continue
# Found how many shows?
return found
# ensure we don't leave process
def snoop_finished(signum, frame):
os.kill(snoop.pid, signal.SIGTERM)
if os.path.exists(pidfile):
os.remove(pidfile)
sys.exit(1)
# Key information arrays
programmes = []
channels = {}
unique = {}
# Key counters
events= 0
packets = 0
# Setup maximums
nilpkts = 2500
# Setup files
epgfile = "epgsnoop.xml"
pidfile = "/tmp/epgsnoop.pid"
# test and or write the pid file
if os.path.exists(pidfile):
print >>sys.stderr, "Aborted: already snooping..."
sys.exit(2)
else:
pidf = open(pidfile, "w")
print >>pidf, os.getpid()
pidf.close()
# Setup sigint handler, kill subprocess on ^C
signal.signal(signal.SIGINT, snoop_finished)
# open stream
#snoop = popen2.Popen3("dvbsnoop -nph 0x12 | /bin/tee data.dmp")
snoop = popen2.Popen3("dvbsnoop -softcrc -nph 0x12")
# Read in the channel idents
chanidents = open('chanidents', 'r').readlines()
for chan in chanidents:
channel = chan.split()
if channel[0] != '#' and len(channel) == 2:
channels[channel[0]] = channel[1]
# Loop packets
check = i = 0
while check < nilpkts:
i = i + 1
print
print "Packet %010d" % i
# Get packet start
out = snoop.fromchild.readline()
while out and out[:11] != "SECT-Packet":
out = snoop.fromchild.readline()
# Packetize
pkt = []
pkt.append(out.strip())
while out and out[:3] != "CRC":
out = snoop.fromchild.readline()
pkt.append(out.strip())
# Process the packet
found = process_packet(pkt)
if found > 0:
check = 0
else:
check += 1
# Had enoungh
if check >= nilpkts:
print
break
print "Nil shows in last", nilpkts, "packets, consumed", packets, "total"
try:
print snoop.childerr.readlines()
except:
print "No errors reported"
# Natural completion ... kill snoop
os.kill(snoop.pid, signal.SIGTERM)
# Statistics
print
print "Packets", packets
print "Events", events
print "Programmes", len(programmes)
# Unleash the output
bad = []
f = open(epgfile,'w')
# start
print >>f, '<?xml version="1.0" encoding="ISO-8859-1"?>'
print >>f, '<tv generator-info-name="%s">' % sys.argv[0].split('/')[-1]
print >>f
# Output the programme records
for p in programmes:
try:
output_programme(f, p, hoffset=12)
except:
bad.append(p)
# Finish
print >>f
print >>f, '</tv>'
# Bad events
print len(bad), "Bad events"
if len(bad) > 0:
print >>f
print >>f, "<!--"
for b in bad:
print >>f, "\t", b
print >>f, "-->"
# Done
f.close()
print
# Is there a request to copy the file somewhere?
if len(sys.argv) > 1:
print "Copying output to", sys.argv[1]
shutil.copy2(epgfile, sys.argv[1])
# clean up the pid file
if os.path.exists(pidfile):
os.remove(pidfile)
sys.exit(0)
--------------020307090600070409090906--