Make streaming over HTSP work

This commit is contained in:
Andreas Öman 2008-05-18 10:18:05 +00:00
parent 1f145bfa55
commit ab4d284b9c
13 changed files with 223 additions and 136 deletions

View file

@ -7,7 +7,7 @@ SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \
SRCS += http.c
SRCS += htsp.c rpc.c
SRCS += htsp.c htsp_muxer.c rpc.c
SRCS += pvr.c autorec.c ffmuxer.c

View file

@ -230,7 +230,7 @@ cr_channel_subscribe(client_t *c, char **argv, int argc)
snprintf(tmp, sizeof(tmp), "HTS Client @ %s",
inet_ntoa(si->sin_addr));
s = subscription_create(ch, weight, tmp, client_subscription_callback, c);
s = subscription_create(ch, weight, tmp, client_subscription_callback, c, 0);
if(s == NULL)
return 1;

84
htsp.c
View file

@ -38,6 +38,7 @@
#include "htsp_muxer.h"
#include "tcp.h"
#include "epg.h"
#include "access.h"
#include <libhts/htsmsg_binary.h>
@ -140,6 +141,75 @@ htsp_async_channel_update(channel_t *ch)
}
/**
* Subscribe to channel
*/
static htsmsg_t *
htsp_subscribe(rpc_session_t *ses, htsmsg_t *in, void *opaque)
{
htsp_t *htsp = opaque;
channel_t *ch;
const char *txt;
th_subscription_t *s;
htsmsg_t *r;
uint32_t tag;
if((txt = htsmsg_get_str(in, "channel")) == NULL)
return rpc_error(ses, "missing argument: channel");
if((ch = channel_find(txt, 0, NULL)) == NULL)
return rpc_error(ses, "Channel not found");
LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) {
if(s->ths_channel == ch) {
subscription_set_weight(s, 200);
return rpc_ok(ses);
}
}
tag = tag_get();
r = htsmsg_create();
htsmsg_add_u32(r, "seq", ses->rs_seq);
htsmsg_add_u32(r, "id", tag);
htsp_send_msg(htsp, r, 0);
htsp_muxer_subscribe(htsp, ch, 200, tag);
return NULL;
}
/**
* Unsubscribe from channel
*/
static htsmsg_t *
htsp_unsubscribe(rpc_session_t *ses, htsmsg_t *in, void *opaque)
{
htsp_t *htsp = opaque;
uint32_t id;
if(htsmsg_get_u32(in, "id", &id))
return rpc_error(ses, "missing argument: id");
htsp_muxer_unsubscribe(htsp, id);
return rpc_ok(ses);
}
/**
* HTSP specific methods
*/
static rpc_cmd_t htsp_rpc[] = {
{ "subscribe", htsp_subscribe, ACCESS_STREAMING },
{ "unsubscribe", htsp_unsubscribe, ACCESS_STREAMING },
{ NULL, NULL, 0 },
};
/*
*
*/
@ -165,15 +235,16 @@ htsp_input(htsp_t *htsp, const void *buf, int len)
printf("INPUT:\n");
htsmsg_print(in);
out = rpc_dispatch(&htsp->htsp_rpc, in, NULL, htsp,
out = rpc_dispatch(&htsp->htsp_rpc, in, htsp_rpc, htsp,
(struct sockaddr *)&htsp->htsp_tcp_session.tcp_peer_addr);
htsmsg_destroy(in);
printf("OUTPUT:\n");
htsmsg_print(out);
htsp_send_msg(htsp, out, 0);
if(out != NULL) {
printf("OUTPUT:\n");
htsmsg_print(out);
htsp_send_msg(htsp, out, 0);
}
if(!was_async && htsp->htsp_rpc.rs_is_async) {
printf("Session went into async mode\n");
@ -244,6 +315,8 @@ htsp_data_input(htsp_t *htsp)
static void
htsp_disconnect(htsp_t *htsp)
{
htsp_muxer_cleanup(htsp);
LIST_REMOVE(htsp, htsp_global_link);
free(htsp->htsp_buf);
@ -279,6 +352,7 @@ htsp_tcp_callback(tcpevent_t event, void *tcpsession)
break;
case TCP_DISCONNECT:
htsp->htsp_zombie = 1;
htsp_disconnect(htsp);
break;

3
htsp.h
View file

@ -37,6 +37,9 @@ typedef struct htsp {
rpc_session_t htsp_rpc;
int htsp_async_init_sent;
int htsp_zombie;
struct th_subscription_list htsp_subscriptions;
} htsp_t;

View file

@ -28,89 +28,156 @@
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <libhts/htsp.h>
#include "tvhead.h"
#include "channels.h"
#include "subscriptions.h"
#include "dispatch.h"
#include "epg.h"
#include "pvr.h"
#include "htsp.h"
#include "htsp_muxer.h"
#include "mux.h"
static void
htsp_packet_input(void *opaque, th_muxstream_t *tms, th_pkt_t *pkt)
{
htsp_t *htsp = opaque;
htsmsg_t *m = htsmsg_create();
th_muxer_t *tm = tms->tms_muxer;
// th_stream_t *st = tms->tms_stream;
th_subscription_t *s = tm->tm_subscription;
/*
* Build a message for this frame
*/
/*
htsmsg_add_str(m, "method", "muxpkt");
htsmsg_add_u32(m, "id", s->ths_u32);
htsmsg_add_u64(m, "stream", tms->tms_index);
htsmsg_add_u64(m, "dts", pkt->pkt_dts);
htsmsg_add_u64(m, "pts", pkt->pkt_pts);
htsmsg_add_u32(m, "duration", pkt->pkt_duration);
htsmsg_add_u32(m, "com", pkt->pkt_commercial);
/**
* Since we will serialize directly we use 'binptr' which is a binary
* object that just points to data, thus avoiding a copy.
*/
htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen);
htsp_send_msg(htsp, m, 1);
}
/**
* Called when a subscription gets/loses access to a transport
*/
static void
client_subscription_callback(struct th_subscription *s,
subscription_event_t event, void *opaque)
htsp_subscription_callback(struct th_subscription *s,
subscription_event_t event, void *opaque)
{
htsp_connection_t *hc = opaque;
htsp_t *htsp = opaque;
th_muxer_t *tm;
th_muxstream_t *tms;
int index = 0;
htsmsg_t *m, *sub;
th_stream_t *st;
switch(event) {
case TRANSPORT_AVAILABLE:
tm = muxer_init(s, htsp_packet_input, htsp);
m = htsmsg_create();
htsmsg_add_str(m, "method", "subscription_start");
htsmsg_add_u32(m, "id", s->ths_u32);
LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
tms->tms_index = index++;
st = tms->tms_stream;
sub = htsmsg_create();
htsmsg_add_u32(sub, "index", tms->tms_index);
htsmsg_add_str(sub, "type", htstvstreamtype2txt(st->st_type));
htsmsg_add_str(sub, "language", st->st_lang);
htsmsg_add_msg(m, "stream", sub);
}
htsmsg_print(m);
htsp_send_msg(htsp, m, 0);
muxer_play(tm, AV_NOPTS_VALUE);
break;
case TRANSPORT_UNAVAILABLE:
if(htsp->htsp_zombie == 0) {
m = htsmsg_create();
htsmsg_add_str(m, "method", "subscription_stop");
htsmsg_add_u32(m, "id", s->ths_u32);
htsmsg_add_str(m, "reason", "unknown");
htsp_send_msg(htsp, m, 0);
}
muxer_deinit(s->ths_muxer, s);
break;
}
}
/*
/**
*
*/
void
htsp_muxer_subscribe(htsp_connection_t *hc, th_channel_t *ch, int weight)
htsp_muxer_subscribe(htsp_t *htsp, channel_t *ch, int weight, uint32_t tag)
{
th_subscription_t *s;
th_muxer_t *m;
LIST_FOREACH(s, &hc->hc_subscriptions, ths_subscriber_link) {
if(s->ths_channel == ch) {
subscription_set_weight(s, weight);
return;
}
}
m = calloc(1, sizeof(th_muxer_t));
LIST_INSERT_HEAD(&hc->hc_subscriptions, s, ths_subscriber_link);
s = subscription_create(ch, weight, "HTSP", htsp_subscription_callback, htsp,
tag);
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, s, ths_subscriber_link);
}
/*
/**
*
*/
static void
htsp_subscription_destroy(th_subscription_t *s)
{
LIST_REMOVE(s, ths_subscriber_link);
subscription_unsubscribe(s);
}
/**
*
*/
void
htsp_muxer_unsubscribe(htsp_connection_t *hc, th_channel_t *ch)
htsp_muxer_unsubscribe(htsp_t *htsp, uint32_t id)
{
th_subscription_t *s;
htsp_muxerer_t *htsp;
LIST_FOREACH(s, &hc->hc_subscriptions, ths_subscriber_link) {
if(s->ths_channel == ch)
LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) {
if(s->ths_u32 == id)
break;
}
if(s != NULL) {
LIST_REMOVE(s, ths_subscriber_link);
free(s->hs_opaque);
subscription_unsubscribe(s);
}
if(s != NULL)
htsp_subscription_destroy(s);
}
/*
/**
*
*/
void
htsp_muxer_cleanup(htsp_connection_t *hc)
htsp_muxer_cleanup(htsp_t *htsp)
{
th_subscription_t *s;
while((s = LIST_FIRST(&hc->hc_subscriptions)) != NULL) {
LIST_REMOVE(s, ths_subscriber_link);
subscription_unsubscribe(s);
}
while((s = LIST_FIRST(&htsp->htsp_subscriptions)) != NULL)
htsp_subscription_destroy(s);
}

