From 2e4c99b6782d094be03403aebb2dbf92294f001e Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Sat, 9 Aug 2014 18:10:09 +0200 Subject: [PATCH] support: added dbus_test.py utility --- support/dbus_test.py | 101 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100755 support/dbus_test.py diff --git a/support/dbus_test.py b/support/dbus_test.py new file mode 100755 index 00000000..a4a03caf --- /dev/null +++ b/support/dbus_test.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python + +""" +A Python DBus Test Utility + +Commands: + + -h,--help,help This help + monitor Monitor signals + +Signal Commands: +""" + +import dbus +import sys + +def help(): + print sys.modules[__name__].__doc__ + g = [x for x in globals() if x.startswith('cmd_')] + g.sort() + for i in g: + print ' %-20s %s' % (i[4:], eval(i + '.__doc__')) + +def rpc(bus, obj_path, method, signature, *args): + return bus.call_blocking( + bus_name='org.tvheadend.server', + object_path=obj_path, + dbus_interface='org.tvheadend', + method=method, + signature=signature, + timeout=1, + args=args) + +def cmd_ping(args): + """A test ping (argument = string)""" + s = 'Hello from python!!!' + if s == rpc(bus, '/', 'ping', 's', 'Hello from python!!!'): + print 'TVHeadend is live!!!' + +def cmd_postpone(args): + """Postpone subcriptions (argument = value in seconds)""" + postpone = rpc(bus, '/org/tvheadend/set', 'postpone', 'x', long(args)) + print 'Subscription postpone set to', postpone + +def cmd_satip_addr_allow(args): + """Allow to use this SAT>IP address (argument = IP address)""" + result = rpc(bus, '/org/tvheadend/allow', 'satip_addr', 's', args) + print 'SAT>IP address %s blocked: %s' % (args, result) + +def cmd_satip_addr_disable(args): + """Disable to use this SAT>IP address (argument = IP address)""" + result = rpc(bus, '/org/tvheadend/disable', 'satip_addr', 's', args) + print 'SAT>IP address %s blocked: %s' % (args, result) + +def cmd_satip_addr_stop(args): + """Stop to use this SAT>IP address immediatelly (argument = IP address)""" + result = rpc(bus, '/org/tvheadend/stop', 'satip_addr', 's', args) + print 'SAT>IP address %s blocked: %s' % (args, result) + +def received_msg(*args, **kwargs): + print "Received signal" + for arg in kwargs: + print "%10s = %s" % (arg, repr(kwargs[arg])) + print "Arguments:" + for arg in args: + print " " + str(arg) + print "-----" + +def monitor(): + import gobject + from dbus.mainloop.glib import DBusGMainLoop + loop = gobject.MainLoop(is_running=True) + dloop = DBusGMainLoop() + bus = dbus.SessionBus(mainloop=dloop) + bus.add_signal_receiver(received_msg, + dbus_interface='org.tvheadend.notify', + sender_keyword='sender', + interface_keyword='interface', + member_keyword='member', + path_keyword='path') + while loop.is_running(): + loop.run() + +if 'help' in sys.argv or '-h' in sys.argv or '--help' in sys.argv: + help() + sys.exit(0) +if 'monitor' in sys.argv: + monitor() + sys.exit(0) +bus = None +if '--session' in sys.argv: + bus = dbus.SessionBus() +if not bus: + bus = dbus.SystemBus() +cmds = [x for x in sys.argv[1:] if not x.startswith('--')] +if not cmds: + cmds = ['ping'] +for cmd in cmds: + a = cmd.split(':') + args = len(a) > 1 and a[1] or None + globals()['cmd_' + a[0]](args)