From 970f6923083402b32698ca6dbffa941312eb1078 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Sat, 27 Sep 2008 16:49:12 +0000 Subject: [PATCH] Add streaming support to HTSP. Still needs some polishing, but can deliver multiple channels without problems to showtime. --- htsp.c | 329 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 325 insertions(+), 4 deletions(-) diff --git a/htsp.c b/htsp.c index 3cd640e0..16f20fbb 100644 --- a/htsp.c +++ b/htsp.c @@ -36,6 +36,7 @@ #include "packet.h" #include "access.h" #include "htsp.h" +#include "streaming.h" #include @@ -110,6 +111,31 @@ typedef struct htsp_connection { } htsp_connection_t; +/** + * + */ +typedef struct htsp_stream { + streaming_target_t hs_st; + int hs_channelid; /* We can not deref channel since we don't (and can't) + hold global_lock during delivery */ + + htsp_connection_t *hs_htsp; + + htsp_msg_q_t hs_q; + +} htsp_stream_t; + + + +/** + * + */ +static void htsp_subscription_callback(struct th_subscription *s, + subscription_event_t event, + void *opaque); + + + /** * @@ -140,13 +166,20 @@ htsp_init_queue(htsp_msg_q_t *hmq) /** * */ -#if 0 static void -htsp_flush_queue(htsp_msg_q_t *hmq) +htsp_destroy_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq) { - abort(); + htsp_msg_t *hm; + + if(hmq->hmq_length) + TAILQ_REMOVE(&htsp->htsp_active_output_queues, hmq, hmq_link); + + while((hm = TAILQ_FIRST(&hmq->hmq_q)) != NULL) { + TAILQ_REMOVE(&hmq->hmq_q, hm, hm_link); + htsp_msg_destroy(hm); + } } -#endif + /** * @@ -335,6 +368,60 @@ htsp_method_getEvent(htsp_connection_t *htsp, htsmsg_t *in) } +/** + * Request subscription for a channel + */ +static htsmsg_t * +htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) +{ + th_subscription_t *s; + uint32_t chid; + channel_t *ch; + + if(htsmsg_get_u32(in, "channelId", &chid)) + return htsp_error("Missing argument 'channeId'"); + + if((ch = channel_find_by_identifier(chid)) == NULL) + return htsp_error("Requested channel does not exist"); + + LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) + if(s->ths_channel == ch) + return htsmsg_create(); /* We just say ok if already subscribed */ + + s = subscription_create_from_channel(ch, 500, "htsp", + htsp_subscription_callback, htsp); + LIST_INSERT_HEAD(&htsp->htsp_subscriptions, s, ths_subscriber_link); + return htsmsg_create(); +} + + +/** + * Request unsubscription for a channel + */ +static htsmsg_t * +htsp_method_unsubscribe(htsp_connection_t *htsp, htsmsg_t *in) +{ + th_subscription_t *s; + uint32_t chid; + channel_t *ch; + + if(htsmsg_get_u32(in, "channelId", &chid)) + return htsp_error("Missing argument 'channeId'"); + + if((ch = channel_find_by_identifier(chid)) == NULL) + return htsp_error("Requested channel does not exist"); + + LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) + if(s->ths_channel == ch) + break; + + if(s == NULL) + return htsmsg_create(); /* We just say ok if already unsubscribed */ + + LIST_REMOVE(s, ths_subscriber_link); + subscription_unsubscribe(s); + return htsmsg_create(); +} @@ -348,6 +435,8 @@ struct { } htsp_methods[] = { { "async", htsp_method_async, ACCESS_STREAMING}, { "getEvent", htsp_method_getEvent, ACCESS_STREAMING}, + { "subscribe", htsp_method_subscribe, ACCESS_STREAMING}, + { "unsubscribe", htsp_method_unsubscribe, ACCESS_STREAMING}, }; @@ -514,6 +603,7 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source) { htsp_connection_t htsp; char buf[30]; + th_subscription_t *s; snprintf(buf, sizeof(buf), "%s", inet_ntoa(source->sin_addr)); @@ -532,11 +622,29 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source) pthread_create(&htsp.htsp_writer_thread, NULL, htsp_write_scheduler, &htsp); + /** + * Reader loop + */ + htsp_read_loop(&htsp); + /** + * Ok, we're back, other end disconnected. Clean up stuff. + */ + pthread_mutex_lock(&global_lock); + + /* Beware! Closing subscriptions will invoke a lot of callbacks + down in the streaming code. So we do this as early as possible + to avoid any weird lockups */ + while((s = LIST_FIRST(&htsp.htsp_subscriptions)) != NULL) { + LIST_REMOVE(s, ths_subscriber_link); + subscription_unsubscribe(s); + } + if(htsp.htsp_async_mode) LIST_REMOVE(&htsp, htsp_async_link); + pthread_mutex_unlock(&global_lock); free(htsp.htsp_name); @@ -661,3 +769,216 @@ htsp_tag_delete(channel_tag_t *ct) htsmsg_add_str(m, "method", "tagDelete"); htsp_async_send(m); } + + +/** + * + */ +static void +htsp_send_subscription_status(htsp_connection_t *htsp, channel_t *ch, + const char *txt) +{ + htsmsg_t *m; + + if(ch == NULL) + return; + + m = htsmsg_create(); + htsmsg_add_str(m, "method", "subscriptionStatus"); + htsmsg_add_u32(m, "channelId", ch->ch_id); + + if(txt != NULL) + htsmsg_add_str(m, "status", txt); + + htsp_send_message(htsp, m, NULL); +} + + +/** + * Build a htsmsg from a th_pkt and enqueue it on our HTSP transport + */ +static void +htsp_stream_deliver(void *opaque, struct th_pktref *pr) +{ + htsp_stream_t *hs = opaque; + th_pkt_t *pkt = pr->pr_pkt; + htsmsg_t *m = htsmsg_create(); + + htsmsg_add_str(m, "method", "muxpkt"); + htsmsg_add_u32(m, "channelId", hs->hs_channelid); + + htsmsg_add_u64(m, "stream", pkt->pkt_componentindex); + htsmsg_add_u64(m, "dts", pkt->pkt_dts); + htsmsg_add_u64(m, "pts", pkt->pkt_pts); + htsmsg_add_u32(m, "duration", pkt->pkt_duration); + htsmsg_add_u32(m, "com", pkt->pkt_commercial); + + /** + * Since we will serialize directly we use 'binptr' which is a binary + * object that just points to data, thus avoiding a copy. + */ + htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen); + + htsp_send(hs->hs_htsp, m, pr, &hs->hs_q); +} + + +/** + * Send a 'subscriptionStart' message to client informing about + * delivery start and all components. + * + * Setup a streaming target that will deliver packets to the HTSP transport. + */ +static void +htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s, + channel_t *ch, streaming_pad_t *sp) +{ + streaming_component_t *sc; + htsp_stream_t *hs; + htsmsg_t *m, *streams, *c; + + assert(s->ths_st == NULL); + + hs = calloc(1, sizeof(htsp_stream_t)); + hs->hs_htsp = htsp; + hs->hs_channelid = ch->ch_id; + htsp_init_queue(&hs->hs_q); + + m = htsmsg_create(); + htsmsg_add_u32(m, "channelId", ch->ch_id); + htsmsg_add_str(m, "method", "subscriptionStart"); + + streaming_target_init(&hs->hs_st, htsp_stream_deliver, hs); + + /** + * Lock streming pad delivery so we can hook us up. + */ + pthread_mutex_lock(sp->sp_mutex); + + /* Setup each stream */ + streams = htsmsg_create_array(); + LIST_FOREACH(sc, &sp->sp_components, sc_link) { + c = htsmsg_create(); + htsmsg_add_u32(c, "index", sc->sc_index); + htsmsg_add_str(c, "type", streaming_component_type2txt(sc->sc_type)); + htsmsg_add_str(c, "language", sc->sc_lang); + htsmsg_add_msg(streams, NULL, c); + } + + htsmsg_add_msg(m, "streams", streams); + + htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q); + + /* Link to the pad */ + streaming_target_connect(sp, &hs->hs_st); + + s->ths_st = &hs->hs_st; + + /* Once we unlock here we will start getting the callback */ + pthread_mutex_unlock(sp->sp_mutex); +} + + +/** + * Send a 'subscriptionStart' stop + * + * Tear down all streaming related stuff. + */ +static void +htsp_subscription_stop(htsp_connection_t *htsp, th_subscription_t *s, + const char *reason) +{ + htsp_stream_t *hs; + htsmsg_t *m; + + assert(s->ths_st != NULL); + + hs = (htsp_stream_t *)s->ths_st; + + /* Unlink from pad */ + streaming_target_disconnect(&hs->hs_st); + + /* Send a stop message back */ + m = htsmsg_create(); + htsmsg_add_u32(m, "channelId", hs->hs_channelid); + htsmsg_add_str(m, "method", "subscriptionStop"); + + if(reason) + htsmsg_add_str(m, "reason", reason); + + /* Send on normal control queue cause we are about do destroy + the per-subscription queue */ + htsp_send_message(hs->hs_htsp, m, NULL); + + htsp_destroy_queue(htsp, &hs->hs_q); + + free(hs); + + s->ths_st = NULL; +} + + + + +/** + * + */ +static void +htsp_subscription_callback(struct th_subscription *s, + subscription_event_t event, void *opaque) +{ + htsp_connection_t *htsp = opaque; + channel_t *ch = s->ths_channel; /* Beware: may be NULL if a channel + is destroyed while we're subscribing + to it. It's not dangerous, just keep + it in mind. */ + th_transport_t *t; + + switch(event) { + case SUBSCRIPTION_EVENT_INVALID: + abort(); + + case SUBSCRIPTION_TRANSPORT_RUN: + + assert(ch != NULL); /* ch must be valid here */ + htsp_send_subscription_status(htsp, ch, NULL); + + t = s->ths_transport; + htsp_subscription_start(htsp, s, ch, &t->tht_streaming_pad); + return; + + case SUBSCRIPTION_NO_INPUT: + htsp_send_subscription_status(htsp, ch, "No input detected"); + break; + + case SUBSCRIPTION_NO_DESCRAMBLER: + htsp_send_subscription_status(htsp, ch, "No descrambler available"); + break; + + case SUBSCRIPTION_NO_ACCESS: + htsp_send_subscription_status(htsp, ch, "Access denied"); + break; + + case SUBSCRIPTION_RAW_INPUT: + htsp_send_subscription_status(htsp, ch, + "Unable to reassemble packets from input"); + break; + + case SUBSCRIPTION_VALID_PACKETS: + htsp_send_subscription_status(htsp, ch, NULL); + break; + + case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE: + htsp_send_subscription_status(htsp, ch, + "No transport available, retrying..."); + break; + + case SUBSCRIPTION_TRANSPORT_LOST: + htsp_subscription_stop(htsp, s, "Transport destroyed"); + break; + + case SUBSCRIPTION_DESTROYED: + htsp_subscription_stop(htsp, s, NULL); + return; + } +}