Rework HTSP to use subscriptionId:s when communicating with client

This commit is contained in:
Andreas Öman 2009-02-18 17:25:36 +00:00
parent 4a2f481571
commit ff13b3f3ee
4 changed files with 41 additions and 47 deletions

View file

@ -121,7 +121,8 @@ dvr_rec_subscribe(dvr_entry_t *de)
return;
de->de_s = subscription_create_from_channel(de->de_channel, 1000, "pvr",
dvr_subscription_callback, de);
dvr_subscription_callback, de,
0);
}

75
htsp.c
View file

@ -121,9 +121,9 @@ typedef struct htsp_connection {
*/
typedef struct htsp_stream {
streaming_target_t hs_st;
int hs_channelid; /* We can not deref channel since we don't (and can't)
hold global_lock during delivery */
int hs_sid; /* Subscription ID */
htsp_connection_t *hs_htsp;
htsp_msg_q_t hs_q;
@ -380,21 +380,21 @@ static htsmsg_t *
htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
{
th_subscription_t *s;
uint32_t chid;
uint32_t chid, sid;
channel_t *ch;
if(htsmsg_get_u32(in, "channelId", &chid))
return htsp_error("Missing argument 'channeId'");
if(htsmsg_get_u32(in, "subscriptionId", &sid))
return htsp_error("Missing argument 'subscriptionId'");
if((ch = channel_find_by_identifier(chid)) == NULL)
return htsp_error("Requested channel does not exist");
LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link)
if(s->ths_channel == ch)
return htsmsg_create(); /* We just say ok if already subscribed */
s = subscription_create_from_channel(ch, 500, "htsp",
htsp_subscription_callback, htsp);
htsp_subscription_callback, htsp, sid);
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, s, ths_subscriber_link);
return htsmsg_create();
}
@ -407,21 +407,17 @@ static htsmsg_t *
htsp_method_unsubscribe(htsp_connection_t *htsp, htsmsg_t *in)
{
th_subscription_t *s;
uint32_t chid;
channel_t *ch;
uint32_t sid;
if(htsmsg_get_u32(in, "channelId", &chid))
return htsp_error("Missing argument 'channeId'");
if((ch = channel_find_by_identifier(chid)) == NULL)
return htsp_error("Requested channel does not exist");
if(htsmsg_get_u32(in, "subscriptionId", &sid))
return htsp_error("Missing argument 'subscriptionId'");
LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link)
if(s->ths_channel == ch)
if(s->ths_u32 == sid)
break;
if(s == NULL)
return htsmsg_create(); /* We just say ok if already unsubscribed */
return htsmsg_create(); /* Just say ok */
LIST_REMOVE(s, ths_subscriber_link);
subscription_unsubscribe(s);
@ -723,7 +719,7 @@ htsp_event_update(channel_t *ch, event_t *e)
m = htsmsg_create();
htsmsg_add_str(m, "method", "channelUpdate");
htsmsg_add_u32(m, "channelId", ch->ch_id);
if(e == NULL)
e = epg_event_find_by_time(ch, now);
@ -801,17 +797,14 @@ htsp_tag_delete(channel_tag_t *ct)
*
*/
static void
htsp_send_subscription_status(htsp_connection_t *htsp, channel_t *ch,
htsp_send_subscription_status(htsp_connection_t *htsp, th_subscription_t *ths,
const char *txt)
{
htsmsg_t *m;
if(ch == NULL)
return;
m = htsmsg_create();
htsmsg_add_str(m, "method", "subscriptionStatus");
htsmsg_add_u32(m, "channelId", ch->ch_id);
htsmsg_add_u32(m, "subscriptionId", ths->ths_u32);
if(txt != NULL)
htsmsg_add_str(m, "status", txt);
@ -852,7 +845,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr)
}
htsmsg_add_str(m, "method", "muxpkt");
htsmsg_add_u32(m, "channelId", hs->hs_channelid);
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsmsg_add_u32(m, "frametype", frametypearray[pkt->pkt_frametype]);
htsmsg_add_u32(m, "stream", pkt->pkt_componentindex);
@ -875,7 +868,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr)
m = htsmsg_create();
htsmsg_add_str(m, "method", "queueStatus");
htsmsg_add_u32(m, "channelId", hs->hs_channelid);
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsmsg_add_u32(m, "packets", hs->hs_q.hmq_length);
htsmsg_add_u32(m, "bytes", hs->hs_q.hmq_payload);
@ -914,7 +907,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr)
*/
static void
htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s,
channel_t *ch, streaming_pad_t *sp)
streaming_pad_t *sp)
{
streaming_component_t *sc;
htsp_stream_t *hs;
@ -924,12 +917,12 @@ htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s,
hs = calloc(1, sizeof(htsp_stream_t));
hs->hs_htsp = htsp;
hs->hs_channelid = ch->ch_id;
hs->hs_sid = s->ths_u32;
htsp_init_queue(&hs->hs_q, 0);
m = htsmsg_create();
htsmsg_add_u32(m, "channelId", ch->ch_id);
htsmsg_add_str(m, "method", "subscriptionStart");
htsmsg_add_u32(m, "subscriptionId", s->ths_u32);
streaming_target_init(&hs->hs_st, htsp_stream_deliver, hs);
@ -984,7 +977,7 @@ htsp_subscription_stop(htsp_connection_t *htsp, th_subscription_t *s,
/* Send a stop message back */
m = htsmsg_create();
htsmsg_add_u32(m, "channelId", hs->hs_channelid);
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsmsg_add_str(m, "method", "subscriptionStop");
if(reason)
@ -1012,10 +1005,6 @@ htsp_subscription_callback(struct th_subscription *s,
subscription_event_t event, void *opaque)
{
htsp_connection_t *htsp = opaque;
channel_t *ch = s->ths_channel; /* Beware: may be NULL if a channel
is destroyed while we're subscribing
to it. It's not dangerous, just keep
it in mind. */
th_transport_t *t;
switch(event) {
@ -1023,37 +1012,35 @@ htsp_subscription_callback(struct th_subscription *s,
abort();
case SUBSCRIPTION_TRANSPORT_RUN:
assert(ch != NULL); /* ch must be valid here */
htsp_send_subscription_status(htsp, ch, NULL);
htsp_send_subscription_status(htsp, s, NULL);
t = s->ths_transport;
htsp_subscription_start(htsp, s, ch, &t->tht_streaming_pad);
htsp_subscription_start(htsp, s, &t->tht_streaming_pad);
return;
case SUBSCRIPTION_NO_INPUT:
htsp_send_subscription_status(htsp, ch, "No input detected");
htsp_send_subscription_status(htsp, s, "No input detected");
break;
case SUBSCRIPTION_NO_DESCRAMBLER:
htsp_send_subscription_status(htsp, ch, "No descrambler available");
htsp_send_subscription_status(htsp, s, "No descrambler available");
break;
case SUBSCRIPTION_NO_ACCESS:
htsp_send_subscription_status(htsp, ch, "Access denied");
htsp_send_subscription_status(htsp, s, "Access denied");
break;
case SUBSCRIPTION_RAW_INPUT:
htsp_send_subscription_status(htsp, ch,
htsp_send_subscription_status(htsp, s,
"Unable to reassemble packets from input");
break;
case SUBSCRIPTION_VALID_PACKETS:
htsp_send_subscription_status(htsp, ch, NULL);
htsp_send_subscription_status(htsp, s, NULL);
break;
case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE:
htsp_send_subscription_status(htsp, ch,
htsp_send_subscription_status(htsp, s,
"No transport available, retrying...");
break;

View file

@ -148,10 +148,13 @@ subscription_create(int weight, const char *name,
th_subscription_t *
subscription_create_from_channel(channel_t *ch,
unsigned int weight, const char *name,
ths_event_callback_t *cb, void *opaque)
ths_event_callback_t *cb, void *opaque,
uint32_t u32)
{
th_subscription_t *s = subscription_create(weight, name, cb, opaque);
s->ths_u32 = u32;
s->ths_channel = ch;
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
s->ths_transport = NULL;

View file

@ -45,6 +45,8 @@ typedef struct th_subscription {
for this link */
void *ths_opaque;
uint32_t ths_u32;
char *ths_title; /* display title */
time_t ths_start; /* time when subscription started */
int ths_total_err; /* total errors during entire subscription */
@ -72,7 +74,8 @@ th_subscription_t *subscription_create_from_channel(channel_t *ch,
unsigned int weight,
const char *name,
ths_event_callback_t *cb,
void *opaque);
void *opaque,
uint32_t u32);
th_subscription_t *subscription_create_from_transport(th_transport_t *t,