Add HTSP server (disabled as of now)

This commit is contained in:
Andreas Öman 2007-11-25 16:58:55 +00:00
parent b2e3a4fa00
commit fe45de8db6
6 changed files with 743 additions and 0 deletions

View file

@ -5,6 +5,8 @@ SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \
SRCS += http.c htmlui.c
SRCS += htsp.c rpc.c
SRCS += pvr.c
SRCS += epg.c epg_xmltv.c

210
htsp.c Normal file
View file

@ -0,0 +1,210 @@
/*
* tvheadend, HTSP interface
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <fcntl.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "tvhead.h"
#include "channels.h"
#include "subscriptions.h"
#include "dispatch.h"
#include "rpc.h"
#include "htsp.h"
#include "htsp_muxer.h"
#include "tcp.h"
#include <libhts/htsmsg_binary.h>
/*
*
*/
int
htsp_send_msg(htsp_t *htsp, htsmsg_t *m, int media)
{
tcp_session_t *tcp = &htsp->htsp_tcp_session;
tcp_queue_t *tq;
void *data;
size_t datalen;
int max, r = -1;
tq = media ? &tcp->tcp_q_low : &tcp->tcp_q_hi;
max = tq->tq_maxdepth - tq->tq_depth; /* max size we are able to enqueue */
if(htsmsg_binary_serialize(m, &data, &datalen, max) == 0)
r = tcp_send_msg(tcp, tq, data, datalen);
htsmsg_destroy(m);
return r;
}
/*
*
*/
static void
htsp_input(htsp_t *htsp, const void *buf, int len)
{
htsmsg_t *in, *out;
int i;
const uint8_t *v = buf;
printf("Got %d bytes\n", len);
for(i =0 ; i < len; i++)
printf("%02x.", v[i]);
printf("\n");
if((in = htsmsg_binary_deserialize(buf, len, NULL)) == NULL) {
printf("deserialize failed\n");
return;
}
printf("INPUT:\n");
htsmsg_print(in);
out = rpc_dispatch(&htsp->htsp_rpc, in, NULL, htsp);
htsmsg_destroy(in);
printf("OUTPUT:\n");
htsmsg_print(out);
htsp_send_msg(htsp, out, 0);
}
/*
* data available on socket
*/
static void
htsp_data_input(htsp_t *htsp)
{
int r, l;
tcp_session_t *tcp = &htsp->htsp_tcp_session;
if(htsp->htsp_bufptr < 4) {
r = read(tcp->tcp_fd, htsp->htsp_buf + htsp->htsp_bufptr,
4 - htsp->htsp_bufptr);
if(r < 1) {
tcp_disconnect(tcp, r == 0 ? ECONNRESET : errno);
return;
}
htsp->htsp_bufptr += r;
if(htsp->htsp_bufptr < 4)
return;
htsp->htsp_msglen = (htsp->htsp_buf[0] << 24) + (htsp->htsp_buf[1] << 16) +
(htsp->htsp_buf[2] << 8) + htsp->htsp_buf[3] + 4;
if(htsp->htsp_msglen < 12 || htsp->htsp_msglen > 16 * 1024 * 1024) {
tcp_disconnect(tcp, EBADMSG);
return;
}
if(htsp->htsp_bufsize < htsp->htsp_msglen) {
htsp->htsp_bufsize = htsp->htsp_msglen;
free(htsp->htsp_buf);
htsp->htsp_buf = malloc(htsp->htsp_bufsize);
}
}
l = htsp->htsp_msglen - htsp->htsp_bufptr;
r = read(tcp->tcp_fd, htsp->htsp_buf + htsp->htsp_bufptr, l);
if(r < 1) {
tcp_disconnect(tcp, r == 0 ? ECONNRESET : errno);
return;
}
htsp->htsp_bufptr += r;
if(htsp->htsp_bufptr == htsp->htsp_msglen) {
htsp_input(htsp, htsp->htsp_buf + 4, htsp->htsp_msglen - 4);
htsp->htsp_bufptr = 0;
htsp->htsp_msglen = 0;
}
}
/*
*
*/
static void
htsp_disconnect(htsp_t *htsp)
{
free(htsp->htsp_buf);
rpc_deinit(&htsp->htsp_rpc);
}
/*
*
*/
static void
htsp_connect(htsp_t *htsp)
{
rpc_init(&htsp->htsp_rpc, "htsp");
htsp->htsp_bufsize = 1000;
htsp->htsp_buf = malloc(htsp->htsp_bufsize);
}
/*
*
*/
static void
htsp_tcp_callback(tcpevent_t event, void *tcpsession)
{
htsp_t *htsp = tcpsession;
switch(event) {
case TCP_CONNECT:
htsp_connect(htsp);
break;
case TCP_DISCONNECT:
htsp_disconnect(htsp);
break;
case TCP_INPUT:
htsp_data_input(htsp);
break;
}
}
/*
* Fire up HTSP server
*/
void
htsp_start(int port)
{
tcp_create_server(port, sizeof(htsp_t), "htsp", htsp_tcp_callback);
}

