101 lines
2.9 KiB
Python
Executable file
101 lines
2.9 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
"""
|
|
A Python DBus Test Utility
|
|
|
|
Commands:
|
|
|
|
-h,--help,help This help
|
|
monitor Monitor signals
|
|
|
|
Signal Commands:
|
|
"""
|
|
|
|
import dbus
|
|
import sys
|
|
|
|
def help():
|
|
print sys.modules[__name__].__doc__
|
|
g = [x for x in globals() if x.startswith('cmd_')]
|
|
g.sort()
|
|
for i in g:
|
|
print ' %-20s %s' % (i[4:], eval(i + '.__doc__'))
|
|
|
|
def rpc(bus, obj_path, method, signature, *args):
|
|
return bus.call_blocking(
|
|
bus_name='org.tvheadend.server',
|
|
object_path=obj_path,
|
|
dbus_interface='org.tvheadend',
|
|
method=method,
|
|
signature=signature,
|
|
timeout=1,
|
|
args=args)
|
|
|
|
def cmd_ping(args):
|
|
"""A test ping (argument = string)"""
|
|
s = 'Hello from python!!!'
|
|
if s == rpc(bus, '/', 'ping', 's', 'Hello from python!!!'):
|
|
print 'TVHeadend is live!!!'
|
|
|
|
def cmd_postpone(args):
|
|
"""Postpone subcriptions (argument = value in seconds)"""
|
|
postpone = rpc(bus, '/org/tvheadend/set', 'postpone', 'x', long(args))
|
|
print 'Subscription postpone set to', postpone
|
|
|
|
def cmd_satip_addr_allow(args):
|
|
"""Allow to use this SAT>IP address (argument = IP address)"""
|
|
result = rpc(bus, '/org/tvheadend/allow', 'satip_addr', 's', args)
|
|
print 'SAT>IP address %s blocked: %s' % (args, result)
|
|
|
|
def cmd_satip_addr_disable(args):
|
|
"""Disable to use this SAT>IP address (argument = IP address)"""
|
|
result = rpc(bus, '/org/tvheadend/disable', 'satip_addr', 's', args)
|
|
print 'SAT>IP address %s blocked: %s' % (args, result)
|
|
|
|
def cmd_satip_addr_stop(args):
|
|
"""Stop to use this SAT>IP address immediatelly (argument = IP address)"""
|
|
result = rpc(bus, '/org/tvheadend/stop', 'satip_addr', 's', args)
|
|
print 'SAT>IP address %s blocked: %s' % (args, result)
|
|
|
|
def received_msg(*args, **kwargs):
|
|
print "Received signal"
|
|
for arg in kwargs:
|
|
print "%10s = %s" % (arg, repr(kwargs[arg]))
|
|
print "Arguments:"
|
|
for arg in args:
|
|
print " " + str(arg)
|
|
print "-----"
|
|
|
|
def monitor():
|
|
import gobject
|
|
from dbus.mainloop.glib import DBusGMainLoop
|
|
loop = gobject.MainLoop(is_running=True)
|
|
dloop = DBusGMainLoop()
|
|
bus = dbus.SessionBus(mainloop=dloop)
|
|
bus.add_signal_receiver(received_msg,
|
|
dbus_interface='org.tvheadend.notify',
|
|
sender_keyword='sender',
|
|
interface_keyword='interface',
|
|
member_keyword='member',
|
|
path_keyword='path')
|
|
while loop.is_running():
|
|
loop.run()
|
|
|
|
if 'help' in sys.argv or '-h' in sys.argv or '--help' in sys.argv:
|
|
help()
|
|
sys.exit(0)
|
|
if 'monitor' in sys.argv:
|
|
monitor()
|
|
sys.exit(0)
|
|
bus = None
|
|
if '--session' in sys.argv:
|
|
bus = dbus.SessionBus()
|
|
if not bus:
|
|
bus = dbus.SystemBus()
|
|
cmds = [x for x in sys.argv[1:] if not x.startswith('--')]
|
|
if not cmds:
|
|
cmds = ['ping']
|
|
for cmd in cmds:
|
|
a = cmd.split(':')
|
|
args = len(a) > 1 and a[1] or None
|
|
globals()['cmd_' + a[0]](args)
|