Add streaming support to HTSP.

Still needs some polishing, but can deliver multiple channels without 
problems to showtime.
This commit is contained in:
Andreas Öman 2008-09-27 16:49:12 +00:00
parent 7ff46ef6f2
commit 970f692308

329
htsp.c
View file

@ -36,6 +36,7 @@
#include "packet.h"
#include "access.h"
#include "htsp.h"
#include "streaming.h"
#include <libhts/htsmsg_binary.h>
@ -110,6 +111,31 @@ typedef struct htsp_connection {
} htsp_connection_t;
/**
*
*/
typedef struct htsp_stream {
streaming_target_t hs_st;
int hs_channelid; /* We can not deref channel since we don't (and can't)
hold global_lock during delivery */
htsp_connection_t *hs_htsp;
htsp_msg_q_t hs_q;
} htsp_stream_t;
/**
*
*/
static void htsp_subscription_callback(struct th_subscription *s,
subscription_event_t event,
void *opaque);
/**
*
@ -140,13 +166,20 @@ htsp_init_queue(htsp_msg_q_t *hmq)
/**
*
*/
#if 0
static void
htsp_flush_queue(htsp_msg_q_t *hmq)
htsp_destroy_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq)
{
abort();
htsp_msg_t *hm;
if(hmq->hmq_length)
TAILQ_REMOVE(&htsp->htsp_active_output_queues, hmq, hmq_link);
while((hm = TAILQ_FIRST(&hmq->hmq_q)) != NULL) {
TAILQ_REMOVE(&hmq->hmq_q, hm, hm_link);
htsp_msg_destroy(hm);
}
}
#endif
/**
*
@ -335,6 +368,60 @@ htsp_method_getEvent(htsp_connection_t *htsp, htsmsg_t *in)
}
/**
* Request subscription for a channel
*/
static htsmsg_t *
htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
{
th_subscription_t *s;
uint32_t chid;
channel_t *ch;
if(htsmsg_get_u32(in, "channelId", &chid))
return htsp_error("Missing argument 'channeId'");
if((ch = channel_find_by_identifier(chid)) == NULL)
return htsp_error("Requested channel does not exist");
LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link)
if(s->ths_channel == ch)
return htsmsg_create(); /* We just say ok if already subscribed */
s = subscription_create_from_channel(ch, 500, "htsp",
htsp_subscription_callback, htsp);
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, s, ths_subscriber_link);
return htsmsg_create();
}
/**
* Request unsubscription for a channel
*/
static htsmsg_t *
htsp_method_unsubscribe(htsp_connection_t *htsp, htsmsg_t *in)
{
th_subscription_t *s;
uint32_t chid;
channel_t *ch;
if(htsmsg_get_u32(in, "channelId", &chid))
return htsp_error("Missing argument 'channeId'");
if((ch = channel_find_by_identifier(chid)) == NULL)
return htsp_error("Requested channel does not exist");
LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link)
if(s->ths_channel == ch)
break;
if(s == NULL)
return htsmsg_create(); /* We just say ok if already unsubscribed */
LIST_REMOVE(s, ths_subscriber_link);
subscription_unsubscribe(s);
return htsmsg_create();
}
@ -348,6 +435,8 @@ struct {
} htsp_methods[] = {
{ "async", htsp_method_async, ACCESS_STREAMING},
{ "getEvent", htsp_method_getEvent, ACCESS_STREAMING},
{ "subscribe", htsp_method_subscribe, ACCESS_STREAMING},
{ "unsubscribe", htsp_method_unsubscribe, ACCESS_STREAMING},
};
@ -514,6 +603,7 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source)
{
htsp_connection_t htsp;
char buf[30];
th_subscription_t *s;
snprintf(buf, sizeof(buf), "%s", inet_ntoa(source->sin_addr));
@ -532,11 +622,29 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source)
pthread_create(&htsp.htsp_writer_thread, NULL, htsp_write_scheduler, &htsp);
/**
* Reader loop
*/
htsp_read_loop(&htsp);
/**
* Ok, we're back, other end disconnected. Clean up stuff.
*/
pthread_mutex_lock(&global_lock);
/* Beware! Closing subscriptions will invoke a lot of callbacks
down in the streaming code. So we do this as early as possible
to avoid any weird lockups */
while((s = LIST_FIRST(&htsp.htsp_subscriptions)) != NULL) {
LIST_REMOVE(s, ths_subscriber_link);
subscription_unsubscribe(s);
}
if(htsp.htsp_async_mode)
LIST_REMOVE(&htsp, htsp_async_link);
pthread_mutex_unlock(&global_lock);
free(htsp.htsp_name);
@ -661,3 +769,216 @@ htsp_tag_delete(channel_tag_t *ct)
htsmsg_add_str(m, "method", "tagDelete");
htsp_async_send(m);
}
/**
*
*/
static void
htsp_send_subscription_status(htsp_connection_t *htsp, channel_t *ch,
const char *txt)
{
htsmsg_t *m;
if(ch == NULL)
return;
m = htsmsg_create();
htsmsg_add_str(m, "method", "subscriptionStatus");
htsmsg_add_u32(m, "channelId", ch->ch_id);
if(txt != NULL)
htsmsg_add_str(m, "status", txt);
htsp_send_message(htsp, m, NULL);
}
/**
* Build a htsmsg from a th_pkt and enqueue it on our HTSP transport
*/
static void
htsp_stream_deliver(void *opaque, struct th_pktref *pr)
{
htsp_stream_t *hs = opaque;
th_pkt_t *pkt = pr->pr_pkt;
htsmsg_t *m = htsmsg_create();
htsmsg_add_str(m, "method", "muxpkt");
htsmsg_add_u32(m, "channelId", hs->hs_channelid);
htsmsg_add_u64(m, "stream", pkt->pkt_componentindex);
htsmsg_add_u64(m, "dts", pkt->pkt_dts);
htsmsg_add_u64(m, "pts", pkt->pkt_pts);
htsmsg_add_u32(m, "duration", pkt->pkt_duration);
htsmsg_add_u32(m, "com", pkt->pkt_commercial);
/**
* Since we will serialize directly we use 'binptr' which is a binary
* object that just points to data, thus avoiding a copy.
*/
htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen);
htsp_send(hs->hs_htsp, m, pr, &hs->hs_q);
}
/**
* Send a 'subscriptionStart' message to client informing about
* delivery start and all components.
*
* Setup a streaming target that will deliver packets to the HTSP transport.
*/
static void
htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s,
channel_t *ch, streaming_pad_t *sp)
{
streaming_component_t *sc;
htsp_stream_t *hs;
htsmsg_t *m, *streams, *c;
assert(s->ths_st == NULL);
hs = calloc(1, sizeof(htsp_stream_t));
hs->hs_htsp = htsp;
hs->hs_channelid = ch->ch_id;
htsp_init_queue(&hs->hs_q);
m = htsmsg_create();
htsmsg_add_u32(m, "channelId", ch->ch_id);
htsmsg_add_str(m, "method", "subscriptionStart");
streaming_target_init(&hs->hs_st, htsp_stream_deliver, hs);
/**
* Lock streming pad delivery so we can hook us up.
*/
pthread_mutex_lock(sp->sp_mutex);
/* Setup each stream */
streams = htsmsg_create_array();
LIST_FOREACH(sc, &sp->sp_components, sc_link) {
c = htsmsg_create();
htsmsg_add_u32(c, "index", sc->sc_index);
htsmsg_add_str(c, "type", streaming_component_type2txt(sc->sc_type));
htsmsg_add_str(c, "language", sc->sc_lang);
htsmsg_add_msg(streams, NULL, c);
}
htsmsg_add_msg(m, "streams", streams);
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q);
/* Link to the pad */
streaming_target_connect(sp, &hs->hs_st);
s->ths_st = &hs->hs_st;
/* Once we unlock here we will start getting the callback */
pthread_mutex_unlock(sp->sp_mutex);
}
/**
* Send a 'subscriptionStart' stop
*
* Tear down all streaming related stuff.
*/
static void
htsp_subscription_stop(htsp_connection_t *htsp, th_subscription_t *s,
const char *reason)
{
htsp_stream_t *hs;
htsmsg_t *m;
assert(s->ths_st != NULL);
hs = (htsp_stream_t *)s->ths_st;
/* Unlink from pad */
streaming_target_disconnect(&hs->hs_st);
/* Send a stop message back */
m = htsmsg_create();
htsmsg_add_u32(m, "channelId", hs->hs_channelid);
htsmsg_add_str(m, "method", "subscriptionStop");
if(reason)
htsmsg_add_str(m, "reason", reason);
/* Send on normal control queue cause we are about do destroy
the per-subscription queue */
htsp_send_message(hs->hs_htsp, m, NULL);
htsp_destroy_queue(htsp, &hs->hs_q);
free(hs);
s->ths_st = NULL;
}
/**
*
*/
static void
htsp_subscription_callback(struct th_subscription *s,
subscription_event_t event, void *opaque)
{
htsp_connection_t *htsp = opaque;
channel_t *ch = s->ths_channel; /* Beware: may be NULL if a channel
is destroyed while we're subscribing
to it. It's not dangerous, just keep
it in mind. */
th_transport_t *t;
switch(event) {
case SUBSCRIPTION_EVENT_INVALID:
abort();
case SUBSCRIPTION_TRANSPORT_RUN:
assert(ch != NULL); /* ch must be valid here */
htsp_send_subscription_status(htsp, ch, NULL);
t = s->ths_transport;
htsp_subscription_start(htsp, s, ch, &t->tht_streaming_pad);
return;
case SUBSCRIPTION_NO_INPUT:
htsp_send_subscription_status(htsp, ch, "No input detected");
break;
case SUBSCRIPTION_NO_DESCRAMBLER:
htsp_send_subscription_status(htsp, ch, "No descrambler available");
break;
case SUBSCRIPTION_NO_ACCESS:
htsp_send_subscription_status(htsp, ch, "Access denied");
break;
case SUBSCRIPTION_RAW_INPUT:
htsp_send_subscription_status(htsp, ch,
"Unable to reassemble packets from input");
break;
case SUBSCRIPTION_VALID_PACKETS:
htsp_send_subscription_status(htsp, ch, NULL);
break;
case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE:
htsp_send_subscription_status(htsp, ch,
"No transport available, retrying...");
break;
case SUBSCRIPTION_TRANSPORT_LOST:
htsp_subscription_stop(htsp, s, "Transport destroyed");
break;
case SUBSCRIPTION_DESTROYED:
htsp_subscription_stop(htsp, s, NULL);
return;
}
}