45
htsp.h Normal file
View file

@ -0,0 +1,45 @@
/*
* tvheadend, HTSP interface
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef HTSP_H_
#define HTSP_H_
#include <libhts/htsmsg_binary.h>
#include "rpc.h"
#include "tcp.h"
typedef struct htsp {
tcp_session_t htsp_tcp_session; /* Must be first */
int htsp_bufsize;
int htsp_bufptr;
int htsp_msglen;
uint8_t *htsp_buf;
rpc_session_t htsp_rpc;
int htsp_async_init_sent;
} htsp_t;
void htsp_start(int port);
int htsp_send_msg(htsp_t *htsp, htsmsg_t *m, int media);
#endif /* HTSP_H_ */

5
main.c
View file

@ -50,6 +50,7 @@
#include "iptv_output.h"
#include "rtsp.h"
#include "http.h"
#include "htsp.h"
#include "buffer.h"
#include "htmlui.h"
@ -205,6 +206,10 @@ main(int argc, char **argv)
if(p)
http_start(p);
p = atoi(config_get_str("htsp-server-port", "0"));
if(p)
htsp_start(p);
}
dispatcher();
}

423
rpc.c Normal file
View file

@ -0,0 +1,423 @@
/*
* tvheadend, RPC
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <fcntl.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <libhts/htsmsg.h>
#include "tvhead.h"
#include "channels.h"
#include "subscriptions.h"
#include "dispatch.h"
#include "epg.h"
#include "pvr.h"
#include "rpc.h"
/**
* Build a channel information message
*/
static htsmsg_t *
rpc_build_channel_msg(th_channel_t *ch)
{
htsmsg_t *m;
m = htsmsg_create();
htsmsg_add_str(m, "name", ch->ch_name);
htsmsg_add_u32(m, "tag", ch->ch_tag);
if(ch->ch_icon)
htsmsg_add_str(m, "icon", refstr_get(ch->ch_icon));
return m;
}
#if 0
/**
* channels_list
*/
static void
htsp_send_all_channels(htsp_connection_t *hc)
{
htsp_msg_t *msg;
th_channel_t *ch;
TAILQ_FOREACH(ch, &channels, ch_global_link) {
msg = htsp_build_channel_msg(ch);
htsp_add_string(msg, "msgtype", "channelAdd");
htsp_send_msg(hc, NULL, msg);
htsp_free_msg(msg);
}
}
#endif
/**
* Simple function to respond 'ok'
*/
htsmsg_t *
rpc_ok(rpc_session_t *ses)
{
htsmsg_t *r = htsmsg_create();
htsmsg_add_u32(r, "seq", ses->rs_seq);
return r;
}
/**
* Simple function to respond with an error
*/
htsmsg_t *
rpc_error(rpc_session_t *ses, const char *err)
{
htsmsg_t *r = htsmsg_create();
htsmsg_add_u32(r, "seq", ses->rs_seq);
htsmsg_add_str(r, "_error", err);
return r;
}
/**
* Login peer, set auth level and do various other stuff
*/
static htsmsg_t *
rpc_login(rpc_session_t *ses, htsmsg_t *in, void *opaque)
{
const char *user = htsmsg_get_str(in, "username");
const char *pass = htsmsg_get_str(in, "password");
if(user == NULL || pass == NULL) {
ses->rs_authlevel = 1;
ses->rs_maxweight = 100;
} else {
/* FIXME: user/password database */
ses->rs_authlevel = 1;
}
if(htsmsg_get_u32(in, "async", &ses->rs_is_async) < 0)
ses->rs_is_async = 0;
return rpc_ok(ses);
}
/*
* channelsList method
*/
static htsmsg_t *
rpc_channels_list(rpc_session_t *ses, htsmsg_t *in, void *opaque)
{
htsmsg_t *out;
th_channel_t *ch;
out = htsmsg_create();
htsmsg_add_u32(out, "seq", ses->rs_seq);
TAILQ_FOREACH(ch, &channels, ch_global_link)
htsmsg_add_msg(out, "channel", rpc_build_channel_msg(ch));
return out;
}
/*
* eventInfo method
*/
static htsmsg_t *
rpc_event_info(rpc_session_t *ses, htsmsg_t *in, void *opaque)
{
htsmsg_t *out;
th_channel_t *ch;
uint32_t u32;
const char *s, *errtxt = NULL;
event_t *e = NULL, *x;
uint32_t tag, prev, next, pvrstatus;
out = htsmsg_create();
htsmsg_add_u32(out, "seq", ses->rs_seq);
epg_lock();
if(htsmsg_get_u32(in, "tag", &u32) >= 0) {
e = epg_event_find_by_tag(u32);
} else if((s = htsmsg_get_str(in, "channel")) != NULL) {
if((ch = channel_find(s, 0)) == NULL) {
errtxt = "Channel not found";
} else {
if(htsmsg_get_u32(in, "time", &u32) < 0) {
e = epg_event_get_current(ch);
} else {
e = epg_event_find_by_time(ch, u32);
}
}
} else {
errtxt = "Invalid arguments";
}
if(e == NULL) {
errtxt = "Event not found";
}
if(errtxt != NULL) {
htsmsg_add_str(out, "_error", errtxt);
} else {
tag = e->e_tag;
x = TAILQ_PREV(e, event_queue, e_link);
prev = x != NULL ? x->e_tag : 0;
x = TAILQ_NEXT(e, e_link);
next = x != NULL ? x->e_tag : 0;
htsmsg_add_u32(out, "start", e->e_start);
htsmsg_add_u32(out, "stop", e->e_start + e->e_duration);
if(e->e_title != NULL)
htsmsg_add_str(out, "title", e->e_title);
if(e->e_desc != NULL)
htsmsg_add_str(out, "desc", e->e_desc);
htsmsg_add_u32(out, "tag", tag);
if(prev)
htsmsg_add_u32(out, "prev", prev);
if(next)
htsmsg_add_u32(out, "next", next);
if((pvrstatus = pvr_prog_status(e)) != 0)
htsmsg_add_u32(out, "pvrstatus", pvrstatus);
}
epg_unlock();
return out;
}
#if 0
/*
* record method
*/
static void
htsp_record(htsp_connection_t *hc, htsp_msg_t *m)
{
htsp_msg_t *r;
uint32_t u32;
const char *s, *errtxt = NULL;
event_t *e = NULL;
recop_t op;
r = htsp_create_msg();
htsp_add_u32(r, "seq", hc->hc_seq);
epg_lock();
s = htsp_get_string(m, "command");
if(s == NULL) {
errtxt = "Missing 'command' field";
} else {
op = pvr_op2int(s);
if(op == -1) {
errtxt = "Invalid 'command' field";
} else {
if((u32 = htsp_get_u32(m, "tag")) != 0) {
e = epg_event_find_by_tag(u32);
if(e != NULL) {
pvr_event_record_op(e->e_ch, e, op);
errtxt = NULL;
} else {
errtxt = "Event not found";
}
} else {
errtxt = "Missing identifier";
}
}
}
epg_unlock();
if(errtxt)
htsp_add_string(r, "_error", errtxt);
htsp_send_msg(hc, NULL, r);
htsp_free_msg(r);
}
#endif
#if 0
/*
* subscribe
*/
static void
htsp_subscribe(htsp_connection_t *hc, htsp_msg_t *m)
{
htsp_msg_t *r;
const char *str, *errtxt = NULL;
th_channel_t *ch;
int weight;
r = htsp_create_msg();
htsp_add_u32(r, "seq", hc->hc_seq);
if((str = htsp_get_string(m, "channel")) != NULL) {
if((ch = channel_find(str, 0)) != NULL) {
weight = htsp_get_u32(m, "weight") ?: 100;
if(weight > hc->hc_maxweight)
weight = hc->hc_maxweight;
htsp_stream_subscribe(hc, ch, weight);
} else {
errtxt = "Channel not found";
}
} else {
errtxt = "Missing 'channel' field";
}
htsp_send_msg(hc, NULL, r);
htsp_free_msg(r);
}
/*
* unsubscribe
*/
static void
htsp_subscribe(htsp_connection_t *hc, htsp_msg_t *m)
{
htsp_msg_t *r;
const char *str, *errtxt = NULL;
th_channel_t *ch;
r = htsp_create_msg();
htsp_add_u32(r, "seq", hc->hc_seq);
if((str = htsp_get_string(m, "channel")) != NULL) {
if((ch = channel_find(str, 0)) != NULL) {
htsp_stream_unsubscribe(hc, ch);
} else {
errtxt = "Channel not found";
}
} else {
errtxt = "Missing 'channel' field";
}
htsp_send_msg(hc, NULL, r);
htsp_free_msg(r);
}
#endif
/*
* List of all methods and their required auth level
*/
static rpc_cmd_t common_rpc[] = {
{ "login", rpc_login, 0 },
{ "channelsList", rpc_channels_list, 1 },
{ "eventInfo", rpc_event_info, 1 },
#if 0
{ "subscribe", rpc_subscribe, 1 },
{ "unsubscribe", rpc_unsubscribe, 1 },
#endif
// { "record", rpc_record, 2 },
{ NULL, NULL, 0 },
};
/*
*
*/
static htsmsg_t *
rpc_dispatch_set(rpc_session_t *ses, htsmsg_t *in, rpc_cmd_t *cmds,
void *opaque, const char *method)
{
for(; cmds->rc_name; cmds++) {
if(strcmp(cmds->rc_name, method))
continue;
if(cmds->rc_authlevel < ses->rs_authlevel)
return rpc_error(ses, "Insufficient privileges");
return cmds->rc_func(ses, in, opaque);
}
return NULL;
}
/*
*
*/
htsmsg_t *
rpc_dispatch(rpc_session_t *ses, htsmsg_t *in, rpc_cmd_t *cmds, void *opaque)
{
const char *method;
htsmsg_t *out;
if(htsmsg_get_u32(in, "seq", &ses->rs_seq))
ses->rs_seq = 0;
if((method = htsmsg_get_str(in, "method")) == NULL)
return rpc_error(ses, "Method not specified");
out = rpc_dispatch_set(ses, in, common_rpc, opaque, method);
if(out == NULL && cmds != NULL)
out = rpc_dispatch_set(ses, in, cmds, opaque, method);
return out ?: rpc_error(ses, "Method not found");
}
/*
*
*/
void
rpc_init(rpc_session_t *ses, const char *logname)
{
memset(ses, 0, sizeof(rpc_session_t));
ses->rs_logname = strdup(logname);
}
/*
*
*/
void
rpc_deinit(rpc_session_t *ses)
{
free((void *)ses->rs_logname);
}

58
rpc.h Normal file
View file

@ -0,0 +1,58 @@
/*
* tvheadend, RPC interface
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef RPC_H_
#define RPC_H_
#include <libhts/htsmsg.h>
typedef struct rpc_session {
const char *rs_logname;
uint32_t rs_seq;
unsigned int rs_authlevel;
unsigned int rs_maxweight;
unsigned int rs_is_async;
struct th_subscription_list rs_subscriptions;
} rpc_session_t;
typedef struct rpc_cmd {
const char *rc_name;
htsmsg_t *(*rc_func)(rpc_session_t *ses, htsmsg_t *in, void *opaque);
int rc_authlevel;
} rpc_cmd_t;
void rpc_init(rpc_session_t *ses, const char *logname);
void rpc_deinit(rpc_session_t *ses);
htsmsg_t *rpc_dispatch(rpc_session_t *ses, htsmsg_t *in, rpc_cmd_t *cmds,
void *opaque);
htsmsg_t *rpc_ok(rpc_session_t *ses);
htsmsg_t *rpc_error(rpc_session_t *ses, const char *err);
#endif /* RPC_H_ */