diff --git a/cwc.c b/cwc.c index 12916051..3cffe88a 100644 --- a/cwc.c +++ b/cwc.c @@ -41,9 +41,12 @@ #include "dtable.h" +/** + * + */ static int cwc_tally; -#define CWC_KEEPALIVE_INTERVAL 600 +#define CWC_KEEPALIVE_INTERVAL 30 #define CWS_NETMSGSIZE 240 #define CWS_FIRSTCMDNO 0xe0 @@ -71,12 +74,18 @@ typedef enum { MSG_KEEPALIVE = CWS_FIRSTCMDNO + 0x1d } net_msg_type_t; -struct cwc_queue cwcs; -extern char *cwc_krypt(const char *key, const char *salt); +/** + * + */ +TAILQ_HEAD(cwc_queue, cwc); +LIST_HEAD(cwc_transport_list, cwc_transport); +static struct cwc_queue cwcs; -static LIST_HEAD(, cwc_transport) cwc_pending_requests; +/** + * + */ typedef struct cwc_transport { th_descrambler_t ct_head; @@ -84,25 +93,39 @@ typedef struct cwc_transport { struct cwc *ct_cwc; - LIST_ENTRY(cwc_transport) ct_cwc_link; /* Always linked */ + LIST_ENTRY(cwc_transport) ct_link; - LIST_ENTRY(cwc_transport) ct_link; /* linked if we are waiting - on a reply from server */ - int ct_pending; + /** + * Sequence number generated on write (based on cwc_seq), + * used to pair reply message (i.e when i CT_STATE_WAIT_REPLY) + * with cwc_transport + */ + uint16_t ct_seq; + + + /** + * Current ECM + */ + uint8_t ct_ecm[256]; + int ct_ecmsize; + + int ct_ecm_reply_pending; /* Waiting for a ECM reply */ + + /** + * Status of the key(s) in ct_keys + */ enum { CT_UNKNOWN, CT_RESOLVED, CT_FORBIDDEN } ct_keystate; - uint16_t ct_seq; - - uint8_t ct_ecm[256]; - int ct_ecmsize; - void *ct_keys; + /** + * CSA + */ int ct_cluster_size; uint8_t *ct_tsbcluster; int ct_fill; @@ -111,6 +134,63 @@ typedef struct cwc_transport { + + + +/** + * + */ +typedef struct cwc { + int cwc_fd; + + pthread_mutex_t cwc_send_mutex; + + pthread_cond_t cwc_cond; + + pthread_cond_t cwc_writer_cond; /* Used to wakeup writer */ + int cwc_writer_running; + + + TAILQ_ENTRY(cwc) cwc_link; /* Linkage protected via global_lock */ + + struct cwc_transport_list cwc_transports; + + uint16_t cwc_caid; + + uint16_t cwc_seq; + + uint8_t cwc_key[16]; + + uint8_t cwc_buf[256]; + int cwc_bufptr; + + /* From configuration */ + + uint8_t cwc_confedkey[14]; + char *cwc_username; + char *cwc_password; + char *cwc_password_salted; /* salted version */ + char *cwc_comment; + char *cwc_hostname; + int cwc_port; + char *cwc_id; + + const char *cwc_errtxt; + + int cwc_enabled; + int cwc_running; + int cwc_reconfigure; +} cwc_t; + + +/** + * + */ + +static void cwc_transport_destroy(th_descrambler_t *td); +extern char *cwc_krypt(const char *key, const char *salt); + + /** * */ @@ -269,6 +349,8 @@ cwc_send_msg(cwc_t *cwc, const uint8_t *msg, size_t len, int sid) { uint8_t *buf = malloc(CWS_NETMSGSIZE); + pthread_mutex_lock(&cwc->cwc_send_mutex); + memset(buf, 0, 12); memcpy(buf + 12, msg, len); @@ -283,6 +365,7 @@ cwc_send_msg(cwc_t *cwc, const uint8_t *msg, size_t len, int sid) if((len = des_encrypt(buf, len, cwc->cwc_key)) < 0) { free(buf); + pthread_mutex_unlock(&cwc->cwc_send_mutex); return -1; } @@ -291,6 +374,7 @@ cwc_send_msg(cwc_t *cwc, const uint8_t *msg, size_t len, int sid) write(cwc->cwc_fd, buf, len); free(buf); + pthread_mutex_unlock(&cwc->cwc_send_mutex); return cwc->cwc_seq; } @@ -316,11 +400,9 @@ cwc_send_data_req(cwc_t *cwc) /** * Send keep alive */ -#if 0 static void -cwc_send_ka(void *aux, int64_t now) +cwc_send_ka(cwc_t *cwc) { - cwc_t *cwc = aux; uint8_t buf[CWS_NETMSGSIZE]; buf[0] = MSG_KEEPALIVE; @@ -329,7 +411,6 @@ cwc_send_ka(void *aux, int64_t now) cwc_send_msg(cwc, buf, 3, 0); } -#endif /** @@ -389,40 +470,12 @@ cwc_send_login(cwc_t *cwc) cwc_send_msg(cwc, buf, ul + pl + 3, 0); } -#if 0 /** - * Handle reply to login + * Handle running reply + * global_lock is held */ static int -cwc_dispatch_login_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len) -{ - switch(msgtype) { - case MSG_CLIENT_2_SERVER_LOGIN_ACK: - tvhlog(LOG_INFO, "cwc", "%s: Login ok", - cwc->cwc_tcp_session.tcp_hostname); - des_make_session_key(cwc); - cwc_send_data_req(cwc); - return 0; - - case MSG_CLIENT_2_SERVER_LOGIN_NAK: - tvhlog(LOG_INFO, "cwc", "%s: Authentication denied", - cwc->cwc_tcp_session.tcp_hostname); - return EACCES; - - default: - tvhlog(LOG_INFO, "cwc", "%s: Invalid response (msgcode = %d) during login", - cwc->cwc_tcp_session.tcp_hostname, msgtype); - - return EBADMSG; - } -} - - -/** - * Handle reply to login - */ -static int -cwc_dispatch_running_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len) +cwc_running_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len) { cwc_transport_t *ct; uint16_t seq = (msg[2] << 8) | msg[3]; @@ -434,8 +487,8 @@ cwc_dispatch_running_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len) switch(msgtype) { case 0x80: case 0x81: - LIST_FOREACH(ct, &cwc_pending_requests, ct_link) { - if(ct->ct_seq == seq) + LIST_FOREACH(ct, &cwc->cwc_transports, ct_link) { + if(ct->ct_seq == seq && ct->ct_ecm_reply_pending) break; } @@ -443,16 +496,14 @@ cwc_dispatch_running_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len) return 0; t = ct->ct_transport; - - LIST_REMOVE(ct, ct_link); - ct->ct_pending = 0; + ct->ct_ecm_reply_pending = 0; if(len < 19) { if(ct->ct_keystate != CT_FORBIDDEN) { tvhlog(LOG_ERR, "cwc", - "Can not descramble \"%s\" for service \"%s\", access denied", - t->tht_identifier, t->tht_svcname); + "Can not descramble service \"%s\", access denied", + t->tht_svcname); ct->ct_keystate = CT_FORBIDDEN; } @@ -461,11 +512,11 @@ cwc_dispatch_running_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len) if(ct->ct_keystate != CT_RESOLVED) tvhlog(LOG_INFO, "cwc", - "Obtained key for \"%s\" for service \"%s\"", - t->tht_identifier, t->tht_svcname); - + "Obtained key for for service \"%s\"",t->tht_svcname); ct->ct_keystate = CT_RESOLVED; + pthread_mutex_lock(&t->tht_stream_mutex); set_control_words(ct->ct_keys, msg + 3, msg + 3 + 8); + pthread_mutex_unlock(&t->tht_stream_mutex); break; } return 0; @@ -475,97 +526,41 @@ cwc_dispatch_running_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len) /** * */ -static void -cwc_data_input(cwc_t *cwc) +static int +cwc_must_break(cwc_t *cwc) { - int msglen, r; - tcp_session_t *ses = &cwc->cwc_tcp_session; - int fd = ses->tcp_fd; - - if(cwc->cwc_state == CWC_STATE_WAIT_LOGIN_KEY) { - - r = read(fd, cwc->cwc_buf + cwc->cwc_bufptr, 14 - cwc->cwc_bufptr); - if(r < 1) { - tcp_disconnect(ses, r == 0 ? ECONNRESET : errno); - return; - } - - cwc->cwc_bufptr += r; - if(cwc->cwc_bufptr < 14) - return; - - des_make_login_key(cwc, cwc->cwc_buf); - cwc_send_login(cwc); - cwc->cwc_bufptr = 0; - - } else { - - if(cwc->cwc_bufptr < 2) { - msglen = 2; - } else { - msglen = ((cwc->cwc_buf[0] << 8) | cwc->cwc_buf[1]) + 2; - if(msglen >= CWS_NETMSGSIZE) { - tcp_disconnect(ses, EMSGSIZE); - return; - } - } - - r = read(fd, cwc->cwc_buf + cwc->cwc_bufptr, msglen - cwc->cwc_bufptr); - if(r < 1) { - tcp_disconnect(ses, r == 0 ? ECONNRESET : errno); - return; - } - - cwc->cwc_bufptr += r; - - if(msglen > 2 && cwc->cwc_bufptr == msglen) { - if((msglen = des_decrypt(cwc->cwc_buf, msglen, cwc->cwc_key)) < 15) { - tcp_disconnect(ses, EILSEQ); - return; - } - cwc->cwc_bufptr = 0; - - switch(cwc->cwc_state) { - case CWC_STATE_WAIT_LOGIN_ACK: - r = cwc_dispatch_login_reply(cwc, cwc->cwc_buf[12], - cwc->cwc_buf, msglen); - break; - - case CWC_STATE_WAIT_CARD_DATA: - r = cwc_dispatch_card_data_reply(cwc, cwc->cwc_buf[12], - cwc->cwc_buf, msglen); - break; - - case CWC_STATE_RUNNING: - r = cwc_dispatch_running_reply(cwc, cwc->cwc_buf[12], - cwc->cwc_buf, msglen); - break; - - default: - r = EBADMSG; - break; - } - - if(r != 0) { - tcp_disconnect(ses, r); - return; - } - } - } + return !cwc->cwc_running || !cwc->cwc_enabled || cwc->cwc_reconfigure; +} + +/** + * + */ +static int +cwc_read(cwc_t *cwc, void *buf, size_t len, int timeout) +{ + int r; + + pthread_mutex_unlock(&global_lock); + r = tcp_read_timeout(cwc->cwc_fd, buf, len, timeout); + pthread_mutex_lock(&global_lock); + + if(cwc_must_break(cwc)) + return ECONNABORTED; + + return r; } -#endif /** * */ static int -cwc_read_message(cwc_t *cwc, const char *state) +cwc_read_message(cwc_t *cwc, const char *state, int timeout) { char buf[2]; int msglen, r; - if((r = tcp_read(cwc->cwc_fd, buf, 2))) { + if((r = cwc_read(cwc, buf, 2, timeout))) { tvhlog(LOG_INFO, "cwc", "%s: %s: Read error (header): %s", cwc->cwc_hostname, state, strerror(r)); return -1; @@ -578,20 +573,51 @@ cwc_read_message(cwc_t *cwc, const char *state) return -1; } - if((r = tcp_read(cwc->cwc_fd, cwc->cwc_buf + 2, msglen))) { + /* We expect the rest of the message to arrive fairly quick, + so just wait 1 second here */ + + if((r = cwc_read(cwc, cwc->cwc_buf + 2, msglen, 1000))) { tvhlog(LOG_INFO, "cwc", "%s: %s: Read error: %s", cwc->cwc_hostname, state, strerror(r)); return -1; } if((msglen = des_decrypt(cwc->cwc_buf, msglen + 2, cwc->cwc_key)) < 15) { - tvhlog(LOG_INFO, "cwc", "%s: %s: Decrypt failed", state, cwc->cwc_hostname); + tvhlog(LOG_INFO, "cwc", "%s: %s: Decrypt failed", + state, cwc->cwc_hostname); return -1; } - return msglen; } +/** + * + */ +static void * +cwc_writer_thread(void *aux) +{ + cwc_t *cwc = aux; + struct timespec ts; + int r; + + pthread_mutex_lock(&global_lock); + + while(cwc->cwc_writer_running) { + + /* If nothing is to be sent in CWC_KEEPALIVE_INTERVAL seconds we + need to send a keepalive */ + ts.tv_sec = time(NULL) + CWC_KEEPALIVE_INTERVAL; + ts.tv_nsec = 0; + + r = pthread_cond_timedwait(&cwc->cwc_writer_cond, &global_lock, &ts); + cwc_send_ka(cwc); + } + + pthread_mutex_unlock(&global_lock); + return NULL; +} + + /** * @@ -600,11 +626,12 @@ static void cwc_session(cwc_t *cwc) { int r; - + pthread_t writer_thread_id; + /** * Get login key */ - if((r = tcp_read(cwc->cwc_fd, cwc->cwc_buf, 14))) { + if((r = cwc_read(cwc, cwc->cwc_buf, 14, 5000))) { tvhlog(LOG_INFO, "cwc", "%s: No login key received: %s", cwc->cwc_hostname, strerror(r)); return; @@ -617,10 +644,7 @@ cwc_session(cwc_t *cwc) */ cwc_send_login(cwc); - if(cwc_read_message(cwc, "Wait login response") < 0) - return; - - if(!cwc->cwc_running || !cwc->cwc_enabled) + if(cwc_read_message(cwc, "Wait login response", 5000) < 0) return; if(cwc->cwc_buf[12] != MSG_CLIENT_2_SERVER_LOGIN_ACK) { @@ -634,7 +658,7 @@ cwc_session(cwc_t *cwc) * Request card data */ cwc_send_data_req(cwc); - if((r = cwc_read_message(cwc, "Request card data")) < 0) + if((r = cwc_read_message(cwc, "Request card data", 5000)) < 0) return; if(cwc->cwc_buf[12] != MSG_CARD_DATA) { @@ -642,13 +666,36 @@ cwc_session(cwc_t *cwc) return; } - if(!cwc->cwc_running || !cwc->cwc_enabled) + if(cwc_decode_card_data_reply(cwc, cwc->cwc_buf, r) < 0) return; - cwc_decode_card_data_reply(cwc, cwc->cwc_buf, r); - sleep(4); + /** + * We do all requests from now on in a separate thread + */ + cwc->cwc_writer_running = 1; + pthread_cond_init(&cwc->cwc_writer_cond, NULL); + pthread_create(&writer_thread_id, NULL, cwc_writer_thread, cwc); + /** + * Mainloop + */ + while(!cwc_must_break(cwc)) { + + if((r = cwc_read_message(cwc, "Decoderloop", + CWC_KEEPALIVE_INTERVAL * 2 * 1000)) < 0) + break; + cwc_running_reply(cwc, cwc->cwc_buf[12], cwc->cwc_buf, r); + } + + /** + * Collect the writer thread + */ + cwc->cwc_writer_running = 0; + pthread_cond_signal(&cwc->cwc_writer_cond); + pthread_mutex_unlock(&global_lock); + pthread_join(writer_thread_id, NULL); + pthread_mutex_lock(&global_lock); } @@ -658,49 +705,77 @@ cwc_session(cwc_t *cwc) static void * cwc_thread(void *aux) { + cwc_transport_t *ct; cwc_t *cwc = aux; int fd; char errbuf[100]; + th_transport_t *t; - pthread_mutex_lock(&cwc->cwc_mutex); + char hostname[256]; + int port; + + pthread_mutex_lock(&global_lock); while(cwc->cwc_running) { while(cwc->cwc_running && cwc->cwc_enabled == 0) - pthread_cond_wait(&cwc->cwc_cond, &cwc->cwc_mutex); + pthread_cond_wait(&cwc->cwc_cond, &global_lock); - tvhlog(LOG_INFO, "cwc", "Attemping to connect to %s:%d", - cwc->cwc_hostname, cwc->cwc_port); + snprintf(hostname, sizeof(hostname), "%s", cwc->cwc_hostname); + port = cwc->cwc_port; - fd = tcp_connect(cwc->cwc_hostname, cwc->cwc_port, errbuf, - sizeof(errbuf), 10); + tvhlog(LOG_INFO, "cwc", "Attemping to connect to %s:%d", hostname, port); + + pthread_mutex_unlock(&global_lock); + + fd = tcp_connect(hostname, port, errbuf, sizeof(errbuf), 10); + + pthread_mutex_lock(&global_lock); if(fd == -1) { tvhlog(LOG_INFO, "cwc", "Connection attempt to %s:%d failed: %s", - cwc->cwc_hostname, cwc->cwc_port, errbuf); + hostname, port, errbuf); continue; } - if(cwc->cwc_running == 0) + if(cwc->cwc_running == 0) { + close(fd); break; + } - tvhlog(LOG_INFO, "cwc", "Connected to %s:%d", - cwc->cwc_hostname, cwc->cwc_port); + tvhlog(LOG_INFO, "cwc", "Connected to %s:%d", hostname, port); cwc->cwc_fd = fd; - pthread_mutex_unlock(&cwc->cwc_mutex); + cwc->cwc_reconfigure = 0; cwc_session(cwc); - pthread_mutex_lock(&cwc->cwc_mutex); cwc->cwc_fd = -1; close(fd); - + cwc->cwc_caid = 0; tvhlog(LOG_INFO, "cwc", "Disconnected from %s", cwc->cwc_hostname); + + pthread_mutex_unlock(&global_lock); sleep(1); + pthread_mutex_lock(&global_lock); } - pthread_mutex_unlock(&cwc->cwc_mutex); + tvhlog(LOG_INFO, "cwc", "%s destroyed", cwc->cwc_hostname); + + while((ct = LIST_FIRST(&cwc->cwc_transports)) != NULL) { + t = ct->ct_transport; + pthread_mutex_lock(&t->tht_stream_mutex); + cwc_transport_destroy(&ct->ct_head); + pthread_mutex_unlock(&t->tht_stream_mutex); + } + + free((void *)cwc->cwc_password); + free((void *)cwc->cwc_password_salted); + free((void *)cwc->cwc_username); + free((void *)cwc->cwc_hostname); + free(cwc); + + pthread_mutex_unlock(&global_lock); return NULL; } @@ -727,26 +802,23 @@ cwc_table_input(struct th_descrambler *td, struct th_transport *t, case 0x81: /* ECM */ - if(ct->ct_pending) - return; + if(ct->ct_ecm_reply_pending) + break; if(ct->ct_ecmsize == len && !memcmp(ct->ct_ecm, data, len)) - return; /* key already sent */ + break; /* key already sent */ - abort(); - /* - if(cwc->cwc_state != CWC_STATE_RUNNING) { + + if(cwc->cwc_fd == -1) { // New key, but we are not connected (anymore), can not descramble ct->ct_keystate = CT_UNKNOWN; break; } -*/ memcpy(ct->ct_ecm, data, len); ct->ct_ecmsize = len; ct->ct_seq = cwc_send_msg(cwc, data, len, sid); - LIST_INSERT_HEAD(&cwc_pending_requests, ct, ct_link); - ct->ct_pending = 1; + ct->ct_ecm_reply_pending = 1; break; default: @@ -800,37 +872,29 @@ cwc_descramble(th_descrambler_t *td, th_transport_t *t, struct th_stream *st, return 0; } - /** - * + * global_lock is held + * tht_stream_mutex is held */ -static void -cwc_transport_destroy(cwc_transport_t *ct) +static void +cwc_transport_destroy(th_descrambler_t *td) { - if(ct->ct_pending) - LIST_REMOVE(ct, ct_link); + cwc_transport_t *ct = (cwc_transport_t *)td; - LIST_REMOVE(ct, ct_cwc_link); + LIST_REMOVE(td, td_transport_link); + + LIST_REMOVE(ct, ct_link); free_key_struct(ct->ct_keys); free(ct->ct_tsbcluster); free(ct); } -/** - * - */ -static void -cwc_transport_stop(th_descrambler_t *td) -{ - LIST_REMOVE(td, td_transport_link); - - cwc_transport_destroy((cwc_transport_t *)td); -} - /** * Check if our CAID's matches, and if so, link + * + * global_lock is held */ void cwc_transport_start(th_transport_t *t) @@ -860,13 +924,14 @@ cwc_transport_start(th_transport_t *t) ct->ct_keys = get_key_struct(); ct->ct_cwc = cwc; ct->ct_transport = t; - LIST_INSERT_HEAD(&cwc->cwc_transports, ct, ct_cwc_link); + td = &ct->ct_head; - - td->td_stop = cwc_transport_stop; + td->td_stop = cwc_transport_destroy; td->td_table = cwc_table_input; td->td_descramble = cwc_descramble; LIST_INSERT_HEAD(&t->tht_descramblers, td, td_transport_link); + + LIST_INSERT_HEAD(&cwc->cwc_transports, ct, ct_link); } } @@ -877,24 +942,10 @@ cwc_transport_start(th_transport_t *t) static void cwc_destroy(cwc_t *cwc) { - cwc_transport_t *ct; - - pthread_mutex_lock(&cwc->cwc_mutex); + lock_assert(&global_lock); + TAILQ_REMOVE(&cwcs, cwc, cwc_link); cwc->cwc_running = 0; pthread_cond_signal(&cwc->cwc_cond); - pthread_mutex_unlock(&cwc->cwc_mutex); - - pthread_join(cwc->cwc_thread_id, NULL); - - while((ct = LIST_FIRST(&cwc->cwc_transports)) != NULL) - cwc_transport_destroy(ct); - - TAILQ_REMOVE(&cwcs, cwc, cwc_link); - free((void *)cwc->cwc_password); - free((void *)cwc->cwc_password_salted); - free((void *)cwc->cwc_username); - free((void *)cwc->cwc_hostname); - free(cwc); } @@ -904,6 +955,8 @@ cwc_destroy(cwc_t *cwc) static cwc_t * cwc_entry_find(const char *id, int create) { + pthread_attr_t attr; + pthread_t ptid; char buf[20]; cwc_t *cwc; @@ -924,11 +977,17 @@ cwc_entry_find(const char *id, int create) } cwc = calloc(1, sizeof(cwc_t)); + pthread_cond_init(&cwc->cwc_cond, NULL); + pthread_mutex_init(&cwc->cwc_send_mutex, NULL); cwc->cwc_id = strdup(id); cwc->cwc_running = 1; TAILQ_INSERT_TAIL(&cwcs, cwc, cwc_link); - pthread_create(&cwc->cwc_thread_id, NULL, cwc_thread, cwc); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + pthread_create(&ptid, &attr, cwc_thread, cwc); + pthread_attr_destroy(&attr); + return cwc; } @@ -1010,7 +1069,7 @@ cwc_entry_update(void *opaque, const char *id, htsmsg_t *values, int maycreate) if((cwc = cwc_entry_find(id, maycreate)) == NULL) return NULL; - pthread_mutex_lock(&cwc->cwc_mutex); + lock_assert(&global_lock); if((s = htsmsg_get_str(values, "username")) != NULL) { free(cwc->cwc_username); @@ -1055,8 +1114,12 @@ cwc_entry_update(void *opaque, const char *id, htsmsg_t *values, int maycreate) memcpy(cwc->cwc_confedkey, key, 14); } + cwc->cwc_reconfigure = 1; + + if(cwc->cwc_fd != -1) + shutdown(cwc->cwc_fd, SHUT_RDWR); + pthread_cond_signal(&cwc->cwc_cond); - pthread_mutex_unlock(&cwc->cwc_mutex); return cwc_record_build(cwc); } diff --git a/cwc.h b/cwc.h index 47ab3131..afcf5ab3 100644 --- a/cwc.h +++ b/cwc.h @@ -19,60 +19,8 @@ #ifndef CWC_H_ #define CWC_H_ -#include "tcp.h" - -TAILQ_HEAD(cwc_queue, cwc); - -extern struct cwc_queue cwcs; - -typedef struct cwc { - int cwc_fd; - - pthread_mutex_t cwc_mutex; - pthread_cond_t cwc_cond; - pthread_t cwc_thread_id; - - TAILQ_ENTRY(cwc) cwc_link; /* Linkage protected via global_lock */ - - LIST_HEAD(, cwc_transport) cwc_transports; - - uint16_t cwc_caid; - - uint16_t cwc_seq; - - uint8_t cwc_key[16]; - - uint8_t cwc_buf[256]; - int cwc_bufptr; - - /* From configuration */ - - uint8_t cwc_confedkey[14]; - char *cwc_username; - char *cwc_password; - char *cwc_password_salted; /* salted version */ - char *cwc_comment; - char *cwc_hostname; - int cwc_port; - char *cwc_id; - - const char *cwc_errtxt; - - int cwc_enabled; - int cwc_running; -} cwc_t; - - void cwc_init(void); void cwc_transport_start(th_transport_t *t); -const char *cwc_status_to_text(struct cwc *cwc); - -//cwc_t *cwc_find(int id); - -void cwc_delete(cwc_t *cwc); - -void cwc_set_enable(cwc_t *cwc, int enabled); - #endif /* CWC_H_ */ diff --git a/main.c b/main.c index 200cf2d9..a0f46113 100644 --- a/main.c +++ b/main.c @@ -46,6 +46,7 @@ #include "spawn.h" #include "subscriptions.h" #include "serviceprobe.h" +#include "cwc.h" #include #include @@ -269,6 +270,8 @@ main(int argc, char **argv) serviceprobe_init(); + cwc_init(); + pthread_mutex_unlock(&global_lock); diff --git a/tcp.c b/tcp.c index 3040cae4..f3fb1cf4 100644 --- a/tcp.c +++ b/tcp.c @@ -304,6 +304,44 @@ tcp_read(int fd, void *buf, size_t len) } +/** + * + */ +int +tcp_read_timeout(int fd, void *buf, size_t len, int timeout) +{ + int x, tot = 0; + struct pollfd fds; + + assert(timeout > 0); + + fds.fd = fd; + fds.events = POLLIN; + fds.revents = 0; + + while(tot != len) { + + x = poll(&fds, 1, timeout); + if(x == 0) + return ETIMEDOUT; + + x = recv(fd, buf + tot, len - tot, MSG_DONTWAIT); + if(x == -1) { + if(errno == EAGAIN) + continue; + return errno; + } + + if(x == 0) + return ECONNRESET; + + tot += x; + } + + return 0; + +} + /** * */ diff --git a/tcp.h b/tcp.h index 7b486e0f..f92e1d55 100644 --- a/tcp.h +++ b/tcp.h @@ -123,4 +123,6 @@ int tcp_read_data(int fd, char *buf, const size_t bufsize, int tcp_write_queue(int fd, htsbuf_queue_t *q); +int tcp_read_timeout(int fd, void *buf, size_t len, int timeout); + #endif /* TCP_H_ */ diff --git a/transports.c b/transports.c index 0532babf..ac8d7ad5 100644 --- a/transports.c +++ b/transports.c @@ -73,6 +73,8 @@ transport_stop(th_transport_t *t) t->tht_stop_feed(t); + pthread_mutex_lock(&t->tht_stream_mutex); + while((td = LIST_FIRST(&t->tht_descramblers)) != NULL) td->td_stop(td); @@ -81,8 +83,6 @@ transport_stop(th_transport_t *t) assert(LIST_FIRST(&t->tht_muxers) == NULL); assert(LIST_FIRST(&t->tht_subscriptions) == NULL); - pthread_mutex_lock(&t->tht_stream_mutex); - /** * Clean up each stream */ @@ -274,7 +274,7 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start) } } - // cwc_transport_start(t); + cwc_transport_start(t); t->tht_packets = 0; gtimer_arm(&t->tht_receive_timer, transport_data_timeout, t, 4);