diff --git a/Makefile b/Makefile index 3f92cf4c..41f9b44a 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ PROG=${BUILDDIR}/tvheadend CFLAGS = -Wall -Werror -Wwrite-strings -Wno-deprecated-declarations CFLAGS += -Wmissing-prototypes - +LDFLAGS += -lrt # # Core @@ -53,6 +53,8 @@ SRCS = src/main.c \ src/parser_latm.c \ src/tsdemux.c \ src/bitstream.c \ + src/rtsp.c \ + src/rtp.c \ src/htsp.c \ src/serviceprobe.c \ src/htsmsg.c \ diff --git a/src/http.c b/src/http.c index 23eb92f9..7ba118a5 100644 --- a/src/http.c +++ b/src/http.c @@ -34,6 +34,7 @@ #include "tvhead.h" #include "tcp.h" #include "http.h" +#include "rtsp.h" #include "access.h" static void *http_server; @@ -446,6 +447,8 @@ process_request(http_connection_t *hc, htsbuf_queue_t *spill) int n, rval = -1; uint8_t authbuf[150]; + hc->hc_url_orig = tvh_strdupa(hc->hc_url); + /* Set keep-alive status */ v = http_arg_get(&hc->hc_args, "connection"); @@ -489,10 +492,8 @@ process_request(http_connection_t *hc, htsbuf_queue_t *spill) switch(hc->hc_version) { case RTSP_VERSION_1_0: - rval = -1; + rval = rtsp_process_request(hc); break; - // rtsp_process_request(hc); - return 0; case HTTP_VERSION_1_0: case HTTP_VERSION_1_1: @@ -600,7 +601,7 @@ http_path_add(const char *path, void *opaque, http_callback_t *callback, /** * De-escape HTTP URL */ -static void +void http_deescape(char *s) { char v, *d = s; @@ -768,7 +769,7 @@ http_serve(int fd, void *opaque, struct sockaddr_in *source) free(hc.hc_username); free(hc.hc_password); - // rtsp_disconncet(hc); + rtsp_disconncet(&hc); http_arg_flush(&hc.hc_args); http_arg_flush(&hc.hc_req_args); diff --git a/src/http.h b/src/http.h index 00660f4a..bb8122da 100644 --- a/src/http.h +++ b/src/http.h @@ -36,14 +36,13 @@ typedef struct http_arg { #define HTTP_STATUS_NOT_FOUND 404 -LIST_HEAD(rtsp_session_head, rtsp_session); - typedef struct http_connection { int hc_fd; struct sockaddr_in *hc_peer; char *hc_representative; char *hc_url; + char *hc_url_orig; int hc_keep_alive; htsbuf_queue_t hc_reply; @@ -79,8 +78,6 @@ typedef struct http_connection { char *hc_username; char *hc_password; - struct rtsp_session_head hc_rtsp_sessions; - int hc_authenticated; /* Used by RTSP, it seems VLC does not send authentication headers for each command, so we just say that it's ok @@ -93,6 +90,8 @@ typedef struct http_connection { char *hc_post_data; unsigned int hc_post_len; + struct rtsp *hc_rtsp_session; + } http_connection_t; @@ -137,4 +136,6 @@ void http_server_init(void); int http_access_verify(http_connection_t *hc, int mask); +void http_deescape(char *s); + #endif /* HTTP_H_ */ diff --git a/src/rtp.c b/src/rtp.c index b374a3eb..8935badd 100644 --- a/src/rtp.c +++ b/src/rtp.c @@ -17,12 +17,10 @@ */ #include -#include #include #include #include #include -#include #include #include #include @@ -30,74 +28,64 @@ #include #include "tvhead.h" -#include "channels.h" #include "rtp.h" -#include "dispatch.h" - -int -rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr, - int fd, struct sockaddr *dst, socklen_t dstlen, - uint16_t seq) -{ - struct msghdr msg; - struct iovec vec[2]; - AVRational mpeg_tc = {1, 90000}; - char hdr[12]; - - pcr = av_rescale_q(pcr, AV_TIME_BASE_Q, mpeg_tc); - - hdr[0] = 0x80; - hdr[1] = 33; /* M2TS */ - hdr[2] = seq >> 8; - hdr[3] = seq; - - hdr[4] = pcr >> 24; - hdr[5] = pcr >> 16; - hdr[6] = pcr >> 8; - hdr[7] = pcr; - - hdr[8] = 0; - hdr[9] = 0; - hdr[10] = 0; - hdr[11] = 0; - - - vec[0].iov_base = hdr; - vec[0].iov_len = 12; - vec[1].iov_base = pkt; - vec[1].iov_len = blocks * 188; - - memset(&msg, 0, sizeof(msg)); - msg.msg_name = dst; - msg.msg_namelen = dstlen; - msg.msg_iov = vec; - msg.msg_iovlen = 2; - - return sendmsg(fd, &msg, 0); -} - +static const AVRational mpeg_tc = {1, 90000}; void -rtp_output_ts(void *opaque, th_subscription_t *s, - uint8_t *pkt, int blocks, int64_t pcr) +rtp_send_mpv(rtp_send_t *sender, void *opaque, rtp_stream_t *rs, + const uint8_t *data, size_t len, + int64_t pts) { - th_rtp_streamer_t *trs = opaque; + uint32_t flags = 0; + int s; + int payloadsize = RTP_MAX_PACKET_SIZE - (4 + 4 + 4 + 4); + uint8_t *buf; + + if(data[0] != 0x00 || data[1] != 0x00 || data[2] != 0x01) + return; // Not a startcode, something is fishy - rtp_sendmsg(pkt, blocks, pcr, trs->trs_fd, - (struct sockaddr *)&trs->trs_dest, sizeof(struct sockaddr_in), - trs->trs_seq); + pts = av_rescale_q(pts, AV_TIME_BASE_Q, mpeg_tc); - trs->trs_seq++; -} - - - - -void -rtp_streamer_init(th_rtp_streamer_t *trs, int fd, struct sockaddr_in *dst) -{ - trs->trs_fd = fd; - trs->trs_dest = *dst; - trs->trs_seq = 0; + if(data[3] == 0xb3) { + // Sequence Start code, set Begin-Of-Sequence + flags |= 1 << 13; + } + while(len > 0) { + + s = len > payloadsize ? payloadsize : len; + + buf = rs->rs_buf; + buf[0] = 0x80; + buf[1] = 32 | (len == payloadsize ? 0x80 : 0); + buf[2] = rs->rs_seq >> 8; + buf[3] = rs->rs_seq; + + buf[4] = pts >> 24; + buf[5] = pts >> 16; + buf[6] = pts >> 8; + buf[7] = pts; + + buf[8] = 0; + buf[9] = 0; + buf[10] = 0; + buf[11] = 0; + + buf[12] = flags >> 24; + buf[13] = flags >> 16; + buf[14] = flags >> 8; + buf[15] = flags; + + memcpy(buf + 16, data, s); + + len -= s; + data += s; + + sender(opaque, buf, s + 16); + rs->rs_seq++; + + flags = 0; + + } + assert(len == 0); } diff --git a/src/rtp.h b/src/rtp.h index ee084c8d..c29cb776 100644 --- a/src/rtp.h +++ b/src/rtp.h @@ -1,6 +1,6 @@ /* - * tvheadend, RTP interface - * Copyright (C) 2007 Andreas Öman + * Tvheadend, RTP streamer + * Copyright (C) 2007, 2009 Andreas Öman * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -19,21 +19,18 @@ #ifndef RTP_H_ #define RTP_H_ -typedef struct th_rtp_streamer { - int trs_fd; - struct sockaddr_in trs_dest; - int16_t trs_seq; +typedef void (rtp_send_t)(void *opaque, void *buf, size_t len); -} th_rtp_streamer_t; +#define RTP_MAX_PACKET_SIZE 1472 -void rtp_streamer_init(th_rtp_streamer_t *trs, int fd, - struct sockaddr_in *dst); +typedef struct rtp_stream { + uint16_t rs_seq; + + int rs_ptr; + uint8_t rs_buf[RTP_MAX_PACKET_SIZE]; +} rtp_stream_t; -void rtp_output_ts(void *opaque, th_subscription_t *s, - uint8_t *pkt, int blocks, int64_t pcr); - -int rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr, - int fd, struct sockaddr *dst, socklen_t dstlen, - uint16_t seq); +void rtp_send_mpv(rtp_send_t *sender, void *opaque, rtp_stream_t *rs, + const uint8_t *data, size_t len, int64_t pts); #endif /* RTP_H_ */ diff --git a/src/rtsp.c b/src/rtsp.c index 0a366bdc..bd09db60 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -32,15 +32,102 @@ #include "tvhead.h" #include "channels.h" #include "subscriptions.h" -#include "dispatch.h" -#include "rtp.h" -#include "tsmux.h" #include "tcp.h" #include "http.h" #include "access.h" #include "rtsp.h" +#include "rtp.h" +#include "streaming.h" +#include "transports.h" -#include +TAILQ_HEAD(rtsp_packet_queue, rtsp_packet); + +/** + * + */ +typedef struct rtsp_resource { + enum { + RTSP_RESOURCE_CHANNEL, + RTSP_RESOURCE_SERVICE + } rr_type; + + union { + struct channel *rr_channel; + struct th_transport *rr_service; + void *rr_r; + }; + +} rtsp_resource_t; + + +/** + * + */ +typedef struct rtsp_packet { + TAILQ_ENTRY(rtsp_packet) rp_link; + + int rp_payloadsize; + uint8_t rp_payload[0]; + +} rtsp_packet_t; + + +/** + * + */ +typedef struct rtsp { + int rtsp_refcount; // Should only be modified with global_lock held + + /* Streaming */ + + struct th_subscription *rtsp_sub; + + struct streaming_start *rtsp_ss; + + streaming_target_t rtsp_input; + + LIST_HEAD(, rtsp_stream) rtsp_streams; + + int rtsp_running; + + char rtsp_session_id[65]; + + struct rtsp_packet_queue rtsp_pqueue; + int rtsp_pqueue_size; + + pthread_t rtsp_thread; + int rtsp_run_thread; + int rtsp_tcp_socket; + + pthread_mutex_t rtsp_mutex; + pthread_cond_t rtsp_cond; + +} rtsp_t; + + +/** + * + */ +typedef struct rtsp_stream { + LIST_ENTRY(rtsp_stream) rs_link; + rtsp_t *rs_rtsp; + + int rs_index; // Source index + int rs_output; // Output index (set by RTSP client) + + int rs_send_sock; + int rs_recv_sock; + + int rs_type; + rtp_stream_t rs_rtp; + rtp_send_t *rs_sender; + + int rs_interleaved_stream; + +} rtsp_stream_t; + + +#define rtsp_printf(hc, fmt...) htsbuf_qprintf(&(hc)->hc_reply, fmt) #define RTSP_STATUS_OK 200 #define RTSP_STATUS_UNAUTHORIZED 401 @@ -52,175 +139,6 @@ -#define rcprintf(c, fmt...) tcp_printf(&(rc)->rc_tcp_session, fmt) - -#define rtsp_printf(hc, fmt...) htsbuf_qprintf(&(hc)->hc_reply, fmt) - -static AVRandomState rtsp_rnd; - -static struct rtsp_session_head rtsp_sessions; - - -typedef struct rtsp_session { - LIST_ENTRY(rtsp_session) rs_global_link; - LIST_ENTRY(rtsp_session) rs_con_link; - - uint32_t rs_id; - - int rs_fd[2]; - int rs_server_port[2]; - - th_subscription_t *rs_s; - - th_rtp_streamer_t rs_rtp_streamer; - - void *rs_muxer; - -} rtsp_session_t; - - - - -/** - * Resolve an URL into a channel - */ - -static channel_t * -rtsp_channel_by_url(char *url) -{ - channel_t *ch; - char *c; - - c = strrchr(url, '/'); - if(c != NULL && c[1] == 0) { - /* remove trailing slash */ - *c = 0; - c = strrchr(url, '/'); - } - - if(c == NULL || c[1] == 0 || (url != c && c[-1] == '/')) - return NULL; - c++; - - RB_FOREACH(ch, &channel_name_tree, ch_name_link) - if(!strcasecmp(ch->ch_sname, c)) - return ch; - - return NULL; -} - -/* - * Called when a subscription gets/loses access to a transport - */ -static void -rtsp_subscription_callback(struct th_subscription *s, - subscription_event_t event, void *opaque) -{ - rtsp_session_t *rs = opaque; - - switch(event) { - case TRANSPORT_AVAILABLE: - assert(rs->rs_muxer == NULL); - rs->rs_muxer = ts_muxer_init(s, rtp_output_ts, &rs->rs_rtp_streamer, - TS_SEEK, 0); - break; - - case TRANSPORT_UNAVAILABLE: - assert(rs->rs_muxer != NULL); - ts_muxer_deinit(rs->rs_muxer, s); - rs->rs_muxer = NULL; - break; - } -} - -/** - * Create an RTSP session - */ -static rtsp_session_t * -rtsp_session_create(channel_t *ch, struct sockaddr_in *dst) -{ - rtsp_session_t *rs; - uint32_t id; - struct sockaddr_in sin; - socklen_t slen; - int max_tries = 100; - - /* generate a random id (but make sure we do not collide) */ - - do { - id = av_random(&rtsp_rnd); - id &= 0x7fffffffUL; /* dont want any signed issues */ - - LIST_FOREACH(rs, &rtsp_sessions, rs_global_link) - if(rs->rs_id == id) - break; - } while(rs != NULL); - - rs = calloc(1, sizeof(rtsp_session_t)); - rs->rs_id = id; - - while(--max_tries) { - rs->rs_fd[0] = socket(AF_INET, SOCK_DGRAM, 0); - - memset(&sin, 0, sizeof(struct sockaddr_in)); - sin.sin_family = AF_INET; - - if(bind(rs->rs_fd[0], (struct sockaddr *)&sin, sizeof(sin)) == -1) { - close(rs->rs_fd[0]); - free(rs); - return NULL; - } - - slen = sizeof(struct sockaddr_in); - getsockname(rs->rs_fd[0], (struct sockaddr *)&sin, &slen); - - rs->rs_server_port[0] = ntohs(sin.sin_port); - rs->rs_server_port[1] = rs->rs_server_port[0] + 1; - - sin.sin_port = htons(rs->rs_server_port[1]); - - rs->rs_fd[1] = socket(AF_INET, SOCK_DGRAM, 0); - - if(bind(rs->rs_fd[1], (struct sockaddr *)&sin, sizeof(sin)) == -1) { - close(rs->rs_fd[0]); - close(rs->rs_fd[1]); - continue; - } - - LIST_INSERT_HEAD(&rtsp_sessions, rs, rs_global_link); - - rs->rs_s = subscription_create(ch, 600, "RTSP", - rtsp_subscription_callback, rs, 0); - - /* Initialize RTP */ - - rtp_streamer_init(&rs->rs_rtp_streamer, rs->rs_fd[0], dst); - return rs; - } - - free(rs); - return NULL; -} - -/** - * Destroy an RTSP session - */ -static void -rtsp_session_destroy(rtsp_session_t *rs) -{ - subscription_unsubscribe(rs->rs_s); /* will call subscription_callback - with TRANSPORT_UNAVAILABLE if - we are hooked on a transport */ - close(rs->rs_fd[0]); - close(rs->rs_fd[1]); - LIST_REMOVE(rs, rs_global_link); - LIST_REMOVE(rs, rs_con_link); - - free(rs); -} - - - /* * RTSP return code to string */ @@ -235,301 +153,442 @@ rtsp_err2str(int err) case RTSP_STATUS_TRANSPORT: return "Unsupported transport"; case RTSP_STATUS_INTERNAL: return "Internal Server Error"; case RTSP_STATUS_SERVICE: return "Service Unavailable"; + case 403: return "Permission denied"; + case 459: return "Aggregate operation not allowed"; default: - return "Unknown Error"; - break; + return "Error"; } } /* * Return an error */ -static void -rtsp_reply_error(http_connection_t *hc, int error, const char *errstr) +static int +rtsp_error(http_connection_t *hc, int error, const char *errstr) { char *c; if(errstr == NULL) errstr = rtsp_err2str(error); - tvhlog(LOG_INFO, "rtsp", "%s", errstr); + tvhlog(LOG_ERR, "RTSP", "%s: %s -- %s", + inet_ntoa(hc->hc_peer->sin_addr), hc->hc_url_orig, errstr); - rtsp_printf(hc, "RTSP/1.0 %d %s\r\n", error, errstr); + htsbuf_qprintf(&hc->hc_reply, "RTSP/1.0 %d %s\r\n", error, errstr); if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) - rtsp_printf(hc, "CSeq: %s\r\n", c); + htsbuf_qprintf(&hc->hc_reply, "CSeq: %s\r\n", c); if(error == HTTP_STATUS_UNAUTHORIZED) - rtsp_printf(hc, "WWW-Authenticate: Basic realm=\"tvheadend\"\r\n"); - rtsp_printf(hc, "\r\n"); + htsbuf_qprintf(&hc->hc_reply, + "WWW-Authenticate: Basic realm=\"tvheadend\"\r\n"); + htsbuf_qprintf(&hc->hc_reply, "\r\n"); + return 0; } -/* - * Find a session pointed do by the current connection +/** + * */ -static rtsp_session_t * +static void * +rtsp_tcp_thread(void *aux) +{ + rtsp_t *rtsp = aux; + rtsp_packet_t *rp; + + pthread_mutex_lock(&rtsp->rtsp_mutex); + + while(1) { + + if(!rtsp->rtsp_run_thread) + break; + + if((rp = TAILQ_FIRST(&rtsp->rtsp_pqueue)) == NULL) { + pthread_cond_wait(&rtsp->rtsp_cond, &rtsp->rtsp_mutex); + continue; + } + + TAILQ_REMOVE(&rtsp->rtsp_pqueue, rp, rp_link); + rtsp->rtsp_pqueue_size -= rp->rp_payloadsize; + + pthread_mutex_unlock(&rtsp->rtsp_mutex); + + if(write(rtsp->rtsp_tcp_socket, rp->rp_payload, rp->rp_payloadsize)) {} + free(rp); + + pthread_mutex_lock(&rtsp->rtsp_mutex); + } + + pthread_mutex_unlock(&rtsp->rtsp_mutex); + return NULL; +} + + +/** + * + */ +static void +start_tcp_writer(rtsp_t *rtsp, int fd) +{ + if(rtsp->rtsp_run_thread) + return; + + rtsp->rtsp_tcp_socket = fd; + rtsp->rtsp_run_thread = 1; + pthread_create(&rtsp->rtsp_thread, NULL, rtsp_tcp_thread, rtsp); +} + + +/** + * + */ +static void +rtsp_stream_teardown(rtsp_stream_t *rs) +{ + if(rs->rs_send_sock != -1) + close(rs->rs_send_sock); + + if(rs->rs_recv_sock != -1) + close(rs->rs_recv_sock); + + LIST_REMOVE(rs, rs_link); + free(rs); +} + + +/** + * + */ +static void +rtsp_destroy_unref(rtsp_t *rtsp) +{ + rtsp_stream_t *rs; + rtsp_packet_t *rp; + + lock_assert(&global_lock); + + if(rtsp->rtsp_refcount > 1) { + rtsp->rtsp_refcount--; + return; + } + + if(rtsp->rtsp_sub != NULL) + subscription_unsubscribe(rtsp->rtsp_sub); + + if(rtsp->rtsp_ss != NULL) + streaming_start_unref(rtsp->rtsp_ss); + + while((rs = LIST_FIRST(&rtsp->rtsp_streams)) != NULL) + rtsp_stream_teardown(rs); + + if(rtsp->rtsp_run_thread) { + rtsp->rtsp_run_thread = 0; + pthread_cond_signal(&rtsp->rtsp_cond); + + pthread_join(rtsp->rtsp_thread, NULL); + } + + while((rp = TAILQ_FIRST(&rtsp->rtsp_pqueue)) != NULL) { + TAILQ_REMOVE(&rtsp->rtsp_pqueue, rp, rp_link); + free(rp); + } + + free(rtsp); +} + + + +/** + * + */ +static int +genrand32(uint8_t *r) +{ + int fd, n; + + if((fd = open("/dev/urandom", O_RDONLY)) < 0) + return -1; + + n = read(fd, r, 32); + close(fd); + return n != 32; +} + + +/** + * + */ +static rtsp_t * rtsp_get_session(http_connection_t *hc) { - char *ses; - int sesid; - rtsp_session_t *rs; - channel_t *ch; + rtsp_t *rtsp; + uint8_t r[32]; + int i; - if((ch = rtsp_channel_by_url(hc->hc_url)) == NULL) { - rtsp_reply_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); + if(hc->hc_rtsp_session != NULL) + return hc->hc_rtsp_session; + + rtsp = hc->hc_rtsp_session = calloc(1, sizeof(rtsp_t)); + + if(genrand32(r)) return NULL; + + for(i = 0; i < 32; i++) + sprintf(rtsp->rtsp_session_id + i * 2, "%02x", r[i]); + + TAILQ_INIT(&rtsp->rtsp_pqueue); + pthread_cond_init(&rtsp->rtsp_cond, NULL); + pthread_mutex_init(&rtsp->rtsp_mutex, NULL); + return rtsp; +} + +/** + * Return 0 if OK, ~0 if fail + */ +static int +rtsp_check_session(http_connection_t *hc, rtsp_t *rtsp, int none_is_ok) +{ + char *ses = http_arg_get(&hc->hc_args, "session"); + + if(none_is_ok && ses == NULL) + return 0; + + return ses == NULL || strcmp(rtsp->rtsp_session_id, ses); +} + +/** + * + */ +static void +rtsp_unsubscribe(rtsp_t *rtsp) +{ + if(rtsp->rtsp_sub != NULL) { + subscription_unsubscribe(rtsp->rtsp_sub); + rtsp->rtsp_sub = NULL; } +} - if((ses = http_arg_get(&hc->hc_args, "session")) == NULL) { - rtsp_reply_error(hc, RTSP_STATUS_SESSION, NULL); - return NULL; - } +/** + * + */ +static void +rtsp_send_tcp(void *opaque, void *buf, size_t len) +{ + rtsp_stream_t *rs = opaque; + rtsp_t *rtsp = rs->rs_rtsp; + rtsp_packet_t *rp = malloc(sizeof(rtsp_packet_t) + len + 4); - sesid = atoi(ses); - LIST_FOREACH(rs, &rtsp_sessions, rs_global_link) - if(rs->rs_id == sesid) + rp->rp_payload[0] = '$'; + rp->rp_payload[1] = rs->rs_interleaved_stream; + rp->rp_payload[2] = len >> 8; + rp->rp_payload[3] = len; + + memcpy(rp->rp_payload + 4, buf, len); + rp->rp_payloadsize = len + 4; + + pthread_mutex_lock(&rtsp->rtsp_mutex); + TAILQ_INSERT_TAIL(&rtsp->rtsp_pqueue, rp, rp_link); + rtsp->rtsp_pqueue_size += rp->rp_payloadsize; + pthread_cond_signal(&rtsp->rtsp_cond); + pthread_mutex_unlock(&rtsp->rtsp_mutex); +} + + +/** + * + */ +static void +rtsp_send_udp(void *opaque, void *buf, size_t len) +{ + rtsp_stream_t *rs = opaque; + if(write(rs->rs_send_sock, buf, len)) {} +} + + + + +/** + * + */ +static void +rtsp_streaming_send(rtsp_t *rtsp, th_pkt_t *pkt) +{ + rtsp_stream_t *rs; + + LIST_FOREACH(rs, &rtsp->rtsp_streams, rs_link) + if(rs->rs_index == pkt->pkt_componentindex) break; - - if(rs == NULL) - rtsp_reply_error(hc, RTSP_STATUS_SESSION, NULL); - - return rs; -} - -/* - * RTSP PLAY - */ - -static void -rtsp_cmd_play(http_connection_t *hc) -{ - char *c; - int64_t start; - rtsp_session_t *rs; - - rs = rtsp_get_session(hc); if(rs == NULL) return; - if((c = http_arg_get(&hc->hc_args, "range")) != NULL) { - start = AV_NOPTS_VALUE; - } else { - start = AV_NOPTS_VALUE; + switch(rs->rs_type) { + case SCT_MPEG2VIDEO: + rtp_send_mpv(rs->rs_sender, rs, &rs->rs_rtp, pkt->pkt_payload, + pkt->pkt_payloadlen, pkt->pkt_pts); + break; } - - if(rs->rs_muxer == NULL) { - rtsp_reply_error(hc, RTSP_STATUS_SERVICE, - "No muxer attached (missing SETUP ?)"); - return; - } - - ts_muxer_play(rs->rs_muxer, start); - - if(rs->rs_s->ths_channel != NULL) - tvhlog(LOG_INFO, "rtsp", - "%s: Starting playback of %s", - hc->hc_peername, rs->rs_s->ths_channel->ch_name); - - rtsp_printf(hc, - "RTSP/1.0 200 OK\r\n" - "Session: %u\r\n", - rs->rs_id); - - if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) - rtsp_printf(hc, "CSeq: %s\r\n", c); - - rtsp_printf(hc, "\r\n"); } -/* - * RTSP PAUSE + + + +/** + * */ - static void -rtsp_cmd_pause(http_connection_t *hc) +rtsp_streaming_input(void *opaque, streaming_message_t *sm) { - char *c; - rtsp_session_t *rs; - - rs = rtsp_get_session(hc); - if(rs == NULL) - return; + rtsp_t *rtsp = opaque; - if(rs->rs_muxer == NULL) { - rtsp_reply_error(hc, RTSP_STATUS_SERVICE, - "No muxer attached (missing SETUP ?)"); - return; + switch(sm->sm_type) { + case SMT_START: + assert(rtsp->rtsp_ss == NULL); + rtsp->rtsp_ss = sm->sm_data; + sm->sm_data = NULL; // steal reference + break; + + case SMT_STOP: + assert(rtsp->rtsp_ss != NULL); + streaming_start_unref(rtsp->rtsp_ss); + rtsp->rtsp_ss = NULL; + break; + + case SMT_PACKET: + if(rtsp->rtsp_running) + rtsp_streaming_send(rtsp, sm->sm_data); + pkt_ref_dec(sm->sm_data); + break; + + case SMT_TRANSPORT_STATUS: + break; + + case SMT_NOSOURCE: + break; + + default: + abort(); } - - ts_muxer_pause(rs->rs_muxer); - - if(rs->rs_s->ths_channel != NULL) - tvhlog(LOG_INFO, "rtsp", - "%s: Pausing playback of %s", - hc->hc_peername, rs->rs_s->ths_channel->ch_name); - - rtsp_printf(hc, - "RTSP/1.0 200 OK\r\n" - "Session: %u\r\n", - rs->rs_id); - - if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) - rtsp_printf(hc, "CSeq: %s\r\n", c); - - rtsp_printf(hc, "\r\n"); + streaming_msg_free(sm); } -/* - * RTSP SETUP + +/** + * */ - static void -rtsp_cmd_setup(http_connection_t *hc) +rtsp_subscribe(rtsp_t *rtsp, rtsp_resource_t *rr) { - char *transports[10]; - char *params[10]; - char *avp[2]; - char *ports[2]; - char *t, *c; - int nt, i, np, j, navp, nports, ismulticast; - int client_ports[2]; - rtsp_session_t *rs; - channel_t *ch; - struct sockaddr_in dst; - - if((ch = rtsp_channel_by_url(hc->hc_url)) == NULL) { - rtsp_reply_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); - return; - } + th_subscription_t *s = NULL; - client_ports[0] = 0; - client_ports[1] = 0; - - if((t = http_arg_get(&hc->hc_args, "transport")) == NULL) { - rtsp_reply_error(hc, RTSP_STATUS_TRANSPORT, NULL); - return; - } - - nt = http_tokenize(t, transports, 10, ','); - - /* Select a transport we can accept */ - - for(i = 0; i < nt; i++) { - np = http_tokenize(transports[i], params, 10, ';'); - - if(np == 0) - continue; - if(strcasecmp(params[0], "RTP/AVP/UDP") && - strcasecmp(params[0], "RTP/AVP")) - continue; - - ismulticast = 1; /* multicast is default according to RFC */ - client_ports[0] = 0; - client_ports[1] = 0; - - - for(j = 1; j < np; j++) { - if((navp = http_tokenize(params[j], avp, 2, '=')) == 0) - continue; - - if(navp == 1 && !strcmp(avp[0], "unicast")) { - ismulticast = 0; - } else if(navp == 2 && !strcmp(avp[0], "client_port")) { - nports = http_tokenize(avp[1], ports, 2, '-'); - if(nports > 0) client_ports[0] = atoi(ports[0]); - if(nports > 1) client_ports[1] = atoi(ports[1]); - } + if(rtsp->rtsp_sub != NULL) { + + switch(rr->rr_type) { + case RTSP_RESOURCE_CHANNEL: + if(rtsp->rtsp_sub->ths_channel == rr->rr_channel) + return; + break; + case RTSP_RESOURCE_SERVICE: + if(rtsp->rtsp_sub->ths_transport == rr->rr_service) + return; } - if(!ismulticast && client_ports[0] && client_ports[1]) - break; + subscription_unsubscribe(rtsp->rtsp_sub); } - if(i == nt) { - /* couldnt find a suitable transport */ - rtsp_reply_error(hc, RTSP_STATUS_TRANSPORT, NULL); - return; - } - - memcpy(&dst, hc->hc_peer, sizeof(struct sockaddr_in)); - dst.sin_port = htons(client_ports[0]); - - if((rs = rtsp_session_create(ch, &dst)) == NULL) { - rtsp_reply_error(hc, RTSP_STATUS_INTERNAL, NULL); - return; - } - - LIST_INSERT_HEAD(&hc->hc_rtsp_sessions, rs, rs_con_link); - - rtsp_printf(hc, - "RTSP/1.0 200 OK\r\n" - "Session: %u\r\n" - "Transport: RTP/AVP/UDP;unicast;client_port=%d-%d;" - "server_port=%d-%d\r\n", - rs->rs_id, - client_ports[0], - client_ports[1], - rs->rs_server_port[0], - rs->rs_server_port[1]); - - if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) - rtsp_printf(hc, "CSeq: %s\r\n", c); - rtsp_printf(hc, "\r\n"); + + + streaming_target_init(&rtsp->rtsp_input, rtsp_streaming_input, rtsp); + + switch(rr->rr_type) { + case RTSP_RESOURCE_CHANNEL: + s = subscription_create_from_channel(rr->rr_channel, 500, "RTSP", + &rtsp->rtsp_input); + break; + case RTSP_RESOURCE_SERVICE: + s = subscription_create_from_transport(rr->rr_service, "RTSP", + &rtsp->rtsp_input); + break; + } + + rtsp->rtsp_sub = s; } - -/* - * RTSP DESCRIBE +/** + * Resolve an URL into a resource */ - -static void -rtsp_cmd_describe(http_connection_t *hc) +static int +rtsp_resolve_url(char *url, rtsp_resource_t *rr, char **remainp) { - char sdpreply[1000]; - channel_t *ch; - char *c; + char *components[5]; + void *r; + int nc; - if((ch = rtsp_channel_by_url(hc->hc_url)) == NULL) { - rtsp_reply_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); - return; + if(!strncasecmp(url, "rtsp://", strlen("rtsp://"))) { + url += strlen("rtsp://"); + url = strchr(url, '/'); + if(url == NULL) + return -1; + url++; } - - snprintf(sdpreply, sizeof(sdpreply), - "v=0\r\n" - "o=- 0 0 IN IPV4 127.0.0.1\r\n" - "s=%s\r\n" - "a=tool:hts tvheadend\r\n" - "m=video 0 RTP/AVP 33\r\n", - ch->ch_name); - - - rtsp_printf(hc, - "RTSP/1.0 200 OK\r\n" - "Content-Type: application/sdp\r\n" - "Content-Length: %d\r\n", - strlen(sdpreply)); - - if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) - rtsp_printf(hc, "CSeq: %s\r\n", c); - rtsp_printf(hc, "\r\n%s", sdpreply); + nc = http_tokenize(url, components, 5, '/'); + + if(nc < 2 || nc > 3) + return -1; + + http_deescape(components[1]); + + if(!strcmp(components[0], "channel")) { + rr->rr_type = RTSP_RESOURCE_CHANNEL; + r = channel_find_by_name(components[1], 0); + + } else if(!strcmp(components[0], "channelid")) { + rr->rr_type = RTSP_RESOURCE_CHANNEL; + r = channel_find_by_identifier(atoi(components[1])); + + } else if(!strcmp(components[0], "service")) { + rr->rr_type = RTSP_RESOURCE_SERVICE; + r = transport_find_by_identifier(components[1]); + + } else { + return -1; + } + + if(r == NULL) + return -1; + rr->rr_r = r; + + if(remainp == NULL) + return 0; + + if(nc == 3) { + http_deescape(components[2]); + *remainp = components[2]; + } else { + *remainp = NULL; + } + return 0; } -/* +/** * RTSP OPTIONS */ -static void +static int rtsp_cmd_options(http_connection_t *hc) { char *c; + rtsp_resource_t rr; - if(strcmp(hc->hc_url, "*") && rtsp_channel_by_url(hc->hc_url) == NULL) { - rtsp_reply_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); - return; + pthread_mutex_lock(&global_lock); + + if(strcmp(hc->hc_url, "*") && rtsp_resolve_url(hc->hc_url, &rr, NULL)) { + rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); + pthread_mutex_unlock(&global_lock); + return 0; } + pthread_mutex_unlock(&global_lock); rtsp_printf(hc, "RTSP/1.0 200 OK\r\n" @@ -538,54 +597,393 @@ rtsp_cmd_options(http_connection_t *hc) if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) rtsp_printf(hc, "CSeq: %s\r\n", c); rtsp_printf(hc, "\r\n"); + return 0; } -/* - * RTSP TEARDOWN - */ -static void -rtsp_cmd_teardown(http_connection_t *hc) -{ - rtsp_session_t *rs; - char *c; - if((rs = rtsp_get_session(hc)) == NULL) - return; +/** + * Get stream by index + */ +static const streaming_start_component_t * +get_ssc_by_index(rtsp_t *rtsp, int wanted_idx) +{ + const streaming_start_t *ss = rtsp->rtsp_ss; + int i; + for(i = 0; i < ss->ss_num_components; i++) + if(ss->ss_components[i].ssc_index == wanted_idx) + return &ss->ss_components[i]; + return NULL; +} + + + + + + +/** + * RTSP DESCRIBE + */ +static int +rtsp_cmd_describe(http_connection_t *hc, rtsp_t *rtsp) +{ + char sdp[1000]; + char *c; + const char *str; + rtsp_resource_t rr; + extern const char *htsversion; + const streaming_start_t *ss; + int i; + + pthread_mutex_lock(&global_lock); + + if(rtsp_resolve_url(hc->hc_url, &rr, NULL)) { + rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); + pthread_mutex_unlock(&global_lock); + return 0; + } + + rtsp_subscribe(rtsp, &rr); + + if((ss = rtsp->rtsp_ss) == NULL) { + rtsp_error(hc, RTSP_STATUS_SERVICE, "No source available"); + rtsp_unsubscribe(rtsp); + pthread_mutex_unlock(&global_lock); + return 0; + } + + pthread_mutex_unlock(&global_lock); + + if(rr.rr_type == RTSP_RESOURCE_CHANNEL) + str = rr.rr_channel->ch_name; + else + str = rr.rr_service->tht_identifier; + + snprintf(sdp, sizeof(sdp), + "v=0\r\n" + "o=- 0 0 IN IPV4 127.0.0.1\r\n" + "s=%s\r\n" + "a=tool:HTS Tvheadend %s\r\n", + str, htsversion); + + for(i = 0; i < ss->ss_num_components; i++) { + const streaming_start_component_t *ssc = &ss->ss_components[i]; + + switch(ssc->ssc_type) { + case SCT_MPEG2VIDEO: + tvh_strlcatf(sdp, sizeof(sdp), + "m=video 0 RTP/AVP 32\r\n" + "a=control:streamid=%d\r\n", ssc->ssc_index); + break; + } + } rtsp_printf(hc, "RTSP/1.0 200 OK\r\n" - "Session: %u\r\n", - rs->rs_id); + "Content-Type: application/sdp\r\n" + "Content-Length: %d\r\n", + strlen(sdp)); + + if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) + rtsp_printf(hc, "CSeq: %s\r\n", c); + + rtsp_printf(hc, "\r\n%s", sdp); + return 0; +} + +/** + * Create rs stream from streaming_start_componen + */ +static rtsp_stream_t * +rs_by_ssc(rtsp_t *rtsp, const streaming_start_component_t *ssc) +{ + rtsp_stream_t *rs = calloc(1, sizeof(rtsp_stream_t)); + rs->rs_rtsp = rtsp; + rs->rs_index = ssc->ssc_index; + rs->rs_type = ssc->ssc_type; + return rs; +} + + +/** + * Setup a TCP stream + */ +static int +rtsp_setup_tcp(http_connection_t *hc, rtsp_t *rtsp, + const streaming_start_component_t *ssc, + char *params[], int np) +{ + rtsp_stream_t *rs; + int navp; + char *avp[2]; + int nvalues; + char *values[2]; + int interleaved[2]; + int i; + const char *c; + + interleaved[0] = -1; + interleaved[1] = -1; + + for(i = 1; i < np; i++) { + if((navp = http_tokenize(params[i], avp, 2, '=')) == 0) + continue; + + if(navp == 2 && !strcmp(avp[0], "interleaved")) { + nvalues = http_tokenize(avp[1], values, 2, '-'); + if(nvalues > 0) interleaved[0] = atoi(values[0]); + if(nvalues > 1) interleaved[1] = atoi(values[1]); + } + } + + if(interleaved[0] == -1 || interleaved[1] == -1) + return rtsp_error(hc, RTSP_STATUS_TRANSPORT, + "No interleaved values selected by client"); + + rs = rs_by_ssc(rtsp, ssc); + + rs->rs_interleaved_stream = interleaved[0]; + rs->rs_sender = rtsp_send_tcp; + + rs->rs_send_sock = -1; + rs->rs_recv_sock = -1; + + LIST_INSERT_HEAD(&rtsp->rtsp_streams, rs, rs_link); + + start_tcp_writer(rtsp, hc->hc_fd); + + rtsp_printf(hc, + "RTSP/1.0 200 OK\r\n" + "Session: %s\r\n" + "Transport: RTP/AVP/TCP;interleaved=%d-%d\r\n", + rtsp->rtsp_session_id, + interleaved[0], interleaved[1]); + + if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) + rtsp_printf(hc, "CSeq: %s\r\n", c); + + rtsp_printf(hc, "\r\n"); + return 0; +} + + +/** + * Setup an UDP stream + */ +static int +rtsp_setup_udp(http_connection_t *hc, rtsp_t *rtsp, + const streaming_start_component_t *ssc, + char *params[], int np) +{ + const char *c; + int i; + int ismulticast = 1; /* multicast is default according to RFC */ + int navp; + char *avp[2]; + int nports; + char *ports[2]; + int client_ports[2]; + int server_ports[2]; + struct sockaddr_in dst; + int attempt = 0; + rtsp_stream_t *rs; + socklen_t slen; + struct sockaddr_in sin; + int fd[2]; + + client_ports[0] = 0; + client_ports[1] = 0; + + for(i = 1; i < np; i++) { + if((navp = http_tokenize(params[i], avp, 2, '=')) == 0) + continue; + + if(navp == 1 && !strcmp(avp[0], "unicast")) { + ismulticast = 0; + + } else if(navp == 2 && !strcmp(avp[0], "client_port")) { + nports = http_tokenize(avp[1], ports, 2, '-'); + if(nports > 0) client_ports[0] = atoi(ports[0]); + if(nports > 1) client_ports[1] = atoi(ports[1]); + } + } + + if(ismulticast) + return rtsp_error(hc, RTSP_STATUS_TRANSPORT, "Multicast is not supported"); + + if(!client_ports[0] || !client_ports[1]) + return rtsp_error(hc, RTSP_STATUS_TRANSPORT, + "No UDP ports selected by client"); + + + retry: + fd[0] = socket(AF_INET, SOCK_DGRAM, 0); + + memset(&sin, 0, sizeof(struct sockaddr_in)); + sin.sin_family = AF_INET; + + if(bind(fd[0], (struct sockaddr *)&sin, sizeof(sin)) == -1) { + close(fd[0]); + // XXX log + return rtsp_error(hc, RTSP_STATUS_TRANSPORT, NULL); + } + + slen = sizeof(struct sockaddr_in); + getsockname(fd[0], (struct sockaddr *)&sin, &slen); + + server_ports[0] = ntohs(sin.sin_port); + server_ports[1] = server_ports[0] + 1; + + sin.sin_port = htons(server_ports[1]); + + fd[1] = socket(AF_INET, SOCK_DGRAM, 0); + + if(bind(fd[1], (struct sockaddr *)&sin, sizeof(sin)) == -1) { + close(fd[0]); + close(fd[1]); + + attempt++; + if(attempt == 100) { + // XXX log + return rtsp_error(hc, RTSP_STATUS_TRANSPORT, NULL); + } + goto retry; + } + + memcpy(&dst, hc->hc_peer, sizeof(struct sockaddr_in)); + dst.sin_port = htons(client_ports[0]); + + if(connect(fd[0], (struct sockaddr *)&dst, sizeof(dst))) { + close(fd[0]); + close(fd[1]); + // XXX log + return rtsp_error(hc, RTSP_STATUS_TRANSPORT, NULL); + } + + rs = rs_by_ssc(rtsp, ssc); + rs->rs_send_sock = fd[0]; + rs->rs_recv_sock = fd[1]; + rs->rs_sender = rtsp_send_udp; + + LIST_INSERT_HEAD(&rtsp->rtsp_streams, rs, rs_link); + + rtsp_printf(hc, + "RTSP/1.0 200 OK\r\n" + "Session: %s\r\n" + "Transport: RTP/AVP/UDP;unicast;client_port=%d-%d;" + "server_port=%d-%d\r\n", + rtsp->rtsp_session_id, + client_ports[0], + client_ports[1], + server_ports[0], + server_ports[1]); + + if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) + rtsp_printf(hc, "CSeq: %s\r\n", c); + rtsp_printf(hc, "\r\n"); + return 0; +} + + +/** + * RTSP SETUP + */ +static int +rtsp_cmd_setup(http_connection_t *hc, rtsp_t *rtsp) +{ + rtsp_resource_t rr; + char *transports[10]; + char *params[10]; + char *t; + int nt, i, np; + int streamid; + char *remain; + const streaming_start_component_t *ssc; + + if(rtsp_check_session(hc, rtsp, 1)) + return rtsp_error(hc, 459, NULL); + + pthread_mutex_lock(&global_lock); + + if(rtsp_resolve_url(hc->hc_url, &rr, &remain)) { + rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); + pthread_mutex_unlock(&global_lock); + return 0; + } + + if(remain == NULL || strncmp(remain, "streamid=", strlen("streamid="))) { + rtsp_error(hc, RTSP_STATUS_SERVICE, "No streamid component in URL"); + pthread_mutex_unlock(&global_lock); + return 0; + } + + + streamid = atoi(remain + strlen("streamid=")); + if((ssc = get_ssc_by_index(rtsp, streamid)) == NULL) { + pthread_mutex_unlock(&global_lock); + return rtsp_error(hc, RTSP_STATUS_SERVICE, "Stream not found"); + } + + rtsp_subscribe(rtsp, &rr); + pthread_mutex_unlock(&global_lock); + + if(rtsp->rtsp_ss == NULL) { + rtsp_error(hc, RTSP_STATUS_SERVICE, "No source available"); + rtsp_unsubscribe(rtsp); + return 0; + } + + if((t = http_arg_get(&hc->hc_args, "transport")) == NULL) { + rtsp_error(hc, RTSP_STATUS_TRANSPORT, NULL); + return 0; + } + + nt = http_tokenize(t, transports, 10, ','); + + /* Select a transport we can accept */ + + for(i = 0; i < nt; i++) { + np = http_tokenize(transports[i], params, 10, ';'); + if(np == 0) + continue; + + if(!strcasecmp(params[0], "RTP/AVP/UDP") || + !strcasecmp(params[0], "RTP/AVP")) + return rtsp_setup_udp(hc, rtsp, ssc, params, np); + + if(!strcasecmp(params[0], "RTP/AVP/TCP")) + return rtsp_setup_tcp(hc, rtsp, ssc, params, np); + } + + rtsp_error(hc, RTSP_STATUS_TRANSPORT, NULL); + return 0; +} + + +/** + * RTSP PLAY + */ +static int +rtsp_cmd_play(http_connection_t *hc, rtsp_t *rtsp) +{ + char *c; + + if(rtsp_check_session(hc, rtsp, 0)) + return rtsp_error(hc, RTSP_STATUS_SERVICE, "Invalid session ID"); + + rtsp->rtsp_running = 1; + + rtsp_printf(hc, + "RTSP/1.0 200 OK\r\n" + "Session: %s\r\n", + rtsp->rtsp_session_id); if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) rtsp_printf(hc, "CSeq: %s\r\n", c); rtsp_printf(hc, "\r\n"); - - rtsp_session_destroy(rs); + return 0; } - -/** - * - */ -static int -rtsp_access_list(http_connection_t *hc) -{ - int x; - - if(hc->hc_authenticated) - return 0; - - x = access_verify(hc->hc_username, hc->hc_password, - (struct sockaddr *)hc->hc_peer, - ACCESS_STREAMING); - if(x == 0) - hc->hc_authenticated = 1; - - return x; -} - - + /* * RTSP connection state machine & parser @@ -593,35 +991,45 @@ rtsp_access_list(http_connection_t *hc) int rtsp_process_request(http_connection_t *hc) { - if(rtsp_access_list(hc)) { - rtsp_reply_error(hc, RTSP_STATUS_UNAUTHORIZED, NULL); + int r; + if(0 /* rtsp_access_list(hc) */) { + rtsp_error(hc, RTSP_STATUS_UNAUTHORIZED, NULL); + r = 0; } else { + rtsp_t *rtsp = rtsp_get_session(hc); + switch(hc->hc_cmd) { default: - rtsp_reply_error(hc, RTSP_STATUS_METHOD, NULL); - break; - case RTSP_CMD_DESCRIBE: - rtsp_cmd_describe(hc); - break; - case RTSP_CMD_SETUP: - rtsp_cmd_setup(hc); - break; - case RTSP_CMD_PLAY: - rtsp_cmd_play(hc); - break; - case RTSP_CMD_PAUSE: - rtsp_cmd_pause(hc); + printf("COMMAND: %d\n", hc->hc_cmd); + + rtsp_error(hc, RTSP_STATUS_METHOD, NULL); + r = 0; break; case RTSP_CMD_OPTIONS: - rtsp_cmd_options(hc); + r = rtsp_cmd_options(hc); + break; + case RTSP_CMD_DESCRIBE: + rtsp_cmd_describe(hc, rtsp); + break; + case RTSP_CMD_SETUP: + rtsp_cmd_setup(hc, rtsp); + break; + case RTSP_CMD_PLAY: + rtsp_cmd_play(hc, rtsp); + break; +#if 0 + case RTSP_CMD_PAUSE: + rtsp_cmd_pause(hc); break; case RTSP_CMD_TEARDOWN: rtsp_cmd_teardown(hc); break; +#endif } } - tcp_write_queue(hc->hc_fd, &hc->hc_reply); + if(!r) + tcp_write_queue(hc->hc_fd, &hc->hc_reply); return 0; } @@ -632,17 +1040,10 @@ rtsp_process_request(http_connection_t *hc) void rtsp_disconncet(http_connection_t *hc) { - rtsp_session_t *rs; - - while((rs = LIST_FIRST(&hc->hc_rtsp_sessions)) != NULL) - rtsp_session_destroy(rs); -} - -/* - * - */ -void -rtsp_init(void) -{ - av_init_random(time(NULL), &rtsp_rnd); + printf("Disconnect..\n"); + pthread_mutex_lock(&global_lock); + if(hc->hc_rtsp_session != NULL) + rtsp_destroy_unref(hc->hc_rtsp_session); + pthread_mutex_unlock(&global_lock); + printf("Disconnect.. done\n"); } diff --git a/src/streaming.c b/src/streaming.c index 92225bbf..1c1c1f0f 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -186,8 +186,8 @@ streaming_msg_clone(streaming_message_t *src) /** * */ -static void -streaming_start_deref(streaming_start_t *ss) +void +streaming_start_unref(streaming_start_t *ss) { if((atomic_add(&ss->ss_refcount, -1)) != 1) return; @@ -209,7 +209,8 @@ streaming_msg_free(streaming_message_t *sm) break; case SMT_START: - streaming_start_deref(sm->sm_data); + if(sm->sm_data) + streaming_start_unref(sm->sm_data); break; case SMT_STOP: diff --git a/src/streaming.h b/src/streaming.h index 060971d7..af8e58c6 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -78,5 +78,7 @@ streaming_message_t *streaming_msg_create_code(streaming_message_type_t type, streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt); #define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm))) + +void streaming_start_unref(streaming_start_t *ss); #endif /* STREAMING_H_ */ diff --git a/src/tsdemux.c b/src/tsdemux.c index ce5aeabb..2c27de97 100644 --- a/src/tsdemux.c +++ b/src/tsdemux.c @@ -143,7 +143,7 @@ ts_recv_packet0(th_transport_t *t, th_stream_t *st, uint8_t *tsb) static void ts_extract_pcr(th_transport_t *t, th_stream_t *st, uint8_t *tsb) { - int64_t real = getclock_hires(); + int64_t real = getmonoclock(); int64_t pcr, d; pcr = tsb[6] << 25; diff --git a/src/tsmux.c b/src/tsmux.c index 60a0c786..014674c0 100644 --- a/src/tsmux.c +++ b/src/tsmux.c @@ -1,6 +1,6 @@ /* * tvheadend, MPEG Transport stream muxer - * Copyright (C) 2008 Andreas Öman + * Copyright (C) 2008 - 2009 Andreas Öman * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -15,6 +15,642 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ + +#include + +#include "tvhead.h" +#include "streaming.h" +#include "tsmux.h" + + +#define PID_PMT 1000 +#define PID_ES_BASE 2000 + +static const AVRational mpeg_tc = {1, 90000}; +static const AVRational mpeg_tc_27M = {1, 27000000}; + +LIST_HEAD(tsmuxer_es_list, tsmuxer_es); +TAILQ_HEAD(tsmuxer_pkt_queue, tsmuxer_pkt); + +/** + * + */ +typedef struct tsmuxer_pkt { + TAILQ_ENTRY(tsmuxer_pkt) tsp_link; + int64_t tsp_deadline; + int64_t tsp_dts; + int64_t tsp_pcr; + int tsp_contentsize; + + uint8_t tsp_payload[188]; + +} tsmuxer_pkt_t; + +/** + * + */ +typedef struct tsmuxer_fifo { + struct tsmuxer_pkt_queue tsf_queue; + int tsf_contentsize; + int tsf_len; + +} tsmuxer_fifo_t; + + + +/** + * TS Elementary stream + */ +typedef struct tsmuxer_es { + LIST_ENTRY(tsmuxer_es) te_link; + + int te_type; + + int te_input_index; + + char te_lang[4]; + + int te_pid; + int te_cc; + + int te_startcode; // Startcode to use for PES packetization + + struct streaming_message_queue te_smq; + int te_lookahead_depth; // bytes in te_smq + int te_lookahead_packets; // packets in te_smq + + int64_t te_mux_offset; + + int te_vbv_delay; + + tsmuxer_fifo_t te_delivery_fifo; + +} tsmuxer_es_t; + + + + +/** + * TS Multiplexer + */ +typedef struct tsmuxer { + + int tsm_run; + + struct tsmuxer_es_list tsm_eslist; + + int64_t tsm_pcr_start; + int64_t tsm_pcr_ref; + int64_t tsm_pcr_last; + + tsmuxer_es_t *tsm_pcr_stream; + + streaming_queue_t tsm_input; + + streaming_target_t *tsm_output; + + pthread_t tsm_thread; + +} tsmuxer_t; + + +/** + * + */ +static void +tmf_enq(tsmuxer_fifo_t *tsf, tsmuxer_pkt_t *tsp) +{ + /* record real content size */ + tsf->tsf_contentsize += tsp->tsp_contentsize; + + /* Enqueue packet */ + TAILQ_INSERT_TAIL(&tsf->tsf_queue, tsp, tsp_link); + tsf->tsf_len++; +} + +#if 0 +/** + * + */ +static void +tsf_remove(tsmuxer_fifo_t *tsf, tsmuxer_pkt_t *tsp) +{ + tsf->tsf_contentsize -= tsp->tsp_contentsize; + TAILQ_REMOVE(&tsf->tsf_queue, tsp, tsp_link); + tsf->tsf_len--; +} + +/** + * + */ +static tsmuxer_pkt_t * +tsf_deq(tsmuxer_fifo_t *tsf) +{ + tsmuxer_pkt_t *tm; + + tm = TAILQ_FIRST(&tsf->tsf_queue); + if(tm != NULL) + tsf_remove(tsf, tm); + return tm; +} +#endif + + +/** + * + */ +static void +tsf_init(tsmuxer_fifo_t *tsf) +{ + TAILQ_INIT(&tsf->tsf_queue); +} + + + +#if 0 +/** + * Check if we need to start delivery timer for the given stream + * + * Also, if it is the PCR stream and we're not yet runnig, figure out + * PCR and start generating packets + */ +static void +ts_check_deliver(tsmuxer_t *tsm, tsmuxer_es_t *te) +{ + int64_t now; + tsmuxer_fifo_t *tsf = &te->te_delivery_fifo; + int64_t next; + + if(dtimer_isarmed(&tms->tms_mux_timer)) + return; /* timer already running, we're fine */ + + assert(tms->tms_delivery_fifo.tmf_len != 0); + + now = getclock_hires(); + + if(ts->ts_pcr_ref == AV_NOPTS_VALUE) { + + if(ts->ts_pcr_start == AV_NOPTS_VALUE) + return; /* dont know anything yet */ + + ts->ts_pcr_ref = now - ts->ts_pcr_start + t->tht_pcr_drift; + } + + f = TAILQ_FIRST(&tmf->tmf_queue); /* next packet we are going to send */ + next = f->tm_deadline + ts->ts_pcr_ref - t->tht_pcr_drift; + + if(next < now + 100) + next = now + 100; + + dtimer_arm_hires(&tms->tms_mux_timer, ts_deliver, tms, next - 500); +} +#endif + + + +/** + * Generate TS packet, see comments inline + */ +static void +lookahead_dequeue(tsmuxer_t *tsm, tsmuxer_es_t *te) +{ + streaming_message_t *sm; + th_pkt_t *pkt; + tsmuxer_pkt_t *tsp; + uint8_t *tsb; + uint16_t u16; + int frrem, pad, tsrem, len, off, cc, hlen, flags; + int64_t t, tdur, toff, tlen, dts, pcr, basedelivery; + + + tlen = 0; + TAILQ_FOREACH(sm, &te->te_smq, sm_link) { + pkt = sm->sm_data; + tlen += pkt->pkt_payloadlen; + + if(pkt->pkt_pts != pkt->pkt_dts) { + tlen += 19; /* pes header with DTS and PTS */ + } else { + tlen += 14; /* pes header with PTS only */ + } + } + + if(tlen == 0) + return; + + sm = TAILQ_FIRST(&te->te_smq); + pkt = sm->sm_data; + toff = 0; + + /* XXX: assumes duration is linear, but it's probably ok */ + tdur = pkt->pkt_duration * te->te_lookahead_packets; + + if(te->te_mux_offset == AV_NOPTS_VALUE) { + if(te->te_vbv_delay == -1) + te->te_mux_offset = tdur / 2 - pkt->pkt_duration; + else + te->te_mux_offset = te->te_vbv_delay; + } + + if(tsm->tsm_pcr_start == AV_NOPTS_VALUE && tsm->tsm_pcr_stream == te) + tsm->tsm_pcr_start = pkt->pkt_dts - te->te_mux_offset; + + basedelivery = pkt->pkt_dts - te->te_mux_offset; + + while((sm = TAILQ_FIRST(&te->te_smq)) != NULL) { + + off = 0; + pkt = sm->sm_data; + + if(pkt->pkt_dts == pkt->pkt_pts) { + hlen = 8; + flags = 0x80; + } else { + hlen = 13; + flags = 0xc0; + } + + while(off < pkt->pkt_payloadlen) { + + tsp = malloc(sizeof(tsmuxer_pkt_t)); + tsp->tsp_deadline = basedelivery + tdur * toff / tlen; + + dts = (int64_t)pkt->pkt_duration * + (int64_t)off / (int64_t)pkt->pkt_payloadlen; + dts += pkt->pkt_dts; + + tsp->tsp_dts = dts; + tsb = tsp->tsp_payload; + + /* TS marker */ + *tsb++ = 0x47; + + /* Write PID and optionally payload unit start indicator */ + *tsb++ = te->te_pid >> 8 | (off ? 0 : 0x40); + *tsb++ = te->te_pid; + + cc = te->te_cc & 0xf; + te->te_cc++; + + /* Remaing bytes after 4 bytes of TS header */ + tsrem = 184; + + if(off == 0) { + /* When writing the packet header, shave of a bit of available + payload size */ + tsrem -= hlen + 6; + } + + /* Remaining length of frame */ + frrem = pkt->pkt_payloadlen - off; + + /* Compute amout of padding needed */ + pad = tsrem - frrem; + + pcr = tsp->tsp_deadline; + tsp->tsp_pcr = AV_NOPTS_VALUE; + + if(tsm->tsm_pcr_stream == te && tsm->tsm_pcr_last + 20000 < pcr) { + + tsp->tsp_pcr = pcr; + /* Insert PCR */ + + tlen += 8; /* compensate total length */ + tsrem -= 8; + pad -= 8; + if(pad < 0) + pad = 0; + + *tsb++ = 0x30 | cc; + *tsb++ = 7 + pad; + *tsb++ = 0x10; /* PCR flag */ + + t = av_rescale_q(pcr, AV_TIME_BASE_Q, mpeg_tc); + *tsb++ = t >> 25; + *tsb++ = t >> 17; + *tsb++ = t >> 9; + *tsb++ = t >> 1; + *tsb++ = (t & 1) << 7; + *tsb++ = 0; + + memset(tsb, 0xff, pad); + tsb += pad; + tsrem -= pad; + + tsm->tsm_pcr_last = pcr + 20000; + + } else if(pad > 0) { + /* Must pad TS packet */ + + *tsb++ = 0x30 | cc; + tsrem -= pad; + *tsb++ = --pad; + + memset(tsb, 0x00, pad); + tsb += pad; + } else { + *tsb++ = 0x10 | cc; + } + + + if(off == 0) { + /* Insert PES header */ + + /* Write startcode */ + *tsb++ = 0; + *tsb++ = 0; + *tsb++ = te->te_startcode >> 8; + *tsb++ = te->te_startcode; + + /* Write total frame length (without accounting for startcode and + length field itself */ + len = pkt->pkt_payloadlen + hlen; + + if(te->te_type == SCT_MPEG2VIDEO) { + /* It's okay to write len as 0 in transport streams, + but only for video frames, and i dont expect any of the + audio frames to exceed 64k + */ + len = 0; + } + + *tsb++ = len >> 8; + *tsb++ = len; + + *tsb++ = 0x80; /* MPEG2 */ + *tsb++ = flags; + *tsb++ = hlen - 3; /* length of rest of header (pts & dts) */ + + /* Write PTS */ + + if(flags == 0xc0) { + t = av_rescale_q(pkt->pkt_pts, AV_TIME_BASE_Q, mpeg_tc); + *tsb++ = (((t >> 30) & 7) << 1) | 1; + u16 = (((t >> 15) & 0x7fff) << 1) | 1; + *tsb++ = u16 >> 8; + *tsb++ = u16; + u16 = ((t & 0x7fff) << 1) | 1; + *tsb++ = u16 >> 8; + *tsb++ = u16; + } + + /* Write DTS */ + + t = av_rescale_q(pkt->pkt_dts, AV_TIME_BASE_Q, mpeg_tc); + *tsb++ = (((t >> 30) & 7) << 1) | 1; + u16 = (((t >> 15) & 0x7fff) << 1) | 1; + *tsb++ = u16 >> 8; + *tsb++ = u16; + u16 = ((t & 0x7fff) << 1) | 1; + *tsb++ = u16 >> 8; + *tsb++ = u16; + } + + memcpy(tsb, pkt->pkt_payload + off, tsrem); + + tsp->tsp_contentsize = tsrem; + + tmf_enq(&te->te_delivery_fifo, tsp); + + toff += tsrem; + off += tsrem; + + } + + te->te_lookahead_depth -= pkt->pkt_payloadlen; + te->te_lookahead_packets--; + pkt_ref_dec(pkt); + TAILQ_REMOVE(&te->te_smq, sm, sm_link); + + free(sm); + } + // ts_check_deliver(ts, tms); +} + + + +/** + * Packet input. + * + * We get the entire streaming message cause we might need to hold and + * enqueue the packet. So we can just reuse that allocation from now + * on + */ +static void +tsm_packet_input(tsmuxer_t *tsm, streaming_message_t *sm) +{ + tsmuxer_es_t *te; + th_pkt_t *pkt = sm->sm_data; + + LIST_FOREACH(te, &tsm->tsm_eslist, te_link) + if(te->te_input_index == pkt->pkt_componentindex) + break; + + if(te == NULL) { + // Stream not in use + streaming_msg_free(sm); + return; + } + + lookahead_dequeue(tsm, te); + + te->te_lookahead_depth += pkt->pkt_payloadlen; + te->te_lookahead_packets++; + + TAILQ_INSERT_TAIL(&te->te_smq, sm, sm_link); +} + + + +/** + * + */ +static void +te_destroy(tsmuxer_es_t *te) +{ + streaming_queue_clear(&te->te_smq); + LIST_REMOVE(te, te_link); + free(te); +} + + +/** + * + */ +static void +tsm_start(tsmuxer_t *tsm, const streaming_start_t *ss) +{ + int i, sc; + + tsmuxer_es_t *te; + + tsm->tsm_pcr_start = AV_NOPTS_VALUE; + tsm->tsm_pcr_ref = AV_NOPTS_VALUE; + tsm->tsm_pcr_last = INT64_MIN; + + + for(i = 0; i < ss->ss_num_components; i++) { + const streaming_start_component_t *ssc = &ss->ss_components[i]; + int dopcr = 0; + + switch(ssc->ssc_type) { + + case SCT_MPEG2VIDEO: + sc = 0x1e0; + dopcr = 1; + break; + + case SCT_MPEG2AUDIO: + sc = 0x1c0; + break; + + case SCT_AC3: + sc = 0x1bd; + break; + + case SCT_H264: + sc = 0x1e0; + dopcr = 1; + break; + default: + continue; + } + + te = calloc(1, sizeof(tsmuxer_es_t)); + tsf_init(&te->te_delivery_fifo); + memcpy(te->te_lang, ssc->ssc_lang, 4); + te->te_input_index = ssc->ssc_index; + te->te_type = ssc->ssc_type; + te->te_pid = PID_ES_BASE + i; + te->te_startcode = sc; + te->te_mux_offset = AV_NOPTS_VALUE; + + TAILQ_INIT(&te->te_smq); + + LIST_INSERT_HEAD(&tsm->tsm_eslist, te, te_link); + } +} + + +/** + * + */ +static void +tsm_stop(tsmuxer_t *tsm) +{ + tsmuxer_es_t *te; + + while((te = LIST_FIRST(&tsm->tsm_eslist)) != NULL) + te_destroy(te); +} + +/** + * + */ +static void +tsm_handle_input(tsmuxer_t *tsm, streaming_message_t *sm) +{ + switch(sm->sm_type) { + case SMT_PACKET: + tsm_packet_input(tsm, sm); + sm = NULL; + break; + + case SMT_START: + assert(LIST_FIRST(&tsm->tsm_eslist) == NULL); + tsm_start(tsm, sm->sm_data); + break; + + case SMT_STOP: + tsm_stop(tsm); + break; + + case SMT_TRANSPORT_STATUS: + // htsp_subscription_transport_status(hs, sm->sm_code); + break; + + case SMT_NOSOURCE: + // htsp_subscription_status(hs, "No available sources"); + break; + + case SMT_EXIT: + tsm->tsm_run = 0; + break; + } + + if(sm != NULL) + streaming_msg_free(sm); +} + + +/** + * + */ +static void * +tsm_thread(void *aux) +{ + tsmuxer_t *tsm = aux; + streaming_queue_t *sq = &tsm->tsm_input; + streaming_message_t *sm; + int64_t now, next = 0; + + tsm->tsm_run = 1; + + while(tsm->tsm_run) { + + pthread_mutex_lock(&sq->sq_mutex); + + sm = TAILQ_FIRST(&sq->sq_queue); + if(sm == NULL) { + + if(next == 0) { + pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); + } else { + struct timespec ts; + + ts.tv_sec = next / 1000000; + ts.tv_nsec = (next % 1000000) * 1000; + pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts); + } + } + + sm = TAILQ_FIRST(&sq->sq_queue); + + if(sm != NULL) + TAILQ_REMOVE(&sq->sq_queue, sm, sm_link); + + pthread_mutex_unlock(&sq->sq_mutex); + + now = getmonoclock(); + + if(sm != NULL) + tsm_handle_input(tsm, sm); + + } + return NULL; +} + + +/** + * + */ +void +tsm_init(void) +{ + tsmuxer_t *tsm = calloc(1, sizeof(tsmuxer_t)); + + streaming_queue_init(&tsm->tsm_input); + + pthread_create(&tsm->tsm_thread, NULL, tsm_thread, tsm); +} + + + + + + +#if 0 + #define _GNU_SOURCE #include @@ -45,11 +681,6 @@ static void lookahead_dequeue(ts_muxer_t *ts, th_muxstream_t *tms); -static const AVRational mpeg_tc = {1, 90000}; -static const AVRational mpeg_tc_27M = {1, 27000000}; - -#define PID_PMT 1000 -#define PID_ES_BASE 2000 /** * Send current packet @@ -769,3 +1400,4 @@ ts_muxer_pause(ts_muxer_t *ts) dtimer_disarm(&ts->ts_patpmt_timer); muxer_pause(ts->ts_muxer); } +#endif diff --git a/src/tsmux.h b/src/tsmux.h index 9f4b500f..a041c923 100644 --- a/src/tsmux.h +++ b/src/tsmux.h @@ -1,6 +1,6 @@ /* * tvheadend, MPEG transport stream muxer - * Copyright (C) 2007 Andreas Öman + * Copyright (C) 2008 - 2009 Andreas Öman * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -19,6 +19,11 @@ #ifndef TSMUX_H #define TSMUX_H +void tsm_init(void); + + +#if 0 + typedef void (ts_mux_output_t)(void *opaque, th_subscription_t *s, uint8_t *pkt, int npackets, int64_t pcr_ref); @@ -64,5 +69,7 @@ void ts_muxer_deinit(ts_muxer_t *ts, th_subscription_t *s); void ts_muxer_play(ts_muxer_t *ts, int64_t toffset); void ts_muxer_pause(ts_muxer_t *ts); +#endif + #endif /* TSMUX_H */ diff --git a/src/tvhead.h b/src/tvhead.h index bf912f17..6e8138e8 100644 --- a/src/tvhead.h +++ b/src/tvhead.h @@ -687,14 +687,13 @@ extern int log_debug; static inline int64_t -getclock_hires(void) +getmonoclock(void) { - int64_t now; - struct timeval tv; + struct timespec tp; - gettimeofday(&tv, NULL); - now = (uint64_t)tv.tv_sec * 1000000ULL + (uint64_t)tv.tv_usec; - return now; + clock_gettime(CLOCK_MONOTONIC, &tp); + + return tp.tv_sec * 1000000ULL + (tp.tv_nsec / 1000); } @@ -717,4 +716,7 @@ extern void scopedunlock(pthread_mutex_t **mtxp); char *tvh_b = alloca(tvh_l + 1); \ memcpy(tvh_b, n, tvh_l + 1); }) +#define tvh_strlcatf(buf, size, fmt...) \ + snprintf((buf) + strlen(buf), (size) - strlen(buf), fmt) + #endif /* TV_HEAD_H */