Make RTSP work again

This commit is contained in:
Andreas Öman 2010-01-26 21:07:07 +00:00
parent b70afbc941
commit 0a94874528

View file

@ -42,26 +42,6 @@
TAILQ_HEAD(rtsp_packet_queue, rtsp_packet); TAILQ_HEAD(rtsp_packet_queue, rtsp_packet);
/**
*
*/
typedef struct rtsp_resource {
enum {
RTSP_RESOURCE_CHANNEL,
RTSP_RESOURCE_SERVICE
} rr_type;
int rr_mpegts;
union {
struct channel *rr_channel;
struct th_transport *rr_service;
void *rr_r;
};
} rtsp_resource_t;
/** /**
* *
*/ */
@ -80,11 +60,16 @@ typedef struct rtsp_packet {
typedef struct rtsp { typedef struct rtsp {
int rtsp_refcount; // Should only be modified with global_lock held int rtsp_refcount; // Should only be modified with global_lock held
/* Streaming */
struct th_subscription *rtsp_sub; struct th_subscription *rtsp_sub;
/* Startup */
const char *rtsp_start_error;
struct streaming_start *rtsp_ss; struct streaming_start *rtsp_ss;
pthread_mutex_t rtsp_start_mutex;
pthread_cond_t rtsp_start_cond;
streaming_target_t rtsp_input; streaming_target_t rtsp_input;
@ -339,6 +324,9 @@ rtsp_get_session(http_connection_t *hc)
TAILQ_INIT(&rtsp->rtsp_pqueue); TAILQ_INIT(&rtsp->rtsp_pqueue);
pthread_cond_init(&rtsp->rtsp_cond, NULL); pthread_cond_init(&rtsp->rtsp_cond, NULL);
pthread_mutex_init(&rtsp->rtsp_mutex, NULL); pthread_mutex_init(&rtsp->rtsp_mutex, NULL);
pthread_cond_init(&rtsp->rtsp_start_cond, NULL);
pthread_mutex_init(&rtsp->rtsp_start_mutex, NULL);
return rtsp; return rtsp;
} }
@ -356,19 +344,6 @@ rtsp_check_session(http_connection_t *hc, rtsp_t *rtsp, int none_is_ok)
return ses == NULL || strcmp(rtsp->rtsp_session_id, ses); return ses == NULL || strcmp(rtsp->rtsp_session_id, ses);
} }
/**
*
*/
static void
rtsp_unsubscribe(rtsp_t *rtsp)
{
if(rtsp->rtsp_sub != NULL) {
subscription_unsubscribe(rtsp->rtsp_sub);
rtsp->rtsp_sub = NULL;
}
}
/** /**
* *
*/ */
@ -448,15 +423,27 @@ rtsp_streaming_input(void *opaque, streaming_message_t *sm)
switch(sm->sm_type) { switch(sm->sm_type) {
case SMT_START: case SMT_START:
pthread_mutex_lock(&rtsp->rtsp_start_mutex);
assert(rtsp->rtsp_ss == NULL); assert(rtsp->rtsp_ss == NULL);
rtsp->rtsp_ss = sm->sm_data; rtsp->rtsp_ss = sm->sm_data;
pthread_cond_signal(&rtsp->rtsp_start_cond);
pthread_mutex_unlock(&rtsp->rtsp_start_mutex);
sm->sm_data = NULL; // steal reference sm->sm_data = NULL; // steal reference
break; break;
case SMT_STOP: case SMT_STOP:
pthread_mutex_lock(&rtsp->rtsp_start_mutex);
assert(rtsp->rtsp_ss != NULL); assert(rtsp->rtsp_ss != NULL);
streaming_start_unref(rtsp->rtsp_ss); streaming_start_unref(rtsp->rtsp_ss);
rtsp->rtsp_ss = NULL; rtsp->rtsp_ss = NULL;
pthread_cond_signal(&rtsp->rtsp_start_cond);
pthread_mutex_unlock(&rtsp->rtsp_start_mutex);
break; break;
case SMT_PACKET: case SMT_PACKET:
@ -465,9 +452,22 @@ rtsp_streaming_input(void *opaque, streaming_message_t *sm)
break; break;
case SMT_TRANSPORT_STATUS: case SMT_TRANSPORT_STATUS:
if(sm->sm_code & TSS_PACKETS) {
} else if(sm->sm_code & (TSS_GRACEPERIOD | TSS_ERRORS)) {
pthread_mutex_lock(&rtsp->rtsp_start_mutex);
rtsp->rtsp_start_error = transport_tss2text(sm->sm_code);
pthread_cond_signal(&rtsp->rtsp_start_cond);
pthread_mutex_unlock(&rtsp->rtsp_start_mutex);
}
break; break;
case SMT_NOSOURCE: case SMT_NOSOURCE:
pthread_mutex_lock(&rtsp->rtsp_start_mutex);
rtsp->rtsp_start_error = transport_nostart2txt(sm->sm_code);
pthread_cond_signal(&rtsp->rtsp_start_cond);
pthread_mutex_unlock(&rtsp->rtsp_start_mutex);
break; break;
case SMT_MPEGTS: case SMT_MPEGTS:
@ -479,68 +479,39 @@ rtsp_streaming_input(void *opaque, streaming_message_t *sm)
streaming_msg_free(sm); streaming_msg_free(sm);
} }
/** /**
* * Attach the url to an internal resource (channel or transport)
*/ */
static int static int
rtsp_subscribe(rtsp_t *rtsp, rtsp_resource_t *rr) rtsp_subscribe(http_connection_t *hc, rtsp_t *rtsp,
{ char *title, size_t titlelen,
th_subscription_t *s = NULL; char *baseurl, size_t baseurllen)
if(rtsp->rtsp_sub != NULL) {
switch(rr->rr_type) {
case RTSP_RESOURCE_CHANNEL:
if(rtsp->rtsp_sub->ths_channel == rr->rr_channel)
return 0;
break;
case RTSP_RESOURCE_SERVICE:
if(rtsp->rtsp_sub->ths_transport == rr->rr_service)
return 0;
}
subscription_unsubscribe(rtsp->rtsp_sub);
}
streaming_target_init(&rtsp->rtsp_input, rtsp_streaming_input, rtsp, 0);
int flags = rr->rr_mpegts ? SUBSCRIPTION_RAW_MPEGTS : 0;
switch(rr->rr_type) {
case RTSP_RESOURCE_CHANNEL:
s = subscription_create_from_channel(rr->rr_channel, 500, "RTSP",
&rtsp->rtsp_input, flags);
break;
case RTSP_RESOURCE_SERVICE:
s = subscription_create_from_transport(rr->rr_service, "RTSP",
&rtsp->rtsp_input, flags);
break;
}
rtsp->rtsp_sub = s;
return s == NULL;
}
/**
* Resolve an URL into a resource
*/
static int
rtsp_resolve_url(http_connection_t *hc, rtsp_resource_t *rr, char **remainp)
{ {
char *components[5]; char *components[5];
void *r;
int nc; int nc;
char *url = hc->hc_url; char *url = hc->hc_url;
int pri = 500;
int subflags = 0;
th_subscription_t *s;
char urlprefix[128];
char buf[INET_ADDRSTRLEN + 1];
inet_ntop(AF_INET, &hc->hc_self->sin_addr, buf, sizeof(buf));
snprintf(urlprefix, sizeof(urlprefix),
"rtsp://%s:%d", buf, ntohs(hc->hc_self->sin_port));
if(rtsp->rtsp_sub != NULL) {
rtsp_error(hc, 400, "Please teardown first");
return -1;
}
if(!strncasecmp(url, "rtsp://", strlen("rtsp://"))) { if(!strncasecmp(url, "rtsp://", strlen("rtsp://"))) {
url += strlen("rtsp://"); url += strlen("rtsp://");
url = strchr(url, '/'); url = strchr(url, '/');
if(url == NULL) if(url == NULL) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "Invalid URL");
return -1; return -1;
}
url++; url++;
} }
@ -551,36 +522,79 @@ rtsp_resolve_url(http_connection_t *hc, rtsp_resource_t *rr, char **remainp)
http_deescape(components[1]); http_deescape(components[1]);
rtsp->rtsp_start_error = NULL;
rtsp->rtsp_ss = NULL;
streaming_target_init(&rtsp->rtsp_input, rtsp_streaming_input, rtsp, 0);
if(!strcmp(components[0], "channel")) { if(!strcmp(components[0], "channel")) {
rr->rr_type = RTSP_RESOURCE_CHANNEL; channel_t *ch;
r = channel_find_by_name(components[1], 0); scopedgloballock();
if((ch = channel_find_by_name(components[1], 0)) == NULL) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "Channel name not found");
return -1;
}
s = subscription_create_from_channel(ch, pri, "RTSP", &rtsp->rtsp_input,
subflags);
snprintf(baseurl, baseurllen, "channel/%s", ch->ch_name);
snprintf(title, titlelen, "%s", ch->ch_name);
} else if(!strcmp(components[0], "channelid")) { } else if(!strcmp(components[0], "channelid")) {
rr->rr_type = RTSP_RESOURCE_CHANNEL; channel_t *ch;
r = channel_find_by_identifier(atoi(components[1])); scopedgloballock();
} else if(!strcmp(components[0], "service")) {
rr->rr_type = RTSP_RESOURCE_SERVICE;
r = transport_find_by_identifier(components[1]);
} else {
return -1;
}
if(r == NULL) if((ch = channel_find_by_identifier(atoi(components[1]))) == NULL) {
return -1; rtsp_error(hc, RTSP_STATUS_SERVICE, "Channel ID not found");
rr->rr_r = r; return -1;
rr->rr_mpegts = 0; }
if(remainp == NULL) s = subscription_create_from_channel(ch, pri, "RTSP", &rtsp->rtsp_input,
return 0; subflags);
snprintf(baseurl, baseurllen, "channel/%s", ch->ch_name);
snprintf(title, titlelen, "%s", ch->ch_name);
} else if(!strcmp(components[0], "service")) {
th_transport_t *t;
scopedgloballock();
if((t = transport_find_by_identifier(components[1])) == NULL) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "Transport ID not found");
return -1;
}
s = subscription_create_from_transport(t, "RTSP", &rtsp->rtsp_input,
subflags);
snprintf(baseurl, baseurllen, "service/%s", t->tht_identifier);
snprintf(title, titlelen, "%s", t->tht_identifier);
if(nc == 3) {
http_deescape(components[2]);
*remainp = components[2];
} else { } else {
*remainp = NULL; rtsp_error(hc, RTSP_STATUS_SERVICE, "Invalid URL");
return -1;
} }
pthread_mutex_lock(&rtsp->rtsp_start_mutex);
while(rtsp->rtsp_start_error == NULL && rtsp->rtsp_ss == NULL)
pthread_cond_wait(&rtsp->rtsp_start_cond, &rtsp->rtsp_start_mutex);
if(rtsp->rtsp_start_error != NULL) {
pthread_mutex_unlock(&rtsp->rtsp_start_mutex);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
pthread_mutex_unlock(&global_lock);
rtsp_error(hc, RTSP_STATUS_SERVICE, rtsp->rtsp_start_error);
return -1;
}
pthread_mutex_unlock(&rtsp->rtsp_start_mutex);
rtsp->rtsp_sub = s;
return 0; return 0;
} }
@ -592,20 +606,10 @@ static int
rtsp_cmd_options(http_connection_t *hc) rtsp_cmd_options(http_connection_t *hc)
{ {
char *c; char *c;
rtsp_resource_t rr;
pthread_mutex_lock(&global_lock);
if(strcmp(hc->hc_url, "*") && rtsp_resolve_url(hc, &rr, NULL)) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve");
pthread_mutex_unlock(&global_lock);
return 0;
}
pthread_mutex_unlock(&global_lock);
rtsp_printf(hc, rtsp_printf(hc,
"RTSP/1.0 200 OK\r\n" "RTSP/1.0 200 OK\r\n"
"Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r\n"); "Public: DESCRIBE, SETUP, TEARDOWN, PLAY\r\n");
if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL) if((c = http_arg_get(&hc->hc_args, "cseq")) != NULL)
rtsp_printf(hc, "CSeq: %s\r\n", c); rtsp_printf(hc, "CSeq: %s\r\n", c);
@ -640,68 +644,31 @@ static int
rtsp_cmd_describe(http_connection_t *hc, rtsp_t *rtsp) rtsp_cmd_describe(http_connection_t *hc, rtsp_t *rtsp)
{ {
char sdp[1000]; char sdp[1000];
char urlprefix[128]; char baseurl[128];
char *c; char *c;
const char *str;
rtsp_resource_t rr;
extern const char *htsversion; extern const char *htsversion;
const streaming_start_t *ss; const streaming_start_t *ss;
int i; int i;
char title[128];
char buf[INET_ADDRSTRLEN + 1]; if(rtsp_subscribe(hc, rtsp, title, sizeof(title), baseurl, sizeof(baseurl)))
inet_ntop(AF_INET, &hc->hc_self->sin_addr, buf, sizeof(buf));
snprintf(urlprefix, sizeof(urlprefix),
"rtsp://%s:%d", buf, ntohs(hc->hc_self->sin_port));
pthread_mutex_lock(&global_lock);
if(rtsp_resolve_url(hc, &rr, NULL)) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve");
pthread_mutex_unlock(&global_lock);
return 0; return 0;
}
rtsp_subscribe(rtsp, &rr);
if((ss = rtsp->rtsp_ss) == NULL) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "No source available");
rtsp_unsubscribe(rtsp);
pthread_mutex_unlock(&global_lock);
return 0;
}
pthread_mutex_unlock(&global_lock);
if(rr.rr_type == RTSP_RESOURCE_CHANNEL)
str = rr.rr_channel->ch_name;
else
str = rr.rr_service->tht_identifier;
snprintf(sdp, sizeof(sdp), snprintf(sdp, sizeof(sdp),
"v=0\r\n" "v=0\r\n"
"o=- 0 0 IN IPV4 127.0.0.1\r\n" "o=- 0 0 IN IPV4 127.0.0.1\r\n"
"s=%s\r\n" "s=%s\r\n"
"a=tool:HTS Tvheadend %s\r\n", "a=tool:HTS Tvheadend %s\r\n",
str, htsversion); title, htsversion);
ss = rtsp->rtsp_ss;
for(i = 0; i < ss->ss_num_components; i++) { for(i = 0; i < ss->ss_num_components; i++) {
const streaming_start_component_t *ssc = &ss->ss_components[i]; const streaming_start_component_t *ssc = &ss->ss_components[i];
char controlurl[256]; char controlurl[256];
snprintf(controlurl, sizeof(controlurl), "%s/streamid=%d",
switch(rr.rr_type) { baseurl, ssc->ssc_index);
case RTSP_RESOURCE_CHANNEL:
snprintf(controlurl, sizeof(controlurl), "%s/channelid/%d/streamid=%d",
urlprefix, rr.rr_channel->ch_id, ssc->ssc_index);
break;
case RTSP_RESOURCE_SERVICE:
snprintf(controlurl, sizeof(controlurl), "%s/service/%s/streamid=%d",
urlprefix, rr.rr_service->tht_identifier, ssc->ssc_index);
break;
}
switch(ssc->ssc_type) { switch(ssc->ssc_type) {
case SCT_MPEG2VIDEO: case SCT_MPEG2VIDEO:
@ -935,7 +902,6 @@ rtsp_setup_udp(http_connection_t *hc, rtsp_t *rtsp,
static int static int
rtsp_cmd_setup(http_connection_t *hc, rtsp_t *rtsp) rtsp_cmd_setup(http_connection_t *hc, rtsp_t *rtsp)
{ {
rtsp_resource_t rr;
char *transports[10]; char *transports[10];
char *params[10]; char *params[10];
char *t; char *t;
@ -943,40 +909,23 @@ rtsp_cmd_setup(http_connection_t *hc, rtsp_t *rtsp)
int streamid; int streamid;
char *remain; char *remain;
const streaming_start_component_t *ssc; const streaming_start_component_t *ssc;
if(rtsp_check_session(hc, rtsp, 1)) remain = strstr(hc->hc_url, "streamid=");
return rtsp_error(hc, 459, NULL); if(remain == NULL) {
pthread_mutex_lock(&global_lock);
if(rtsp_resolve_url(hc, &rr, &remain)) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve");
pthread_mutex_unlock(&global_lock);
return 0; return 0;
} }
if(remain == NULL || strncmp(remain, "streamid=", strlen("streamid="))) { if(rtsp->rtsp_ss == NULL) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "No streamid component in URL"); rtsp_error(hc, RTSP_STATUS_SERVICE, "Not described");
pthread_mutex_unlock(&global_lock);
return 0; return 0;
} }
streamid = atoi(remain + strlen("streamid=")); streamid = atoi(remain + strlen("streamid="));
if((ssc = get_ssc_by_index(rtsp, streamid)) == NULL) { if((ssc = get_ssc_by_index(rtsp, streamid)) == NULL) {
pthread_mutex_unlock(&global_lock);
return rtsp_error(hc, RTSP_STATUS_SERVICE, "Stream not found"); return rtsp_error(hc, RTSP_STATUS_SERVICE, "Stream not found");
} }
rtsp_subscribe(rtsp, &rr);
pthread_mutex_unlock(&global_lock);
if(rtsp->rtsp_ss == NULL) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "No source available");
rtsp_unsubscribe(rtsp);
return 0;
}
if((t = http_arg_get(&hc->hc_args, "transport")) == NULL) { if((t = http_arg_get(&hc->hc_args, "transport")) == NULL) {
rtsp_error(hc, RTSP_STATUS_TRANSPORT, NULL); rtsp_error(hc, RTSP_STATUS_TRANSPORT, NULL);
return 0; return 0;