View file

@ -19,9 +19,10 @@
#ifndef HTSP_MUXER_H_
#define HTSP_MUXER_H_
void htsp_muxer_subscribe(htsp_t *htsp, channel_t *ch, int weight);
void htsp_muxer_subscribe(htsp_t *htsp, channel_t *ch,
int weight, uint32_t tag);
void htsp_muxer_unsubscribe(htsp_t *htsp, channel_t *ch);
void htsp_muxer_unsubscribe(htsp_t *htsp, uint32_t id);
void htsp_muxer_cleanup(htsp_t *htsp);

View file

@ -187,7 +187,7 @@ output_multicast_load(struct config_head *head)
snprintf(title2, sizeof(title2), "IPTV-OUT: %s", title);
subscription_create(ch, 900, title2, iptv_subscription_callback, om);
subscription_create(ch, 900, title2, iptv_subscription_callback, om, 0);
return;
err:

2
pvr.c
View file

@ -605,7 +605,7 @@ pvr_fsm(pvr_rec_t *pvrr)
pvrr->pvrr_s = subscription_create(pvrr->pvrr_channel, 1000, "pvr",
pvr_subscription_callback,
pvrr);
pvrr, 0);
pvrr->pvrr_error = HTSTV_PVR_STATUS_DONE; /* assume everything will
work out ok */
break;

