[mythtvnz] Updated EPGSnoop Script

Andrew Bruce mythtvnz@lists.linuxnut.co.nz
Tue, 24 Oct 2006 17:43:47 +1300


This is a multi-part message in MIME format.
--------------020307090600070409090906
Content-Type: text/plain; charset=ISO-8859-1; format=flowed
Content-Transfer-Encoding: 7bit

 >> This information was provided to me and I was asked to repost the 
following to this list, in the hope that it may be of some use to the 
community:

This script has a slight fix to the previously posted version.

--------------020307090600070409090906
Content-Type: text/plain;
 name="epgsnoop.py"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
 filename="epgsnoop.py"

#!/usr/bin/python

import sys
import shutil
import os, os.path
import popen2, signal
import datetime
import string
import cgi
import re


# Convert DVB dates, ETSI EN 300 468 (DVB SI), Annex C
def mjd_to_date(dvb):

	# bcd hour/min/sec
	hour = int(dvb[-6:-4])
	minute = int(dvb[-4:-2])
	second = int(dvb[-2:])

	# Date or duration
	mjd = int(dvb[:-6], 16)
	if mjd == 0:
		return (int(hour), int(minute), int(second))

	# Intermediate calcs
	y = int((mjd - 15078.2) / 365.25)
	m = int((mjd - 14956.1 - int(y * 365.25)) / 30.6001)
	if m == 14 or m == 15:
		k = 1
	else:
		k = 0

	# Date
	m_year = y + k + 1900
	m_month = m - 1 - k * 12
	m_day = mjd - 14956 - int(y * 365.25) - int(m * 30.6001)

	return (int(m_year), int(m_month), int(m_day), int(hour), int(minute), int(second))


# Output the programe XML
def output_programme(f, p, hoffset=0):
	# Get the programme start
	offset = datetime.timedelta(hours=hoffset)

	# Create the datetime representations
	s = mjd_to_date(p['start'])
	d = mjd_to_date(p['duration'])
	startdt = datetime.datetime(s[0], s[1], s[2], s[3], s[4], s[5])
	duration = datetime.timedelta(hours=d[0], minutes=d[1], seconds=d[2])

	# Calculate programme date stuff
	startdt = startdt + offset
	stopdt = startdt + duration + offset

	# Convert to xml string forms
	startstr = startdt.strftime("%Y%m%d%H%M%S")
	stopstr = stopdt.strftime("%Y%m%d%H%M%S")
	durationstr = "%02d:%02d:%02d" % d

	# Start programme record
	print >>f, '<programme channel="%s.%s" start="%s" stop="%s">' % (p['channel_id'], p['channel'], startstr, stopstr)

	# We will always have a title
	title = '\t<title'
	if p.has_key("language"):
		title = title + ' lang="%s"' % p['language']
	title = title + '>%s</title>' % cgi.escape(p['title'])
	print >>f, title

	# Sort out the description (if there is one)
	description = ""
	if p.has_key("description"):
		description = cgi.escape(p['description'])
	print >>f, '\t<desc length="%d">%s</desc>' % (len(description), description)

	# Duration .. informational
	print >>f, '\t<duration mjd="%s" from="%s" to="%s">%s</duration>' % (p['start'], startdt, stopdt, durationstr)

	# Rating
	if p.has_key("rating"):
		rating = '\t<rating'
		if p.has_key("country"):
			rating = rating + ' country="%s"' % p['country']
		if p.has_key("ratinginfo"):
			rating = rating + ' parental="%s"' % cgi.escape(p['ratinginfo'], quote=True)
		rating = rating + '>' + p['rating'] + '</rating>'
		print >>f, rating

	# Content
	if p.has_key("content_1") and p.has_key("content_2"):
		hexvalue = "%x%x" % (int(p["content_1"]), int(p["content_2"]))
		content = '\t<content'
		if p.has_key("contentinfo"):
			content = content + ' genre="%s"' % cgi.escape(p['contentinfo'], quote=True)
		content = content + '>0x' + hexvalue + '</content>'
		print >>f, content

	# Done
	print >>f, '</programme>'


