From 0138a37aee1335818ac6dd91b1b03b611efc9883 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Fri, 13 Mar 2015 19:18:46 +0100 Subject: [PATCH] SAT>IP Server: Finish service descrambling --- src/descrambler/descrambler.c | 14 ++- src/idnode.c | 4 +- src/idnode.h | 2 +- src/input/mpegts.h | 3 + src/input/mpegts/mpegts_service.c | 27 ++++- src/input/mpegts/tsdemux.c | 31 +++--- src/queue.h | 11 +- src/satip/rtsp.c | 162 +++++++++++++++++++++++++++++- 8 files changed, 225 insertions(+), 29 deletions(-) diff --git a/src/descrambler/descrambler.c b/src/descrambler/descrambler.c index 850b0af1..8918fd6b 100644 --- a/src/descrambler/descrambler.c +++ b/src/descrambler/descrambler.c @@ -21,6 +21,7 @@ #include "caclient.h" #include "ffdecsa/FFdecsa.h" #include "input.h" +#include "input/mpegts/tsdemux.h" struct caid_tab { const char *name; @@ -375,8 +376,19 @@ descrambler_descramble ( service_t *t, lock_assert(&t->s_stream_mutex); - if (dr == NULL) + if (dr == NULL) { + if ((tsb[3] & 0x80) == 0) { + ts_recv_packet2((mpegts_service_t *)t, tsb); + return 1; + } return -1; + } + + if (dr->dr_csa.csa_type == DESCRAMBLER_NONE && dr->dr_buf.sb_ptr == 0) + if ((tsb[3] & 0x80) == 0) { + ts_recv_packet2((mpegts_service_t *)t, tsb); + return 1; + } count = failed = resolved = 0; LIST_FOREACH(td, &t->s_descramblers, td_service_link) { diff --git a/src/idnode.c b/src/idnode.c index f417be4f..fc91ff67 100644 --- a/src/idnode.c +++ b/src/idnode.c @@ -1011,7 +1011,7 @@ idnode_set_find_index return -1; } -void +int idnode_set_remove ( idnode_set_t *is, idnode_t *in ) { @@ -1020,7 +1020,9 @@ idnode_set_remove memmove(&is->is_array[i], &is->is_array[i+1], (is->is_count - i - 1) * sizeof(idnode_t *)); is->is_count--; + return 1; } + return 0; } void diff --git a/src/idnode.h b/src/idnode.h index e59366fa..9cc8935b 100644 --- a/src/idnode.h +++ b/src/idnode.h @@ -203,7 +203,7 @@ static inline idnode_set_t * idnode_set_create(int sorted) is->is_sorted = sorted; return is; } void idnode_set_add ( idnode_set_t *is, idnode_t *in, idnode_filter_t *filt ); -void idnode_set_remove ( idnode_set_t *is, idnode_t *in ); +int idnode_set_remove ( idnode_set_t *is, idnode_t *in ); ssize_t idnode_set_find_index( idnode_set_t *is, idnode_t *in ); static inline int idnode_set_exists ( idnode_set_t *is, idnode_t *in ) { return idnode_set_find_index(is, in) >= 0; } diff --git a/src/input/mpegts.h b/src/input/mpegts.h index 78c16cf2..8246d435 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -977,6 +977,9 @@ mpegts_service_t *mpegts_service_create_raw(mpegts_mux_t *mm); mpegts_service_t *mpegts_service_find ( mpegts_mux_t *mm, uint16_t sid, uint16_t pmt_pid, int create, int *save ); +mpegts_service_t * +mpegts_service_find_by_pid ( mpegts_mux_t *mm, int pid ); + static inline mpegts_service_t *mpegts_service_find_by_uuid(const char *uuid) { return idnode_find(uuid, &mpegts_service_class, NULL); } diff --git a/src/input/mpegts/mpegts_service.c b/src/input/mpegts/mpegts_service.c index f003d3f7..29dd0dd9 100644 --- a/src/input/mpegts/mpegts_service.c +++ b/src/input/mpegts/mpegts_service.c @@ -633,6 +633,31 @@ mpegts_service_find return s; } +/* + * Find PID + */ +mpegts_service_t * +mpegts_service_find_by_pid ( mpegts_mux_t *mm, int pid ) +{ + mpegts_service_t *s; + + lock_assert(&global_lock); + + /* Find existing service */ + LIST_FOREACH(s, &mm->mm_services, s_dvb_mux_link) { + pthread_mutex_lock(&s->s_stream_mutex); + if (pid == s->s_pmt_pid || pid == s->s_pcr_pid) + goto ok; + if (service_stream_find((service_t *)s, pid)) + goto ok; + pthread_mutex_unlock(&s->s_stream_mutex); + } + return NULL; +ok: + pthread_mutex_unlock(&s->s_stream_mutex); + return s; +} + /* * Raw MPEGTS Service */ @@ -713,7 +738,6 @@ static int mpegts_service_link ( mpegts_service_t *master, mpegts_service_t *slave ) { pthread_mutex_lock(&master->s_stream_mutex); - assert(slave->s_status == SERVICE_IDLE); LIST_INSERT_HEAD(&slave->s_masters, master, s_masters_link); LIST_INSERT_HEAD(&master->s_slaves, slave, s_slaves_link); pthread_mutex_unlock(&master->s_stream_mutex); @@ -724,7 +748,6 @@ static int mpegts_service_unlink ( mpegts_service_t *master, mpegts_service_t *slave ) { pthread_mutex_lock(&master->s_stream_mutex); - assert(slave->s_status == SERVICE_IDLE); LIST_SAFE_REMOVE(master, s_masters_link); LIST_SAFE_REMOVE(slave, s_slaves_link); pthread_mutex_unlock(&master->s_stream_mutex); diff --git a/src/input/mpegts/tsdemux.c b/src/input/mpegts/tsdemux.c index 21511bd2..2db7720f 100644 --- a/src/input/mpegts/tsdemux.c +++ b/src/input/mpegts/tsdemux.c @@ -86,32 +86,25 @@ ts_recv_packet0 ts_remux(t, tsb, error); LIST_FOREACH(m, &t->s_masters, s_masters_link) { - pthread_mutex_lock(&t->s_stream_mutex); - if(streaming_pad_probe_type(&t->s_streaming_pad, SMT_MPEGTS)) - ts_remux(t, tsb, error); - pthread_mutex_unlock(&t->s_stream_mutex); + pthread_mutex_lock(&m->s_stream_mutex); + if(streaming_pad_probe_type(&m->s_streaming_pad, SMT_MPEGTS)) + ts_remux(m, tsb, error); + pthread_mutex_unlock(&m->s_stream_mutex); } off = tsb[3] & 0x20 ? tsb[4] + 5 : 4; - switch(st->es_type) { + if (st->es_type == SCT_CA) + return; - case SCT_CA: - break; + if(!streaming_pad_probe_type(&t->s_streaming_pad, SMT_PACKET)) + return; - default: - if(!streaming_pad_probe_type(&t->s_streaming_pad, SMT_PACKET)) - break; + if(st->es_type == SCT_TELETEXT) + teletext_input(t, st, tsb); - if(st->es_type == SCT_TELETEXT) - teletext_input(t, st, tsb); - if(off > 188) - break; - - if(t->s_status == SERVICE_RUNNING) - parse_mpeg_ts((service_t*)t, st, tsb + off, 188 - off, pusi, error); - break; - } + if(off <= 188 && t->s_status == SERVICE_RUNNING) + parse_mpeg_ts((service_t*)t, st, tsb + off, 188 - off, pusi, error); } /** diff --git a/src/queue.h b/src/queue.h index 966836b7..5a918f2c 100644 --- a/src/queue.h +++ b/src/queue.h @@ -11,6 +11,11 @@ * Complete missing LIST-ops */ +#ifndef LIST_ENTRY_INIT +#define LIST_ENTRY_INIT(elm, field) \ + (elm)->field.le_next = NULL, (elm)->field.le_prev = NULL +#endif + #ifndef LIST_FOREACH #define LIST_FOREACH(var, head, field) \ for ((var) = ((head)->lh_first); \ @@ -32,8 +37,10 @@ #ifndef LIST_SAFE_REMOVE #define LIST_SAFE_REMOVE(elm, field) \ - if ((elm)->field.le_next != NULL || (elm)->field.le_prev != NULL) \ - LIST_REMOVE(elm, field) + if ((elm)->field.le_next != NULL || (elm)->field.le_prev != NULL) { \ + LIST_REMOVE(elm, field); \ + LIST_ENTRY_INIT(elm, field); \ + } #endif #ifndef LIST_INSERT_BEFORE diff --git a/src/satip/rtsp.c b/src/satip/rtsp.c index 05c4bdb8..390b0773 100644 --- a/src/satip/rtsp.c +++ b/src/satip/rtsp.c @@ -29,6 +29,13 @@ #define RTP_BUFSIZE (256*1024) #define RTCP_BUFSIZE (16*1024) +typedef struct slave_subscription { + LIST_ENTRY(slave_subscription) link; + mpegts_service_t *service; + th_subscription_t *ths; + profile_chain_t prch; +} slave_subscription_t; + typedef struct session { TAILQ_ENTRY(session) link; int delsys; @@ -49,6 +56,7 @@ typedef struct session { int rtp_peer_port; udp_connection_t *udp_rtp; udp_connection_t *udp_rtcp; + LIST_HEAD(, slave_subscription) slaves; } session_t; static uint32_t session_number; @@ -110,7 +118,7 @@ rtsp_new_session(int delsys, uint32_t nsession, int session) if (rs == NULL) return NULL; - rs->delsys = delsys; + rs->nsession = nsession ?: session_number; snprintf(rs->session, sizeof(rs->session), "%08X", session_number); if (nsession) { @@ -220,13 +228,82 @@ rtsp_parse_args(http_connection_t *hc, char *u) return stream; } +/* + * + */ +static void +rtsp_slave_add + (session_t *rs, mpegts_service_t *master, mpegts_service_t *slave) +{ + char buf[128]; + slave_subscription_t *sub = calloc(1, sizeof(*sub)); + + pthread_mutex_lock(&master->s_stream_mutex); + if (master->s_slaves_pids == NULL) + master->s_slaves_pids = mpegts_pid_alloc(); + pthread_mutex_unlock(&master->s_stream_mutex); + master->s_link(master, slave); + sub->service = slave; + profile_chain_init(&sub->prch, NULL, NULL); + sub->prch.prch_st = &sub->prch.prch_sq.sq_st; + sub->prch.prch_id = slave; + snprintf(buf, sizeof(buf), "SAT>IP Slave/%s", slave->s_nicename); + sub->ths = subscription_create_from_service(&sub->prch, NULL, + SUBSCRIPTION_NONE, + buf, 0, NULL, NULL, + buf, NULL); + if (sub->ths == NULL) { + tvherror("satips", "%i/%s/%i: unable to subscribe service %s\n", + rs->frontend, rs->session, rs->stream, slave->s_nicename); + profile_chain_close(&sub->prch); + free(sub); + master->s_unlink(master, slave); + } else { + LIST_INSERT_HEAD(&rs->slaves, sub, link); + tvhdebug("satips", "%i/%s/%i: slave service %s subscribed", + rs->frontend, rs->session, rs->stream, slave->s_nicename); + } +} + +/* + * + */ +static void +rtsp_slave_remove + (session_t *rs, mpegts_service_t *master, mpegts_service_t *slave) +{ + slave_subscription_t *sub; + + if (master == NULL) + return; + LIST_FOREACH(sub, &rs->slaves, link) + if (sub->service == slave) + break; + if (sub == NULL) + return; + tvhdebug("satips", "%i/%s/%i: slave service %s unsubscribed", + rs->frontend, rs->session, rs->stream, slave->s_nicename); + master->s_unlink(master, slave); + if (sub->ths) + subscription_unsubscribe(sub->ths, 0); + if (sub->prch.prch_id) + profile_chain_close(&sub->prch); + LIST_REMOVE(sub, link); + free(sub); +} + /* * */ static void rtsp_clean(session_t *rs) { + slave_subscription_t *sub; + if (rs->subs) { + while ((sub = LIST_FIRST(&rs->slaves)) != NULL) + rtsp_slave_remove(rs, (mpegts_service_t *)rs->subs->ths_service, + sub->service); subscription_unsubscribe(rs->subs, 0); rs->subs = NULL; } @@ -238,6 +315,84 @@ rtsp_clean(session_t *rs) rs->mux_created = 0; } +/* + * + */ +static int +rtsp_validate_service(mpegts_service_t *s) +{ + elementary_stream_t *st; + + pthread_mutex_lock(&s->s_stream_mutex); + if (s->s_pmt_pid <= 0 || s->s_pcr_pid <= 0) { + pthread_mutex_unlock(&s->s_stream_mutex); + return 0; + } + TAILQ_FOREACH(st, &s->s_components, es_link) + if (st->es_pid > 0 && + (SCT_ISVIDEO(st->es_type) || SCT_ISAUDIO(st->es_type))) + break; + pthread_mutex_unlock(&s->s_stream_mutex); + return st != NULL; +} + +/* + * + */ +static void +rtsp_manage_descramble(session_t *rs) +{ + idnode_set_t *found; + mpegts_service_t *s, *snext; + mpegts_service_t *master = (mpegts_service_t *)rs->subs->ths_service; + size_t si; + int i, used = 0; + + if (rtsp_descramble <= 0) + return; + + found = idnode_set_create(1); + + if (rs->mux == NULL || rs->subs == NULL) + goto end; + + if (rs->pids.all) { + LIST_FOREACH(s, &rs->mux->mm_services, s_dvb_mux_link) + if (rtsp_validate_service(s)) + idnode_set_add(found, &s->s_id, NULL); + } else { + for (i = 0; i < rs->pids.count; i++) { + s = mpegts_service_find_by_pid((mpegts_mux_t *)rs->mux, rs->pids.pids[i]); + if (s != NULL && rtsp_validate_service(s)) + if (!idnode_set_exists(found, &s->s_id)) + idnode_set_add(found, &s->s_id, NULL); + } + } + + /* Remove already used or no-longer required services */ + for (s = LIST_FIRST(&master->s_slaves); s; s = snext) { + snext = LIST_NEXT(s, s_slaves_link); + if (idnode_set_remove(found, &s->s_id)) + used++; + else if (!idnode_set_exists(found, &s->s_id)) + rtsp_slave_remove(rs, master, s); + } + + /* Add new ones */ + for (si = 0; used < rtsp_descramble && si < found->is_count; si++, used++) { + s = (mpegts_service_t *)found->is_array[si]; + rtsp_slave_add(rs, master, s); + idnode_set_remove(found, &s->s_id); + } + if (si < found->is_count) + tvhwarn("satips", "%i/%s/%i: limit for descrambled services reached (wanted %zd allowed %d)", + rs->frontend, rs->session, rs->stream, + (used - si) + found->is_count, rtsp_descramble); + +end: + idnode_set_free(found); +} + /* * */ @@ -322,6 +477,7 @@ pids: svc->s_update_pids(svc, &rs->pids); rs->run = 1; } + rtsp_manage_descramble(rs); pthread_mutex_unlock(&global_lock); return 0; @@ -334,7 +490,6 @@ endclean: return res; } - /* * */ @@ -957,7 +1112,8 @@ play: } if (setup) - tvhdebug("satips", "setup from %s:%d, RTP: %d, RTCP: %d, pids ", + tvhdebug("satips", "%i/%s/%d: setup from %s:%d, RTP: %d, RTCP: %d, pids ", + rs->frontend, rs->session, rs->stream, addrbuf, IP_PORT(*hc->hc_peer), rs->rtp_peer_port, rs->rtp_peer_port + 1);