profile/subscription: use profile chain everywhere, add profile refcounting

This commit is contained in:
Jaroslav Kysela 2014-10-22 16:05:18 +02:00
parent 97e7f1a86d
commit c2609312d0
11 changed files with 186 additions and 108 deletions

View file

@ -78,15 +78,15 @@ dvr_rec_subscribe(dvr_entry_t *de)
pro = de->de_config->dvr_profile;
prch = malloc(sizeof(*prch));
if (profile_chain_open(pro, prch, de->de_channel, &de->de_config->dvr_muxcnf, 0, 0)) {
profile_chain_init(prch, pro, de->de_channel);
if (profile_chain_open(prch, &de->de_config->dvr_muxcnf, 0, 0)) {
tvherror("dvr", "unable to create new channel streaming chain for '%s'",
channel_get_name(de->de_channel));
return;
}
de->de_s = subscription_create_from_channel(de->de_channel, pro, weight,
buf, prch->prch_st,
prch->prch_flags,
de->de_s = subscription_create_from_channel(prch, weight,
buf, prch->prch_flags,
NULL, NULL, NULL);
if (de->de_s == NULL) {
tvherror("dvr", "unable to create new channel subcription for '%s'",

View file

@ -1762,7 +1762,8 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
#endif
pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id, "htsp");
if (!profile_work(pro, &hs->hs_prch, ch, &hs->hs_input, timeshiftPeriod, pflags)) {
profile_chain_init(&hs->hs_prch, pro, ch);
if (!profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, pflags)) {
tvhlog(LOG_ERR, "htsp", "unable to create profile chain '%s'", pro->pro_name);
free(hs);
return htsp_error("Stream setup error");
@ -1795,9 +1796,8 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
tvhdebug("htsp", "%s - subscribe to %s\n", htsp->htsp_logname, ch->ch_name ?: "");
hs->hs_s = subscription_create_from_channel(ch, pro, weight,
hs->hs_s = subscription_create_from_channel(&hs->hs_prch, weight,
htsp->htsp_logname,
hs->hs_prch.prch_st,
SUBSCRIPTION_STREAMING,
htsp->htsp_peername,
htsp->htsp_username,

View file

@ -23,6 +23,7 @@
#include "subscriptions.h"
#include "channels.h"
#include "access.h"
#include "profile.h"
#include "dvb_charset.h"
#include <assert.h>
@ -1090,8 +1091,11 @@ mpegts_mux_subscribe
( mpegts_mux_t *mm, const char *name, int weight )
{
int err = 0;
profile_chain_t prch;
th_subscription_t *s;
s = subscription_create_from_mux(mm, NULL, weight, name, NULL,
memset(&prch, 0, sizeof(prch));
prch.prch_id = mm;
s = subscription_create_from_mux(&prch, weight, name,
SUBSCRIPTION_NONE,
NULL, NULL, NULL, &err);
return s ? 0 : err;

View file

@ -22,6 +22,7 @@
#include "input/mpegts/mpegts_mux_sched.h"
#include "streaming.h"
#include "settings.h"
#include "profile.h"
static void mpegts_mux_sched_timer ( void *p );
static void mpegts_mux_sched_input ( void *p, streaming_message_t *sm );
@ -204,10 +205,14 @@ mpegts_mux_sched_timer ( void *p )
if (!mms->mms_active) {
assert(mms->mms_sub == NULL);
if (!mms->mms_prch)
mms->mms_prch = calloc(1, sizeof(mms->mms_prch));
mms->mms_prch->prch_id = mm;
mms->mms_prch->prch_st = &mms->mms_input;
mms->mms_sub
= subscription_create_from_mux(mm, NULL, mms->mms_weight,
= subscription_create_from_mux(mms->mms_prch, mms->mms_weight,
mms->mms_creator ?: "",
&mms->mms_input,
SUBSCRIPTION_NONE,
NULL, NULL, NULL, NULL);
@ -311,6 +316,7 @@ mpegts_mux_sched_delete ( mpegts_mux_sched_t *mms, int delconf )
free(mms->mms_cronstr);
free(mms->mms_mux);
free(mms->mms_creator);
free(mms->mms_prch);
free(mms);
}

View file

@ -25,6 +25,8 @@
#include "idnode.h"
#include "subscriptions.h"
struct profile_chain;
typedef LIST_HEAD(,mpegts_mux_sched) mpegts_mux_sched_list_t;
extern mpegts_mux_sched_list_t mpegts_mux_sched_all;
@ -57,8 +59,9 @@ typedef struct mpegts_mux_sched
/*
* Subscription
*/
th_subscription_t *mms_sub; ///< Subscription handler
streaming_target_t mms_input; ///< Streaming input
struct profile_chain *mms_prch; ///< Dummy profile chain
th_subscription_t *mms_sub; ///< Subscription handler
streaming_target_t mms_input; ///< Streaming input
} mpegts_mux_sched_t;

View file

@ -99,6 +99,7 @@ profile_create
if (!htsmsg_get_bool(conf, "shield", &b))
pro->pro_shield = !!b;
}
pro->pro_refcount = 1;
TAILQ_INSERT_TAIL(&profiles, pro, pro_link);
if (save)
profile_class_save((idnode_t *)pro);
@ -107,6 +108,16 @@ profile_create
return pro;
}
void
profile_release_(profile_t *pro)
{
if (pro->pro_free)
pro->pro_free(pro);
free(pro->pro_name);
free(pro->pro_comment);
free(pro);
}
static void
profile_delete(profile_t *pro, int delconf)
{
@ -119,11 +130,7 @@ profile_delete(profile_t *pro, int delconf)
idnode_unlink(&pro->pro_id);
dvr_config_destroy_by_profile(pro, delconf);
access_destroy_by_profile(pro, delconf);
if (pro->pro_free)
pro->pro_free(pro);
free(pro->pro_name);
free(pro->pro_comment);
free(pro);
profile_release(pro);
}
static void
@ -427,17 +434,58 @@ profile_get_htsp_list(htsmsg_t *array, htsmsg_t *filter)
}
}
/*
*
*/
void
profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id)
{
memset(prch, 0, sizeof(*prch));
if (pro)
profile_grab(pro);
prch->prch_pro = pro;
prch->prch_id = id;
streaming_queue_init(&prch->prch_sq, 0, 0);
}
/*
*
*/
int
profile_chain_raw_open(profile_chain_t *prch, size_t qsize)
profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
uint32_t timeshift_period, int flags)
{
profile_t *pro = prch->prch_pro;
if (pro && pro->pro_work)
return pro->pro_work(prch, dst, timeshift_period, flags);
return -1;
}
/*
*
*/
int
profile_chain_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
profile_t *pro = prch->prch_pro;
if (pro && pro->pro_open)
return pro->pro_open(prch, m_cfg, flags, qsize);
return -1;
}
/*
*
*/
int
profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize)
{
muxer_config_t c;
memset(&c, 0, sizeof(c));
c.m_type = MC_RAW;
memset(prch, 0, sizeof(*prch));
prch->prch_id = id;
prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
prch->prch_st = &prch->prch_sq.sq_st;
@ -467,6 +515,8 @@ profile_chain_close(profile_chain_t *prch)
muxer_destroy(prch->prch_muxer);
streaming_queue_deinit(&prch->prch_sq);
prch->prch_st = NULL;
if (prch->prch_pro)
profile_release(prch->prch_pro);
}
/*
@ -484,13 +534,10 @@ const idclass_t profile_htsp_class =
};
static int
profile_htsp_work(profile_t *_pro, profile_chain_t *prch,
void *id, streaming_target_t *dst,
profile_htsp_work(profile_chain_t *prch,
streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
if (!(flags & PRCH_FLAG_SKIPZEROING))
memset(prch, 0, sizeof(*prch));
if (flags & PRCH_FLAG_TSFIX)
dst = prch->prch_tsfix = tsfix_create(prch->prch_transcoder);
@ -548,10 +595,10 @@ const idclass_t profile_mpegts_pass_class =
};
static int
profile_mpegts_pass_open(profile_t *_pro, profile_chain_t *prch, void *id,
profile_mpegts_pass_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
profile_mpegts_t *pro = (profile_mpegts_t *)_pro;
profile_mpegts_t *pro = (profile_mpegts_t *)prch->prch_pro;
muxer_config_t c;
if (m_cfg)
@ -563,7 +610,6 @@ profile_mpegts_pass_open(profile_t *_pro, profile_chain_t *prch, void *id,
c.m_rewrite_pat = pro->pro_rewrite_pat;
c.m_rewrite_pmt = pro->pro_rewrite_pmt;
memset(prch, 0, sizeof(*prch));
prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
prch->prch_muxer = muxer_create(&c);
@ -612,10 +658,10 @@ const idclass_t profile_matroska_class =
};
static int
profile_matroska_open(profile_t *_pro, profile_chain_t *prch, void *id,
profile_matroska_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
profile_matroska_t *pro = (profile_matroska_t *)_pro;
profile_matroska_t *pro = (profile_matroska_t *)prch->prch_pro;
muxer_config_t c;
if (m_cfg)
@ -627,7 +673,6 @@ profile_matroska_open(profile_t *_pro, profile_chain_t *prch, void *id,
if (pro->pro_webm)
c.m_type = MC_WEBM;
memset(prch, 0, sizeof(*prch));
streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
prch->prch_tsfix = tsfix_create(prch->prch_gh);
@ -883,11 +928,11 @@ const idclass_t profile_transcode_class =
};
static int
profile_transcode_work(profile_t *_pro, profile_chain_t *prch,
void *id, streaming_target_t *dst,
profile_transcode_work(profile_chain_t *prch,
streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
profile_transcode_t *pro = (profile_transcode_t *)_pro;
profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
transcoder_props_t props;
memset(&props, 0, sizeof(props));
@ -899,9 +944,6 @@ profile_transcode_work(profile_t *_pro, profile_chain_t *prch,
props.tp_bandwidth = pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64;
strncpy(props.tp_language, pro->pro_language ?: "", 3);
if (!(flags & PRCH_FLAG_SKIPZEROING))
memset(prch, 0, sizeof(*prch));
#if ENABLE_TIMESHIFT
if (timeshift_period > 0)
dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
@ -935,10 +977,10 @@ profile_transcode_mc_valid(int mc)
}
static int
profile_transcode_open(profile_t *_pro, profile_chain_t *prch, void *id,
profile_transcode_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
profile_transcode_t *pro = (profile_transcode_t *)_pro;
profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
muxer_config_t c;
int r;
@ -952,12 +994,10 @@ profile_transcode_open(profile_t *_pro, profile_chain_t *prch, void *id,
c.m_type = MC_MATROSKA;
}
memset(prch, 0, sizeof(*prch));
streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
r = profile_transcode_work(_pro, prch, prch->prch_gh, id, 0,
r = profile_transcode_work(prch, prch->prch_gh, 0,
PRCH_FLAG_SKIPZEROING | PRCH_FLAG_TSFIX);
if (r)
return r;

View file

@ -51,6 +51,8 @@ extern profile_builders_queue profile_builders;
#define PRCH_FLAG_TSFIX (1<<1)
typedef struct profile_chain {
struct profile *prch_pro;
void *prch_id;
int prch_flags;
struct streaming_queue prch_sq;
struct streaming_target *prch_st;
@ -69,6 +71,8 @@ typedef struct profile {
idnode_t pro_id;
TAILQ_ENTRY(profile) pro_link;
int pro_refcount;
LIST_HEAD(,dvr_config) pro_dvr_configs;
LIST_HEAD(,access_entry) pro_accesses;
@ -83,10 +87,9 @@ typedef struct profile {
void (*pro_conf_changed)(struct profile *pro);
muxer_container_type_t (*pro_get_mc)(struct profile *pro);
int (*pro_work)(struct profile *pro, profile_chain_t *prch,
void *id, struct streaming_target *dst,
int (*pro_work)(profile_chain_t *prch, struct streaming_target *dst,
uint32_t timeshift_period, int flags);
int (*pro_open)(struct profile *pro, profile_chain_t *prch, void *id,
int (*pro_open)(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize);
} profile_t;
@ -95,17 +98,24 @@ void profile_register(const idclass_t *clazz, profile_builder_t builder);
profile_t *profile_create
(const char *uuid, htsmsg_t *conf, int save);
static inline int
profile_work(profile_t *pro, profile_chain_t *prch,
void *id, struct streaming_target *dst,
uint32_t timeshift_period, int flags)
{ return pro && pro->pro_work ? pro->pro_work(pro, prch, id, dst, timeshift_period, flags) : -1; }
static inline void profile_grab( profile_t *pro )
{ pro->pro_refcount++; }
static inline int
profile_chain_open(profile_t *pro, profile_chain_t *prch, void *id,
muxer_config_t *m_cfg, int flags, size_t qsize)
{ return pro && pro->pro_open ? pro->pro_open(pro, prch, id, m_cfg, flags, qsize) : -1; }
int profile_chain_raw_open(profile_chain_t *prch, size_t qsize);
void profile_release_( profile_t *pro );
static inline void profile_release( profile_t *pro )
{
assert(pro->pro_refcount > 0);
if (--pro->pro_refcount == 0) profile_release_(pro);
}
int
profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
uint32_t timeshift_period, int flags);
int
profile_chain_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize);
void profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id);
int profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize);
void profile_chain_close(profile_chain_t *prch);
static inline profile_t *profile_find_by_uuid(const char *uuid)

View file

@ -329,13 +329,15 @@ static void *
service_mapper_thread ( void *aux )
{
service_t *s;
profile_chain_t prch;
th_subscription_t *sub;
int run, working = 0;
streaming_queue_t sq;
streaming_queue_t *sq;
streaming_message_t *sm;
const char *err = NULL;
streaming_queue_init(&sq, 0, 0);
profile_chain_init(&prch, NULL, NULL);
sq = &prch.prch_sq;
pthread_mutex_lock(&global_lock);
@ -362,8 +364,9 @@ service_mapper_thread ( void *aux )
/* Subscribe */
tvhinfo("service_mapper", "checking %s", s->s_nicename);
sub = subscription_create_from_service(s, NULL, SUBSCRIPTION_PRIO_MAPPER,
"service_mapper", &sq.sq_st,
prch.prch_id = s;
sub = subscription_create_from_service(&prch, SUBSCRIPTION_PRIO_MAPPER,
"service_mapper",
0, NULL, NULL, "service_mapper");
/* Failed */
@ -380,20 +383,20 @@ service_mapper_thread ( void *aux )
/* Wait */
run = 1;
pthread_mutex_lock(&sq.sq_mutex);
pthread_mutex_lock(&sq->sq_mutex);
while(tvheadend_running && run) {
/* Wait for message */
while((sm = TAILQ_FIRST(&sq.sq_queue)) == NULL) {
pthread_cond_wait(&sq.sq_cond, &sq.sq_mutex);
while((sm = TAILQ_FIRST(&sq->sq_queue)) == NULL) {
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
if (!tvheadend_running)
break;
}
if (!tvheadend_running)
break;
TAILQ_REMOVE(&sq.sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq.sq_mutex);
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq->sq_mutex);
if(sm->sm_type == SMT_PACKET) {
run = 0;
@ -411,13 +414,13 @@ service_mapper_thread ( void *aux )
}
streaming_msg_free(sm);
pthread_mutex_lock(&sq.sq_mutex);
pthread_mutex_lock(&sq->sq_mutex);
}
if (!tvheadend_running)
break;
streaming_queue_clear(&sq.sq_queue);
pthread_mutex_unlock(&sq.sq_mutex);
streaming_queue_clear(&sq->sq_queue);
pthread_mutex_unlock(&sq->sq_mutex);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(sub);
@ -434,6 +437,7 @@ service_mapper_thread ( void *aux )
}
pthread_mutex_unlock(&global_lock);
profile_chain_close(&prch);
return NULL;
}

