From acee2bc9cc9af05b4a00384bf7910129a20fa0a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Sun, 28 Sep 2008 17:19:11 +0000 Subject: [PATCH] HTSP Server: Maintain stats about streaming queues and send periodic updates to client. --- htsp.c | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++------ packet.h | 1 + 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/htsp.c b/htsp.c index 189a72f7..3da6c553 100644 --- a/htsp.c +++ b/htsp.c @@ -73,6 +73,7 @@ typedef struct htsp_msg_q { struct htsp_msg_queue hmq_q; TAILQ_ENTRY(htsp_msg_q) hmq_link; + int hmq_strict_prio; /* Serve this queue 'til it's empty */ int hmq_length; int hmq_payload; /* Bytes of streaming payload that's enqueued */ } htsp_msg_q_t; @@ -106,6 +107,7 @@ typedef struct htsp_connection { htsp_msg_q_t htsp_hmq_ctrl; htsp_msg_q_t htsp_hmq_epg; + htsp_msg_q_t htsp_hmq_qstatus; /** * @@ -126,6 +128,10 @@ typedef struct htsp_stream { htsp_msg_q_t hs_q; + time_t hs_last_report; /* Last queue status report sent */ + + int hs_dropstats[PKT_NTYPES]; + } htsp_stream_t; @@ -159,10 +165,11 @@ htsp_msg_destroy(htsp_msg_t *hm) * */ static void -htsp_init_queue(htsp_msg_q_t *hmq) +htsp_init_queue(htsp_msg_q_t *hmq, int strict_prio) { TAILQ_INIT(&hmq->hmq_q); hmq->hmq_length = 0; + hmq->hmq_strict_prio = strict_prio; } @@ -203,7 +210,11 @@ htsp_send(htsp_connection_t *htsp, htsmsg_t *m, th_pktref_t *pkt, if(hmq->hmq_length == 0) { /* Activate queue */ - TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link); + + if(hmq->hmq_strict_prio) + TAILQ_INSERT_HEAD(&htsp->htsp_active_output_queues, hmq, hmq_link); + else + TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link); } hmq->hmq_length++; @@ -569,14 +580,21 @@ htsp_write_scheduler(void *aux) TAILQ_REMOVE(&htsp->htsp_active_output_queues, hmq, hmq_link); if(hmq->hmq_length) { - /* Still messages to be sent, put back at the end of active queues */ - TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link); + /* Still messages to be sent, put back in active queues */ + if(hmq->hmq_strict_prio) + TAILQ_INSERT_HEAD(&htsp->htsp_active_output_queues, hmq, hmq_link); + else + TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link); } pthread_mutex_unlock(&htsp->htsp_out_mutex); r = htsmsg_binary_serialize(hm->hm_msg, &dptr, &dlen, INT32_MAX); + if(hm->hm_pktref) { + usleep(hm->hm_payloadsize * 3); + } + htsp_msg_destroy(hm); write(htsp->htsp_fd, dptr, dlen); @@ -605,8 +623,9 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source) TAILQ_INIT(&htsp.htsp_active_output_queues); - htsp_init_queue(&htsp.htsp_hmq_ctrl); - htsp_init_queue(&htsp.htsp_hmq_epg); + htsp_init_queue(&htsp.htsp_hmq_ctrl, 0); + htsp_init_queue(&htsp.htsp_hmq_qstatus, 1); + htsp_init_queue(&htsp.htsp_hmq_epg, 0); htsp.htsp_name = strdup(buf); @@ -796,8 +815,24 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr) { htsp_stream_t *hs = opaque; th_pkt_t *pkt = pr->pr_pkt; - htsmsg_t *m = htsmsg_create(); + htsmsg_t *m = htsmsg_create(), *n; + htsp_msg_t *hm; + htsp_connection_t *htsp = hs->hs_htsp; + uint64_t ts; + int qlen = hs->hs_q.hmq_payload; + if((qlen > 500000 && pkt->pkt_frametype == PKT_B_FRAME) || + (qlen > 750000 && pkt->pkt_frametype == PKT_P_FRAME) || + (qlen > 1500000)) { + + hs->hs_dropstats[pkt->pkt_frametype]++; + + /* Queue size protection */ + pkt_ref_dec(pr->pr_pkt); + free(pr); + return; + } + htsmsg_add_str(m, "method", "muxpkt"); htsmsg_add_u32(m, "channelId", hs->hs_channelid); @@ -812,7 +847,40 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr) * object that just points to data, thus avoiding a copy. */ htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen); - htsp_send(hs->hs_htsp, m, pr, &hs->hs_q, pkt->pkt_payloadlen); + htsp_send(htsp, m, pr, &hs->hs_q, pkt->pkt_payloadlen); + + if(hs->hs_last_report != dispatch_clock) { + /* Send a queue status report every second */ + + hs->hs_last_report = dispatch_clock; + + m = htsmsg_create(); + htsmsg_add_str(m, "method", "queueStatus"); + htsmsg_add_u32(m, "channelId", hs->hs_channelid); + htsmsg_add_u32(m, "packets", hs->hs_q.hmq_length); + htsmsg_add_u32(m, "bytes", hs->hs_q.hmq_payload); + + /** + * Figure out real time queue delay + */ + + pthread_mutex_lock(&htsp->htsp_out_mutex); + if((hm = TAILQ_FIRST(&hs->hs_q.hmq_q)) != NULL && + (n = hm->hm_msg) != NULL && !htsmsg_get_u64(n, "dts", &ts) && + pkt->pkt_dts != AV_NOPTS_VALUE && ts != AV_NOPTS_VALUE) { + htsmsg_add_u64(m, "delay", pkt->pkt_dts - ts); + } + pthread_mutex_unlock(&htsp->htsp_out_mutex); + + htsmsg_add_u32(m, "Bdrops", hs->hs_dropstats[PKT_B_FRAME]); + htsmsg_add_u32(m, "Pdrops", hs->hs_dropstats[PKT_P_FRAME]); + htsmsg_add_u32(m, "Idrops", hs->hs_dropstats[PKT_I_FRAME]); + + /* We use a special queue for queue status message so they're not + blocked by anything else */ + + htsp_send_message(hs->hs_htsp, m, &hs->hs_htsp->htsp_hmq_qstatus); + } } @@ -835,7 +903,7 @@ htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s, hs = calloc(1, sizeof(htsp_stream_t)); hs->hs_htsp = htsp; hs->hs_channelid = ch->ch_id; - htsp_init_queue(&hs->hs_q); + htsp_init_queue(&hs->hs_q, 0); m = htsmsg_create(); htsmsg_add_u32(m, "channelId", ch->ch_id); diff --git a/packet.h b/packet.h index a4e4035b..f11b4f88 100644 --- a/packet.h +++ b/packet.h @@ -26,6 +26,7 @@ #define PKT_I_FRAME 1 #define PKT_P_FRAME 2 #define PKT_B_FRAME 3 +#define PKT_NTYPES 4 typedef struct th_pkt { int64_t pkt_dts;