SAT>IP Server: Finish service descrambling

This commit is contained in:
Jaroslav Kysela 2015-03-13 19:18:46 +01:00
parent 385c4167c0
commit 0138a37aee
8 changed files with 225 additions and 29 deletions

View file

@ -21,6 +21,7 @@
#include "caclient.h"
#include "ffdecsa/FFdecsa.h"
#include "input.h"
#include "input/mpegts/tsdemux.h"
struct caid_tab {
const char *name;
@ -375,8 +376,19 @@ descrambler_descramble ( service_t *t,
lock_assert(&t->s_stream_mutex);
if (dr == NULL)
if (dr == NULL) {
if ((tsb[3] & 0x80) == 0) {
ts_recv_packet2((mpegts_service_t *)t, tsb);
return 1;
}
return -1;
}
if (dr->dr_csa.csa_type == DESCRAMBLER_NONE && dr->dr_buf.sb_ptr == 0)
if ((tsb[3] & 0x80) == 0) {
ts_recv_packet2((mpegts_service_t *)t, tsb);
return 1;
}
count = failed = resolved = 0;
LIST_FOREACH(td, &t->s_descramblers, td_service_link) {

View file

@ -1011,7 +1011,7 @@ idnode_set_find_index
return -1;
}
void
int
idnode_set_remove
( idnode_set_t *is, idnode_t *in )
{
@ -1020,7 +1020,9 @@ idnode_set_remove
memmove(&is->is_array[i], &is->is_array[i+1],
(is->is_count - i - 1) * sizeof(idnode_t *));
is->is_count--;
return 1;
}
return 0;
}
void

View file

@ -203,7 +203,7 @@ static inline idnode_set_t * idnode_set_create(int sorted)
is->is_sorted = sorted; return is; }
void idnode_set_add
( idnode_set_t *is, idnode_t *in, idnode_filter_t *filt );
void idnode_set_remove ( idnode_set_t *is, idnode_t *in );
int idnode_set_remove ( idnode_set_t *is, idnode_t *in );
ssize_t idnode_set_find_index( idnode_set_t *is, idnode_t *in );
static inline int idnode_set_exists ( idnode_set_t *is, idnode_t *in )
{ return idnode_set_find_index(is, in) >= 0; }

View file

@ -977,6 +977,9 @@ mpegts_service_t *mpegts_service_create_raw(mpegts_mux_t *mm);
mpegts_service_t *mpegts_service_find
( mpegts_mux_t *mm, uint16_t sid, uint16_t pmt_pid, int create, int *save );
mpegts_service_t *
mpegts_service_find_by_pid ( mpegts_mux_t *mm, int pid );
static inline mpegts_service_t *mpegts_service_find_by_uuid(const char *uuid)
{ return idnode_find(uuid, &mpegts_service_class, NULL); }

View file

