From 4f44db4f718dc60d2590cc57413c188e1d83536b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Sat, 27 Sep 2008 13:24:00 +0000 Subject: [PATCH] Revive HTSP server. No streaming support yet but we are able to do these things: * Notify clients about channel add/update/delete * Notify clients about tag add/update/delete * Respond to EPG queries --- Makefile | 2 + channels.c | 39 ++- epg.c | 2 + htsp.c | 811 ++++++++++++++++++++++++++++++++++++----------------- htsp.h | 34 +-- main.c | 3 + 6 files changed, 610 insertions(+), 281 deletions(-) diff --git a/Makefile b/Makefile index be9503c3..5df684cc 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,8 @@ SRCS += packet.c streaming.c VPATH += dvr SRCS += dvr_db.c dvr_rec.c dvr_autorec.c +SRCS += htsp.c + SRCS += channels.c subscriptions.c transports.c SRCS += psi.c parsers.c parser_h264.c tsdemux.c bitstream.c diff --git a/channels.c b/channels.c index e5240626..05f64d44 100644 --- a/channels.c +++ b/channels.c @@ -40,6 +40,7 @@ #include "dtable.h" #include "notify.h" #include "dvr/dvr.h" +#include "htsp.h" struct channel_list channels_not_xmltv_mapped; @@ -186,6 +187,8 @@ channel_create(const char *name) if(xc->xc_icon != NULL) channel_set_icon(ch, xc->xc_icon); } + + htsp_channel_add(ch); return ch; } @@ -365,6 +368,7 @@ channel_rename(channel_t *ch, const char *newname) } channel_save(ch); + htsp_channel_update(ch); return 0; } @@ -380,6 +384,8 @@ channel_delete(channel_t *ch) lock_assert(&global_lock); + htsp_channel_delete(ch); + while((ctm = LIST_FIRST(&ch->ch_ctms)) != NULL) channel_tag_mapping_destroy(ctm); @@ -462,6 +468,7 @@ channel_set_icon(channel_t *ch, const char *icon) free(ch->ch_icon); ch->ch_icon = strdup(icon); channel_save(ch); + htsp_channel_update(ch); } @@ -583,6 +590,9 @@ channel_tag_map(channel_t *ch, channel_tag_t *ct, int check) LIST_INSERT_HEAD(&ct->ct_ctms, ctm, ctm_tag_link); ctm->ctm_mark = 0; + + if(ct->ct_enabled && !ct->ct_internal) + htsp_tag_update(ct); } @@ -592,9 +602,13 @@ channel_tag_map(channel_t *ch, channel_tag_t *ct, int check) static void channel_tag_mapping_destroy(channel_tag_mapping_t *ctm) { + channel_tag_t *ct = ctm->ctm_tag; LIST_REMOVE(ctm, ctm_channel_link); LIST_REMOVE(ctm, ctm_tag_link); free(ctm); + + if(ct->ct_enabled && !ct->ct_internal) + htsp_tag_update(ct); } @@ -663,6 +677,9 @@ channel_tag_destroy(channel_tag_t *ct) channel_save(ch); } + if(ct->ct_enabled && !ct->ct_internal) + htsp_tag_delete(ct); + free(ct->ct_identifier); free(ct->ct_name); free(ct->ct_comment); @@ -737,6 +754,7 @@ channel_tag_record_update(void *opaque, const char *id, htsmsg_t *values, { channel_tag_t *ct; uint32_t u32; + int was_exposed, is_exposed; if((ct = channel_tag_find(id, maycreate)) == NULL) return NULL; @@ -744,13 +762,28 @@ channel_tag_record_update(void *opaque, const char *id, htsmsg_t *values, tvh_str_update(&ct->ct_name, htsmsg_get_str(values, "name")); tvh_str_update(&ct->ct_comment, htsmsg_get_str(values, "comment")); - if(!htsmsg_get_u32(values, "enabled", &u32)) + was_exposed = ct->ct_enabled && !ct->ct_internal; + + if(!htsmsg_get_u32(values, "enabled", &u32)) ct->ct_enabled = u32; - if(!htsmsg_get_u32(values, "internal", &u32)) + if(!htsmsg_get_u32(values, "internal", &u32)) ct->ct_internal = u32; - return channel_tag_record_build(ct); + is_exposed = ct->ct_enabled && !ct->ct_internal; + + /* We only export tags to HTSP if enabled == true and internal == false, + thus, it's not as simple as just sending updates here. + Depending on how the flags transition we add update or delete tags */ + + if(was_exposed == 0 && is_exposed == 1) + htsp_tag_add(ct); + else if(was_exposed == 1 && is_exposed == 1) + htsp_tag_update(ct); + else if(was_exposed == 1 && is_exposed == 0) + htsp_tag_delete(ct); + + return channel_tag_record_build(ct); } diff --git a/epg.c b/epg.c index b3cad17d..080618f8 100644 --- a/epg.c +++ b/epg.c @@ -27,6 +27,7 @@ #include "channels.h" #include "epg.h" #include "dvr/dvr.h" +#include "htsp.h" #define EPG_MAX_AGE 86400 @@ -56,6 +57,7 @@ epg_set_current(channel_t *ch, event_t *e) if(e != NULL) dvr_autorec_check(e); + htsp_event_update(ch, e); } diff --git a/htsp.c b/htsp.c index 495cd00f..3cd640e0 100644 --- a/htsp.c +++ b/htsp.c @@ -32,327 +32,632 @@ #include "tvhead.h" #include "channels.h" #include "subscriptions.h" -#include "dispatch.h" -#include "rpc.h" -#include "htsp.h" -#include "htsp_muxer.h" #include "tcp.h" -#include "epg.h" +#include "packet.h" #include "access.h" +#include "htsp.h" #include -static LIST_HEAD(, htsp) htsp_sessions; +extern const char *htsversion; -/* +LIST_HEAD(htsp_connection_list, htsp_connection); + +TAILQ_HEAD(htsp_msg_queue, htsp_msg); +TAILQ_HEAD(htsp_msg_q_queue, htsp_msg_q); + +static struct htsp_connection_list htsp_async_connections; + +/** * */ -int -htsp_send_msg(htsp_t *htsp, htsmsg_t *m, int media) +typedef struct htsp_msg { + TAILQ_ENTRY(htsp_msg) hm_link; + + htsmsg_t *hm_msg; + + th_pktref_t *hm_pktref; /* For keeping reference to packet. + hm_msg can contain messages that points + to packet payload so to avoid copy we + keep a reference here */ +} htsp_msg_t; + + +/** + * + */ +typedef struct htsp_msg_q { + struct htsp_msg_queue hmq_q; + + TAILQ_ENTRY(htsp_msg_q) hmq_link; + int hmq_length; +} htsp_msg_q_t; + + +/** + * + */ +typedef struct htsp_connection { + int htsp_fd; + struct sockaddr_in *htsp_peer; + char *htsp_name; + + /** + * Async mode + */ + int htsp_async_mode; + LIST_ENTRY(htsp_connection) htsp_async_link; + + /** + * Writer thread + */ + pthread_t htsp_writer_thread; + + int htsp_writer_run; + + struct htsp_msg_q_queue htsp_active_output_queues; + + pthread_mutex_t htsp_out_mutex; + pthread_cond_t htsp_out_cond; + + htsp_msg_q_t htsp_hmq_ctrl; + htsp_msg_q_t htsp_hmq_epg; + + /** + * + */ + struct th_subscription_list htsp_subscriptions; + +} htsp_connection_t; + + +/** + * + */ +static void +htsp_msg_destroy(htsp_msg_t *hm) { - tcp_session_t *tcp = &htsp->htsp_tcp_session; - htsbuf_queue_t *hq; - void *data; - size_t datalen; - int hiprio = !media; - int max, r = -1; + htsmsg_destroy(hm->hm_msg); + if(hm->hm_pktref != NULL) { + pkt_ref_dec(hm->hm_pktref->pr_pkt); + free(hm->hm_pktref); + } + free(hm); +} - hq = &tcp->tcp_q[hiprio]; - - max = hq->hq_maxsize - hq->hq_size; /* max size we are able to enqueue */ - - if(htsmsg_binary_serialize(m, &data, &datalen, max) == 0) - r = tcp_send_msg(tcp, hiprio, data, datalen); - htsmsg_destroy(m); +/** + * + */ +static void +htsp_init_queue(htsp_msg_q_t *hmq) +{ + TAILQ_INIT(&hmq->hmq_q); + hmq->hmq_length = 0; +} + + +/** + * + */ +#if 0 +static void +htsp_flush_queue(htsp_msg_q_t *hmq) +{ + abort(); +} +#endif + +/** + * + */ +static void +htsp_send(htsp_connection_t *htsp, htsmsg_t *m, th_pktref_t *pkt, + htsp_msg_q_t *hmq) +{ + htsp_msg_t *hm = malloc(sizeof(htsp_msg_t)); + + hm->hm_msg = m; + hm->hm_pktref = pkt; + + pthread_mutex_lock(&htsp->htsp_out_mutex); + + TAILQ_INSERT_TAIL(&hmq->hmq_q, hm, hm_link); + + if(hmq->hmq_length == 0) { + /* Activate queue */ + TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link); + } + + hmq->hmq_length++; + pthread_cond_signal(&htsp->htsp_out_cond); + pthread_mutex_unlock(&htsp->htsp_out_mutex); +} + +/** + * + */ +static void +htsp_send_message(htsp_connection_t *htsp, htsmsg_t *m, htsp_msg_q_t *hmq) +{ + htsp_send(htsp, m, NULL, hmq ?: &htsp->htsp_hmq_ctrl); +} + +#if 0 +/** + * + */ +static void +htsp_send_message(htsp_connection_t *htsp, htsmsg_t *m, int queue) +{ + htsp_msg_q_t *hm = malloc(sizeof(htsp_msg_q_t)); + + hm->hm_msg = m; + hm->hm_pktref = NULL; + + +} +#endif + +/** + * + */ +static htsmsg_t * +htsp_build_channel(channel_t *ch, const char *method) +{ + htsmsg_t *out = htsmsg_create(); + + htsmsg_add_u32(out, "channelId", ch->ch_id); + + htsmsg_add_str(out, "channelName", ch->ch_name); + if(ch->ch_icon != NULL) + htsmsg_add_str(out, "channelIcon", ch->ch_icon); + + htsmsg_add_u32(out, "eventId", + ch->ch_epg_current != NULL ? ch->ch_epg_current->e_id : 0); + + htsmsg_add_str(out, "method", method); + return out; +} + + +/** + * + */ +static htsmsg_t * +htsp_build_tag(channel_tag_t *ct, const char *method) +{ + channel_tag_mapping_t *ctm; + htsmsg_t *out = htsmsg_create(); + htsmsg_t *members = htsmsg_create_array(); + + htsmsg_add_str(out, "tagId", ct->ct_identifier); + + htsmsg_add_str(out, "tagName", ct->ct_name); + + LIST_FOREACH(ctm, &ct->ct_ctms, ctm_tag_link) + htsmsg_add_u32(members, NULL, ctm->ctm_channel->ch_id); + + htsmsg_add_msg(out, "members", members); + htsmsg_add_str(out, "method", method); + return out; +} + + +/** + * Simple function to respond with an error + */ +static htsmsg_t * +htsp_error(const char *err) +{ + htsmsg_t *r = htsmsg_create(); + htsmsg_add_str(r, "_error", err); return r; } /** - * build a channel message - */ -static htsmsg_t * -htsp_build_channel_msg(channel_t *ch, const char *method) -{ - htsmsg_t *msg = htsmsg_create(); - event_t *e; - - htsmsg_add_str(msg, "method", method); - htsmsg_add_str(msg, "channelName", ch->ch_name); - htsmsg_add_u32(msg, "channelId", ch->ch_id); - if(ch->ch_icon != NULL) - htsmsg_add_str(msg, "channelIcon", ch->ch_icon); - - if((e = epg_event_get_current(ch)) != NULL) - htsmsg_add_u32(msg, "currentEvent", e->e_tag); - - return msg; -} - - - - -/** - * channels_list + * */ static void -htsp_send_all_channels(htsp_t *htsp) +htsp_reply(htsp_connection_t *htsp, htsmsg_t *in, htsmsg_t *out) { - htsmsg_t *msg; - channel_t *ch; + uint32_t seq; - RB_FOREACH(ch, &channel_name_tree, ch_name_link) { - if(LIST_FIRST(&ch->ch_transports) == NULL) - continue; + if(!htsmsg_get_u32(in, "seq", &seq)) + htsmsg_add_u32(out, "seq", seq); - msg = htsp_build_channel_msg(ch, "channelAdd"); - htsp_send_msg(htsp, msg, 0); - } -} - -/** - * - */ -void -htsp_async_channel_update(channel_t *ch) -{ - htsp_t *htsp; - htsmsg_t *msg; - - LIST_FOREACH(htsp, &htsp_sessions, htsp_global_link) { - if(!htsp->htsp_rpc.rs_is_async) - continue; - - msg = htsp_build_channel_msg(ch, "channelUpdate"); - htsp_send_msg(htsp, msg, 0); - } + htsp_send_message(htsp, out, NULL); } /** - * Subscribe to channel + * Switch the HTSP connection into async mode */ static htsmsg_t * -htsp_subscribe(rpc_session_t *ses, htsmsg_t *in, void *opaque) +htsp_method_async(htsp_connection_t *htsp, htsmsg_t *in) { - htsp_t *htsp = opaque; channel_t *ch; - th_subscription_t *s; - htsmsg_t *r; - uint32_t u32; + channel_tag_t *ct; - if(htsmsg_get_u32(in, "channelId", &u32)) - return rpc_error(ses, "missing argument: channelId"); + /* First, just OK the async request */ + htsp_reply(htsp, in, htsmsg_create()); - if((ch = channel_find_by_identifier(u32)) == NULL) - return rpc_error(ses, "Channel not found"); + if(htsp->htsp_async_mode) + return NULL; /* already in async mode */ + + htsp->htsp_async_mode = 1; + + /* Send all channels */ + RB_FOREACH(ch, &channel_name_tree, ch_name_link) + htsp_send_message(htsp, htsp_build_channel(ch, "channelAdd"), NULL); + + /* Send all enabled and external tags */ + TAILQ_FOREACH(ct, &channel_tags, ct_link) + if(ct->ct_enabled && !ct->ct_internal) + htsp_send_message(htsp, htsp_build_tag(ct, "tagAdd"), NULL); - LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) { - if(s->ths_channel == ch) { - subscription_set_weight(s, 200); - return rpc_ok(ses); - } - } + /* Insert in list so it will get all updates */ + LIST_INSERT_HEAD(&htsp_async_connections, htsp, htsp_async_link); - - r = htsmsg_create(); - htsp_send_msg(htsp, r, 0); - - htsp_muxer_subscribe(htsp, ch, 200); - return NULL; } /** - * Unsubscribe from channel + * Get information about the given event */ static htsmsg_t * -htsp_unsubscribe(rpc_session_t *ses, htsmsg_t *in, void *opaque) +htsp_method_getEvent(htsp_connection_t *htsp, htsmsg_t *in) { - htsp_t *htsp = opaque; - uint32_t id; + htsmsg_t *out; + uint32_t eventid; + event_t *e, *n; - if(htsmsg_get_u32(in, "channelTag", &id)) - return rpc_error(ses, "missing argument: channelTag"); + if(htsmsg_get_u32(in, "eventId", &eventid)) + return htsp_error("Missing argument 'eventId'"); + + if((e = epg_event_find_by_id(eventid)) == NULL) + return htsp_error("Event does not exist"); + + out = htsmsg_create(); + + htsmsg_add_u32(out, "start", e->e_start); + htsmsg_add_u32(out, "stop", e->e_stop); + htsmsg_add_str(out, "title", e->e_title); + htsmsg_add_str(out, "description", e->e_desc); + + n = RB_NEXT(e, e_channel_link); + if(n != NULL) + htsmsg_add_u32(out, "nextEventId", n->e_id); + + return out; +} + + + + + +/** + * HTSP methods + */ +struct { + const char *name; + htsmsg_t *(*fn)(htsp_connection_t *htsp, htsmsg_t *in); + int privmask; +} htsp_methods[] = { + { "async", htsp_method_async, ACCESS_STREAMING}, + { "getEvent", htsp_method_getEvent, ACCESS_STREAMING}, + +}; + +#define NUM_METHODS (sizeof(htsp_methods) / sizeof(htsp_methods[0])) + + + +/** + * timeout is in ms, 0 means infinite timeout + */ +static int +htsp_read_message(htsp_connection_t *htsp, htsmsg_t **mp, int timeout) +{ + int v; + size_t len; + uint8_t data[4]; + void *buf; + + v = timeout ? tcp_read_timeout(htsp->htsp_fd, data, 4, timeout) : + tcp_read(htsp->htsp_fd, data, 4); + + if(v != 0) + return v; + + len = (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3]; + if(len > 1024 * 1024) + return EMSGSIZE; + if((buf = malloc(len)) == NULL) + return ENOMEM; + + v = timeout ? tcp_read_timeout(htsp->htsp_fd, buf, len, timeout) : + tcp_read(htsp->htsp_fd, buf, len); - htsp_muxer_unsubscribe(htsp, id); + if(v != 0) { + free(buf); + return v; + } - return rpc_ok(ses); + /* buf will be tied to the message. + * NB: If the message can not be deserialized buf will be free'd by the + * function. + */ + *mp = htsmsg_binary_deserialize(buf, len, buf); + if(*mp == NULL) + return EBADMSG; + + return 0; +} + +/** + * + */ +static int +htsp_read_loop(htsp_connection_t *htsp) +{ + htsmsg_t *m, *reply; + int r, i; + const char *method, *username, *password; + + while(1) { + if((r = htsp_read_message(htsp, &m, 0)) != 0) + return r; + + username = htsmsg_get_str(m, "username"); + password = htsmsg_get_str(m, "password"); + + pthread_mutex_lock(&global_lock); + + if((method = htsmsg_get_str(m, "method")) != NULL) { + for(i = 0; i < NUM_METHODS; i++) { + if(!strcmp(method, htsp_methods[i].name)) { + + if(access_verify(username, password, + (struct sockaddr *)htsp->htsp_peer, + htsp_methods[i].privmask)) { + + reply = htsmsg_create(); + + htsmsg_add_u32(reply, "_noaccess", 1); + } else { + reply = htsp_methods[i].fn(htsp, m); + } + break; + } + } + + if(i == NUM_METHODS) { + reply = htsp_error("Method not found"); + } + + } else { + reply = htsp_error("No 'method' argument"); + } + + pthread_mutex_unlock(&global_lock); + + if(reply != NULL) /* Methods can do all the replying inline */ + htsp_reply(htsp, m, reply); + + htsmsg_destroy(m); + } } /** - * HTSP specific methods + * */ -static rpc_cmd_t htsp_rpc[] = { - { "subscribe", htsp_subscribe, ACCESS_STREAMING }, - { "unsubscribe", htsp_unsubscribe, ACCESS_STREAMING }, - { NULL, NULL, 0 }, -}; +static void * +htsp_write_scheduler(void *aux) +{ + htsp_connection_t *htsp = aux; + int r; + htsp_msg_q_t *hmq; + htsp_msg_t *hm; + void *dptr; + size_t dlen; + + pthread_mutex_lock(&htsp->htsp_out_mutex); + + while(1) { + + if((hmq = TAILQ_FIRST(&htsp->htsp_active_output_queues)) == NULL) { + /* No active queues at all */ + if(!htsp->htsp_writer_run) + break; /* Should not run anymore, bail out */ + + /* Nothing to be done, go to sleep */ + pthread_cond_wait(&htsp->htsp_out_cond, &htsp->htsp_out_mutex); + continue; + } + + hm = TAILQ_FIRST(&hmq->hmq_q); + TAILQ_REMOVE(&hmq->hmq_q, hm, hm_link); + hmq->hmq_length--; + + TAILQ_REMOVE(&htsp->htsp_active_output_queues, hmq, hmq_link); + if(hmq->hmq_length) { + /* Still messages to be sent, put back at the end of active queues */ + TAILQ_INSERT_TAIL(&htsp->htsp_active_output_queues, hmq, hmq_link); + } + + pthread_mutex_unlock(&htsp->htsp_out_mutex); + + r = htsmsg_binary_serialize(hm->hm_msg, &dptr, &dlen, INT32_MAX); + + htsp_msg_destroy(hm); + + write(htsp->htsp_fd, dptr, dlen); + free(dptr); + pthread_mutex_lock(&htsp->htsp_out_mutex); + } + + pthread_mutex_unlock(&htsp->htsp_out_mutex); + return NULL; +} -/* +/** * */ static void -htsp_input(htsp_t *htsp, const void *buf, int len) +htsp_serve(int fd, void *opaque, struct sockaddr_in *source) { - htsmsg_t *in, *out; - int i, was_async; - const uint8_t *v = buf; + htsp_connection_t htsp; + char buf[30]; - printf("Got %d bytes\n", len); - for(i =0 ; i < len; i++) - printf("%02x.", v[i]); - printf("\n"); + snprintf(buf, sizeof(buf), "%s", inet_ntoa(source->sin_addr)); - if((in = htsmsg_binary_deserialize(buf, len, NULL)) == NULL) { - printf("deserialize failed\n"); - return; - } + memset(&htsp, 0, sizeof(htsp_connection_t)); - was_async = htsp->htsp_rpc.rs_is_async; + TAILQ_INIT(&htsp.htsp_active_output_queues); - printf("INPUT:\n"); - htsmsg_print(in); + htsp_init_queue(&htsp.htsp_hmq_ctrl); + htsp_init_queue(&htsp.htsp_hmq_epg); - out = rpc_dispatch(&htsp->htsp_rpc, in, htsp_rpc, htsp, - (struct sockaddr *)&htsp->htsp_tcp_session.tcp_peer_addr); + htsp.htsp_name = strdup(buf); - htsmsg_destroy(in); + htsp.htsp_fd = fd; + htsp.htsp_peer = source; + htsp.htsp_writer_run = 1; - if(out != NULL) { - printf("OUTPUT:\n"); - htsmsg_print(out); - htsp_send_msg(htsp, out, 0); - } + pthread_create(&htsp.htsp_writer_thread, NULL, htsp_write_scheduler, &htsp); - if(!was_async && htsp->htsp_rpc.rs_is_async) { - printf("Session went into async mode\n"); - /* Session went into async state */ - htsp_send_all_channels(htsp); - } + htsp_read_loop(&htsp); + + pthread_mutex_lock(&global_lock); + if(htsp.htsp_async_mode) + LIST_REMOVE(&htsp, htsp_async_link); + pthread_mutex_unlock(&global_lock); + + free(htsp.htsp_name); + + pthread_mutex_lock(&htsp.htsp_out_mutex); + htsp.htsp_writer_run = 0; + pthread_cond_signal(&htsp.htsp_out_cond); + pthread_mutex_unlock(&htsp.htsp_out_mutex); + + pthread_join(htsp.htsp_writer_thread, NULL); + close(fd); } - - -/* - * data available on socket - */ -static void -htsp_data_input(htsp_t *htsp) -{ - int r, l; - tcp_session_t *tcp = &htsp->htsp_tcp_session; - - if(htsp->htsp_bufptr < 4) { - r = read(tcp->tcp_fd, htsp->htsp_buf + htsp->htsp_bufptr, - 4 - htsp->htsp_bufptr); - if(r < 1) { - tcp_disconnect(tcp, r == 0 ? ECONNRESET : errno); - return; - } - - htsp->htsp_bufptr += r; - if(htsp->htsp_bufptr < 4) - return; - - htsp->htsp_msglen = (htsp->htsp_buf[0] << 24) + (htsp->htsp_buf[1] << 16) + - (htsp->htsp_buf[2] << 8) + htsp->htsp_buf[3] + 4; - - if(htsp->htsp_msglen < 12 || htsp->htsp_msglen > 16 * 1024 * 1024) { - tcp_disconnect(tcp, EBADMSG); - return; - } - - if(htsp->htsp_bufsize < htsp->htsp_msglen) { - htsp->htsp_bufsize = htsp->htsp_msglen; - free(htsp->htsp_buf); - htsp->htsp_buf = malloc(htsp->htsp_bufsize); - } - } - - l = htsp->htsp_msglen - htsp->htsp_bufptr; - r = read(tcp->tcp_fd, htsp->htsp_buf + htsp->htsp_bufptr, l); - if(r < 1) { - tcp_disconnect(tcp, r == 0 ? ECONNRESET : errno); - return; - } - - htsp->htsp_bufptr += r; - - if(htsp->htsp_bufptr == htsp->htsp_msglen) { - htsp_input(htsp, htsp->htsp_buf + 4, htsp->htsp_msglen - 4); - htsp->htsp_bufptr = 0; - htsp->htsp_msglen = 0; - } -} - - - -/* - * - */ -static void -htsp_disconnect(htsp_t *htsp) -{ - htsp_muxer_cleanup(htsp); - - LIST_REMOVE(htsp, htsp_global_link); - - free(htsp->htsp_buf); - rpc_deinit(&htsp->htsp_rpc); -} - - -/* - * - */ -static void -htsp_connect(htsp_t *htsp) -{ - rpc_init(&htsp->htsp_rpc, "htsp"); - - htsp->htsp_bufsize = 1000; - htsp->htsp_buf = malloc(htsp->htsp_bufsize); - - LIST_INSERT_HEAD(&htsp_sessions, htsp, htsp_global_link); -} - -/* - * - */ -static void -htsp_tcp_callback(tcpevent_t event, void *tcpsession) -{ - htsp_t *htsp = tcpsession; - - switch(event) { - case TCP_CONNECT: - htsp_connect(htsp); - break; - - case TCP_DISCONNECT: - htsp->htsp_zombie = 1; - htsp_disconnect(htsp); - break; - - case TCP_INPUT: - htsp_data_input(htsp); - break; - } -} - - -/* +/** * Fire up HTSP server */ - void -htsp_start(int port) +htsp_init(void) { - tcp_create_server(port, sizeof(htsp_t), "htsp", htsp_tcp_callback); + tcp_server_create(9982, htsp_serve, NULL); +} + +/** + * + */ +static void +htsp_async_send(htsmsg_t *m) +{ + htsp_connection_t *htsp; + + LIST_FOREACH(htsp, &htsp_async_connections, htsp_async_link) + htsp_send_message(htsp, htsmsg_copy(m), NULL); + htsmsg_destroy(m); +} + + +/** + * EPG subsystem calls this function when the current event + * changes for a channel, e may be NULL if there is no current event. + * + * global_lock is held + */ +void +htsp_event_update(channel_t *ch, event_t *e) +{ + htsmsg_t *m; + time_t now; + + time(&now); + m = htsmsg_create(); + htsmsg_add_str(m, "method", "channelUpdate"); + htsmsg_add_u32(m, "channelId", ch->ch_id); + + if(e == NULL) + e = epg_event_find_by_time(ch, now); + + htsmsg_add_u32(m, "eventId", e ? e->e_id : 0); + htsp_async_send(m); +} + +/** + * Called from channel.c when a new channel is created + */ +void +htsp_channel_add(channel_t *ch) +{ + htsp_async_send(htsp_build_channel(ch, "channelAdd")); +} + + +/** + * Called from channel.c when a channel is updated + */ +void +htsp_channel_update(channel_t *ch) +{ + htsp_async_send(htsp_build_channel(ch, "channelUpdate")); +} + + +/** + * Called from channel.c when a channel is deleted + */ +void +htsp_channel_delete(channel_t *ch) +{ + htsmsg_t *m = htsmsg_create(); + htsmsg_add_u32(m, "channelId", ch->ch_id); + htsmsg_add_str(m, "method", "channelDelete"); + htsp_async_send(m); +} + + +/** + * Called from channel.c when a tag is exported + */ +void +htsp_tag_add(channel_tag_t *ct) +{ + htsp_async_send(htsp_build_tag(ct, "tagAdd")); +} + + +/** + * Called from channel.c when an exported tag is changed + */ +void +htsp_tag_update(channel_tag_t *ct) +{ + htsp_async_send(htsp_build_tag(ct, "tagUpdate")); +} + + +/** + * Called from channel.c when an exported tag is deleted + */ +void +htsp_tag_delete(channel_tag_t *ct) +{ + htsmsg_t *m = htsmsg_create(); + htsmsg_add_str(m, "tagId", ct->ct_identifier); + htsmsg_add_str(m, "method", "tagDelete"); + htsp_async_send(m); } diff --git a/htsp.h b/htsp.h index 03244afa..5856948d 100644 --- a/htsp.h +++ b/htsp.h @@ -19,34 +19,18 @@ #ifndef HTSP_H_ #define HTSP_H_ -#include +#include "epg.h" -#include "rpc.h" -#include "tcp.h" +void htsp_init(void); -typedef struct htsp { - tcp_session_t htsp_tcp_session; /* Must be first */ +void htsp_event_update(channel_t *ch, event_t *e); - LIST_ENTRY(htsp) htsp_global_link; +void htsp_channel_add(channel_t *ch); +void htsp_channel_update(channel_t *ch); +void htsp_channel_delete(channel_t *ch); - int htsp_bufsize; - int htsp_bufptr; - int htsp_msglen; - uint8_t *htsp_buf; - - rpc_session_t htsp_rpc; - - int htsp_async_init_sent; - int htsp_zombie; - - struct th_subscription_list htsp_subscriptions; - -} htsp_t; - -void htsp_start(int port); - -int htsp_send_msg(htsp_t *htsp, htsmsg_t *m, int media); - -void htsp_async_channel_update(channel_t *ch); +void htsp_tag_add(channel_tag_t *ct); +void htsp_tag_update(channel_tag_t *ct); +void htsp_tag_delete(channel_tag_t *ct); #endif /* HTSP_H_ */ diff --git a/main.c b/main.c index d6279d52..b9ace746 100644 --- a/main.c +++ b/main.c @@ -48,6 +48,7 @@ #include "serviceprobe.h" #include "cwc.h" #include "dvr/dvr.h" +#include "htsp.h" #include #include @@ -287,6 +288,8 @@ main(int argc, char **argv) dvr_init(); + htsp_init(); + pthread_mutex_unlock(&global_lock);