Rewrite stream start messaging. Using htsmsg internally is a bit obscure

This commit is contained in:
Andreas Öman 2009-10-15 20:10:19 +00:00
parent 9f6183d9b4
commit 02dd003832
12 changed files with 210 additions and 113 deletions

View file

@ -293,33 +293,32 @@ dvb_transport_quality(th_transport_t *t)
/**
* Generate a descriptive name for the source
*/
static htsmsg_t *
dvb_transport_sourceinfo(th_transport_t *t)
static void
dvb_transport_setsourceinfo(th_transport_t *t, struct source_info *si)
{
htsmsg_t *m = htsmsg_create_map();
th_dvb_mux_instance_t *tdmi = t->tht_dvb_mux_instance;
char buf[100];
memset(si, 0, sizeof(struct source_info));
lock_assert(&global_lock);
if(tdmi->tdmi_adapter->tda_rootpath != NULL)
htsmsg_add_str(m, "device", tdmi->tdmi_adapter->tda_rootpath);
si->si_device = strdup(tdmi->tdmi_adapter->tda_rootpath);
htsmsg_add_str(m, "adapter", tdmi->tdmi_adapter->tda_displayname);
si->si_adapter = strdup(tdmi->tdmi_adapter->tda_displayname);
if(tdmi->tdmi_network != NULL)
htsmsg_add_str(m, "network", tdmi->tdmi_network);
si->si_network = strdup(tdmi->tdmi_network);
dvb_mux_nicename(buf, sizeof(buf), tdmi);
htsmsg_add_str(m, "mux", buf);
si->si_mux = strdup(buf);
if(t->tht_provider != NULL)
htsmsg_add_str(m, "provider", t->tht_provider);
si->si_provider = strdup(t->tht_provider);
if(t->tht_svcname != NULL)
htsmsg_add_str(m, "service", t->tht_svcname);
return m;
si->si_service = strdup(t->tht_svcname);
}
@ -363,7 +362,7 @@ dvb_transport_find(th_dvb_mux_instance_t *tdmi, uint16_t sid, int pmt_pid,
t->tht_refresh_feed = dvb_transport_refresh;
t->tht_stop_feed = dvb_transport_stop;
t->tht_config_save = dvb_transport_save;
t->tht_sourceinfo = dvb_transport_sourceinfo;
t->tht_setsourceinfo = dvb_transport_setsourceinfo;
t->tht_dvb_mux_instance = tdmi;
t->tht_quality_index = dvb_transport_quality;

View file

@ -247,8 +247,9 @@ dvr_rec_fatal_error(dvr_entry_t *de, const char *fmt, ...)
*
*/
static void
dvr_rec_start(dvr_entry_t *de, htsmsg_t *m)
dvr_rec_start(dvr_entry_t *de, const streaming_start_t *ss)
{
const source_info_t *si = &ss->ss_si;
dvr_rec_stream_t *drs;
AVOutputFormat *fmt;
AVFormatContext *fctx;
@ -259,13 +260,7 @@ dvr_rec_start(dvr_entry_t *de, htsmsg_t *m)
const char *codec_name;
char urlname[512];
int err, r;
htsmsg_field_t *f;
htsmsg_t *sub, *streams, *srcinfo;
const char *type, *lang;
uint32_t idx;
if((streams = htsmsg_get_list(m, "streams")) == NULL)
abort();
int i;
if(pvr_generate_filename(de) != 0) {
dvr_rec_fatal_error(de, "Unable to create directories");
@ -313,34 +308,31 @@ dvr_rec_start(dvr_entry_t *de, htsmsg_t *m)
/**
* Setup each stream
*/
HTSMSG_FOREACH(f, streams) {
if(f->hmf_type != HMF_MAP)
continue;
sub = &f->hmf_msg;
for(i = 0; i < ss->ss_num_components; i++) {
const streaming_start_component_t *ssc = &ss->ss_components[i];
if((type = htsmsg_get_str(sub, "type")) == NULL)
continue;
if(htsmsg_get_u32(sub, "index", &idx))
continue;
if(!strcmp(type, "AC3")) {
switch(ssc->ssc_type) {
case SCT_AC3:
codec_id = CODEC_ID_AC3;
codec_type = CODEC_TYPE_AUDIO;
codec_name = "AC-3";
} else if(!strcmp(type, "MPEG2AUDIO")) {
break;
case SCT_MPEG2AUDIO:
codec_id = CODEC_ID_MP2;
codec_type = CODEC_TYPE_AUDIO;
codec_name = "MPEG";
} else if(!strcmp(type, "MPEG2VIDEO")) {
break;
case SCT_MPEG2VIDEO:
codec_id = CODEC_ID_MPEG2VIDEO;
codec_type = CODEC_TYPE_VIDEO;
codec_name = "MPEG-2";
} else if(!strcmp(type, "H264")) {
break;
case SCT_H264:
codec_id = CODEC_ID_H264;
codec_type = CODEC_TYPE_VIDEO;
codec_name = "H264";
} else {
break;
default:
continue;
}
@ -353,7 +345,7 @@ dvr_rec_start(dvr_entry_t *de, htsmsg_t *m)
}
drs = calloc(1, sizeof(dvr_rec_stream_t));
drs->drs_source_index = idx;
drs->drs_source_index = ssc->ssc_index;
drs->drs_lavf_stream = av_new_stream(fctx, fctx->nb_streams);
@ -373,8 +365,7 @@ dvr_rec_start(dvr_entry_t *de, htsmsg_t *m)
continue;
}
if((lang = htsmsg_get_str(sub, "language")) != NULL)
memcpy(drs->drs_lavf_stream->language, lang, 4);
memcpy(drs->drs_lavf_stream->language, ssc->ssc_lang, 4);
LIST_INSERT_HEAD(&de->de_streams, drs, drs_link);
}
@ -382,13 +373,17 @@ dvr_rec_start(dvr_entry_t *de, htsmsg_t *m)
de->de_fctx = fctx;
de->de_ts_offset = AV_NOPTS_VALUE;
if((srcinfo = htsmsg_get_map(m, "sourceinfo")) != NULL) {
HTSMSG_FOREACH(f, srcinfo) {
if(f->hmf_type == HMF_STR)
tvhlog(LOG_INFO, "dvr", "%s - Source %s: %s",
de->de_ititle, f->hmf_name, f->hmf_str);
}
}
tvhlog(LOG_INFO, "dvr", "%s from "
"adapter: \"%s\", "
"network: \"%s\", mux: \"%s\", provider: \"%s\", "
"service: \"%s\", quality: %d",
de->de_ititle,
si->si_adapter ?: "<N/A>",
si->si_network ?: "<N/A>",
si->si_mux ?: "<N/A>",
si->si_provider ?: "<N/A>",
si->si_service ?: "<N/A>");
}

View file

@ -1225,8 +1225,36 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
* delivery start and all components.
*/
static void
htsp_subscription_start(htsp_subscription_t *hs, htsmsg_t *m)
htsp_subscription_start(htsp_subscription_t *hs, const streaming_start_t *ss)
{
htsmsg_t *m = htsmsg_create_map();
htsmsg_t *streams = htsmsg_create_list();
htsmsg_t *c;
htsmsg_t *sourceinfo = htsmsg_create_map();
int i;
const source_info_t *si = &ss->ss_si;
for(i = 0; i < ss->ss_num_components; i++) {
const streaming_start_component_t *ssc = &ss->ss_components[i];
c = htsmsg_create_map();
htsmsg_add_u32(c, "index", ssc->ssc_index);
htsmsg_add_str(c, "type", streaming_component_type2txt(ssc->ssc_type));
if(ssc->ssc_lang[0])
htsmsg_add_str(c, "language", ssc->ssc_lang);
htsmsg_add_msg(streams, NULL, c);
}
htsmsg_add_msg(m, "streams", streams);
if(si->si_adapter ) htsmsg_add_str(sourceinfo, "adapter", si->si_adapter );
if(si->si_mux ) htsmsg_add_str(sourceinfo, "mux" , si->si_mux );
if(si->si_network ) htsmsg_add_str(sourceinfo, "network", si->si_network );
if(si->si_provider) htsmsg_add_str(sourceinfo, "provider", si->si_provider);
if(si->si_service ) htsmsg_add_str(sourceinfo, "serivce", si->si_service );
htsmsg_add_msg(m, "sourceinfo", sourceinfo);
htsmsg_add_str(m, "method", "subscriptionStart");
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
@ -1237,8 +1265,9 @@ htsp_subscription_start(htsp_subscription_t *hs, htsmsg_t *m)
* Send a 'subscriptionStart' stop
*/
static void
htsp_subscription_stop(htsp_subscription_t *hs, htsmsg_t *m)
htsp_subscription_stop(htsp_subscription_t *hs)
{
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "method", "subscriptionStop");
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
@ -1289,6 +1318,7 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
switch(sm->sm_type) {
case SMT_PACKET:
htsp_stream_deliver(hs, sm->sm_data); // reference is transfered
sm->sm_data = NULL;
break;
case SMT_START:
@ -1296,7 +1326,7 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
break;
case SMT_STOP:
htsp_subscription_stop(hs, sm->sm_data);
htsp_subscription_stop(hs);
break;
case SMT_TRANSPORT_STATUS:
@ -1310,6 +1340,5 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
case SMT_EXIT:
abort();
}
free(sm);
streaming_msg_free(sm);
}

View file

@ -338,15 +338,13 @@ iptv_transport_quality(th_transport_t *t)
/**
* Generate a descriptive name for the source
*/
static htsmsg_t *
iptv_transport_sourceinfo(th_transport_t *t)
static void
iptv_transport_setsourceinfo(th_transport_t *t, struct source_info *si)
{
htsmsg_t *m = htsmsg_create_map();
memset(si, 0, sizeof(struct source_info));
if(t->tht_iptv_iface != NULL)
htsmsg_add_str(m, "adapter", t->tht_iptv_iface);
htsmsg_add_str(m, "mux", inet_ntoa(t->tht_iptv_group));
return m;
si->si_adapter = t->tht_iptv_iface ? strdup(t->tht_iptv_iface) : NULL;
si->si_mux = strdup(inet_ntoa(t->tht_iptv_group));
}
@ -387,7 +385,7 @@ iptv_transport_find(const char *id, int create)
t->tht_refresh_feed = iptv_transport_refresh;
t->tht_stop_feed = iptv_transport_stop;
t->tht_config_save = iptv_transport_save;
t->tht_sourceinfo = iptv_transport_sourceinfo;
t->tht_setsourceinfo = iptv_transport_setsourceinfo;
t->tht_quality_index = iptv_transport_quality;
t->tht_iptv_fd = -1;

View file

@ -84,10 +84,10 @@ rawts_transport_quality(th_transport_t *t)
/**
* Generate a descriptive name for the source
*/
static htsmsg_t *
rawts_transport_sourceinfo(th_transport_t *t)
static void
rawts_transport_setsourceinfo(th_transport_t *t, struct source_info *si)
{
return htsmsg_create_map();
memset(si, 0, sizeof(struct source_info));
}
@ -119,7 +119,7 @@ rawts_transport_add(rawts_t *rt, uint16_t sid, int pmt_pid)
t->tht_start_feed = rawts_transport_start;
t->tht_stop_feed = rawts_transport_stop;
t->tht_config_save = rawts_transport_save;
t->tht_sourceinfo = rawts_transport_sourceinfo;
t->tht_setsourceinfo = rawts_transport_setsourceinfo;
t->tht_quality_index = rawts_transport_quality;
t->tht_svcname = strdup(tmp);

View file

@ -21,6 +21,8 @@
#include "tvhead.h"
#include "streaming.h"
#include "packet.h"
#include "atomic.h"
#include "transports.h"
void
streaming_pad_init(streaming_pad_t *sp)
@ -122,10 +124,10 @@ streaming_msg_create_pkt(th_pkt_t *pkt)
*
*/
streaming_message_t *
streaming_msg_create_msg(streaming_message_type_t type, htsmsg_t *msg)
streaming_msg_create_data(streaming_message_type_t type, void *data)
{
streaming_message_t *sm = streaming_msg_create(type);
sm->sm_data = msg;
sm->sm_data = data;
return sm;
}
@ -150,6 +152,7 @@ streaming_message_t *
streaming_msg_clone(streaming_message_t *src)
{
streaming_message_t *dst = malloc(sizeof(streaming_message_t));
streaming_start_t *ss;
dst->sm_type = src->sm_type;
@ -161,8 +164,13 @@ streaming_msg_clone(streaming_message_t *src)
break;
case SMT_START:
ss = dst->sm_data = src->sm_data;
atomic_add(&ss->ss_refcount, 1);
break;
case SMT_STOP:
dst->sm_data = htsmsg_copy(src->sm_data);
case SMT_TRANSPORT_STATUS:
dst->sm_code = src->sm_code;
break;
case SMT_EXIT:
@ -175,6 +183,19 @@ streaming_msg_clone(streaming_message_t *src)
}
/**
*
*/
static void
streaming_start_deref(streaming_start_t *ss)
{
if((atomic_add(&ss->ss_refcount, -1)) != 1)
return;
transport_source_info_free(&ss->ss_si);
free(ss);
}
/**
*
*/
@ -183,12 +204,15 @@ streaming_msg_free(streaming_message_t *sm)
{
switch(sm->sm_type) {
case SMT_PACKET:
pkt_ref_dec(sm->sm_data);
if(sm->sm_data)
pkt_ref_dec(sm->sm_data);
break;
case SMT_START:
streaming_start_deref(sm->sm_data);
break;
case SMT_STOP:
htsmsg_destroy(sm->sm_data);
break;
case SMT_EXIT:

View file

@ -23,6 +23,28 @@
#include "packet.h"
#include "htsmsg.h"
typedef struct streaming_start_component {
int ssc_index;
int ssc_type;
char ssc_lang[4];
} streaming_start_component_t;
typedef struct streaming_start {
int ss_refcount;
int ss_num_components;
source_info_t ss_si;
streaming_start_component_t ss_components[0];
} streaming_start_t;
/**
*
*/
@ -47,8 +69,8 @@ streaming_message_t *streaming_msg_clone(streaming_message_t *src);
streaming_message_t *streaming_msg_create(streaming_message_type_t type);
streaming_message_t *streaming_msg_create_msg(streaming_message_type_t type,
htsmsg_t *msg);
streaming_message_t *streaming_msg_create_data(streaming_message_type_t type,
void *data);
streaming_message_t *streaming_msg_create_code(streaming_message_type_t type,
int code);

View file

@ -81,8 +81,8 @@ subscription_link_transport(th_subscription_t *s, th_transport_t *t)
if(LIST_FIRST(&t->tht_components) != NULL) {
// Send a START message to the subscription client
sm = streaming_msg_create_msg(SMT_START,
transport_build_stream_start_msg(t));
sm = streaming_msg_create_data(SMT_START,
transport_build_stream_start(t));
streaming_target_deliver(s->ths_output, sm);
@ -112,7 +112,7 @@ subscription_unlink_transport(th_subscription_t *s)
if(LIST_FIRST(&t->tht_components) != NULL) {
// Send a STOP message to the subscription client
sm = streaming_msg_create_msg(SMT_STOP, htsmsg_create_map());
sm = streaming_msg_create_code(SMT_STOP, 0);
streaming_target_deliver(s->ths_output, sm);
}
@ -243,21 +243,23 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight,
"to channel \"%s\"",
s->ths_title, ch->ch_name);
} else {
htsmsg_t *m = s->ths_transport->tht_sourceinfo(s->ths_transport);
source_info_t si;
s->ths_transport->tht_setsourceinfo(s->ths_transport, &si);
tvhlog(LOG_INFO, "subscription",
"\"%s\" subscribing on \"%s\", weight: %d, adapter: \"%s\", "
"network: \"%s\", mux: \"%s\", provider: \"%s\", "
"service: \"%s\", quality: %d",
s->ths_title, ch->ch_name, weight,
htsmsg_get_str(m, "adapter") ?: "<N/A>",
htsmsg_get_str(m, "network") ?: "<N/A>",
htsmsg_get_str(m, "mux") ?: "<N/A>",
htsmsg_get_str(m, "provider") ?: "<N/A>",
htsmsg_get_str(m, "service") ?: "<N/A>",
si.si_adapter ?: "<N/A>",
si.si_network ?: "<N/A>",
si.si_mux ?: "<N/A>",
si.si_provider ?: "<N/A>",
si.si_service ?: "<N/A>",
s->ths_transport->tht_quality_index(s->ths_transport));
htsmsg_destroy(m);
transport_source_info_free(&si);
}
return s;
}
@ -288,9 +290,7 @@ dummy_callback(void *opauqe, streaming_message_t *sm)
{
switch(sm->sm_type) {
case SMT_START:
fprintf(stderr, "dummysubscription START, message follows\n");
htsmsg_print(sm->sm_data);
fprintf(stderr, "\n");
fprintf(stderr, "dummysubscription START\n");
break;
case SMT_STOP:
fprintf(stderr, "dummysubscription STOP\n");

View file

@ -685,7 +685,7 @@ transport_restart(th_transport_t *t, int had_components)
lock_assert(&t->tht_stream_mutex);
if(had_components) {
sm = streaming_msg_create_msg(SMT_STOP, htsmsg_create_map());
sm = streaming_msg_create_code(SMT_STOP, 0);
streaming_pad_deliver(&t->tht_streaming_pad, sm);
}
@ -693,8 +693,8 @@ transport_restart(th_transport_t *t, int had_components)
if(LIST_FIRST(&t->tht_components) != NULL) {
sm = streaming_msg_create_msg(SMT_START,
transport_build_stream_start_msg(t));
sm = streaming_msg_create_data(SMT_START,
transport_build_stream_start(t));
streaming_pad_deliver(&t->tht_streaming_pad, sm);
}
}
@ -706,31 +706,35 @@ transport_restart(th_transport_t *t, int had_components)
* Note: This is the same as the one in HTSP.subscriptionStart so take
* great care if you change anying. (Just adding is fine)
*/
htsmsg_t *
transport_build_stream_start_msg(th_transport_t *t)
streaming_start_t *
transport_build_stream_start(th_transport_t *t)
{
htsmsg_t *m, *streams, *c;
th_stream_t *st;
int n = 0;
streaming_start_t *ss;
lock_assert(&t->tht_stream_mutex);
m = htsmsg_create_map();
LIST_FOREACH(st, &t->tht_components, st_link)
n++;
/* Setup each stream */
streams = htsmsg_create_list();
ss = calloc(1, sizeof(streaming_start_t) +
sizeof(streaming_start_component_t) * n);
ss->ss_num_components = n;
n = 0;
LIST_FOREACH(st, &t->tht_components, st_link) {
c = htsmsg_create_map();
htsmsg_add_u32(c, "index", st->st_index);
htsmsg_add_str(c, "type", streaming_component_type2txt(st->st_type));
if(st->st_lang[0])
htsmsg_add_str(c, "language", st->st_lang);
htsmsg_add_msg(streams, NULL, c);
streaming_start_component_t *ssc = &ss->ss_components[n++];
ssc->ssc_index = st->st_index;
ssc->ssc_type = st->st_type;
memcpy(ssc->ssc_lang, st->st_lang, 4);
}
htsmsg_add_msg(m, "streams", streams);
htsmsg_add_msg(m, "sourceinfo", t->tht_sourceinfo(t));
return m;
t->tht_setsourceinfo(t, &ss->ss_si);
ss->ss_refcount = 1;
return ss;
}
@ -854,3 +858,18 @@ transport_init(void)
pthread_cond_init(&pending_save_cond, NULL);
pthread_create(&tid, NULL, transport_saver, NULL);
}
/**
*
*/
void
transport_source_info_free(struct source_info *si)
{
free(si->si_device);
free(si->si_adapter);
free(si->si_network);
free(si->si_mux);
free(si->si_provider);
free(si->si_service);
}

View file

@ -76,7 +76,8 @@ transport_find_stream_by_pid(th_transport_t *t, int pid)
return NULL;
}
htsmsg_t *transport_build_stream_start_msg(th_transport_t *t);
struct streaming_start;
struct streaming_start *transport_build_stream_start(th_transport_t *t);
void transport_set_enable(th_transport_t *t, int enabled);
@ -86,4 +87,6 @@ void transport_stream_destroy(th_transport_t *t, th_stream_t *st);
void transport_request_save(th_transport_t *t);
void transport_source_info_free(source_info_t *si);
#endif /* TRANSPORTS_H */

View file

@ -39,6 +39,15 @@
extern pthread_mutex_t global_lock;
extern pthread_mutex_t ffmpeg_lock;
typedef struct source_info {
char *si_device;
char *si_adapter;
char *si_network;
char *si_mux;
char *si_provider;
char *si_service;
} source_info_t;
static inline void
lock_assert0(pthread_mutex_t *l, const char *file, int line)
{
@ -149,8 +158,9 @@ TAILQ_HEAD(streaming_message_queue, streaming_message);
*/
typedef enum {
SMT_PACKET, // sm_data is a th_pkt. Unref when destroying msg
SMT_START, // sm_data is a htsmsg, see transport_build_stream_msg()
SMT_STOP, // sm_data is a htsmsg
SMT_START, // sm_data is a stream_start,
// see transport_build_stream_start()
SMT_STOP , // no extra payload right now
SMT_TRANSPORT_STATUS, // sm_code is TRANSPORT_STATUS_
SMT_EXIT, // Used to signal exit to threads
SMT_NOSOURCE,
@ -454,7 +464,7 @@ typedef struct th_transport {
void (*tht_config_save)(struct th_transport *t);
struct htsmsg *(*tht_sourceinfo)(struct th_transport *t);
void (*tht_setsourceinfo)(struct th_transport *t, struct source_info *si);
int (*tht_quality_index)(struct th_transport *t);

View file

@ -303,18 +303,16 @@ v4l_transport_quality(th_transport_t *t)
/**
* Generate a descriptive name for the source
*/
static htsmsg_t *
v4l_transport_sourceinfo(th_transport_t *t)
static void
v4l_transport_setsourceinfo(th_transport_t *t, struct source_info *si)
{
htsmsg_t *m = htsmsg_create_map();
char buf[30];
char buf[64];
memset(si, 0, sizeof(struct source_info));
htsmsg_add_str(m, "adapter", t->tht_v4l_adapter->va_displayname);
si->si_adapter = strdup(t->tht_v4l_adapter->va_displayname);
snprintf(buf, sizeof(buf), "%d Hz", t->tht_v4l_frequency);
htsmsg_add_str(m, "mux", buf);
return m;
si->si_mux = strdup(buf);
}
@ -356,7 +354,7 @@ v4l_transport_find(v4l_adapter_t *va, const char *id, int create)
t->tht_refresh_feed = v4l_transport_refresh;
t->tht_stop_feed = v4l_transport_stop;
t->tht_config_save = v4l_transport_save;
t->tht_sourceinfo = v4l_transport_sourceinfo;
t->tht_setsourceinfo = v4l_transport_setsourceinfo;
t->tht_quality_index = v4l_transport_quality;
t->tht_iptv_fd = -1;