@ -633,6 +633,31 @@ mpegts_service_find
return s;
}
/*
* Find PID
*/
mpegts_service_t *
mpegts_service_find_by_pid ( mpegts_mux_t *mm, int pid )
{
mpegts_service_t *s;
lock_assert(&global_lock);
/* Find existing service */
LIST_FOREACH(s, &mm->mm_services, s_dvb_mux_link) {
pthread_mutex_lock(&s->s_stream_mutex);
if (pid == s->s_pmt_pid || pid == s->s_pcr_pid)
goto ok;
if (service_stream_find((service_t *)s, pid))
goto ok;
pthread_mutex_unlock(&s->s_stream_mutex);
}
return NULL;
ok:
pthread_mutex_unlock(&s->s_stream_mutex);
return s;
}
/*
* Raw MPEGTS Service
*/
@ -713,7 +738,6 @@ static int
mpegts_service_link ( mpegts_service_t *master, mpegts_service_t *slave )
{
pthread_mutex_lock(&master->s_stream_mutex);
assert(slave->s_status == SERVICE_IDLE);
LIST_INSERT_HEAD(&slave->s_masters, master, s_masters_link);
LIST_INSERT_HEAD(&master->s_slaves, slave, s_slaves_link);
pthread_mutex_unlock(&master->s_stream_mutex);
@ -724,7 +748,6 @@ static int
mpegts_service_unlink ( mpegts_service_t *master, mpegts_service_t *slave )
{
pthread_mutex_lock(&master->s_stream_mutex);
assert(slave->s_status == SERVICE_IDLE);
LIST_SAFE_REMOVE(master, s_masters_link);
LIST_SAFE_REMOVE(slave, s_slaves_link);
pthread_mutex_unlock(&master->s_stream_mutex);

View file

@ -86,32 +86,25 @@ ts_recv_packet0
ts_remux(t, tsb, error);
LIST_FOREACH(m, &t->s_masters, s_masters_link) {
pthread_mutex_lock(&t->s_stream_mutex);
if(streaming_pad_probe_type(&t->s_streaming_pad, SMT_MPEGTS))
ts_remux(t, tsb, error);
pthread_mutex_unlock(&t->s_stream_mutex);
pthread_mutex_lock(&m->s_stream_mutex);
if(streaming_pad_probe_type(&m->s_streaming_pad, SMT_MPEGTS))
ts_remux(m, tsb, error);
pthread_mutex_unlock(&m->s_stream_mutex);
}
off = tsb[3] & 0x20 ? tsb[4] + 5 : 4;
switch(st->es_type) {
if (st->es_type == SCT_CA)
return;
case SCT_CA:
break;
if(!streaming_pad_probe_type(&t->s_streaming_pad, SMT_PACKET))
return;
default:
if(!streaming_pad_probe_type(&t->s_streaming_pad, SMT_PACKET))
break;
if(st->es_type == SCT_TELETEXT)
teletext_input(t, st, tsb);
if(st->es_type == SCT_TELETEXT)
teletext_input(t, st, tsb);
if(off > 188)
break;
if(t->s_status == SERVICE_RUNNING)
parse_mpeg_ts((service_t*)t, st, tsb + off, 188 - off, pusi, error);
break;
}
if(off <= 188 && t->s_status == SERVICE_RUNNING)
parse_mpeg_ts((service_t*)t, st, tsb + off, 188 - off, pusi, error);
}
/**

View file

@ -11,6 +11,11 @@
* Complete missing LIST-ops
*/
#ifndef LIST_ENTRY_INIT
#define LIST_ENTRY_INIT(elm, field) \
(elm)->field.le_next = NULL, (elm)->field.le_prev = NULL
#endif
#ifndef LIST_FOREACH
#define LIST_FOREACH(var, head, field) \
for ((var) = ((head)->lh_first); \
@ -32,8 +37,10 @@
#ifndef LIST_SAFE_REMOVE
#define LIST_SAFE_REMOVE(elm, field) \
if ((elm)->field.le_next != NULL || (elm)->field.le_prev != NULL) \
LIST_REMOVE(elm, field)
if ((elm)->field.le_next != NULL || (elm)->field.le_prev != NULL) { \
LIST_REMOVE(elm, field); \
LIST_ENTRY_INIT(elm, field); \
}
#endif
#ifndef LIST_INSERT_BEFORE

View file

@ -29,6 +29,13 @@
#define RTP_BUFSIZE (256*1024)
#define RTCP_BUFSIZE (16*1024)
typedef struct slave_subscription {
LIST_ENTRY(slave_subscription) link;
mpegts_service_t *service;
th_subscription_t *ths;
profile_chain_t prch;
} slave_subscription_t;
typedef struct session {
TAILQ_ENTRY(session) link;
int delsys;
@ -49,6 +56,7 @@ typedef struct session {
int rtp_peer_port;
udp_connection_t *udp_rtp;
udp_connection_t *udp_rtcp;
LIST_HEAD(, slave_subscription) slaves;
} session_t;
static uint32_t session_number;
@ -110,7 +118,7 @@ rtsp_new_session(int delsys, uint32_t nsession, int session)
if (rs == NULL)
return NULL;
rs->delsys = delsys;
rs->nsession = nsession ?: session_number;
snprintf(rs->session, sizeof(rs->session), "%08X", session_number);
if (nsession) {
@ -220,13 +228,82 @@ rtsp_parse_args(http_connection_t *hc, char *u)
return stream;
}
/*
*
*/
static void
rtsp_slave_add
(session_t *rs, mpegts_service_t *master, mpegts_service_t *slave)
{
char buf[128];
slave_subscription_t *sub = calloc(1, sizeof(*sub));
pthread_mutex_lock(&master->s_stream_mutex);
if (master->s_slaves_pids == NULL)
master->s_slaves_pids = mpegts_pid_alloc();
pthread_mutex_unlock(&master->s_stream_mutex);
master->s_link(master, slave);
sub->service = slave;
profile_chain_init(&sub->prch, NULL, NULL);
sub->prch.prch_st = &sub->prch.prch_sq.sq_st;
sub->prch.prch_id = slave;
snprintf(buf, sizeof(buf), "SAT>IP Slave/%s", slave->s_nicename);
sub->ths = subscription_create_from_service(&sub->prch, NULL,
SUBSCRIPTION_NONE,
buf, 0, NULL, NULL,
buf, NULL);
if (sub->ths == NULL) {
tvherror("satips", "%i/%s/%i: unable to subscribe service %s\n",
rs->frontend, rs->session, rs->stream, slave->s_nicename);
profile_chain_close(&sub->prch);
free(sub);
master->s_unlink(master, slave);
} else {
LIST_INSERT_HEAD(&rs->slaves, sub, link);
tvhdebug("satips", "%i/%s/%i: slave service %s subscribed",
rs->frontend, rs->session, rs->stream, slave->s_nicename);
}
}
/*
*
*/
static void
rtsp_slave_remove
(session_t *rs, mpegts_service_t *master, mpegts_service_t *slave)
{
slave_subscription_t *sub;
if (master == NULL)
return;
LIST_FOREACH(sub, &rs->slaves, link)
if (sub->service == slave)
break;
if (sub == NULL)
return;
tvhdebug("satips", "%i/%s/%i: slave service %s unsubscribed",
rs->frontend, rs->session, rs->stream, slave->s_nicename);
master->s_unlink(master, slave);
if (sub->ths)
subscription_unsubscribe(sub->ths, 0);
if (sub->prch.prch_id)
profile_chain_close(&sub->prch);
LIST_REMOVE(sub, link);
free(sub);
}
/*
*
*/
static void
rtsp_clean(session_t *rs)
{
slave_subscription_t *sub;
if (rs->subs) {
while ((sub = LIST_FIRST(&rs->slaves)) != NULL)
rtsp_slave_remove(rs, (mpegts_service_t *)rs->subs->ths_service,
sub->service);
subscription_unsubscribe(rs->subs, 0);
rs->subs = NULL;
}
@ -238,6 +315,84 @@ rtsp_clean(session_t *rs)
rs->mux_created = 0;
}
/*
*
*/
static int
rtsp_validate_service(mpegts_service_t *s)
{
elementary_stream_t *st;
pthread_mutex_lock(&s->s_stream_mutex);
if (s->s_pmt_pid <= 0 || s->s_pcr_pid <= 0) {
pthread_mutex_unlock(&s->s_stream_mutex);
return 0;
}
TAILQ_FOREACH(st, &s->s_components, es_link)
if (st->es_pid > 0 &&
(SCT_ISVIDEO(st->es_type) || SCT_ISAUDIO(st->es_type)))
break;
pthread_mutex_unlock(&s->s_stream_mutex);
return st != NULL;
}
/*
*
*/
static void
rtsp_manage_descramble(session_t *rs)
{
idnode_set_t *found;
mpegts_service_t *s, *snext;
mpegts_service_t *master = (mpegts_service_t *)rs->subs->ths_service;
size_t si;
int i, used = 0;
if (rtsp_descramble <= 0)
return;
found = idnode_set_create(1);
if (rs->mux == NULL || rs->subs == NULL)
goto end;
if (rs->pids.all) {
LIST_FOREACH(s, &rs->mux->mm_services, s_dvb_mux_link)
if (rtsp_validate_service(s))
idnode_set_add(found, &s->s_id, NULL);
} else {
for (i = 0; i < rs->pids.count; i++) {
s = mpegts_service_find_by_pid((mpegts_mux_t *)rs->mux, rs->pids.pids[i]);
if (s != NULL && rtsp_validate_service(s))
if (!idnode_set_exists(found, &s->s_id))
idnode_set_add(found, &s->s_id, NULL);
}
}
/* Remove already used or no-longer required services */
for (s = LIST_FIRST(&master->s_slaves); s; s = snext) {
snext = LIST_NEXT(s, s_slaves_link);
if (idnode_set_remove(found, &s->s_id))
used++;
else if (!idnode_set_exists(found, &s->s_id))
rtsp_slave_remove(rs, master, s);
}
/* Add new ones */
for (si = 0; used < rtsp_descramble && si < found->is_count; si++, used++) {
s = (mpegts_service_t *)found->is_array[si];
rtsp_slave_add(rs, master, s);
idnode_set_remove(found, &s->s_id);
}
if (si < found->is_count)
tvhwarn("satips", "%i/%s/%i: limit for descrambled services reached (wanted %zd allowed %d)",
rs->frontend, rs->session, rs->stream,
(used - si) + found->is_count, rtsp_descramble);
end:
idnode_set_free(found);
}
/*
*
*/
@ -322,6 +477,7 @@ pids:
svc->s_update_pids(svc, &rs->pids);
rs->run = 1;
}
rtsp_manage_descramble(rs);
pthread_mutex_unlock(&global_lock);
return 0;
@ -334,7 +490,6 @@ endclean:
return res;
}
/*
*
*/
@ -957,7 +1112,8 @@ play:
}
if (setup)
tvhdebug("satips", "setup from %s:%d, RTP: %d, RTCP: %d, pids ",
tvhdebug("satips", "%i/%s/%d: setup from %s:%d, RTP: %d, RTCP: %d, pids ",
rs->frontend, rs->session, rs->stream,
addrbuf, IP_PORT(*hc->hc_peer),
rs->rtp_peer_port, rs->rtp_peer_port + 1);