SAT>IP server: more work, add subscriptions and add RTP and RTCP threads

This commit is contained in:
Jaroslav Kysela 2015-02-24 15:27:51 +01:00
parent 83f94f6f0c
commit 78e4ddf86e
7 changed files with 693 additions and 43 deletions

View file

@ -162,7 +162,8 @@ SRCS-${CONFIG_UPNP} += \
# SATIP Server
SRCS-${CONFIG_SATIP_SERVER} += \
src/satip/server.c \
src/satip/rtsp.c
src/satip/rtsp.c \
src/satip/rtp.c
SRCS += \
src/api.c \

526
src/satip/rtp.c Normal file
View file

@ -0,0 +1,526 @@
/*
* Tvheadend - SAT-IP server - RTP part
*
* Copyright (C) 2015 Jaroslav Kysela
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <signal.h>
#include "tvheadend.h"
#include "input.h"
#include "streaming.h"
#include "satip/server.h"
#define RTP_PACKETS 128
#define RTP_PAYLOAD (1356-20-8)
#define RTCP_PAYLOAD (1472-20-8)
typedef struct satip_rtp_session {
TAILQ_ENTRY(satip_rtp_session) link;
pthread_t tid;
void *id;
struct sockaddr_storage peer;
struct sockaddr_storage peer2;
int port;
th_subscription_t *subs;
streaming_queue_t *sq;
int fd_rtp;
int fd_rtcp;
int frontend;
int source;
dvb_mux_conf_t dmc;
int16_t pids[RTSP_PIDS];
udp_multisend_t um;
struct iovec *um_iovec;
int um_packet;
uint16_t seq;
signal_status_t sig;
} satip_rtp_session_t;
static pthread_mutex_t satip_rtp_lock;
static pthread_t satip_rtcp_tid;
static int satip_rtcp_run;
static TAILQ_HEAD(, satip_rtp_session) satip_rtp_sessions;
static void
satip_rtp_header(satip_rtp_session_t *rtp)
{
struct iovec *v = rtp->um_iovec + rtp->um_packet;
uint8_t *data = v->iov_base;
uint32_t tstamp = dispatch_clock + rtp->seq;
rtp->seq++;
v->iov_len = 12;
data[0] = 0x80;
data[1] = 33;
data[2] = (rtp->seq >> 8) & 0xff;
data[3] = rtp->seq & 0xff;
data[4] = (tstamp >> 24) & 0xff;
data[5] = (tstamp >> 16) & 0xff;
data[6] = (tstamp >> 8) & 0xff;
data[7] = tstamp & 0xff;
memset(data + 8, 0xa5, 8);
}
static int
satip_rtp_send(satip_rtp_session_t *rtp)
{
struct iovec *v = rtp->um_iovec, *v2;
int packets, copy, len, r;
if (v->iov_len == RTP_PAYLOAD) {
packets = rtp->um_packet;
v2 = v + packets;
if (v2->iov_len == RTP_PAYLOAD) {
packets++;
copy = 0;
} else
copy = 1;
r = udp_multisend_send(&rtp->um, rtp->fd_rtp, packets);
if (r < 0)
return r;
if (copy)
memcpy(v->iov_base, v2->iov_base, len = v2->iov_len);
else
len = 0;
rtp->um_packet = 0;
udp_multisend_clean(&rtp->um);
v->iov_len = len;
}
if (v->iov_len == 0)
satip_rtp_header(rtp);
return 0;
}
static int
satip_rtp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len)
{
int i, pid, last_pid = -1, r;
int16_t *pids = rtp->pids;
struct iovec *v = rtp->um_iovec + rtp->um_packet;
assert((len % 188) == 0);
for ( ; len >= 188 ; data += 188, len -= 188) {
pid = ((data[1] & 0x1f) << 8) | data[2];
if (pid != last_pid) {
for (i = 0; i < RTSP_PIDS && pids[i] >= 0; i++)
if (pids[i] == pid)
break;
if (i >= RTSP_PIDS) continue; /* skip PID */
last_pid = pid;
}
memcpy(v->iov_base + v->iov_len, data, 188);
v->iov_len += 188;
if (v->iov_len >= RTP_PAYLOAD) {
if ((rtp->um_packet + 1) == RTP_PACKETS) {
r = satip_rtp_send(rtp);
if (r < 0)
return r;
} else
rtp->um_packet++;
}
}
return 0;
}
static void
satip_rtp_signal_status(satip_rtp_session_t *rtp, signal_status_t *sig)
{
rtp->sig = *sig;
}
static void *
satip_rtp_thread(void *aux)
{
satip_rtp_session_t *rtp = aux;
streaming_queue_t *sq = rtp->sq;
streaming_message_t *sm;
th_subscription_t *subs = rtp->subs;
pktbuf_t *pb;
char peername[50];
int alive = 1, fatal = 0, r;
tcp_get_ip_str((struct sockaddr *)&rtp->peer, peername, sizeof(peername));
tvhdebug("satips", "RTP streaming to %s:%d open", peername, rtp->port);
pthread_mutex_lock(&sq->sq_mutex);
while (rtp->sq && !fatal) {
sm = TAILQ_FIRST(&sq->sq_queue);
if (sm == NULL) {
r = satip_rtp_send(rtp);
if (r) {
fatal = 1;
continue;
}
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq->sq_mutex);
switch (sm->sm_type) {
case SMT_MPEGTS:
pb = sm->sm_data;
atomic_add(&subs->ths_bytes_out, pktbuf_len(pb));
r = satip_rtp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb));
if (r) fatal = 1;
break;
case SMT_SIGNAL_STATUS:
satip_rtp_signal_status(rtp, sm->sm_data);
break;
case SMT_NOSTART:
case SMT_EXIT:
alive = 0;
break;
case SMT_START:
case SMT_STOP:
case SMT_PACKET:
case SMT_GRACE:
case SMT_SKIP:
case SMT_SPEED:
case SMT_SERVICE_STATUS:
case SMT_TIMESHIFT_STATUS:
break;
}
streaming_msg_free(sm);
pthread_mutex_lock(&sq->sq_mutex);
}
pthread_mutex_unlock(&sq->sq_mutex);
tvhdebug("satips", "RTP streaming to %s:%d closed (%s request)",
peername, rtp->port, alive ? "remote" : "streaming");
return NULL;
}
/*
*
*/
static satip_rtp_session_t *
satip_rtp_find(void *id)
{
satip_rtp_session_t *rtp;
pthread_mutex_lock(&satip_rtp_lock);
TAILQ_FOREACH(rtp, &satip_rtp_sessions, link)
if (rtp == id)
break;
pthread_mutex_unlock(&satip_rtp_lock);
return rtp;
}
/*
*
*/
void satip_rtp_queue(void *id, th_subscription_t *subs,
streaming_queue_t *sq,
struct sockaddr_storage *peer, int port,
int fd_rtp, int fd_rtcp,
int frontend, int source, dvb_mux_conf_t *dmc,
int16_t *pids)
{
satip_rtp_session_t *rtp = calloc(1, sizeof(*rtp));
if (rtp == NULL)
return;
rtp->id = id;
rtp->peer = *peer;
rtp->peer2 = *peer;
IP_PORT_SET(rtp->peer2, port + 1);
rtp->port = port;
rtp->fd_rtp = fd_rtp;
rtp->fd_rtcp = fd_rtcp;
rtp->subs = subs;
rtp->sq = sq;
memcpy(rtp->pids, pids, sizeof(*pids)*RTSP_PIDS);
udp_multisend_init(&rtp->um, RTP_PACKETS, RTP_PAYLOAD, &rtp->um_iovec);
satip_rtp_header(rtp);
rtp->frontend = frontend;
rtp->dmc = *dmc;
rtp->source = source;
pthread_mutex_lock(&satip_rtp_lock);
TAILQ_INSERT_TAIL(&satip_rtp_sessions, rtp, link);
tvhthread_create(&rtp->tid, NULL, satip_rtp_thread, rtp);
pthread_mutex_unlock(&satip_rtp_lock);
}
void satip_rtp_close(void *id)
{
satip_rtp_session_t *rtp;
streaming_queue_t *sq;
pthread_mutex_lock(&satip_rtp_lock);
rtp = satip_rtp_find(id);
if (rtp) {
sq = rtp->sq;
pthread_mutex_lock(&sq->sq_mutex);
rtp->sq = NULL;
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
pthread_join(rtp->tid, NULL);
pthread_mutex_lock(&satip_rtp_lock);
udp_multisend_free(&rtp->um);
free(rtp);
}
pthread_mutex_unlock(&satip_rtp_lock);
}
/*
*
*/
static const char *
satip_rtcp_fec(int fec)
{
static char buf[16];
const char *s = dvb_fec2str(fec);
char *p = buf;
if (s == NULL)
return "";
strncpy(buf, s, sizeof(buf));
buf[sizeof(buf)-1] = '\0';
p = strchr(buf, '/');
while (*p) {
*p = *(p+1);
p++;
}
return s;
}
/*
*
*/
static int
satip_rtcp_build(satip_rtp_session_t *rtp, uint8_t *msg)
{
char buf[1500], pids[1400];
const char *delsys, *msys, *pilot, *rolloff;
const char *bw, *tmode, *gi, *plp, *t2id, *sm, *c2tft, *ds, *specinv;
int i, len, len2, level = 0, lock = 0, quality = 0;
pids[0] = 0;
for (i = len = 00; i < RTSP_PIDS && rtp->pids[i] >= 0; i++)
len += snprintf(pids + len, sizeof(pids) - len, "%d,", rtp->pids[i]);
if (len && pids[len-1] == ',')
pids[len-1] = '\0';
switch (rtp->dmc.dmc_fe_delsys) {
case DVB_SYS_DVBS:
case DVB_SYS_DVBS2:
delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBS ? "dvbs" : "dvbs2";
switch (rtp->dmc.dmc_fe_modulation) {
case DVB_MOD_QPSK: msys = "qpsk"; break;
case DVB_MOD_PSK_8: msys = "8psk"; break;
default: msys = ""; break;
}
switch (rtp->dmc.dmc_fe_pilot) {
case DVB_PILOT_ON: pilot = "on"; break;
case DVB_PILOT_OFF: pilot = "off"; break;
default: pilot = ""; break;
}
switch (rtp->dmc.dmc_fe_rolloff) {
case DVB_ROLLOFF_20: rolloff = "20"; break;
case DVB_ROLLOFF_25: rolloff = "25"; break;
case DVB_ROLLOFF_35: rolloff = "35"; break;
default: rolloff = ""; break;
}
/* ver=<major>.<minor>;src=<srcID>;tuner=<feID>,<level>,<lock>,<quality>,<frequency>,<polarisation>,\
* <system>,<type>,<pilots>,<roll_off>,<symbol_rate>,<fec_inner>;pids=<pid0>,...,<pidn>
*/
snprintf(buf, sizeof(buf),
"vers=1.0;src=%d;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%s,%s,%.f,%s;pids=%s",
rtp->source, rtp->frontend, level, lock, quality,
(float)rtp->dmc.dmc_fe_freq / 1000.0,
dvb_pol2str(rtp->dmc.u.dmc_fe_qpsk.polarisation),
delsys, msys, pilot, rolloff,
(float)rtp->dmc.u.dmc_fe_qpsk.symbol_rate / 1000.0,
satip_rtcp_fec(rtp->dmc.u.dmc_fe_qpsk.fec_inner),
pids);
break;
case DVB_SYS_DVBT:
case DVB_SYS_DVBT2:
delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBT ? "dvbt" : "dvbt2";
switch (rtp->dmc.u.dmc_fe_ofdm.bandwidth) {
case DVB_BANDWIDTH_1_712_MHZ: bw = "1.712"; break;
case DVB_BANDWIDTH_5_MHZ: bw = "5"; break;
case DVB_BANDWIDTH_6_MHZ: bw = "6"; break;
case DVB_BANDWIDTH_7_MHZ: bw = "7"; break;
case DVB_BANDWIDTH_8_MHZ: bw = "8"; break;
case DVB_BANDWIDTH_10_MHZ: bw = "10"; break;
default: bw = ""; break;
}
switch (rtp->dmc.u.dmc_fe_ofdm.transmission_mode) {
case DVB_TRANSMISSION_MODE_1K: tmode = "1k"; break;
case DVB_TRANSMISSION_MODE_2K: tmode = "2k"; break;
case DVB_TRANSMISSION_MODE_4K: tmode = "4k"; break;
case DVB_TRANSMISSION_MODE_8K: tmode = "8k"; break;
case DVB_TRANSMISSION_MODE_16K: tmode = "16k"; break;
case DVB_TRANSMISSION_MODE_32K: tmode = "32k"; break;
default: tmode = ""; break;
}
switch (rtp->dmc.dmc_fe_modulation) {
case DVB_MOD_QAM_16: msys = "qam16"; break;
case DVB_MOD_QAM_32: msys = "qam32"; break;
case DVB_MOD_QAM_64: msys = "qam64"; break;
case DVB_MOD_QAM_128: msys = "qam128"; break;
default: msys = ""; break;
}
switch (rtp->dmc.u.dmc_fe_ofdm.guard_interval) {
case DVB_GUARD_INTERVAL_1_4: gi = "14"; break;
case DVB_GUARD_INTERVAL_1_8: gi = "18"; break;
case DVB_GUARD_INTERVAL_1_16: gi = "116"; break;
case DVB_GUARD_INTERVAL_1_32: gi = "132"; break;
case DVB_GUARD_INTERVAL_1_128: gi = "1128"; break;
case DVB_GUARD_INTERVAL_19_128: gi = "19128"; break;
case DVB_GUARD_INTERVAL_19_256: gi = "19256"; break;
default: gi = ""; break;
}
plp = "";
t2id = "";
sm = "";
/* ver=1.1;tuner=<feID>,<level>,<lock>,<quality>,<freq>,<bw>,<msys>,<tmode>,<mtype>,<gi>,\
* <fec>,<plp>,<t2id>,<sm>;pids=<pid0>,...,<pidn>
*/
snprintf(buf, sizeof(buf),
"vers=1.1;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%s,%s,%s,%s,%s,%s;pids=%s",
rtp->frontend, level, lock, quality,
(float)rtp->dmc.dmc_fe_freq / 1000.0,
bw, delsys, tmode, msys, gi,
satip_rtcp_fec(rtp->dmc.u.dmc_fe_ofdm.code_rate_HP),
plp, t2id, sm, pids);
break;
case DVB_SYS_DVBC_ANNEX_A:
case DVB_SYS_DVBC_ANNEX_C:
delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBC_ANNEX_A ? "dvbc" : "dvbc2";
bw = "";
switch (rtp->dmc.dmc_fe_modulation) {
case DVB_MOD_QAM_16: msys = "qam16"; break;
case DVB_MOD_QAM_32: msys = "qam32"; break;
case DVB_MOD_QAM_64: msys = "qam64"; break;
case DVB_MOD_QAM_128: msys = "qam128"; break;
default: msys = ""; break;
}
c2tft = "";
ds = "";
plp = "";
specinv = "";
/* ver=1.2;tuner=<feID>,<level>,<lock>,<quality>,<freq>,<bw>,<msys>,<mtype>,<sr>,<c2tft>,<ds>,<plp>,
* <specinv>;pids=<pid0>,...,<pidn>
*/
snprintf(buf, sizeof(buf),
"vers=1.1;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%.f,%s,%s,%s,%s;pids=%s",
rtp->frontend, level, lock, quality,
(float)rtp->dmc.dmc_fe_freq / 1000.0,
bw, delsys, msys,
(float)rtp->dmc.u.dmc_fe_qam.symbol_rate / 1000.0,
c2tft, ds, plp, specinv, pids);
break;
default:
return 0;
}
len = len2 = strlen(buf);
while ((len % 4) != 0)
buf[len++] = 0;
memcpy(msg + 16, buf, len);
len += 16;
msg[0] = 0x80;
msg[1] = 204;
msg[2] = (len >> 8) & 0xff;
msg[3] = len & 0xff;
msg[4] = 0;
msg[5] = 0;
msg[6] = 0;
msg[7] = 0;
msg[8] = 'S';
msg[9] = 'E';
msg[10] = 'S';
msg[11] = '1';
msg[12] = 0;
msg[13] = 0;
msg[14] = (len2 >> 8) & 0xff;
msg[15] = len2 & 0xff;
return len2;
}
/*
*
*/
static void *
satip_rtcp_thread(void *aux)
{
satip_rtp_session_t *rtp;
struct timespec ts;
uint8_t msg[RTCP_PAYLOAD];
char addrbuf[50];
int r, len, err;
while (satip_rtcp_run) {
ts.tv_sec = 0;
ts.tv_nsec = 150000000;
while (1) {
nanosleep(&ts, &ts);
if (satip_rtcp_run)
goto end;
} while (ts.tv_nsec);
pthread_mutex_lock(&satip_rtp_lock);
TAILQ_FOREACH(rtp, &satip_rtp_sessions, link) {
if (rtp->sq == NULL) continue;
len = satip_rtcp_build(rtp, msg);
if (len <= 0) continue;
r = sendto(rtp->fd_rtcp, msg, len, 0,
(struct sockaddr*)&rtp->peer2,
rtp->peer2.ss_family == AF_INET6 ?
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
if (r) {
err = errno;
tcp_get_ip_str((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf));
tvhwarn("satips", "RTCP send to error %s:%d : %s",
addrbuf, IP_PORT(rtp->peer2), strerror(err));
}
}
pthread_mutex_unlock(&satip_rtp_lock);
}
end:
return NULL;
}
/*
*
*/
void satip_rtp_init(void)
{
TAILQ_INIT(&satip_rtp_sessions);
pthread_mutex_init(&satip_rtp_lock, NULL);
satip_rtcp_run = 1;
tvhthread_create(&satip_rtcp_tid, NULL, satip_rtcp_thread, NULL);
}
/*
*
*/
void satip_rtp_done(void)
{
assert(TAILQ_EMPTY(&satip_rtp_sessions));
satip_rtcp_run = 0;
pthread_kill(satip_rtcp_tid, SIGTERM);
pthread_join(satip_rtcp_tid, NULL);
}

View file

@ -18,36 +18,34 @@
*/
#include "tvheadend.h"
#include "input.h"
#include "htsbuf.h"
#include "htsmsg_xml.h"
#include "upnp.h"
#include "http.h"
#include "settings.h"
#include "config.h"
#include "profile.h"
#include "satip/server.h"
#include <ctype.h>
#include <arpa/inet.h>
#include <openssl/sha.h>
#if defined(PLATFORM_FREEBSD) || ENABLE_ANDROID
#include <sys/types.h>
#include <sys/socket.h>
#endif
#define RTSP_TIMEOUT 30
#define RTSP_PIDS 128
#define RTP_BUFSIZE (256*1024)
#define RTCP_BUFSIZE (16*1024)
typedef struct session {
TAILQ_ENTRY(session) link;
int delsys;
int stream;
int frontend;
int findex;
uint32_t nsession;
char session[9];
dvb_mux_conf_t dmc;
int16_t pids[RTSP_PIDS];
gtimer_t timer;
dvb_mux_t *mux;
int mux_created;
profile_chain_t prch;
th_subscription_t *subs;
udp_connection_t *udp_rtp;
udp_connection_t *udp_rtcp;
} session_t;
static uint32_t session_number;
@ -65,26 +63,36 @@ static void rtsp_free_session(session_t *rs);
*
*/
static int
rtsp_delsys(int fe)
rtsp_delsys(int fe, int *findex)
{
int i;
int res, i;
if (fe < 1)
return DVB_SYS_NONE;
pthread_mutex_lock(&global_lock);
i = config_get_int("satip_dvbt", 0);
if (fe <= i)
return DVB_SYS_DVBT;
if (fe <= i) {
res = DVB_SYS_DVBT;
goto result;
}
fe -= i;
i = config_get_int("satip_dvbs", 0);
if (fe <= i)
return DVB_SYS_DVBS;
if (fe <= i) {
res = DVB_SYS_DVBS;
goto result;
}
fe -= i;
i = config_get_int("satip_dvbc", 0);
if (fe <= i)
return DVB_SYS_DVBC_ANNEX_A;
if (fe <= i) {
res = DVB_SYS_DVBC_ANNEX_A;
goto result;
}
pthread_mutex_unlock(&global_lock);
return DVB_SYS_NONE;
result:
pthread_mutex_unlock(&global_lock);
*findex = i;
return res;
}
/*
@ -97,8 +105,10 @@ rtsp_new_session(int delsys)
if (rs == NULL)
return NULL;
rs->delsys = delsys;
rs->nsession = session_number;
snprintf(rs->session, sizeof(rs->session), "%08X", session_number);
session_number += 9876;
TAILQ_INSERT_TAIL(&rtsp_sessions, rs, link);
return rs;
}
@ -241,10 +251,91 @@ rtsp_delpids(session_t *rs, int16_t *pids)
/*
*
*/
static int
rtsp_start(session_t *rs)
static void
rtsp_clean(session_t *rs)
{
if (rs->subs) {
subscription_unsubscribe(rs->subs);
rs->subs = NULL;
}
if (rs->prch.prch_pro)
profile_chain_close(&rs->prch);
if (rs->mux && rs->mux_created) {
rs->mux->mm_delete((mpegts_mux_t *)rs->mux, 1);
rs->mux = NULL;
rs->mux_created = 0;
}
}
/*
*
*/
static int
rtsp_start(http_connection_t *hc, session_t *rs)
{
mpegts_network_t *mn;
dvb_network_t *ln;
char buf[256], addrbuf[50];
int res = HTTP_STATUS_SERVICE, qsize = 3000000;
if (rs->mux)
return 0;
rs->mux_created = 0;
pthread_mutex_lock(&global_lock);
LIST_FOREACH(mn, &mpegts_network_all, mn_global_link) {
ln = (dvb_network_t *)mn;
if (ln->ln_type == rs->dmc.dmc_fe_type &&
mn->mn_satip_source == rs->findex)
break;
}
if (mn) {
rs->mux = dvb_network_find_mux((dvb_network_t *)mn, &rs->dmc,
MPEGTS_ONID_NONE, MPEGTS_TSID_NONE);
if (rs->mux == NULL) {
rs->mux = (dvb_mux_t *)
mn->mn_create_mux(mn, (void *)(intptr_t)rs->nsession,
MPEGTS_ONID_NONE, MPEGTS_TSID_NONE,
&rs->dmc, 0);
if (rs->mux)
rs->mux_created = 1;
}
}
if (rs->mux == NULL) {
dvb_mux_conf_str(&rs->dmc, buf, sizeof(buf));
tvhwarn("satips", "%i: unable to create mux %s", rs->frontend, buf);
goto end;
}
if (profile_chain_raw_open(&rs->prch, (mpegts_mux_t *)rs->mux, qsize))
goto endclean;
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, sizeof(addrbuf));
rs->subs = subscription_create_from_mux(&rs->prch, NULL,
config_get_int("satip_weight", 100),
"SAT>IP",
SUBSCRIPTION_FULLMUX | SUBSCRIPTION_STREAMING,
addrbuf, hc->hc_username,
http_arg_get(&hc->hc_args, "User-Agent"), NULL);
if (!rs->subs)
goto endclean;
memset(&rs->udp_rtp, 0, sizeof(rs->udp_rtp));
memset(&rs->udp_rtcp, 0, sizeof(rs->udp_rtcp));
if (udp_bind_double(&rs->udp_rtp, &rs->udp_rtcp,
"satips", "rtsp", "rtcp",
addrbuf, 0, NULL,
4*1024, 4*1024,
RTP_BUFSIZE, RTCP_BUFSIZE))
goto endclean;
satip_rtp_queue((void *)(intptr_t)rs->nsession,
rs->subs, &rs->prch.prch_sq,
hc->hc_peer, ntohs(IP_PORT(rs->udp_rtp->ip)),
rs->udp_rtp->fd, rs->udp_rtcp->fd,
rs->frontend, rs->findex, &rs->mux->lm_tuning, rs->pids);
return 0;
endclean:
rtsp_clean(rs);
end:
pthread_mutex_unlock(&global_lock);
return res;
}
@ -473,13 +564,14 @@ static int
rtsp_process_play(http_connection_t *hc, int setup)
{
session_t *rs;
int errcode = HTTP_STATUS_BAD_REQUEST;
int errcode = HTTP_STATUS_BAD_REQUEST, r, findex = 0;
int stream, delsys = DVB_SYS_NONE, msys, fe, src, freq, pol, sr;
int fec, ro, plts, bw, tmode, mtype, gi, plp, t2id, sm, c2tft, ds, specinv;
char *u, *s;
char *pids, *addpids, *delpids;
int16_t _pids[RTSP_PIDS+1], _addpids[RTSP_PIDS+1], _delpids[RTSP_PIDS+1];
dvb_mux_conf_t *dmc;
char buf[256];
u = tvh_strdupa(hc->hc_url);
if ((u = rtsp_check_urlbase(u)) == NULL ||
@ -507,7 +599,7 @@ rtsp_process_play(http_connection_t *hc, int setup)
rs = rtsp_find_session(hc);
if (fe > 0) {
delsys = rtsp_delsys(fe);
delsys = rtsp_delsys(fe, &findex);
if (delsys == DVB_SYS_NONE)
goto error;
}
@ -533,6 +625,7 @@ rtsp_process_play(http_connection_t *hc, int setup)
dmc = &rs->dmc;
dvb_mux_conf_init(dmc, msys);
rs->frontend = fe;
rs->findex = findex;
pids = http_arg_get_remove(&hc->hc_req_args, "pids");
if (parse_pids(pids, _pids)) goto error;
@ -615,8 +708,8 @@ rtsp_process_play(http_connection_t *hc, int setup)
if (!TAILQ_EMPTY(&hc->hc_req_args))
goto error;
dmc->u.dmc_fe_qpsk.symbol_rate = sr;
dmc->u.dmc_fe_qpsk.fec_inner = DVB_FEC_NONE;
dmc->u.dmc_fe_qam.symbol_rate = sr;
dmc->u.dmc_fe_qam.fec_inner = DVB_FEC_NONE;
dmc->dmc_fe_inversion = specinv;
dmc->dmc_fe_stream_id = plp;
dmc->dmc_fe_pls_code = ds; /* check */
@ -627,6 +720,9 @@ rtsp_process_play(http_connection_t *hc, int setup)
}
dvb_mux_conf_str(dmc, buf, sizeof(buf));
tvhdebug("satips", "%i: setup %s", rs->frontend, buf);
dmc->dmc_fe_freq = freq;
dmc->dmc_fe_modulation = mtype;
@ -641,10 +737,11 @@ play:
rtsp_delpids(rs, _delpids);
if (addpids)
rtsp_addpids(rs, _addpids);
if (rtsp_start(rs) < 0) {
errcode = HTTP_STATUS_SERVICE;;
if ((r = rtsp_start(hc, rs)) < 0) {
errcode = r;
goto error;
}
tvhdebug("satips", "%i: play", rs->frontend);
end:
pthread_mutex_unlock(&rtsp_lock);
@ -742,6 +839,12 @@ rtsp_serve(int fd, void **opaque, struct sockaddr_storage *peer,
static void
rtsp_close_session(session_t *rs)
{
satip_rtp_close((void *)(intptr_t)rs->nsession);
udp_close(rs->udp_rtp);
udp_close(rs->udp_rtcp);
pthread_mutex_lock(&global_lock);
rtsp_clean(rs);
pthread_mutex_unlock(&global_lock);
gtimer_disarm(&rs->timer);
}
@ -786,6 +889,7 @@ void satip_server_rtsp_init(const char *bindaddr, int port)
session_number = *(uint32_t *)rnd;
TAILQ_INIT(&rtsp_sessions);
pthread_mutex_init(&rtsp_lock, NULL);
satip_rtp_init();
}
if (rtsp_port != port && rtsp_server) {
pthread_mutex_lock(&rtsp_lock);
@ -817,5 +921,6 @@ void satip_server_rtsp_done(void)
rtsp_port = -1;
free(rtsp_ip);
rtsp_ip = NULL;
satip_rtp_done();
pthread_mutex_unlock(&global_lock);
}

View file

@ -18,23 +18,11 @@
*/
#include "tvheadend.h"
#include "input.h"
#include "htsbuf.h"
#include "htsmsg_xml.h"
#include "upnp.h"
#include "http.h"
#include "settings.h"
#include "config.h"
#include "satip/server.h"
#include <arpa/inet.h>
#include <openssl/sha.h>
#if defined(PLATFORM_FREEBSD) || ENABLE_ANDROID
#include <sys/types.h>
#include <sys/socket.h>
#endif
#define UPNP_MAX_AGE 1800
static char *http_server_ip;

View file

@ -29,6 +29,26 @@
#include "udp.h"
#include "http.h"
#define RTSP_PIDS 128
void satip_rtp_queue(void *id, th_subscription_t *subs,
streaming_queue_t *sq,
struct sockaddr_storage *peer, int port,
int fd_rtp, int fd_rtcp,
int frontend, int source,
dvb_mux_conf_t *dmc,
int16_t *pids);
void satip_rtp_update(void *id, th_subscription_t *subs,
streaming_queue_t *sq,
int frontend, int source,
dvb_mux_conf_t *dmc,
int16_t *pids);
void satip_rtp_update_pids(void *id, int16_t *pids);
void satip_rtp_close(void *id);
void satip_rtp_init(void);
void satip_rtp_done(void);
void satip_server_rtsp_init(const char *bindaddr, int port);
void satip_server_rtsp_register(void);
void satip_server_rtsp_done(void);

View file

@ -679,7 +679,7 @@ udp_multisend_init( udp_multisend_t *um, int packets, int psize,
((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iov = &um->um_iovec[i];
((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iovlen = 1;
um->um_iovec[i].iov_base = um->um_data + i * psize;
um->um_iovec[i].iov_len = psize;
um->um_iovec[i].iov_len = 0;
}
*iovec = um->um_iovec;
}
@ -696,6 +696,14 @@ udp_multisend_free( udp_multisend_t *um )
um->um_packets = 0;
}
void
udp_multisend_clean( udp_multisend_t *um )
{
int i;
for (i = 0; i < um->um_packets; i++)
um->um_iovec[i].iov_len = 0;
}
int
udp_multisend_send( udp_multisend_t *um, int fd, int packets )
{

View file

@ -89,6 +89,8 @@ void
udp_multisend_init( udp_multisend_t *um, int packets, int psize,
struct iovec **iovec );
void
udp_multisend_clean( udp_multisend_t *um );
void
udp_multisend_free( udp_multisend_t *um );
int
udp_multisend_send( udp_multisend_t *um, int fd, int packets );