View file

@ -577,13 +577,16 @@ subscription_unsubscribe(th_subscription_t *s)
*/
th_subscription_t *
subscription_create
(profile_t *pro, int weight, const char *name, streaming_target_t *st,
(profile_chain_t *prch, int weight, const char *name,
int flags, st_callback_t *cb, const char *hostname,
const char *username, const char *client)
{
th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
profile_t *pro = prch ? prch->prch_pro : NULL;
streaming_target_t *st = prch ? prch->prch_st : NULL;
int reject = 0;
static int tally;
TAILQ_INIT(&s->ths_instances);
if(flags & SUBSCRIPTION_NONE)
@ -601,6 +604,7 @@ subscription_create
streaming_target_init(&s->ths_input, cb, s, reject);
s->ths_prch = prch->prch_st ? prch : NULL;
s->ths_weight = weight;
s->ths_title = strdup(name);
s->ths_hostname = hostname ? strdup(hostname) : NULL;
@ -633,22 +637,29 @@ subscription_create
*
*/
static th_subscription_t *
subscription_create_from_channel_or_service(channel_t *ch,
service_t *t,
profile_t *pro,
subscription_create_from_channel_or_service(profile_chain_t *prch,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client)
const char *client,
int service)
{
th_subscription_t *s;
assert(!ch || !t);
assert(st);
channel_t *ch = NULL;
service_t *t = NULL;
s = subscription_create(pro, weight, name, st, flags, subscription_input,
assert(prch);
assert(prch->prch_id);
assert(prch->prch_st);
if (service)
t = prch->prch_id;
else
ch = prch->prch_id;
s = subscription_create(prch, weight, name, flags, subscription_input,
hostname, username, client);
if (ch)
tvhtrace("subscription", "%04X: creating subscription for %s weight %d",
@ -666,29 +677,29 @@ subscription_create_from_channel_or_service(channel_t *ch,
}
th_subscription_t *
subscription_create_from_channel(channel_t *ch, profile_t *pro,
subscription_create_from_channel(profile_chain_t *prch,
unsigned int weight,
const char *name, streaming_target_t *st,
const char *name,
int flags, const char *hostname,
const char *username, const char *client)
{
return subscription_create_from_channel_or_service
(ch, NULL, pro, weight, name, st, flags, hostname, username, client);
(prch, weight, name, flags, hostname, username, client, 0);
}
/**
*
*/
th_subscription_t *
subscription_create_from_service(service_t *t, profile_t *pro,
subscription_create_from_service(profile_chain_t *prch,
unsigned int weight,
const char *name,
streaming_target_t *st, int flags,
int flags,
const char *hostname, const char *username,
const char *client)
{
return subscription_create_from_channel_or_service
(NULL, t, pro, weight, name, st, flags, hostname, username, client);
(prch, weight, name, flags, hostname, username, client, 1);
}
/**
@ -738,16 +749,16 @@ mux_data_timeout ( void *aux )
}
th_subscription_t *
subscription_create_from_mux(mpegts_mux_t *mm, profile_t *pro,
subscription_create_from_mux(profile_chain_t *prch,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client,
int *err)
{
mpegts_mux_t *mm = prch->prch_id;
th_subscription_t *s;
streaming_message_t *sm;
streaming_start_t *ss;
@ -762,9 +773,9 @@ subscription_create_from_mux(mpegts_mux_t *mm, profile_t *pro,
}
/* Create subscription */
if (!st)
if (!prch->prch_st)
flags |= SUBSCRIPTION_NONE;
s = subscription_create(pro, weight, name, st, flags, NULL,
s = subscription_create(prch, weight, name, flags, NULL,
hostname, username, client);
s->ths_mmi = mm->mm_active;
@ -784,7 +795,7 @@ subscription_create_from_mux(mpegts_mux_t *mm, profile_t *pro,
LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link);
/* Connect */
if (st)
if (prch->prch_st)
streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input);
/* Deliver a start message */
@ -1057,6 +1068,7 @@ void
subscription_dummy_join(const char *id, int first)
{
service_t *t = service_find_by_identifier(id);
profile_chain_t *prch;
streaming_target_t *st;
th_subscription_t *s;
@ -1073,9 +1085,12 @@ subscription_dummy_join(const char *id, int first)
return;
}
st = calloc(1, sizeof(streaming_target_t));
prch = calloc(1, sizeof(*prch));
prch->prch_id = t;
st = calloc(1, sizeof(*st));
streaming_target_init(st, dummy_callback, NULL, 0);
s = subscription_create_from_service(t, NULL, 1, "dummy", st, 0, NULL, NULL, "dummy");
prch->prch_st = st;
s = subscription_create_from_service(prch, 1, "dummy", 0, NULL, NULL, "dummy");
tvhlog(LOG_NOTICE, "subscription",
"%04X: Dummy join %s ok", shortid(s), id);

View file

@ -21,7 +21,7 @@
#include "service.h"
struct profile;
struct profile_chain;
extern struct th_subscription_list subscriptions;
@ -46,6 +46,9 @@ typedef struct th_subscription {
LIST_ENTRY(th_subscription) ths_global_link;
LIST_ENTRY(th_subscription) ths_remove_link;
struct profile_chain *ths_prch;
int ths_weight;
enum {
@ -127,11 +130,9 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight);
void subscription_reschedule(void);
th_subscription_t *
subscription_create_from_channel(struct channel *ch,
struct profile *pro,
subscription_create_from_channel(struct profile_chain *prch,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
@ -139,11 +140,9 @@ subscription_create_from_channel(struct channel *ch,
th_subscription_t *
subscription_create_from_service(struct service *t,
struct profile *pro,
subscription_create_from_service(struct profile_chain *prch,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
@ -152,20 +151,17 @@ subscription_create_from_service(struct service *t,
#if ENABLE_MPEGTS
struct mpegts_mux;
th_subscription_t *
subscription_create_from_mux(struct mpegts_mux *m,
struct profile *pro,
subscription_create_from_mux(struct profile_chain *prch,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client, int *err);
#endif
th_subscription_t *subscription_create(struct profile *pro,
th_subscription_t *subscription_create(struct profile_chain *prch,
int weight, const char *name,
streaming_target_t *st,
int flags, st_callback_t *cb,
const char *hostname,
const char *username,

View file

@ -732,12 +732,12 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
else
qsize = 1500000;
if (!profile_chain_open(pro, &prch, service, NULL, 0, qsize)) {
profile_chain_init(&prch, pro, service);
if (!profile_chain_open(&prch, NULL, 0, qsize)) {
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
s = subscription_create_from_service(service, pro, weight ?: 100, "HTTP",
prch.prch_st,
s = subscription_create_from_service(&prch, weight ?: 100, "HTTP",
prch.prch_flags | SUBSCRIPTION_STREAMING,
addrbuf,
hc->hc_username,
@ -786,12 +786,11 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
else
qsize = 10000000;
if (!profile_chain_raw_open(&prch, qsize)) {
if (!profile_chain_raw_open(&prch, mm, qsize)) {
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
s = subscription_create_from_mux(mm, NULL, weight ?: 10, "HTTP",
prch.prch_st,
s = subscription_create_from_mux(&prch, weight ?: 10, "HTTP",
prch.prch_flags |
SUBSCRIPTION_FULLMUX |
SUBSCRIPTION_STREAMING,
@ -846,12 +845,13 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
else
qsize = 1500000;
if (!profile_chain_open(pro, &prch, ch, NULL, 0, qsize)) {
profile_chain_init(&prch, pro, ch);
if (!profile_chain_open(&prch, NULL, 0, qsize)) {
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
s = subscription_create_from_channel(ch, pro, weight ?: 100, "HTTP",
prch.prch_st, prch.prch_flags | SUBSCRIPTION_STREAMING,
s = subscription_create_from_channel(&prch, weight ?: 100, "HTTP",
prch.prch_flags | SUBSCRIPTION_STREAMING,
addrbuf, hc->hc_username,
http_arg_get(&hc->hc_args, "User-Agent"));