110
rpc.c
View file

@ -226,11 +226,11 @@ rpc_event_info(rpc_session_t *ses, htsmsg_t *in, void *opaque)
#if 0
/*
/**
* record method
*/
static void
htsp_record(htsp_connection_t *hc, htsp_msg_t *m)
static htsmsg_t *
rpc_record(rpc_session_t *ses, htsmsg_t *in, void *opaque)
{
htsp_msg_t *r;
uint32_t u32;
@ -277,103 +277,39 @@ htsp_record(htsp_connection_t *hc, htsp_msg_t *m)
}
#endif
#if 0
/*
* subscribe
/**
* Common methods and their required auth level
*/
static void
htsp_subscribe(htsp_connection_t *hc, htsp_msg_t *m)
{
htsp_msg_t *r;
const char *str, *errtxt = NULL;
th_channel_t *ch;
int weight;
r = htsp_create_msg();
htsp_add_u32(r, "seq", hc->hc_seq);
if((str = htsp_get_string(m, "channel")) != NULL) {
if((ch = channel_find(str, 0)) != NULL) {
weight = htsp_get_u32(m, "weight") ?: 100;
if(weight > hc->hc_maxweight)
weight = hc->hc_maxweight;
htsp_stream_subscribe(hc, ch, weight);
} else {
errtxt = "Channel not found";
}
} else {
errtxt = "Missing 'channel' field";
}
htsp_send_msg(hc, NULL, r);
htsp_free_msg(r);
}
/*
* unsubscribe
*/
static void
htsp_subscribe(htsp_connection_t *hc, htsp_msg_t *m)
{
htsp_msg_t *r;
const char *str, *errtxt = NULL;
th_channel_t *ch;
r = htsp_create_msg();
htsp_add_u32(r, "seq", hc->hc_seq);
if((str = htsp_get_string(m, "channel")) != NULL) {
if((ch = channel_find(str, 0)) != NULL) {
htsp_stream_unsubscribe(hc, ch);
} else {
errtxt = "Channel not found";
}
} else {
errtxt = "Missing 'channel' field";
}
htsp_send_msg(hc, NULL, r);
htsp_free_msg(r);
}
#endif
/*
* List of all methods and their required auth level
*/
static rpc_cmd_t common_rpc[] = {
{ "auth", rpc_auth, 0 },
{ "async", rpc_async, ACCESS_STREAMING },
{ "channelsList", rpc_channels_list, ACCESS_STREAMING },
{ "eventInfo", rpc_event_info, ACCESS_STREAMING },
#if 0
{ "subscribe", rpc_subscribe, 1 },
{ "unsubscribe", rpc_unsubscribe, 1 },
#endif
// { "record", rpc_record, 2 },
{ NULL, NULL, 0 },
};
/*
/**
*
*/
static htsmsg_t *
static int
rpc_dispatch_set(rpc_session_t *ses, htsmsg_t *in, rpc_cmd_t *cmds,
void *opaque, const char *method, struct sockaddr *peer)
void *opaque, const char *method, struct sockaddr *peer,
htsmsg_t **outp)
{
for(; cmds->rc_name; cmds++) {
if(strcmp(cmds->rc_name, method))
continue;
if(access_verify(ses->rs_username, ses->rs_password, peer,
cmds->rc_privmask))
return rpc_unauthorized(ses);
return cmds->rc_func(ses, in, opaque);
cmds->rc_privmask)) {
*outp = rpc_unauthorized(ses);
return 0;
}
*outp = cmds->rc_func(ses, in, opaque);
return 0;
}
return NULL;
return -1;
}
@ -386,6 +322,7 @@ rpc_dispatch(rpc_session_t *ses, htsmsg_t *in, rpc_cmd_t *cmds, void *opaque,
{
const char *method;
htsmsg_t *out;
int r;
if(htsmsg_get_u32(in, "seq", &ses->rs_seq))
ses->rs_seq = 0;
@ -393,11 +330,14 @@ rpc_dispatch(rpc_session_t *ses, htsmsg_t *in, rpc_cmd_t *cmds, void *opaque,
if((method = htsmsg_get_str(in, "method")) == NULL)
return rpc_error(ses, "Method not specified");
out = rpc_dispatch_set(ses, in, common_rpc, opaque, method, peer);
if(out == NULL && cmds != NULL)
out = rpc_dispatch_set(ses, in, cmds, opaque, method, peer);
r = rpc_dispatch_set(ses, in, common_rpc, opaque, method, peer, &out);
if(r == -1 && cmds != NULL)
r = rpc_dispatch_set(ses, in, cmds, opaque, method, peer, &out);
return out ?: rpc_error(ses, "Method not found");
if(r == -1)
return rpc_error(ses, "Method not found");
return out;
}

2
rtsp.c
View file

@ -193,7 +193,7 @@ rtsp_session_create(channel_t *ch, struct sockaddr_in *dst)
LIST_INSERT_HEAD(&rtsp_sessions, rs, rs_global_link);
rs->rs_s = subscription_create(ch, 600, "RTSP",
rtsp_subscription_callback, rs);
rtsp_subscription_callback, rs, 0);
/* Initialize RTP */

View file

@ -127,7 +127,7 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b)
th_subscription_t *
subscription_create(channel_t *ch, unsigned int weight, const char *name,
subscription_callback_t *cb, void *opaque)
subscription_callback_t *cb, void *opaque, uint32_t u32)
{
th_subscription_t *s;
@ -137,6 +137,7 @@ subscription_create(channel_t *ch, unsigned int weight, const char *name,
s->ths_title = strdup(name);
s->ths_total_err = 0;
s->ths_weight = weight;
s->ths_u32 = u32;
time(&s->ths_start);
LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort);

View file

@ -26,7 +26,8 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight);
th_subscription_t *subscription_create(channel_t *ch, unsigned int weight,
const char *name,
subscription_callback_t *cb,
void *opaque);
void *opaque,
uint32_t u32);
void subscriptions_init(void);

View file

@ -814,7 +814,7 @@ xbmsp_input_file_open(xbmsp_t *xbmsp, uint32_t msgid, uint8_t *buf, int len)
xs->xs_xbmsp = xbmsp;
xs->xs_subscription = subscription_create(ch, 100, xbmsp->xbmsp_logname,
xbmsp_subscription_callback, xs);
xbmsp_subscription_callback, xs, 0);
LIST_INSERT_HEAD(&xbmsp->xbmsp_subscriptions, xs, xs_link);
xbmsp_send_handle(xbmsp, msgid, xs->xs_handle);