# Process packet
def process_packet(pkt):
	global events, packets
	found = 0

	# Create some regex's for extraction
	decode_regex = re.compile("""\[= (.*?)\]$""")
	detail_regex = re.compile("""[char|name]: "(.*?)"  -- Charset""")

	# Process the packet
	packets += 1
	event_id = None
	for data in pkt:
		#print data

		# Log key details from top of packet
		if data[:8] == "Table_ID":
			print ".", data.split('  [=')[0]
			table_id = data.split(': ')[1:][0].split()[0]
			continue
		if data[:14] == "Version_number":
			print ".", data
			version = data.split(': ')[1:][0].split()[0]
			continue
		if data[:10] == "Service_ID":
			print ".", data.split('  [=')[0]
			channel_id = data.split(': ')[1:][0].split()[0]
			if channels.has_key(channel_id):
				channel_name = channels[channel_id]
			else:
				channel_name = "UNKNOWN"
			continue

		# Are we in en event
		if data[:8] == "Event_ID":
			print "* Found", data
			# Store old event and create a new one
			if event_id != None:
				key = channel_id + "|" + event_id
				if not unique.has_key(key):
					unique[key] = version
					programmes.append(event)
					print "= events", events, "programmes", len(programmes)
					found += 1
			event_id = data.split(': ')[1:][0].split()[0]
			event = {}
			event['eventid'] = event_id
			event['version'] = version
			event['channel_id'] = channel_id
			event['channel'] = channel_name
			events += 1
			continue

		# End of packet store last event
		if data[:3] == "CRC":
			if event_id != None:
				key = channel_id + "|" + event_id
				if not unique.has_key(key):
					unique[key] = version
					programmes.append(event)
					print "= events", events, "programmes", len(programmes)
					found += 1

		# Check for event data
		if event_id != None:
			# Starttime
			if data[:11] == "Start_time:":
				event['start'] = string.join(data.split(': ')[1:]).split()[0]
				event['startinfo'] = decode_regex.findall(data)[0].strip()
				continue
			# Duration
			if data[:9] == "Duration:":
				event['duration'] = string.join(data.split(': ')[1:]).split()[0]
				event['durationinfo'] = decode_regex.findall(data)[0].strip()
				continue
			# Name
			if data[:11] == "event_name:":
				event['title'] = detail_regex.findall(data)[0]
				print "+", event['title'], channel_name
				continue
			# Description
			if data[:10] == "text_char:":
				try:
					event['description'] = detail_regex.findall(data)[0]
				except: 
					event['description'] = ""
				continue
			# Rating
			if data[:7] == "Rating:":
				event['rating'] = string.join(data.split(': ')[1:]).split()[0]
				event['ratinginfo'] = decode_regex.findall(data)[0].strip()
				continue
			# Country
			if data[:13] == "Country_code:":
				event['country'] = string.join(data.split(': ')[1:]).strip()
				continue
			# Language
			if data.find("language_code:") >= 0:
				event['language'] = string.join(data.split(': ')[1:]).strip()
				continue
			# Content
			if data[:23] == "Content_nibble_level_1:":
				event['content_1'] = string.join(data.split(': ')[1:]).split()[0]
				continue
			if data[:23] == "Content_nibble_level_2:":
				event['content_2'] = string.join(data.split(': ')[1:]).split()[0]
				continue
			if data[:3] == "[= ":
				event['contentinfo'] = decode_regex.findall(data)[0].strip()
				continue
			# User
			if data[:14] == "User_nibble_1:":
				event['user_1'] = string.join(data.split(': ')[1:]).split()[0]
				continue
			if data[:14] == "User_nibble_2:":
				event['user_2'] = string.join(data.split(': ')[1:]).split()[0]
				continue
	# Found how many shows?
	return found


# ensure we don't leave process
def snoop_finished(signum, frame):
	os.kill(snoop.pid, signal.SIGTERM)
	if os.path.exists(pidfile):
		os.remove(pidfile)
	sys.exit(1)


# Key information arrays
programmes = []
channels = {}
unique = {}

# Key counters
events= 0
packets = 0

# Setup maximums
nilpkts = 2500

# Setup files
epgfile = "epgsnoop.xml"
pidfile = "/tmp/epgsnoop.pid"

# test and or write the pid file
if os.path.exists(pidfile):
	print >>sys.stderr, "Aborted: already snooping..."
	sys.exit(2)
else:
	pidf = open(pidfile, "w")
	print >>pidf, os.getpid()
	pidf.close()

# Setup sigint handler, kill subprocess on ^C
signal.signal(signal.SIGINT, snoop_finished)

# open stream
#snoop = popen2.Popen3("dvbsnoop -nph 0x12 | /bin/tee data.dmp")
snoop = popen2.Popen3("dvbsnoop -softcrc -nph 0x12")

# Read in the channel idents
chanidents = open('chanidents', 'r').readlines()
for chan in chanidents:
	channel = chan.split()
	if channel[0] != '#' and len(channel) == 2:
		channels[channel[0]] = channel[1]

# Loop packets
check = i = 0
while check < nilpkts:
	i = i + 1
	print 
	print "Packet %010d" % i

	# Get packet start
	out = snoop.fromchild.readline()
	while out and out[:11] != "SECT-Packet":
		out = snoop.fromchild.readline()

	# Packetize
	pkt = []
	pkt.append(out.strip())
	while out and out[:3] != "CRC":
		out = snoop.fromchild.readline()
		pkt.append(out.strip())

	# Process the packet
	found = process_packet(pkt)
	if found > 0:
		check = 0
	else:
		check += 1

	# Had enoungh
	if check >= nilpkts:
		print 
		break

print "Nil shows in last", nilpkts, "packets, consumed", packets, "total"

try:
	print snoop.childerr.readlines()
except:
	print "No errors reported"

# Natural completion ... kill snoop
os.kill(snoop.pid, signal.SIGTERM)

# Statistics
print
print "Packets", packets
print "Events", events
print "Programmes", len(programmes)

# Unleash the output
bad = []
f = open(epgfile,'w')

# start 
print >>f, '<?xml version="1.0" encoding="ISO-8859-1"?>'
print >>f, '<tv generator-info-name="%s">' % sys.argv[0].split('/')[-1]
print >>f 

# Output the programme records
for p in programmes:
	try:
		output_programme(f, p, hoffset=12)
	except:
		bad.append(p)

# Finish
print >>f
print >>f, '</tv>'

# Bad events
print len(bad), "Bad events"
if len(bad) > 0:
	print >>f
	print >>f, "<!--"
	for b in  bad:
		print >>f, "\t", b
	print >>f, "-->"

# Done
f.close()
print

# Is there a request to copy the file somewhere?
if len(sys.argv) > 1:
	print "Copying output to", sys.argv[1]
	shutil.copy2(epgfile, sys.argv[1])

# clean up the pid file
if os.path.exists(pidfile):
	os.remove(pidfile)

sys.exit(0)


--------------020307090600070409090906--