HTSP Server: Maintain stats about streaming queues and send periodic
updates to client.
This commit is contained in:
parent
39227c209a
commit
acee2bc9cc
2 changed files with 78 additions and 9 deletions
86
htsp.c
86
htsp.c
|
@ -73,6 +73,7 @@ typedef struct htsp_msg_q {
|
||||||
struct htsp_msg_queue hmq_q;
|
struct htsp_msg_queue hmq_q;
|
||||||
|
|
||||||
TAILQ_ENTRY(htsp_msg_q) hmq_link;
|
TAILQ_ENTRY(htsp_msg_q) hmq_link;
|
||||||
|
int hmq_strict_prio; /* Serve this queue 'til it's empty */
|
||||||
int hmq_length;
|
int hmq_length;
|
||||||
int hmq_payload; /* Bytes of streaming payload that's enqueued */
|
int hmq_payload; /* Bytes of streaming payload that's enqueued */
|
||||||
} htsp_msg_q_t;
|
} htsp_msg_q_t;
|
||||||
|
@ -106,6 +107,7 @@ typedef struct htsp_connection {
|
||||||
|
|
||||||
htsp_msg_q_t htsp_hmq_ctrl;
|
htsp_msg_q_t htsp_hmq_ctrl;
|
||||||
htsp_msg_q_t htsp_hmq_epg;
|
htsp_msg_q_t htsp_hmq_epg;
|
||||||
|
htsp_msg_q_t htsp_hmq_qstatus;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -126,6 +128,10 @@ typedef struct htsp_stream {
|
||||||
|
|
||||||
htsp_msg_q_t hs_q;
|
htsp_msg_q_t hs_q;
|
||||||
|
|
||||||
|
time_t hs_last_report; /* Last queue status report sent */
|
||||||
|
|
||||||
|
int hs_dropstats[PKT_NTYPES];
|
||||||
|
|
||||||
} htsp_stream_t;
|
} htsp_stream_t;
|
||||||
|
|
||||||
|
|
||||||
|
@ -159,10 +165,11 @@ htsp_msg_destroy(htsp_msg_t *hm)
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
htsp_init_queue(htsp_msg_q_t *hmq)
|
htsp_init_queue(htsp_msg_q_t *hmq, int strict_prio)
|
||||||
{
|
{
|
||||||
TAILQ_INIT(&hmq->hmq_q);
|
TAILQ_INIT(&hmq->hmq_q);
|
||||||
hmq->hmq_length = 0;
|
hmq->hmq_length = 0;
|
||||||
|
hmq->hmq_strict_prio = strict_prio;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -203,7 +210,11 @@ htsp_send(htsp_connection_t *htsp, htsmsg_t *m, th_pktref_t *pkt,
|
||||||
|
|
||||||
if(hmq->hmq_length == 0) {
|
if(hmq->hmq_length == 0) {
|
||||||
/* Activate queue */
|
/* Activate queue */
|
||||||
TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link);
|
|
||||||
|
if(hmq->hmq_strict_prio)
|
||||||
|
TAILQ_INSERT_HEAD(&htsp->htsp_active_output_queues, hmq, hmq_link);
|
||||||
|
else
|
||||||
|
TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link);
|
||||||
}
|
}
|
||||||
|
|
||||||
hmq->hmq_length++;
|
hmq->hmq_length++;
|
||||||
|
@ -569,14 +580,21 @@ htsp_write_scheduler(void *aux)
|
||||||
|
|
||||||
TAILQ_REMOVE(&htsp->htsp_active_output_queues, hmq, hmq_link);
|
TAILQ_REMOVE(&htsp->htsp_active_output_queues, hmq, hmq_link);
|
||||||
if(hmq->hmq_length) {
|
if(hmq->hmq_length) {
|
||||||
/* Still messages to be sent, put back at the end of active queues */
|
/* Still messages to be sent, put back in active queues */
|
||||||
TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link);
|
if(hmq->hmq_strict_prio)
|
||||||
|
TAILQ_INSERT_HEAD(&htsp->htsp_active_output_queues, hmq, hmq_link);
|
||||||
|
else
|
||||||
|
TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&htsp->htsp_out_mutex);
|
pthread_mutex_unlock(&htsp->htsp_out_mutex);
|
||||||
|
|
||||||
r = htsmsg_binary_serialize(hm->hm_msg, &dptr, &dlen, INT32_MAX);
|
r = htsmsg_binary_serialize(hm->hm_msg, &dptr, &dlen, INT32_MAX);
|
||||||
|
|
||||||
|
if(hm->hm_pktref) {
|
||||||
|
usleep(hm->hm_payloadsize * 3);
|
||||||
|
}
|
||||||
|
|
||||||
htsp_msg_destroy(hm);
|
htsp_msg_destroy(hm);
|
||||||
|
|
||||||
write(htsp->htsp_fd, dptr, dlen);
|
write(htsp->htsp_fd, dptr, dlen);
|
||||||
|
@ -605,8 +623,9 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source)
|
||||||
|
|
||||||
TAILQ_INIT(&htsp.htsp_active_output_queues);
|
TAILQ_INIT(&htsp.htsp_active_output_queues);
|
||||||
|
|
||||||
htsp_init_queue(&htsp.htsp_hmq_ctrl);
|
htsp_init_queue(&htsp.htsp_hmq_ctrl, 0);
|
||||||
htsp_init_queue(&htsp.htsp_hmq_epg);
|
htsp_init_queue(&htsp.htsp_hmq_qstatus, 1);
|
||||||
|
htsp_init_queue(&htsp.htsp_hmq_epg, 0);
|
||||||
|
|
||||||
htsp.htsp_name = strdup(buf);
|
htsp.htsp_name = strdup(buf);
|
||||||
|
|
||||||
|
@ -796,7 +815,23 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr)
|
||||||
{
|
{
|
||||||
htsp_stream_t *hs = opaque;
|
htsp_stream_t *hs = opaque;
|
||||||
th_pkt_t *pkt = pr->pr_pkt;
|
th_pkt_t *pkt = pr->pr_pkt;
|
||||||
htsmsg_t *m = htsmsg_create();
|
htsmsg_t *m = htsmsg_create(), *n;
|
||||||
|
htsp_msg_t *hm;
|
||||||
|
htsp_connection_t *htsp = hs->hs_htsp;
|
||||||
|
uint64_t ts;
|
||||||
|
int qlen = hs->hs_q.hmq_payload;
|
||||||
|
|
||||||
|
if((qlen > 500000 && pkt->pkt_frametype == PKT_B_FRAME) ||
|
||||||
|
(qlen > 750000 && pkt->pkt_frametype == PKT_P_FRAME) ||
|
||||||
|
(qlen > 1500000)) {
|
||||||
|
|
||||||
|
hs->hs_dropstats[pkt->pkt_frametype]++;
|
||||||
|
|
||||||
|
/* Queue size protection */
|
||||||
|
pkt_ref_dec(pr->pr_pkt);
|
||||||
|
free(pr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
htsmsg_add_str(m, "method", "muxpkt");
|
htsmsg_add_str(m, "method", "muxpkt");
|
||||||
htsmsg_add_u32(m, "channelId", hs->hs_channelid);
|
htsmsg_add_u32(m, "channelId", hs->hs_channelid);
|
||||||
|
@ -812,7 +847,40 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr)
|
||||||
* object that just points to data, thus avoiding a copy.
|
* object that just points to data, thus avoiding a copy.
|
||||||
*/
|
*/
|
||||||
htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen);
|
htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen);
|
||||||
htsp_send(hs->hs_htsp, m, pr, &hs->hs_q, pkt->pkt_payloadlen);
|
htsp_send(htsp, m, pr, &hs->hs_q, pkt->pkt_payloadlen);
|
||||||
|
|
||||||
|
if(hs->hs_last_report != dispatch_clock) {
|
||||||
|
/* Send a queue status report every second */
|
||||||
|
|
||||||
|
hs->hs_last_report = dispatch_clock;
|
||||||
|
|
||||||
|
m = htsmsg_create();
|
||||||
|
htsmsg_add_str(m, "method", "queueStatus");
|
||||||
|
htsmsg_add_u32(m, "channelId", hs->hs_channelid);
|
||||||
|
htsmsg_add_u32(m, "packets", hs->hs_q.hmq_length);
|
||||||
|
htsmsg_add_u32(m, "bytes", hs->hs_q.hmq_payload);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Figure out real time queue delay
|
||||||
|
*/
|
||||||
|
|
||||||
|
pthread_mutex_lock(&htsp->htsp_out_mutex);
|
||||||
|
if((hm = TAILQ_FIRST(&hs->hs_q.hmq_q)) != NULL &&
|
||||||
|
(n = hm->hm_msg) != NULL && !htsmsg_get_u64(n, "dts", &ts) &&
|
||||||
|
pkt->pkt_dts != AV_NOPTS_VALUE && ts != AV_NOPTS_VALUE) {
|
||||||
|
htsmsg_add_u64(m, "delay", pkt->pkt_dts - ts);
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&htsp->htsp_out_mutex);
|
||||||
|
|
||||||
|
htsmsg_add_u32(m, "Bdrops", hs->hs_dropstats[PKT_B_FRAME]);
|
||||||
|
htsmsg_add_u32(m, "Pdrops", hs->hs_dropstats[PKT_P_FRAME]);
|
||||||
|
htsmsg_add_u32(m, "Idrops", hs->hs_dropstats[PKT_I_FRAME]);
|
||||||
|
|
||||||
|
/* We use a special queue for queue status message so they're not
|
||||||
|
blocked by anything else */
|
||||||
|
|
||||||
|
htsp_send_message(hs->hs_htsp, m, &hs->hs_htsp->htsp_hmq_qstatus);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -835,7 +903,7 @@ htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s,
|
||||||
hs = calloc(1, sizeof(htsp_stream_t));
|
hs = calloc(1, sizeof(htsp_stream_t));
|
||||||
hs->hs_htsp = htsp;
|
hs->hs_htsp = htsp;
|
||||||
hs->hs_channelid = ch->ch_id;
|
hs->hs_channelid = ch->ch_id;
|
||||||
htsp_init_queue(&hs->hs_q);
|
htsp_init_queue(&hs->hs_q, 0);
|
||||||
|
|
||||||
m = htsmsg_create();
|
m = htsmsg_create();
|
||||||
htsmsg_add_u32(m, "channelId", ch->ch_id);
|
htsmsg_add_u32(m, "channelId", ch->ch_id);
|
||||||
|
|
1
packet.h
1
packet.h
|
@ -26,6 +26,7 @@
|
||||||
#define PKT_I_FRAME 1
|
#define PKT_I_FRAME 1
|
||||||
#define PKT_P_FRAME 2
|
#define PKT_P_FRAME 2
|
||||||
#define PKT_B_FRAME 3
|
#define PKT_B_FRAME 3
|
||||||
|
#define PKT_NTYPES 4
|
||||||
|
|
||||||
typedef struct th_pkt {
|
typedef struct th_pkt {
|
||||||
int64_t pkt_dts;
|
int64_t pkt_dts;
|
||||||
|
|
Loading…
Add table
Reference in a new issue