diff --git a/src/cwc.c b/src/cwc.c index b3bc8b6c..d1fc046f 100644 --- a/src/cwc.c +++ b/src/cwc.c @@ -37,7 +37,7 @@ #include "transports.h" #include "cwc.h" #include "notify.h" - +#include "atomic.h" #include "dtable.h" /** @@ -78,6 +78,7 @@ typedef enum { */ TAILQ_HEAD(cwc_queue, cwc); LIST_HEAD(cwc_transport_list, cwc_transport); +TAILQ_HEAD(cwc_message_queue, cwc_message); static struct cwc_queue cwcs; static pthread_cond_t cwc_config_changed; @@ -132,7 +133,14 @@ typedef struct cwc_transport { } cwc_transport_t; - +/** + * + */ +typedef struct cwc_message { + TAILQ_ENTRY(cwc_message) cm_link; + int cm_len; + uint8_t cm_data[CWS_NETMSGSIZE]; +} cwc_message_t; @@ -145,13 +153,12 @@ typedef struct cwc { int cwc_retry_delay; - pthread_mutex_t cwc_send_mutex; - pthread_cond_t cwc_cond; - pthread_cond_t cwc_writer_cond; /* Used to wakeup writer */ + pthread_mutex_t cwc_writer_mutex; + pthread_cond_t cwc_writer_cond; int cwc_writer_running; - + struct cwc_message_queue cwc_writeq; TAILQ_ENTRY(cwc) cwc_link; /* Linkage protected via global_lock */ @@ -159,7 +166,7 @@ typedef struct cwc { uint16_t cwc_caid; - uint16_t cwc_seq; + int cwc_seq; uint8_t cwc_key[16]; @@ -350,42 +357,52 @@ des_make_session_key(cwc_t *cwc) } /** - * + * Note, this function is called from multiple threads so beware of + * locking / race issues (Note how we use atomic_add() to generate + * the ID) */ static int -cwc_send_msg(cwc_t *cwc, const uint8_t *msg, size_t len, int sid) +cwc_send_msg(cwc_t *cwc, const uint8_t *msg, size_t len, int sid, int enq) { - uint8_t *buf = malloc(CWS_NETMSGSIZE); - int n; + cwc_message_t *cm = malloc(sizeof(cwc_message_t)); + uint8_t *buf = cm->cm_data; + int seq, n; - pthread_mutex_lock(&cwc->cwc_send_mutex); + if(len + 12 > CWS_NETMSGSIZE) + return -1; memset(buf, 0, 12); memcpy(buf + 12, msg, len); len += 12; - cwc->cwc_seq++; + seq = atomic_add(&cwc->cwc_seq, 1); - buf[2] = cwc->cwc_seq >> 8; - buf[3] = cwc->cwc_seq; + buf[2] = seq >> 8; + buf[3] = seq; buf[4] = sid >> 8; buf[5] = sid; if((len = des_encrypt(buf, len, cwc->cwc_key)) < 0) { free(buf); - pthread_mutex_unlock(&cwc->cwc_send_mutex); return -1; } buf[0] = (len - 2) >> 8; buf[1] = len - 2; - /* ignore return value */ - n = write(cwc->cwc_fd, buf, len); - free(buf); - pthread_mutex_unlock(&cwc->cwc_send_mutex); - return cwc->cwc_seq; + + if(enq) { + cm->cm_len = len; + pthread_mutex_lock(&cwc->cwc_writer_mutex); + TAILQ_INSERT_TAIL(&cwc->cwc_writeq, cm, cm_link); + pthread_cond_signal(&cwc->cwc_writer_cond); + pthread_mutex_unlock(&cwc->cwc_writer_mutex); + } else { + n = write(cwc->cwc_fd, buf, len); + free(cm); + } + return seq & 0xffff; } @@ -403,7 +420,7 @@ cwc_send_data_req(cwc_t *cwc) buf[1] = 0; buf[2] = 0; - cwc_send_msg(cwc, buf, 3, 0); + cwc_send_msg(cwc, buf, 3, 0, 0); } @@ -419,7 +436,7 @@ cwc_send_ka(cwc_t *cwc) buf[1] = 0; buf[2] = 0; - cwc_send_msg(cwc, buf, 3, 0); + cwc_send_msg(cwc, buf, 3, 0, 0); } static void cwc_comet_status_update(cwc_t *cwc){ @@ -513,7 +530,7 @@ cwc_send_login(cwc_t *cwc) memcpy(buf + 3, cwc->cwc_username, ul); memcpy(buf + 3 + ul, cwc->cwc_password_salted, pl); - cwc_send_msg(cwc, buf, ul + pl + 3, 0); + cwc_send_msg(cwc, buf, ul + pl + 3, 0, 0); } /** @@ -588,6 +605,9 @@ cwc_running_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len) pthread_mutex_unlock(&t->tht_stream_mutex); break; + default: + // EMM + break; } return 0; } @@ -667,23 +687,37 @@ static void * cwc_writer_thread(void *aux) { cwc_t *cwc = aux; + cwc_message_t *cm; struct timespec ts; int r; - pthread_mutex_lock(&global_lock); + pthread_mutex_lock(&cwc->cwc_writer_mutex); while(cwc->cwc_writer_running) { + if((cm = TAILQ_FIRST(&cwc->cwc_writeq)) != NULL) { + TAILQ_REMOVE(&cwc->cwc_writeq, cm, cm_link); + pthread_mutex_unlock(&cwc->cwc_writer_mutex); + // int64_t ts = getmonoclock(); + r = write(cwc->cwc_fd, cm->cm_data, cm->cm_len); + // printf("Write took %lld usec\n", getmonoclock() - ts); + free(cm); + pthread_mutex_lock(&cwc->cwc_writer_mutex); + continue; + } + + /* If nothing is to be sent in CWC_KEEPALIVE_INTERVAL seconds we need to send a keepalive */ ts.tv_sec = time(NULL) + CWC_KEEPALIVE_INTERVAL; ts.tv_nsec = 0; - - r = pthread_cond_timedwait(&cwc->cwc_writer_cond, &global_lock, &ts); - cwc_send_ka(cwc); + r = pthread_cond_timedwait(&cwc->cwc_writer_cond, + &cwc->cwc_writer_mutex, &ts); + if(r == ETIMEDOUT) + cwc_send_ka(cwc); } - pthread_mutex_unlock(&global_lock); + pthread_mutex_unlock(&cwc->cwc_writer_mutex); return NULL; } @@ -749,6 +783,8 @@ cwc_session(cwc_t *cwc) */ cwc->cwc_writer_running = 1; pthread_cond_init(&cwc->cwc_writer_cond, NULL); + pthread_mutex_init(&cwc->cwc_writer_mutex, NULL); + TAILQ_INIT(&cwc->cwc_writeq); pthread_create(&writer_thread_id, NULL, cwc_writer_thread, cwc); /** @@ -765,11 +801,11 @@ cwc_session(cwc_t *cwc) /** * Collect the writer thread */ + shutdown(cwc->cwc_fd, SHUT_RDWR); cwc->cwc_writer_running = 0; pthread_cond_signal(&cwc->cwc_writer_cond); - pthread_mutex_unlock(&global_lock); pthread_join(writer_thread_id, NULL); - pthread_mutex_lock(&global_lock); + tvhlog(LOG_DEBUG, "cwc", "Write thread joined"); } @@ -891,6 +927,22 @@ verify_provider(cwc_t *cwc, uint32_t providerid) } +/** + * + */ +void +cwc_emm(uint8_t *data, int len) +{ + cwc_t *cwc; + + lock_assert(&global_lock); + + TAILQ_FOREACH(cwc, &cwcs, cwc_link) + if(cwc->cwc_emm && cwc->cwc_writer_running) + cwc_send_msg(cwc, data, len, 0, 1); +} + + /** * */ @@ -933,15 +985,14 @@ cwc_table_input(struct th_descrambler *td, struct th_transport *t, memcpy(ct->ct_ecm, data, len); ct->ct_ecmsize = len; - ct->ct_seq = cwc_send_msg(cwc, data, len, sid); + ct->ct_seq = cwc_send_msg(cwc, data, len, sid, 1); ct->ct_ecm_reply_pending = 1; break; default: - if (cwc->cwc_emm) { - /* EMM */ - cwc_send_msg(cwc, data, len, sid); - } + /* EMM */ + if (cwc->cwc_emm) + cwc_send_msg(cwc, data, len, sid, 1); break; } } @@ -1108,7 +1159,6 @@ cwc_entry_find(const char *id, int create) cwc = calloc(1, sizeof(cwc_t)); pthread_cond_init(&cwc->cwc_cond, NULL); - pthread_mutex_init(&cwc->cwc_send_mutex, NULL); cwc->cwc_id = strdup(id); cwc->cwc_running = 1; TAILQ_INSERT_TAIL(&cwcs, cwc, cwc_link); diff --git a/src/cwc.h b/src/cwc.h index afcf5ab3..3f963e57 100644 --- a/src/cwc.h +++ b/src/cwc.h @@ -23,4 +23,6 @@ void cwc_init(void); void cwc_transport_start(th_transport_t *t); +void cwc_emm(uint8_t *data, int len); + #endif /* CWC_H_ */ diff --git a/src/dvb/dvb_tables.c b/src/dvb/dvb_tables.c index f5f499a2..579814f1 100644 --- a/src/dvb/dvb_tables.c +++ b/src/dvb/dvb_tables.c @@ -41,6 +41,7 @@ #include "channels.h" #include "psi.h" #include "notify.h" +#include "cwc.h" #define TDT_CRC 0x1 #define TDT_QUICKREQ 0x2 @@ -747,6 +748,7 @@ static int dvb_ca_callback(th_dvb_mux_instance_t *tdmi, uint8_t *ptr, int len, uint8_t tableid, void *opaque) { + cwc_emm(ptr, len); return 0; }