mpegts: starting to get table input working for tsfile
I have realised that for iptv style setups the current arrangement will be problematic. The main issue is having the table filter and top level processing based on the mpegts_input. Since for IPTV its most likely that we'll only have one mpegts_input with a bunch of different muxes currently active.
This commit is contained in:
parent
1abb1f7736
commit
4b73a651cf
10 changed files with 360 additions and 45 deletions
1
Makefile
1
Makefile
|
@ -152,6 +152,7 @@ SRCS-$(CONFIG_MPEGTS) += \
|
|||
src/input/mpegts/mpegts_network.c \
|
||||
src/input/mpegts/mpegts_mux.c \
|
||||
src/input/mpegts/mpegts_service.c \
|
||||
src/input/mpegts/mpegts_table.c \
|
||||
src/input/mpegts/psi.c \
|
||||
src/input/mpegts/tsdemux.c \
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#define __TVH_MPEGTS_H__
|
||||
|
||||
#include "service.h"
|
||||
#include "src/input/mpegts/psi.h"
|
||||
|
||||
#define MM_ONID_NONE 0xFFFF
|
||||
#define MM_TSID_NONE 0xFFFF
|
||||
|
@ -32,15 +33,20 @@ typedef struct mpegts_mux mpegts_mux_t;
|
|||
typedef struct mpegts_service mpegts_service_t;
|
||||
typedef struct mpegts_mux_instance mpegts_mux_instance_t;
|
||||
typedef struct mpegts_input mpegts_input_t;
|
||||
typedef struct mpegts_table_feed mpegts_table_feed_t;
|
||||
|
||||
/* Lists */
|
||||
typedef TAILQ_HEAD(mpegts_mux_queue,mpegts_mux) mpegts_mux_queue_t;
|
||||
typedef LIST_HEAD (mpegts_mux_list,mpegts_mux) mpegts_mux_list_t;
|
||||
TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed);
|
||||
|
||||
/* **************************************************************************
|
||||
* SI processing
|
||||
* *************************************************************************/
|
||||
|
||||
typedef int (*mpegts_table_callback)
|
||||
( mpegts_table_t*, const uint8_t *buf, int len, int tableid );
|
||||
|
||||
struct mpegts_table
|
||||
{
|
||||
/**
|
||||
|
@ -49,6 +55,10 @@ struct mpegts_table
|
|||
*/
|
||||
int mt_flags;
|
||||
|
||||
#define MT_CRC 0x1
|
||||
#define MT_FULL 0x2
|
||||
#define MT_QUICKREQ 0x4
|
||||
|
||||
/**
|
||||
* Cycle queue
|
||||
* Tables that did not get a fd or filter in hardware will end up here
|
||||
|
@ -63,13 +73,12 @@ struct mpegts_table
|
|||
int mt_fd;
|
||||
|
||||
LIST_ENTRY(mpegts_table) mt_link;
|
||||
mpegts_mux_t *mt_mux;
|
||||
mpegts_mux_instance_t *mt_mux;
|
||||
|
||||
char *mt_name;
|
||||
|
||||
void *mt_opaque;
|
||||
int (*mt_callback)(mpegts_mux_t *m, uint8_t *buf, int len,
|
||||
uint8_t tableid, void *opaque);
|
||||
mpegts_table_callback mt_callback;
|
||||
|
||||
|
||||
// TODO: remind myself of what each field is for
|
||||
|
@ -84,10 +93,22 @@ struct mpegts_table
|
|||
int mt_destroyed; // Refcounting
|
||||
int mt_refcount;
|
||||
|
||||
//psi_section_t mt_sect; // Manual reassembly
|
||||
psi_section_t mt_sect; // Manual reassembly
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* When in raw mode we need to enqueue raw TS packet
|
||||
* to a different thread because we need to hold
|
||||
* global_lock when doing delivery of the tables
|
||||
*/
|
||||
|
||||
struct mpegts_table_feed {
|
||||
TAILQ_ENTRY(mpegts_table_feed) mtf_link;
|
||||
uint8_t mtf_tsb[188];
|
||||
};
|
||||
|
||||
|
||||
/* **************************************************************************
|
||||
* Logical network
|
||||
* *************************************************************************/
|
||||
|
@ -163,20 +184,21 @@ struct mpegts_mux
|
|||
MM_SCAN_CURRENT, // Currently tuned for initial scan
|
||||
} mm_initial_scan_status;
|
||||
|
||||
/*
|
||||
* Input processing
|
||||
*/
|
||||
|
||||
int mm_num_tables;
|
||||
LIST_HEAD(, mpegts_table) mm_tables;
|
||||
TAILQ_HEAD(, mpegts_table) mm_table_queue;
|
||||
// TODO: remind myself of what the queue/list's are for
|
||||
|
||||
/*
|
||||
* Physical instances
|
||||
*/
|
||||
|
||||
LIST_HEAD(, mpegts_mux_instance) mm_instances;
|
||||
mpegts_mux_instance_t *mm_active;
|
||||
|
||||
/*
|
||||
* Table processing
|
||||
*/
|
||||
|
||||
int mm_num_tables;
|
||||
LIST_HEAD(, mpegts_table) mm_tables;
|
||||
TAILQ_HEAD(, mpegts_table) mm_table_queue;
|
||||
uint8_t mm_table_filter;
|
||||
|
||||
/*
|
||||
* Functions
|
||||
|
@ -321,15 +343,36 @@ struct mpegts_input
|
|||
|
||||
LIST_ENTRY(mpegts_input) mi_global_link;
|
||||
|
||||
LIST_HEAD(,service) mi_transports;
|
||||
|
||||
pthread_mutex_t mi_delivery_mutex;
|
||||
|
||||
mpegts_network_t *mi_network; // TODO: this may need altering for DVB-S
|
||||
mpegts_mux_instance_t *mi_mux_current;
|
||||
|
||||
/*
|
||||
* Input processing
|
||||
*/
|
||||
|
||||
pthread_mutex_t mi_delivery_mutex;
|
||||
|
||||
LIST_HEAD(,service) mi_transports;
|
||||
|
||||
|
||||
int mi_bytes;
|
||||
|
||||
// Full mux streaming, protected via the delivery mutex
|
||||
|
||||
streaming_pad_t mi_streaming_pad;
|
||||
|
||||
|
||||
struct mpegts_table_feed_queue mi_table_feed;
|
||||
pthread_cond_t mi_table_feed_cond; // Bound to mi_delivery_mutex
|
||||
|
||||
|
||||
pthread_t mi_thread_id;
|
||||
th_pipe_t mi_thread_pipe;
|
||||
|
||||
/*
|
||||
* Functions
|
||||
*/
|
||||
|
||||
int (*mi_start_mux) (mpegts_input_t*,mpegts_mux_instance_t*);
|
||||
void (*mi_stop_mux) (mpegts_input_t*);
|
||||
|
@ -362,10 +405,21 @@ mpegts_mux_instance_t *mpegts_mux_instance_create0
|
|||
|
||||
void mpegts_mux_initial_scan_done ( mpegts_mux_t *mm );
|
||||
|
||||
void mpegts_input_recv_packets
|
||||
(mpegts_input_t *mi, const uint8_t *tsb, size_t len,
|
||||
size_t mpegts_input_recv_packets
|
||||
(mpegts_input_t *mi, uint8_t *tsb, size_t len,
|
||||
int64_t *pcr, uint16_t *pcr_pid);
|
||||
|
||||
void mpegts_table_dispatch
|
||||
(mpegts_table_t *mt, const uint8_t *sec, int r);
|
||||
void mpegts_table_release
|
||||
(mpegts_table_t *mt);
|
||||
void mpegts_table_add
|
||||
(mpegts_mux_t *mm, int tableid, int mask,
|
||||
mpegts_table_callback callback, void *opaque,
|
||||
const char *name, int flags, int pid);
|
||||
void mpegts_table_flush_all
|
||||
(mpegts_mux_t *mm);
|
||||
|
||||
/******************************************************************************
|
||||
* Editor Configuration
|
||||
*
|
||||
|
|
|
@ -17,6 +17,10 @@
|
|||
*/
|
||||
|
||||
#include "input/mpegts.h"
|
||||
#include "tsdemux.h"
|
||||
#include "packet.h"
|
||||
#include "streaming.h"
|
||||
#include "atomic.h"
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
|
@ -28,11 +32,95 @@ const idclass_t mpegts_input_class =
|
|||
}
|
||||
};
|
||||
|
||||
void
|
||||
size_t
|
||||
mpegts_input_recv_packets
|
||||
( mpegts_input_t *mi, const uint8_t *tsb, size_t len,
|
||||
( mpegts_input_t *mi, uint8_t *tsb, size_t l,
|
||||
int64_t *pcr, uint16_t *pcr_pid )
|
||||
{
|
||||
int len = l; // TODO: fix ts_resync() to remove this
|
||||
int i = 0, table_wakeup = 0;
|
||||
tvhtrace("mpegts", "recv_packets tsb=%p, len=%d, pcr=%p, pcr_pid=%p",
|
||||
tsb, (int)len, pcr, pcr_pid);
|
||||
|
||||
/* Not enough data */
|
||||
if (len < 188) return 0;
|
||||
|
||||
/* Streaming - lock mutex */
|
||||
pthread_mutex_lock(&mi->mi_delivery_mutex);
|
||||
|
||||
/* Raw stream */
|
||||
if (LIST_FIRST(&mi->mi_streaming_pad.sp_targets) != NULL) {
|
||||
streaming_message_t sm;
|
||||
pktbuf_t *pb = pktbuf_alloc(tsb, len);
|
||||
memset(&sm, 0, sizeof(sm));
|
||||
sm.sm_type = SMT_MPEGTS;
|
||||
sm.sm_data = pb;
|
||||
streaming_pad_deliver(&mi->mi_streaming_pad, &sm);
|
||||
pktbuf_ref_dec(pb);
|
||||
}
|
||||
|
||||
/* Process */
|
||||
while ( len >= 188 ) {
|
||||
//printf("tsb[%d] = %02X\n", i, tsb[i]);
|
||||
|
||||
/* Sync */
|
||||
if ( tsb[i] == 0x47 ) {
|
||||
int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2];
|
||||
|
||||
/* SI data */
|
||||
if (mi->mi_table_filter[pid]) {
|
||||
printf("pid = %04X\n", pid);
|
||||
if (!(tsb[i+1] & 0x80)) {
|
||||
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
|
||||
memcpy(mtf->mtf_tsb, tsb+i, 188);
|
||||
TAILQ_INSERT_TAIL(&mi->mi_table_feed, mtf, mtf_link);
|
||||
table_wakeup = 1;
|
||||
}
|
||||
|
||||
/* Other */
|
||||
} else {
|
||||
service_t *s;
|
||||
int64_t tpcr = PTS_UNSET;
|
||||
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
|
||||
ts_recv_packet1((mpegts_service_t*)s, tsb+i, NULL);
|
||||
}
|
||||
if (tpcr != PTS_UNSET) {
|
||||
// printf("pcr = %p, %"PRId64" pcr_pid == %p, %d, pid = %d\n", pcr, tpcr, pcr_pid, *pcr_pid, pid);
|
||||
if (pcr && pcr_pid){// && (*pcr_pid == 0 || *pcr_pid == pid)) {
|
||||
printf("SET\n");
|
||||
*pcr_pid = pid;
|
||||
*pcr = tpcr;
|
||||
printf("pcr set to %"PRId64"\n", tpcr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
i += 188;
|
||||
len -= 188;
|
||||
|
||||
/* Re-sync */
|
||||
} else {
|
||||
// TODO: set flag (to avoid spam)
|
||||
tvhlog(LOG_DEBUG, "mpegts", "%s ts sync lost", "TODO");
|
||||
if (ts_resync(tsb, &len, &i)) break;
|
||||
tvhlog(LOG_DEBUG, "mpegts", "%s ts sync found", "TODO");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/* Wake table */
|
||||
if (table_wakeup)
|
||||
pthread_cond_signal(&mi->mi_table_feed_cond);
|
||||
|
||||
pthread_mutex_unlock(&mi->mi_delivery_mutex);
|
||||
|
||||
/* Bandwidth monitoring */
|
||||
atomic_add(&mi->mi_bytes, i);
|
||||
|
||||
/* Reset buffer */
|
||||
if (len) memmove(tsb, tsb+i, len);
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
mpegts_input_t*
|
||||
|
@ -42,6 +130,10 @@ mpegts_input_create0 ( const char *uuid )
|
|||
|
||||
/* Init mutex */
|
||||
pthread_mutex_init(&mi->mi_delivery_mutex, NULL);
|
||||
|
||||
/* Table input */
|
||||
TAILQ_INIT(&mi->mi_table_feed);
|
||||
pthread_cond_init(&mi->mi_table_feed_cond, NULL);
|
||||
|
||||
/* Init input thread control */
|
||||
mi->mi_thread_pipe.rd = mi->mi_thread_pipe.wr = -1;
|
||||
|
|
|
@ -110,9 +110,8 @@ mpegts_mux_start ( mpegts_mux_t *mm, const char *reason, int weight )
|
|||
printf("mpegts_mux_start(%p, %s, %d)\n", mm, reason, weight);
|
||||
|
||||
/* Already tuned */
|
||||
LIST_FOREACH(mmi, &mm->mm_instances, mmi_mux_link)
|
||||
if (mmi->mmi_input->mi_mux_current == mmi)
|
||||
return 0;
|
||||
if (mm->mm_active)
|
||||
return 0;
|
||||
printf("not already tuned\n");
|
||||
|
||||
/* Find */
|
||||
|
|
|
@ -30,6 +30,66 @@
|
|||
#include "parsers/parser_teletext.h" // TODO: only for PID
|
||||
#include "lang_codes.h"
|
||||
|
||||
int
|
||||
psi_pat_callback
|
||||
(mpegts_table_t *mt, const uint8_t *ptr, int len, int tableid)
|
||||
{
|
||||
uint16_t sid, pid, tsid;
|
||||
//mpegts_mux_instance_t *mmi = mt->mt_mux;
|
||||
//mpegts_mux_t *mm = mmi->mmi_mux;
|
||||
|
||||
/* Not enough data */
|
||||
if(len < 5)
|
||||
return -1;
|
||||
|
||||
/* Ignore next */
|
||||
if((ptr[2] & 1) == 0)
|
||||
return -1;
|
||||
|
||||
/* Multiplex */
|
||||
tsid = (ptr[0] << 8) | ptr[1];
|
||||
printf("tsid = %04X\n", tsid);
|
||||
#if 0 // TODO: process this
|
||||
mpegts_mux_set_tsid(mm, tsid, 0);
|
||||
if (mm->mm_tsid != tsid)
|
||||
return -1;
|
||||
#endif
|
||||
|
||||
/* Process each programme */
|
||||
ptr += 5;
|
||||
len -= 5;
|
||||
while(len >= 4) {
|
||||
sid = ptr[0] << 8 | ptr[1];
|
||||
pid = (ptr[2] & 0x1f) << 8 | ptr[3];
|
||||
|
||||
/* NIT PID */
|
||||
if (sid == 0) {
|
||||
printf("NIT on pid %04X\n", pid);
|
||||
#if 0
|
||||
if (pid != 0x10 && pid != 0x00)
|
||||
mpegts_table_add(mm, 0, 0, psi_nit_callback, NULL, "nit",
|
||||
TDT_CRC | TDT_QUICKREQ, pid)
|
||||
#endif
|
||||
|
||||
/* Service */
|
||||
} else if (pid) {
|
||||
printf("SID %04X on pid %04X\n", sid, pid);
|
||||
#if 0
|
||||
int save = 0;
|
||||
mpegts_service_find(mm, sid, pid, NULL, &save);
|
||||
// TODO: option to disable PMT monitor
|
||||
if (save)
|
||||
psi_table_add_pmt(mm, pid);
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Next */
|
||||
ptr += 4;
|
||||
len -= 4;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
psi_section_reassemble0(psi_section_t *ps, const uint8_t *data,
|
||||
int len, int start, int crc,
|
||||
|
|
|
@ -21,10 +21,13 @@
|
|||
|
||||
#include "htsmsg.h"
|
||||
#include "streaming.h"
|
||||
#include "input/mpegts.h"
|
||||
|
||||
#define PSI_SECTION_SIZE 5000
|
||||
|
||||
// TODO: tidy this up
|
||||
struct mpegts_service;
|
||||
struct mpegts_table;
|
||||
|
||||
typedef void (section_handler_t)(const uint8_t *data, size_t len, void *opaque);
|
||||
|
||||
typedef struct psi_section {
|
||||
|
@ -46,4 +49,7 @@ const char *psi_caid2name(uint16_t caid);
|
|||
void psi_load_service_settings(htsmsg_t *m, struct mpegts_service *t);
|
||||
void psi_save_service_settings(htsmsg_t *m, struct mpegts_service *t);
|
||||
|
||||
int psi_pat_callback
|
||||
(struct mpegts_table *mt, const uint8_t *ptr, int len, int tableid);
|
||||
|
||||
#endif /* PSI_H_ */
|
||||
|
|
|
@ -138,20 +138,10 @@ ts_recv_packet0(mpegts_service_t *t, elementary_stream_t *st, const uint8_t *tsb
|
|||
* than the stream PCR
|
||||
*/
|
||||
static void
|
||||
ts_extract_pcr(mpegts_service_t *t, elementary_stream_t *st, const uint8_t *tsb,
|
||||
int64_t *pcrp)
|
||||
ts_process_pcr(mpegts_service_t *t, elementary_stream_t *st, int64_t pcr)
|
||||
{
|
||||
int64_t real, pcr, d;
|
||||
|
||||
pcr = (uint64_t)tsb[6] << 25;
|
||||
pcr |= (uint64_t)tsb[7] << 17;
|
||||
pcr |= (uint64_t)tsb[8] << 9;
|
||||
pcr |= (uint64_t)tsb[9] << 1;
|
||||
pcr |= ((uint64_t)tsb[10] >> 7) & 0x01;
|
||||
int64_t real, d;
|
||||
|
||||
if(pcrp != NULL)
|
||||
*pcrp = pcr;
|
||||
|
||||
if(st == NULL)
|
||||
return;
|
||||
|
||||
|
@ -192,7 +182,31 @@ ts_recv_packet1(mpegts_service_t *t, const uint8_t *tsb, int64_t *pcrp)
|
|||
int pid, n, m, r;
|
||||
th_descrambler_t *td;
|
||||
int error = 0;
|
||||
int64_t pcr = PTS_UNSET;
|
||||
|
||||
/* Error */
|
||||
if (tsb[1] & 0x80)
|
||||
error = 1;
|
||||
|
||||
#if 0
|
||||
printf("%02X %02X %02X %02X %02X %02X\n",
|
||||
tsb[0], tsb[1], tsb[2], tsb[3], tsb[4], tsb[5]);
|
||||
#endif
|
||||
|
||||
/* Extract PCR (do this early for tsfile) */
|
||||
if(tsb[3] & 0x20 && tsb[4] > 0 && tsb[5] & 0x10 && !error) {
|
||||
pcr = (uint64_t)tsb[6] << 25;
|
||||
pcr |= (uint64_t)tsb[7] << 17;
|
||||
pcr |= (uint64_t)tsb[8] << 9;
|
||||
pcr |= (uint64_t)tsb[9] << 1;
|
||||
pcr |= ((uint64_t)tsb[10] >> 7) & 0x01;
|
||||
if (*pcrp) *pcrp = pcr;
|
||||
}
|
||||
|
||||
/* Nothing - special case for tsfile to get PCR */
|
||||
if (!t) return;
|
||||
|
||||
/* Service inactive - ignore */
|
||||
if(t->s_status != SERVICE_RUNNING)
|
||||
return;
|
||||
|
||||
|
@ -200,11 +214,10 @@ ts_recv_packet1(mpegts_service_t *t, const uint8_t *tsb, int64_t *pcrp)
|
|||
|
||||
service_set_streaming_status_flags((service_t*)t, TSS_INPUT_HARDWARE);
|
||||
|
||||
if(tsb[1] & 0x80) {
|
||||
if(error) {
|
||||
/* Transport Error Indicator */
|
||||
limitedlog(&t->s_loglimit_tei, "TS", service_nicename((service_t*)t),
|
||||
"Transport error indicator");
|
||||
error = 1;
|
||||
}
|
||||
|
||||
pid = (tsb[1] & 0x1f) << 8 | tsb[2];
|
||||
|
@ -212,8 +225,8 @@ ts_recv_packet1(mpegts_service_t *t, const uint8_t *tsb, int64_t *pcrp)
|
|||
st = service_stream_find((service_t*)t, pid);
|
||||
|
||||
/* Extract PCR */
|
||||
if(tsb[3] & 0x20 && tsb[4] > 0 && tsb[5] & 0x10 && !error)
|
||||
ts_extract_pcr(t, st, tsb, pcrp);
|
||||
if (pcr != PTS_UNSET)
|
||||
ts_process_pcr(t, st, pcr);
|
||||
|
||||
if(st == NULL) {
|
||||
pthread_mutex_unlock(&t->s_stream_mutex);
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
#include "tvheadend.h"
|
||||
#include "tsfile_private.h"
|
||||
#include "input.h"
|
||||
#include "input/mpegts/psi.h"
|
||||
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/types.h>
|
||||
|
@ -31,7 +33,7 @@
|
|||
static void *
|
||||
tsfile_input_thread ( void *aux )
|
||||
{
|
||||
int fd = -1, efd, nfds;
|
||||
int pos = 0, fd = -1, efd, nfds;
|
||||
size_t len, rem;
|
||||
ssize_t c;
|
||||
struct epoll_event ev;
|
||||
|
@ -43,7 +45,10 @@ tsfile_input_thread ( void *aux )
|
|||
tsfile_mux_instance_t *mmi;
|
||||
|
||||
/* Open file */
|
||||
printf("waiting for lock..\n");
|
||||
pthread_mutex_lock(&global_lock);
|
||||
printf("got lock\n");
|
||||
printf("cur mux = %p\n", mi->mi_mux_current);
|
||||
if (mi->mi_mux_current) {
|
||||
mmi = (tsfile_mux_instance_t*)mi->mi_mux_current;
|
||||
fd = tvh_open(mmi->mmi_tsfile_path, O_RDONLY | O_NONBLOCK, 0);
|
||||
|
@ -54,6 +59,7 @@ tsfile_input_thread ( void *aux )
|
|||
}
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
if (fd == -1) return NULL;
|
||||
printf("file opened = %d\n", fd);
|
||||
|
||||
/* Polling */
|
||||
memset(&ev, 0, sizeof(ev));
|
||||
|
@ -72,6 +78,7 @@ tsfile_input_thread ( void *aux )
|
|||
/* Check for extra (incomplete) packet at end */
|
||||
rem = st.st_size % 188;
|
||||
len = 0;
|
||||
printf("file size = %lu, rem = %lu\n", st.st_size, rem);
|
||||
|
||||
/* Process input */
|
||||
while (1) {
|
||||
|
@ -81,7 +88,7 @@ tsfile_input_thread ( void *aux )
|
|||
if (nfds == 1) break;
|
||||
|
||||
/* Read */
|
||||
c = read(fd, tsb, sizeof(tsb));
|
||||
c = read(fd, tsb+pos, sizeof(tsb)-pos);
|
||||
if (c < 0) {
|
||||
if (errno == EAGAIN || errno == EINTR)
|
||||
continue;
|
||||
|
@ -102,7 +109,7 @@ tsfile_input_thread ( void *aux )
|
|||
/* Process */
|
||||
if (c >= 0) {
|
||||
pcr = PTS_UNSET;
|
||||
mpegts_input_recv_packets(mi, tsb, c, &pcr, &pcr_pid);
|
||||
pos = mpegts_input_recv_packets(mi, mmi, tsb, c, &pcr, &pcr_pid);
|
||||
|
||||
/* Delay */
|
||||
if (pcr != PTS_UNSET) {
|
||||
|
@ -145,15 +152,22 @@ tsfile_input_stop_mux ( mpegts_input_t *mi )
|
|||
tvh_pipe_close(&mi->mi_thread_pipe);
|
||||
tvhtrace("tsfile", "adapter %d stopped thread", mi->mi_instance);
|
||||
}
|
||||
|
||||
mi->mi_mux_current->mmi_mux->mm_active = NULL;
|
||||
mi->mi_mux_current = NULL;
|
||||
}
|
||||
|
||||
static int
|
||||
tsfile_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *t )
|
||||
{
|
||||
struct stat st;
|
||||
mpegts_mux_t *mm = t->mmi_mux;
|
||||
tsfile_mux_instance_t *mmi = (tsfile_mux_instance_t*)t;
|
||||
printf("tsfile_input_start_mux(%p, %p)\n", mi, t);
|
||||
|
||||
/* Already tuned */
|
||||
assert(mmi->mmi_mux->mm_active == NULL);
|
||||
|
||||
/* Check file is accessible */
|
||||
if (lstat(mmi->mmi_tsfile_path, &st)) {
|
||||
printf("could not stat %s\n", mmi->mmi_tsfile_path);
|
||||
|
@ -172,9 +186,22 @@ tsfile_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *t )
|
|||
pthread_create(&mi->mi_thread_id, NULL, tsfile_input_thread, mi);
|
||||
}
|
||||
|
||||
/* Current */
|
||||
mi->mi_mux_current = mmi->mmi_mux->mm_active = t;
|
||||
|
||||
/* Install table handlers */
|
||||
mpegts_table_add(mm, 0x0, 0xff, psi_pat_callback, NULL, "pat",
|
||||
MT_QUICKREQ| MT_CRC, 0);
|
||||
#if 0
|
||||
mpegts_table_add(mm, 0x1, 0xff, dvb_cat_callback, NULL, "cat",
|
||||
MT_CRC, 1);
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* TODO: I think most of these can be moved to mpegts */
|
||||
|
||||
static void
|
||||
tsfile_input_open_service ( mpegts_input_t *mi, mpegts_service_t *t )
|
||||
{
|
||||
|
@ -188,17 +215,77 @@ tsfile_input_close_service ( mpegts_input_t *mi, mpegts_service_t *t )
|
|||
static void
|
||||
tsfile_input_open_table ( mpegts_input_t *mi, mpegts_table_t *mt )
|
||||
{
|
||||
if (mt->mt_pid >= 0x2000)
|
||||
return;
|
||||
mi->mi_table_filter[mt->mt_pid] = 1;
|
||||
printf("table opened %04X\n", mt->mt_pid);
|
||||
}
|
||||
|
||||
static void
|
||||
tsfile_input_close_table ( mpegts_input_t *mi, mpegts_table_t *mt )
|
||||
{
|
||||
if (mt->mt_pid >= 0x2000)
|
||||
return;
|
||||
mi->mi_table_filter[mt->mt_pid] = 0;
|
||||
}
|
||||
|
||||
static void
|
||||
tsfile_table_dispatch ( mpegts_mux_t *mm, mpegts_table_feed_t *mtf )
|
||||
{
|
||||
int i = 0;
|
||||
int len = mm->mm_num_tables;
|
||||
uint16_t pid = ((mtf->mtf_tsb[1] & 0x1f) << 8) | mtf->mtf_tsb[2];
|
||||
mpegts_table_t *mt, *vec[len];
|
||||
|
||||
/* Collate - tables may be removed during callbacks */
|
||||
LIST_FOREACH(mt, &mm->mm_tables, mt_link) {
|
||||
vec[i++] = mt;
|
||||
mt->mt_refcount++;
|
||||
}
|
||||
assert(i == len);
|
||||
|
||||
/* Process */
|
||||
for (i = 0; i < len; i++) {
|
||||
mt = vec[i];
|
||||
if (!mt->mt_destroyed && mt->mt_pid == pid)
|
||||
psi_section_reassemble(&mt->mt_sect, mtf->mtf_tsb, 0, NULL, mt);
|
||||
mpegts_table_release(mt);
|
||||
}
|
||||
}
|
||||
|
||||
static void *
|
||||
tsfile_table_thread ( void *aux )
|
||||
{
|
||||
mpegts_table_feed_t *mtf;
|
||||
mpegts_mux_instance_t *mmi;
|
||||
mpegts_input_t *mi = aux;
|
||||
|
||||
while (1) {
|
||||
|
||||
/* Wait for data */
|
||||
pthread_mutex_lock(&mi->mi_delivery_mutex);
|
||||
while(!(mtf = TAILQ_FIRST(&mi->mi_table_feed)))
|
||||
pthread_cond_wait(&mi->mi_table_feed_cond, &mi->mi_delivery_mutex);
|
||||
TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
|
||||
pthread_mutex_unlock(&mi->mi_delivery_mutex);
|
||||
|
||||
/* Process */
|
||||
pthread_mutex_lock(&global_lock);
|
||||
if ((mmi = mi->mi_mux_current))
|
||||
tsfile_table_dispatch(mmi->mmi_mux, mtf);
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
free(mtf);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mpegts_input_t *
|
||||
tsfile_input_create ( void )
|
||||
{
|
||||
pthread_t tid;
|
||||
mpegts_input_t *mi;
|
||||
|
||||
/* Create object */
|
||||
mi = mpegts_input_create0(NULL);
|
||||
mi->mi_start_mux = tsfile_input_start_mux;
|
||||
mi->mi_stop_mux = tsfile_input_stop_mux;
|
||||
|
@ -206,6 +293,9 @@ tsfile_input_create ( void )
|
|||
mi->mi_close_table = tsfile_input_close_table;
|
||||
mi->mi_open_service = tsfile_input_open_service;
|
||||
mi->mi_close_service = tsfile_input_close_service;
|
||||
|
||||
/* Start table thread */
|
||||
pthread_create(&tid, NULL, tsfile_table_thread, mi);
|
||||
return mi;
|
||||
}
|
||||
|
||||
|
|
|
@ -513,7 +513,7 @@ int tvh_write(int fd, const void *buf, size_t len);
|
|||
|
||||
void hexdump(const char *pfx, const uint8_t *data, int len);
|
||||
|
||||
uint32_t tvh_crc32(uint8_t *data, size_t datalen, uint32_t crc);
|
||||
uint32_t tvh_crc32(const uint8_t *data, size_t datalen, uint32_t crc);
|
||||
|
||||
int base64_decode(uint8_t *out, const char *in, int out_size);
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ static uint32_t crc_tab[256] = {
|
|||
};
|
||||
|
||||
uint32_t
|
||||
tvh_crc32(uint8_t *data, size_t datalen, uint32_t crc)
|
||||
tvh_crc32(const uint8_t *data, size_t datalen, uint32_t crc)
|
||||
{
|
||||
while(datalen--)
|
||||
crc = (crc << 8) ^ crc_tab[((crc >> 24) ^ *data++) & 0xff];
|
||||
|
|
Loading…
Add table
Reference in a new issue