dbus: create interface for rpc calls

This commit is contained in:
Jaroslav Kysela 2014-08-08 09:59:35 +02:00
parent 069090e1d9
commit 9d6ab33b98
4 changed files with 108 additions and 17 deletions

View file

@ -38,9 +38,18 @@ typedef struct dbus_sig {
htsmsg_t *msg;
} dbus_sig_t;
typedef struct dbus_rpc {
LIST_ENTRY(dbus_rpc) link;
char *call_name;
void *opaque;
int64_t (*rpc_s64)(void *opaque, const char *path, int64_t value);
char *(*rpc_str)(void *opaque, const char *path, char *value);
} dbus_rpc_t;
TAILQ_HEAD(dbus_signal_queue, dbus_sig);
LIST_HEAD(dbus_rpc_list, dbus_rpc);
static struct dbus_signal_queue dbus_signals;
static struct dbus_rpc_list dbus_rpcs;
static th_pipe_t dbus_pipe;
static pthread_mutex_t dbus_lock;
static int dbus_running;
@ -207,28 +216,76 @@ dbus_reply_to_ping(DBusMessage *msg, DBusConnection *conn)
* Set the subscription postpone delay
*/
static void
dbus_reply_to_postpone(DBusMessage *msg, DBusConnection *conn)
dbus_reply_to_rpc(dbus_rpc_t *rpc, DBusMessage *msg, DBusConnection *conn)
{
DBusMessageIter args;
DBusMessage *reply;
int64_t param;
const char *path;
int64_t param_s64;
char *param_str;
path = dbus_message_get_path(msg);
if (path == NULL)
return;
if (!dbus_message_iter_init(msg, &args))
return;
if (DBUS_TYPE_INT64 != dbus_message_iter_get_arg_type(&args))
if (rpc->rpc_s64) {
if (DBUS_TYPE_INT64 != dbus_message_iter_get_arg_type(&args))
return;
dbus_message_iter_get_basic(&args, &param_s64);
param_s64 = rpc->rpc_s64(rpc->opaque, path, param_s64);
reply = dbus_message_new_method_return(msg);
dbus_message_iter_init_append(reply, &args);
dbus_message_iter_append_basic(&args, DBUS_TYPE_INT64, &param_s64);
} else if (rpc->rpc_s64) {
if (DBUS_TYPE_INT64 != dbus_message_iter_get_arg_type(&args))
return;
dbus_message_iter_get_basic(&args, &param_str);
param_str = rpc->rpc_str(rpc->opaque, path, param_str);
reply = dbus_message_new_method_return(msg);
dbus_message_iter_init_append(reply, &args);
dbus_message_iter_append_basic(&args, DBUS_TYPE_STRING, &param_str);
free(param_str);
} else {
return;
dbus_message_iter_get_basic(&args, &param);
param = subscription_set_postpone(param);
reply = dbus_message_new_method_return(msg);
dbus_message_iter_init_append(reply, &args);
dbus_message_iter_append_basic(&args, DBUS_TYPE_INT64, &param);
}
dbus_connection_send(conn, reply, NULL);
dbus_connection_flush(conn);
dbus_message_unref(reply);
}
/**
*
*/
void
dbus_register_rpc_s64(const char *call_name, void *opaque,
int64_t (*fcn)(void *, const char *, int64_t))
{
dbus_rpc_t *rpc = calloc(1, sizeof(*rpc));
rpc->call_name = strdup(call_name);
rpc->rpc_s64 = fcn;
rpc->opaque = opaque;
pthread_mutex_lock(&dbus_lock);
LIST_INSERT_HEAD(&dbus_rpcs, rpc, link);
pthread_mutex_unlock(&dbus_lock);
}
/**
*
*/
void
dbus_register_rpc_str(const char *call_name, void *opaque,
char *(*fcn)(void *, const char *, char *))
{
dbus_rpc_t *rpc = calloc(1, sizeof(*rpc));
rpc->call_name = strdup(call_name);
rpc->rpc_str = fcn;
rpc->opaque = opaque;
pthread_mutex_lock(&dbus_lock);
LIST_INSERT_HEAD(&dbus_rpcs, rpc, link);
pthread_mutex_unlock(&dbus_lock);
}
/**
*
*/
@ -283,6 +340,7 @@ dbus_server_thread(void *aux)
DBusConnection *conn, *notify;
tvhpoll_t *poll;
tvhpoll_event_t ev;
dbus_rpc_t *rpc;
int n;
uint8_t c;
@ -338,10 +396,19 @@ dbus_server_thread(void *aux)
if (msg == NULL)
continue;
if (dbus_message_is_method_call(msg, "org.tvheadend", "ping"))
if (dbus_message_is_method_call(msg, "org.tvheadend", "ping")) {
dbus_reply_to_ping(msg, conn);
else if (dbus_message_is_method_call(msg, "org.tvheadend", "postpone"))
dbus_reply_to_postpone(msg, conn);
continue;
}
pthread_mutex_lock(&dbus_lock);
LIST_FOREACH(rpc, &dbus_rpcs, link)
if (dbus_message_is_method_call(msg, "org.tvheadend", rpc->call_name))
break;
pthread_mutex_unlock(&dbus_lock);
if (rpc)
dbus_reply_to_rpc(rpc, msg, conn);
dbus_message_unref(msg);
}
@ -363,6 +430,7 @@ dbus_server_init(void)
{
pthread_mutex_init(&dbus_lock, NULL);
TAILQ_INIT(&dbus_signals);
LIST_INIT(&dbus_rpcs);
tvh_pipe(O_NONBLOCK, &dbus_pipe);
dbus_threads_init_default();
dbus_running = 1;
@ -378,10 +446,17 @@ dbus_server_start(void)
void
dbus_server_done(void)
{
dbus_rpc_t *rpc;
dbus_emit_signal_str("/main", "stop", "bye");
dbus_running = 0;
tvh_write(dbus_pipe.wr, "", 1);
pthread_kill(dbus_tid, SIGTERM);
pthread_join(dbus_tid, NULL);
dbus_flush_queue(NULL);
while ((rpc = LIST_FIRST(&dbus_rpcs)) != NULL) {
LIST_REMOVE(rpc, link);
free(rpc->call_name);
free(rpc);
}
}

View file

@ -31,6 +31,14 @@ dbus_emit_signal_str(const char *obj_name, const char *sig_name, const char *val
void
dbus_emit_signal_s64(const char *obj_name, const char *sig_name, int64_t value);
void
dbus_register_rpc_s64(const char *call_name, void *opaque,
int64_t (*fcn)(void *, const char *, int64_t));
void
dbus_register_rpc_str(const char *call_name, void *opaque,
char *(*fcn)(void *, const char *, char *));
void dbus_server_init(void);
void dbus_server_start(void);
void dbus_server_done(void);
@ -44,7 +52,16 @@ dbus_emit_signal_str(const char *sig_name, const char *value) { }
static inline void
dbus_emit_signal_s64(const char *sig_name, int64_t value) { }
static inline void
dbus_register_rpc_s64(const char *call_name, void *opaque,
int64_t (*fcn)(void *, const char *, int64_t)) { }
static inline void
dbus_register_rpc_str(const char *call_name, void *opaque,
char *(*fcn)(void *, const char *, char *)) { }
static inline void dbus_server_init(void) { }
static inline void dbus_server_start(void) { }
static inline void dbus_server_done(void) { }
#endif

View file

@ -340,8 +340,8 @@ subscription_reschedule(void)
/**
*
*/
int
subscription_set_postpone(int postpone)
static int64_t
subscription_set_postpone(void *aux, const char *path, int64_t postpone)
{
th_subscription_t *s;
time_t now = time(NULL);
@ -882,6 +882,7 @@ void
subscription_init(void)
{
subscription_status_callback(NULL);
dbus_register_rpc_s64("postpone", NULL, subscription_set_postpone);
}
/**

View file

@ -118,8 +118,6 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight);
void subscription_reschedule(void);
int subscription_set_postpone(int postpone);
th_subscription_t *subscription_create_from_channel(struct channel *ch,
unsigned int weight,
const char *name,