From 98202725cb89ea00a0b949eb309772d628d66627 Mon Sep 17 00:00:00 2001 From: sb1066 Date: Fri, 9 Jul 2010 20:29:11 +0000 Subject: [PATCH] Moved the http streaming to the webui --- src/http.c | 192 -------------------------------------------- src/webui/webui.c | 197 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 197 insertions(+), 192 deletions(-) diff --git a/src/http.c b/src/http.c index cb813b9c..c8866888 100644 --- a/src/http.c +++ b/src/http.c @@ -34,10 +34,6 @@ #include "http.h" #include "rtsp.h" #include "access.h" -#include "channels.h" -#include "subscriptions.h" -#include "streaming.h" -#include "psi.h" static void *http_server; @@ -792,193 +788,6 @@ http_serve(int fd, void *opaque, struct sockaddr_in *peer, close(fd); } -static void -http_stream_run(http_connection_t *hc, streaming_queue_t *sq) -{ - streaming_message_t *sm; - int run = 1; - int start = 1; - int timeouts = 0; - pthread_mutex_lock(&sq->sq_mutex); - - while(run) { - sm = TAILQ_FIRST(&sq->sq_queue); - if(sm == NULL) { - struct timespec ts; - struct timeval tp; - - gettimeofday(&tp, NULL); - ts.tv_sec = tp.tv_sec + 1; - ts.tv_nsec = tp.tv_usec * 1000; - - if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) { - int err = 0; - socklen_t errlen = sizeof(err); - - timeouts++; - - //Check socket status - getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); - - //Abort upon socket error, or after 5 seconds of silence - if(err || timeouts > 4){ - run = 0; - } - } - continue; - } - - timeouts = 0; //Reset timeout counter - TAILQ_REMOVE(&sq->sq_queue, sm, sm_link); - - pthread_mutex_unlock(&sq->sq_mutex); - - switch(sm->sm_type) { - case SMT_PACKET: - //printf("SMT_PACKET\n"); - break; - - case SMT_START: - if (start) { - struct streaming_start *ss = sm->sm_data; - uint8_t pat_ts[188]; - uint8_t pmt_ts[188]; - int pcrpid = ss->ss_pcr_pid; - int pmtpid = 0x0fff; - - http_output_content(hc, "video/mp2t"); - - //Send PAT - memset(pat_ts, 0xff, 188); - psi_build_pat(NULL, pat_ts+5, 183, pmtpid); - pat_ts[0] = 0x47; - pat_ts[1] = 0x40; - pat_ts[2] = 0x00; - pat_ts[3] = 0x10; - pat_ts[4] = 0x00; - run = (write(hc->hc_fd, pat_ts, 188) == 188); - - //Send PMT - memset(pmt_ts, 0xff, 188); - psi_build_pmt(ss, pmt_ts+5, 183, pcrpid); - pmt_ts[0] = 0x47; - pmt_ts[1] = 0x40 | (pmtpid >> 8); - pmt_ts[2] = pmtpid; - pmt_ts[3] = 0x10; - pmt_ts[4] = 0x00; - run = (write(hc->hc_fd, pmt_ts, 188) == 188); - - start = 0; - } - break; - - case SMT_STOP: - run = 0; - break; - - case SMT_TRANSPORT_STATUS: - //printf("SMT_TRANSPORT_STATUS\n"); - break; - - case SMT_NOSTART: - run = 0; - break; - - case SMT_MPEGTS: - run = (write(hc->hc_fd, sm->sm_data, 188) == 188); - break; - - case SMT_EXIT: - run = 0; - break; - } - - streaming_msg_free(sm); - pthread_mutex_lock(&sq->sq_mutex); - } - - pthread_mutex_unlock(&sq->sq_mutex); -} - - -static void -http_stream_playlist(http_connection_t *hc) -{ - htsbuf_queue_t *hq = &hc->hc_reply; - channel_t *ch = NULL; - const char *host = http_arg_get(&hc->hc_args, "Host"); - - pthread_mutex_lock(&global_lock); - - htsbuf_qprintf(hq, "#EXTM3U\n"); - RB_FOREACH(ch, &channel_name_tree, ch_name_link) { - htsbuf_qprintf(hq, "#EXTINF:-1,%s\n", ch->ch_name); - htsbuf_qprintf(hq, "http://%s/stream/channelid/%d\n", host, ch->ch_id); - } - - http_output_content(hc, "application/x-mpegURL"); - - pthread_mutex_unlock(&global_lock); -} - - -static int -http_stream_channel(http_connection_t *hc, int chid) -{ - channel_t *ch; - streaming_queue_t sq; - th_subscription_t *s; - int priority = 150; //Default value, Compute this somehow - - pthread_mutex_lock(&global_lock); - - if((ch = channel_find_by_identifier(chid)) == NULL) { - pthread_mutex_unlock(&global_lock); - http_error(hc, HTTP_STATUS_BAD_REQUEST); - return HTTP_STATUS_BAD_REQUEST; - } - - streaming_queue_init(&sq, ~SMT_TO_MASK(SUBSCRIPTION_RAW_MPEGTS)); - - s = subscription_create_from_channel(ch, priority, - "HTTP", &sq.sq_st, - SUBSCRIPTION_RAW_MPEGTS); - - - pthread_mutex_unlock(&global_lock); - - http_stream_run(hc, &sq); - - pthread_mutex_lock(&global_lock); - subscription_unsubscribe(s); - pthread_mutex_unlock(&global_lock); - streaming_queue_deinit(&sq); - - return 0; -} - - -/** - * Handle the http request. http://tvheadend/stream/channelid/ - */ -static int -http_stream(http_connection_t *hc, const char *remain, void *opaque) -{ - if(http_access_verify(hc, ACCESS_STREAMING)) { - http_error(hc, HTTP_STATUS_UNAUTHORIZED); - return HTTP_STATUS_UNAUTHORIZED; - } - - hc->hc_keep_alive = 0; - - if(remain == NULL) { - http_stream_playlist(hc); - return 0; - } - - return http_stream_channel(hc, atoi(remain)); -} - /** * Fire up HTTP server @@ -987,5 +796,4 @@ void http_server_init(void) { http_server = tcp_server_create(9981, http_serve, NULL); - http_path_add("/stream/channelid", NULL, http_stream, ACCESS_STREAMING); } diff --git a/src/webui/webui.c b/src/webui/webui.c index e3394b28..ffd6fb89 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -35,6 +35,7 @@ #include "webui.h" #include "dvr/dvr.h" #include "filebundle.h" +#include "psi.h" struct filebundle *filebundles; @@ -149,6 +150,201 @@ page_rtsp_playlist(http_connection_t *hc, const char *remain, void *opaque) return 0; } +/** + * HTTP stream loop + */ +static void +http_stream_run(http_connection_t *hc, streaming_queue_t *sq) +{ + streaming_message_t *sm; + int run = 1; + int start = 1; + int timeouts = 0; + pthread_mutex_lock(&sq->sq_mutex); + + while(run) { + sm = TAILQ_FIRST(&sq->sq_queue); + if(sm == NULL) { + struct timespec ts; + struct timeval tp; + + gettimeofday(&tp, NULL); + ts.tv_sec = tp.tv_sec + 1; + ts.tv_nsec = tp.tv_usec * 1000; + + if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) { + int err = 0; + socklen_t errlen = sizeof(err); + + timeouts++; + + //Check socket status + getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); + + //Abort upon socket error, or after 5 seconds of silence + if(err || timeouts > 4){ + run = 0; + } + } + continue; + } + + timeouts = 0; //Reset timeout counter + TAILQ_REMOVE(&sq->sq_queue, sm, sm_link); + + pthread_mutex_unlock(&sq->sq_mutex); + + switch(sm->sm_type) { + case SMT_PACKET: + //printf("SMT_PACKET\n"); + break; + + case SMT_START: + if (start) { + struct streaming_start *ss = sm->sm_data; + uint8_t pat_ts[188]; + uint8_t pmt_ts[188]; + int pcrpid = ss->ss_pcr_pid; + int pmtpid = 0x0fff; + + http_output_content(hc, "video/mp2t"); + + //Send PAT + memset(pat_ts, 0xff, 188); + psi_build_pat(NULL, pat_ts+5, 183, pmtpid); + pat_ts[0] = 0x47; + pat_ts[1] = 0x40; + pat_ts[2] = 0x00; + pat_ts[3] = 0x10; + pat_ts[4] = 0x00; + run = (write(hc->hc_fd, pat_ts, 188) == 188); + + //Send PMT + memset(pmt_ts, 0xff, 188); + psi_build_pmt(ss, pmt_ts+5, 183, pcrpid); + pmt_ts[0] = 0x47; + pmt_ts[1] = 0x40 | (pmtpid >> 8); + pmt_ts[2] = pmtpid; + pmt_ts[3] = 0x10; + pmt_ts[4] = 0x00; + run = (write(hc->hc_fd, pmt_ts, 188) == 188); + + start = 0; + } + break; + + case SMT_STOP: + run = 0; + break; + + case SMT_TRANSPORT_STATUS: + //printf("SMT_TRANSPORT_STATUS\n"); + break; + + case SMT_NOSTART: + run = 0; + break; + + case SMT_MPEGTS: + run = (write(hc->hc_fd, sm->sm_data, 188) == 188); + break; + + case SMT_EXIT: + run = 0; + break; + } + + streaming_msg_free(sm); + pthread_mutex_lock(&sq->sq_mutex); + } + + pthread_mutex_unlock(&sq->sq_mutex); +} + +/** + * Playlist with http streams (.m3u format) + */ +static void +http_stream_playlist(http_connection_t *hc) +{ + htsbuf_queue_t *hq = &hc->hc_reply; + channel_t *ch = NULL; + const char *host = http_arg_get(&hc->hc_args, "Host"); + + pthread_mutex_lock(&global_lock); + + htsbuf_qprintf(hq, "#EXTM3U\n"); + RB_FOREACH(ch, &channel_name_tree, ch_name_link) { + htsbuf_qprintf(hq, "#EXTINF:-1,%s\n", ch->ch_name); + htsbuf_qprintf(hq, "http://%s/stream/channelid/%d\n", host, ch->ch_id); + } + + http_output_content(hc, "application/x-mpegURL"); + + pthread_mutex_unlock(&global_lock); +} + +/** + * Subscribes to a channel and starts the streaming loop + */ +static int +http_stream_channel(http_connection_t *hc, int chid) +{ + channel_t *ch; + streaming_queue_t sq; + th_subscription_t *s; + int priority = 150; //Default value, Compute this somehow + + pthread_mutex_lock(&global_lock); + + if((ch = channel_find_by_identifier(chid)) == NULL) { + pthread_mutex_unlock(&global_lock); + http_error(hc, HTTP_STATUS_BAD_REQUEST); + return HTTP_STATUS_BAD_REQUEST; + } + + streaming_queue_init(&sq, ~SMT_TO_MASK(SUBSCRIPTION_RAW_MPEGTS)); + + s = subscription_create_from_channel(ch, priority, + "HTTP", &sq.sq_st, + SUBSCRIPTION_RAW_MPEGTS); + + + pthread_mutex_unlock(&global_lock); + + http_stream_run(hc, &sq); + + pthread_mutex_lock(&global_lock); + subscription_unsubscribe(s); + pthread_mutex_unlock(&global_lock); + streaming_queue_deinit(&sq); + + return 0; +} + + +/** + * Handle the http request. http://tvheadend/stream/channelid/ + */ +static int +http_stream(http_connection_t *hc, const char *remain, void *opaque) +{ + if(http_access_verify(hc, ACCESS_STREAMING)) { + http_error(hc, HTTP_STATUS_UNAUTHORIZED); + return HTTP_STATUS_UNAUTHORIZED; + } + + hc->hc_keep_alive = 0; + + if(remain == NULL) { + http_stream_playlist(hc); + return 0; + } + + return http_stream_channel(hc, atoi(remain)); +} + + /** * Static download of a file from an embedded filebundle */ @@ -335,6 +531,7 @@ webui_init(const char *contentpath) http_path_add("/state", NULL, page_statedump, ACCESS_ADMIN); + http_path_add("/stream/channelid", NULL, http_stream, ACCESS_STREAMING); webui_static_content(contentpath, "/static", "src/webui/static"); webui_static_content(contentpath, "/docs", "docs/html");