From ff13b3f3ee38ccfc4793bdde1e35828f2a9bab1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Wed, 18 Feb 2009 17:25:36 +0000 Subject: [PATCH] Rework HTSP to use subscriptionId:s when communicating with client --- dvr/dvr_rec.c | 3 +- htsp.c | 75 ++++++++++++++++++++----------------------------- subscriptions.c | 5 +++- subscriptions.h | 5 +++- 4 files changed, 41 insertions(+), 47 deletions(-) diff --git a/dvr/dvr_rec.c b/dvr/dvr_rec.c index 3fb7d996..dd78df31 100644 --- a/dvr/dvr_rec.c +++ b/dvr/dvr_rec.c @@ -121,7 +121,8 @@ dvr_rec_subscribe(dvr_entry_t *de) return; de->de_s = subscription_create_from_channel(de->de_channel, 1000, "pvr", - dvr_subscription_callback, de); + dvr_subscription_callback, de, + 0); } diff --git a/htsp.c b/htsp.c index e95fbf82..32cd86d7 100644 --- a/htsp.c +++ b/htsp.c @@ -121,9 +121,9 @@ typedef struct htsp_connection { */ typedef struct htsp_stream { streaming_target_t hs_st; - int hs_channelid; /* We can not deref channel since we don't (and can't) - hold global_lock during delivery */ - + + int hs_sid; /* Subscription ID */ + htsp_connection_t *hs_htsp; htsp_msg_q_t hs_q; @@ -380,21 +380,21 @@ static htsmsg_t * htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) { th_subscription_t *s; - uint32_t chid; + uint32_t chid, sid; channel_t *ch; if(htsmsg_get_u32(in, "channelId", &chid)) return htsp_error("Missing argument 'channeId'"); + if(htsmsg_get_u32(in, "subscriptionId", &sid)) + return htsp_error("Missing argument 'subscriptionId'"); + if((ch = channel_find_by_identifier(chid)) == NULL) return htsp_error("Requested channel does not exist"); - LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) - if(s->ths_channel == ch) - return htsmsg_create(); /* We just say ok if already subscribed */ - s = subscription_create_from_channel(ch, 500, "htsp", - htsp_subscription_callback, htsp); + htsp_subscription_callback, htsp, sid); + LIST_INSERT_HEAD(&htsp->htsp_subscriptions, s, ths_subscriber_link); return htsmsg_create(); } @@ -407,21 +407,17 @@ static htsmsg_t * htsp_method_unsubscribe(htsp_connection_t *htsp, htsmsg_t *in) { th_subscription_t *s; - uint32_t chid; - channel_t *ch; + uint32_t sid; - if(htsmsg_get_u32(in, "channelId", &chid)) - return htsp_error("Missing argument 'channeId'"); - - if((ch = channel_find_by_identifier(chid)) == NULL) - return htsp_error("Requested channel does not exist"); + if(htsmsg_get_u32(in, "subscriptionId", &sid)) + return htsp_error("Missing argument 'subscriptionId'"); LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) - if(s->ths_channel == ch) + if(s->ths_u32 == sid) break; if(s == NULL) - return htsmsg_create(); /* We just say ok if already unsubscribed */ + return htsmsg_create(); /* Just say ok */ LIST_REMOVE(s, ths_subscriber_link); subscription_unsubscribe(s); @@ -723,7 +719,7 @@ htsp_event_update(channel_t *ch, event_t *e) m = htsmsg_create(); htsmsg_add_str(m, "method", "channelUpdate"); htsmsg_add_u32(m, "channelId", ch->ch_id); - + if(e == NULL) e = epg_event_find_by_time(ch, now); @@ -801,17 +797,14 @@ htsp_tag_delete(channel_tag_t *ct) * */ static void -htsp_send_subscription_status(htsp_connection_t *htsp, channel_t *ch, +htsp_send_subscription_status(htsp_connection_t *htsp, th_subscription_t *ths, const char *txt) { htsmsg_t *m; - if(ch == NULL) - return; - m = htsmsg_create(); htsmsg_add_str(m, "method", "subscriptionStatus"); - htsmsg_add_u32(m, "channelId", ch->ch_id); + htsmsg_add_u32(m, "subscriptionId", ths->ths_u32); if(txt != NULL) htsmsg_add_str(m, "status", txt); @@ -852,7 +845,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr) } htsmsg_add_str(m, "method", "muxpkt"); - htsmsg_add_u32(m, "channelId", hs->hs_channelid); + htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); htsmsg_add_u32(m, "frametype", frametypearray[pkt->pkt_frametype]); htsmsg_add_u32(m, "stream", pkt->pkt_componentindex); @@ -875,7 +868,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr) m = htsmsg_create(); htsmsg_add_str(m, "method", "queueStatus"); - htsmsg_add_u32(m, "channelId", hs->hs_channelid); + htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); htsmsg_add_u32(m, "packets", hs->hs_q.hmq_length); htsmsg_add_u32(m, "bytes", hs->hs_q.hmq_payload); @@ -914,7 +907,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr) */ static void htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s, - channel_t *ch, streaming_pad_t *sp) + streaming_pad_t *sp) { streaming_component_t *sc; htsp_stream_t *hs; @@ -924,12 +917,12 @@ htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s, hs = calloc(1, sizeof(htsp_stream_t)); hs->hs_htsp = htsp; - hs->hs_channelid = ch->ch_id; + hs->hs_sid = s->ths_u32; htsp_init_queue(&hs->hs_q, 0); m = htsmsg_create(); - htsmsg_add_u32(m, "channelId", ch->ch_id); htsmsg_add_str(m, "method", "subscriptionStart"); + htsmsg_add_u32(m, "subscriptionId", s->ths_u32); streaming_target_init(&hs->hs_st, htsp_stream_deliver, hs); @@ -984,7 +977,7 @@ htsp_subscription_stop(htsp_connection_t *htsp, th_subscription_t *s, /* Send a stop message back */ m = htsmsg_create(); - htsmsg_add_u32(m, "channelId", hs->hs_channelid); + htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); htsmsg_add_str(m, "method", "subscriptionStop"); if(reason) @@ -1012,10 +1005,6 @@ htsp_subscription_callback(struct th_subscription *s, subscription_event_t event, void *opaque) { htsp_connection_t *htsp = opaque; - channel_t *ch = s->ths_channel; /* Beware: may be NULL if a channel - is destroyed while we're subscribing - to it. It's not dangerous, just keep - it in mind. */ th_transport_t *t; switch(event) { @@ -1023,37 +1012,35 @@ htsp_subscription_callback(struct th_subscription *s, abort(); case SUBSCRIPTION_TRANSPORT_RUN: - - assert(ch != NULL); /* ch must be valid here */ - htsp_send_subscription_status(htsp, ch, NULL); + htsp_send_subscription_status(htsp, s, NULL); t = s->ths_transport; - htsp_subscription_start(htsp, s, ch, &t->tht_streaming_pad); + htsp_subscription_start(htsp, s, &t->tht_streaming_pad); return; case SUBSCRIPTION_NO_INPUT: - htsp_send_subscription_status(htsp, ch, "No input detected"); + htsp_send_subscription_status(htsp, s, "No input detected"); break; case SUBSCRIPTION_NO_DESCRAMBLER: - htsp_send_subscription_status(htsp, ch, "No descrambler available"); + htsp_send_subscription_status(htsp, s, "No descrambler available"); break; case SUBSCRIPTION_NO_ACCESS: - htsp_send_subscription_status(htsp, ch, "Access denied"); + htsp_send_subscription_status(htsp, s, "Access denied"); break; case SUBSCRIPTION_RAW_INPUT: - htsp_send_subscription_status(htsp, ch, + htsp_send_subscription_status(htsp, s, "Unable to reassemble packets from input"); break; case SUBSCRIPTION_VALID_PACKETS: - htsp_send_subscription_status(htsp, ch, NULL); + htsp_send_subscription_status(htsp, s, NULL); break; case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE: - htsp_send_subscription_status(htsp, ch, + htsp_send_subscription_status(htsp, s, "No transport available, retrying..."); break; diff --git a/subscriptions.c b/subscriptions.c index 1cce4128..b2d12b1e 100644 --- a/subscriptions.c +++ b/subscriptions.c @@ -148,10 +148,13 @@ subscription_create(int weight, const char *name, th_subscription_t * subscription_create_from_channel(channel_t *ch, unsigned int weight, const char *name, - ths_event_callback_t *cb, void *opaque) + ths_event_callback_t *cb, void *opaque, + uint32_t u32) { th_subscription_t *s = subscription_create(weight, name, cb, opaque); + s->ths_u32 = u32; + s->ths_channel = ch; LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link); s->ths_transport = NULL; diff --git a/subscriptions.h b/subscriptions.h index f0b344d7..07a81603 100644 --- a/subscriptions.h +++ b/subscriptions.h @@ -45,6 +45,8 @@ typedef struct th_subscription { for this link */ void *ths_opaque; + uint32_t ths_u32; + char *ths_title; /* display title */ time_t ths_start; /* time when subscription started */ int ths_total_err; /* total errors during entire subscription */ @@ -72,7 +74,8 @@ th_subscription_t *subscription_create_from_channel(channel_t *ch, unsigned int weight, const char *name, ths_event_callback_t *cb, - void *opaque); + void *opaque, + uint32_t u32); th_subscription_t *subscription_create_from_transport(th_transport_t *t,