diff --git a/include/re_sipevent.h b/include/re_sipevent.h index c7c0517..9d0a097 100644 --- a/include/re_sipevent.h +++ b/include/re_sipevent.h @@ -4,6 +4,9 @@ * Copyright (C) 2010 Creytiv.com */ + +/* Listener Socket */ + struct sipevent_sock; int sipevent_listen(struct sipevent_sock **sockp, struct sip *sip, @@ -11,25 +14,33 @@ int sipevent_listen(struct sipevent_sock **sockp, struct sip *sip, sip_msg_h *subh, void *arg); +/* Subscriber */ + +typedef void (sipevent_notify_h)(struct sip *sip, const struct sip_msg *msg, + void *arg); +typedef void (sipevent_close_h)(int err, const struct sip_msg *msg, void *arg); + struct sipsub; int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock, - bool retry, const char *uri, const char *from_name, + const char *uri, const char *from_name, const char *from_uri, const char *event, uint32_t expires, const char *cuser, const char *routev[], uint32_t routec, sip_auth_h *authh, void *aarg, bool aref, - sip_resp_h *resph, sip_msg_h *noth, void *arg, - const char *fmt, ...); + sipevent_notify_h *notifyh, sipevent_close_h *closeh, + void *arg, const char *fmt, ...); int sipevent_refer(struct sipsub **subp, struct sipevent_sock *sock, const char *uri, const char *from_name, const char *from_uri, const char *cuser, const char *routev[], uint32_t routec, sip_auth_h *authh, void *aarg, bool aref, - sip_resp_h *resph, sip_msg_h *noth, void *arg, - const char *fmt, ...); + sipevent_notify_h *notifyh, sipevent_close_h *closeh, + void *arg, const char *fmt, ...); +/* Message Components */ + struct sipevent_event { struct pl event; struct pl params; @@ -37,13 +48,15 @@ struct sipevent_event { enum sipevent_subst { SIPEVENT_ACTIVE = 0, + SIPEVENT_PENDING, SIPEVENT_TERMINATED, }; struct sipevent_substate { enum sipevent_subst state; struct pl params; - uint32_t expires; + struct pl expires; + struct pl reason; }; int sipevent_event_decode(struct sipevent_event *se, const struct pl *pl); diff --git a/src/sipevent/listen.c b/src/sipevent/listen.c index 90e24e7..811cf88 100644 --- a/src/sipevent/listen.c +++ b/src/sipevent/listen.c @@ -79,18 +79,33 @@ static struct sipsub *sipsub_find(struct sipevent_sock *sock, static void notify_handler(struct sipevent_sock *sock, const struct sip_msg *msg) { - struct sipevent_substate ss; + struct sipevent_substate state; + struct sipevent_event event; struct sip *sip = sock->sip; const struct sip_hdr *hdr; - struct sipevent_event se; struct sipsub *sub; + uint32_t nrefs; bool indialog; + hdr = sip_msg_hdr(msg, SIP_HDR_EVENT); + if (!hdr || sipevent_event_decode(&event, &hdr->val)) { + (void)sip_reply(sip, msg, 400, "Bad Event Header"); + return; + } + + hdr = sip_msg_hdr(msg, SIP_HDR_SUBSCRIPTION_STATE); + if (!hdr || sipevent_substate_decode(&state, &hdr->val)) { + (void)sip_reply(sip, msg, 400,"Bad Subscription-State Header"); + return; + } + sub = sipsub_find(sock, msg, true); if (!sub) { + /* hack: while we are waiting for proper fork handling */ + sub = sipsub_find(sock, msg, false); - if (!sub) { + if (!sub || sip_dialog_established(sub->dlg)) { (void)sip_reply(sip, msg, 481, "Subscription Does Not Exist"); return; @@ -100,7 +115,7 @@ static void notify_handler(struct sipevent_sock *sock, } else { if (!sip_dialog_rseq_valid(sub->dlg, msg)) { - (void)sip_reply(sip, msg, 500,"Server Internal Error"); + (void)sip_reply(sip, msg, 500, "Bad Sequence"); return; } @@ -109,58 +124,46 @@ static void notify_handler(struct sipevent_sock *sock, indialog = true; } - hdr = sip_msg_hdr(msg, SIP_HDR_EVENT); - - if (!hdr || sipevent_event_decode(&se, &hdr->val) || - pl_strcasecmp(&se.event, sub->event)) { + if (pl_strcasecmp(&event.event, sub->event)) { (void)sip_reply(sip, msg, 489, "Bad Event"); return; } - hdr = sip_msg_hdr(msg, SIP_HDR_SUBSCRIPTION_STATE); - - if (indialog && hdr && !sipevent_substate_decode(&ss, &hdr->val)) { - - re_printf("dialog substate: %s (%u secs) [%r]\n", - sipevent_substate_name(ss.state), - ss.expires, &ss.params); - - switch (ss.state) { - - case SIPEVENT_ACTIVE: - sub->subscribed = true; - - if (sub->terminated) - break; - - sipevent_resubscribe(sub, ss.expires * 900); - break; - - case SIPEVENT_TERMINATED: - sub->req = mem_deref(sub->req); /* forget request */ - - if (sub->terminated) { - mem_deref(sub); - goto reply; - } - - sub->subscribed = false; - sub->dlg = mem_deref(sub->dlg); - hash_unlink(&sub->he); - - if (sub->retry) - sipevent_resubscribe(sub, 0); - else - tmr_cancel(&sub->tmr); - break; - } + /* hack: while we are waiting for proper fork handling */ + if (!indialog) { + sub->notifyh(sip, msg, sub->arg); + return; } - if (sub->noth(msg, sub->arg)) + re_printf("notify: %s (%r)\n", sipevent_substate_name(state.state), + &state.params); + + switch (state.state) { + + case SIPEVENT_ACTIVE: + case SIPEVENT_PENDING: + sub->subscribed = true; + + if (!sub->terminated && pl_isset(&state.expires)) + sipsub_reschedule(sub, pl_u32(&state.expires) * 900); + break; + + case SIPEVENT_TERMINATED: + sub->subscribed = false; + break; + } + + mem_ref(sub); + sub->notifyh(sip, msg, sub->arg); + nrefs = mem_nrefs(sub); + mem_deref(sub); + + /* check if subscription was deref'd from notify handler */ + if (nrefs == 1) return; - reply: - (void)sip_treply(NULL, sip, msg, 200, "OK"); + if (!sub->terminated && state.state == SIPEVENT_TERMINATED) + sipsub_terminate(sub, 0, msg); } @@ -177,7 +180,7 @@ static void subscribe_handler(struct sipevent_sock *sock, } if (!sip_dialog_rseq_valid(not->dlg, msg)) { - (void)sip_reply(sip, msg, 500, "Server Internal Error"); + (void)sip_reply(sip, msg, 500, "Bad Sequence"); return; } diff --git a/src/sipevent/msg.c b/src/sipevent/msg.c index f49b4a1..38f4124 100644 --- a/src/sipevent/msg.c +++ b/src/sipevent/msg.c @@ -31,7 +31,7 @@ int sipevent_event_decode(struct sipevent_event *se, const struct pl *pl) int sipevent_substate_decode(struct sipevent_substate *ss, const struct pl *pl) { - struct pl state, expires; + struct pl state, param; int err; if (!ss || !pl) @@ -44,15 +44,22 @@ int sipevent_substate_decode(struct sipevent_substate *ss, const struct pl *pl) if (!pl_strcasecmp(&state, "active")) ss->state = SIPEVENT_ACTIVE; + else if (!pl_strcasecmp(&state, "pending")) + ss->state = SIPEVENT_PENDING; else if (!pl_strcasecmp(&state, "terminated")) ss->state = SIPEVENT_TERMINATED; else ss->state = -1; - if (!sip_param_decode(&ss->params, "expires", &expires)) - ss->expires = pl_u32(&expires); + if (!sip_param_decode(&ss->params, "expires", ¶m)) + ss->expires = param; else - ss->expires = 0; + ss->expires = pl_null; + + if (!sip_param_decode(&ss->params, "reason", ¶m)) + ss->reason = param; + else + ss->reason = pl_null; return 0; } @@ -63,6 +70,7 @@ const char *sipevent_substate_name(enum sipevent_subst state) switch (state) { case SIPEVENT_ACTIVE: return "active"; + case SIPEVENT_PENDING: return "pending"; case SIPEVENT_TERMINATED: return "terminated"; default: return "???"; } diff --git a/src/sipevent/sipevent.h b/src/sipevent/sipevent.h index f9e3b3b..87ed81a 100644 --- a/src/sipevent/sipevent.h +++ b/src/sipevent/sipevent.h @@ -5,6 +5,8 @@ */ +/* Listener Socket */ + struct sipevent_sock { struct sip_lsnr *lsnr; struct hash *ht_not; @@ -15,6 +17,8 @@ struct sipevent_sock { }; +/* Notifier */ + struct sipnot { struct le he; struct sip_dialog *dlg; @@ -22,6 +26,8 @@ struct sipnot { }; +/* Subscriber */ + struct sipsub { struct le he; struct sip_loopstate ls; @@ -31,24 +37,18 @@ struct sipsub { struct sip_dialog *dlg; struct sip_auth *auth; struct sip *sip; - char *uri; - char *from_name; - char *from_uri; - char **routev; char *event; char *cuser; char *hdrs; - sip_resp_h *resph; - sip_msg_h *noth; + sipevent_notify_h *notifyh; + sipevent_close_h *closeh; void *arg; uint32_t expires; uint32_t failc; - uint32_t routec; bool subscribed; bool terminated; bool refer; - bool retry; }; - -void sipevent_resubscribe(struct sipsub *sub, uint64_t wait); +void sipsub_reschedule(struct sipsub *sub, uint64_t wait); +void sipsub_terminate(struct sipsub *sub, int err, const struct sip_msg *msg); diff --git a/src/sipevent/subscribe.c b/src/sipevent/subscribe.c index 3cd769b..8c6e3df 100644 --- a/src/sipevent/subscribe.c +++ b/src/sipevent/subscribe.c @@ -3,6 +3,7 @@ * * Copyright (C) 2010 Creytiv.com */ +#include // todo: remove #include #include #include @@ -20,14 +21,25 @@ enum { DEFAULT_EXPIRES = 3600, + RESUB_FAIL_WAIT = 60000, + RESUB_FAILC_MAX = 7, }; static int request(struct sipsub *sub, bool reset_ls); -static void internal_response_handler(int err, const struct sip_msg *msg, - void *arg) +static void internal_notify_handler(struct sip *sip, const struct sip_msg *msg, + void *arg) +{ + (void)arg; + + (void)sip_treply(NULL, sip, msg, 200, "OK"); +} + + +static void internal_close_handler(int err, const struct sip_msg *msg, + void *arg) { (void)err; (void)msg; @@ -35,10 +47,21 @@ static void internal_response_handler(int err, const struct sip_msg *msg, } -static bool internal_notify_handler(const struct sip_msg *msg, void *arg) +static bool terminate(struct sipsub *sub) { - (void)msg; - (void)arg; + sub->terminated = true; + sub->notifyh = internal_notify_handler; + sub->closeh = internal_close_handler; + + if (sub->req) { + mem_ref(sub); + return true; + } + + if (sub->subscribed && !request(sub, true)) { + mem_ref(sub); + return true; + } return false; } @@ -52,37 +75,14 @@ static void destructor(void *arg) if (!sub->terminated) { - sub->resph = internal_response_handler; - sub->noth = internal_notify_handler; - sub->terminated = true; - - if (sub->req) { - mem_ref(sub); + if (terminate(sub)) return; - } - - if (sub->subscribed && !request(sub, true)) { - mem_ref(sub); - return; - } - } - - if (sub->routev) { - - uint32_t i; - - for (i=0; iroutec; i++) - mem_deref(sub->routev[i]); - - mem_deref(sub->routev); } hash_unlink(&sub->he); + mem_deref(sub->req); mem_deref(sub->dlg); mem_deref(sub->auth); - mem_deref(sub->uri); - mem_deref(sub->from_name); - mem_deref(sub->from_uri); mem_deref(sub->event); mem_deref(sub->cuser); mem_deref(sub->hdrs); @@ -91,12 +91,6 @@ static void destructor(void *arg) } -static uint64_t failwait(uint32_t failc) -{ - return min(1800, (30 * (1<req) return; - if (!sub->dlg) { - - err = sip_dialog_alloc(&sub->dlg, sub->uri, sub->uri, - sub->from_name, sub->from_uri, - (const char **)sub->routev, - sub->routec); - if (err) - goto out; - - hash_append(sub->sock->ht_sub, - hash_joaat_str(sip_dialog_callid(sub->dlg)), - &sub->he, sub); - } - err = request(sub, true); - if (err) - goto out; - - out: if (err) { - tmr_start(&sub->tmr, failwait(++sub->failc), tmr_handler, sub); - sub->resph(err, NULL, sub->arg); + if (++sub->failc < RESUB_FAILC_MAX) { + sipsub_reschedule(sub, RESUB_FAIL_WAIT); + } + else { + sipsub_terminate(sub, err, NULL); + } } } -void sipevent_resubscribe(struct sipsub *sub, uint64_t wait) +void sipsub_reschedule(struct sipsub *sub, uint64_t wait) { - if (!wait) - wait = failwait(++sub->failc); - - re_printf("will re-subscribe in %llu ms\n", wait); + re_printf("will re-subscribe in %llu secs\n", wait/1000); tmr_start(&sub->tmr, wait, tmr_handler, sub); } +void sipsub_terminate(struct sipsub *sub, int err, const struct sip_msg *msg) +{ + sipevent_close_h *closeh; + void *arg; + + closeh = sub->closeh; + arg = sub->arg; + + tmr_cancel(&sub->tmr); + (void)terminate(sub); + + closeh(err, msg, arg); +} + + static void response_handler(int err, const struct sip_msg *msg, void *arg) { const struct sip_hdr *minexp; struct sipsub *sub = arg; - uint64_t wait; - wait = failwait(sub->failc + 1); + if (err) + re_printf("reply: %s\n", strerror(err)); + else + re_printf("reply: %u %r\n", msg->scode, &msg->reason); - if (err || sip_request_loops(&sub->ls, msg->scode)) { - - if (err == ETIMEDOUT) { - sub->subscribed = false; - sub->dlg = mem_deref(sub->dlg); - hash_unlink(&sub->he); - } - - sub->failc++; + if (err || sip_request_loops(&sub->ls, msg->scode)) goto out; - } if (msg->scode < 200) { return; } else if (msg->scode < 300) { + uint32_t wait; + if (!sip_dialog_established(sub->dlg)) { err = sip_dialog_create(sub->dlg, msg); if (err) { - sub->dlg = mem_deref(sub->dlg); - hash_unlink(&sub->he); - sub->failc++; + sub->subscribed = false; goto out; } } @@ -181,16 +166,26 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg) (void)sip_dialog_update(sub->dlg, msg); } - if (sub->refer && tmr_isrunning(&sub->tmr)) - wait = tmr_get_expire(&sub->tmr); - else if (pl_isset(&msg->expires)) - wait = pl_u32(&msg->expires) * 900; - else - wait = sub->expires * 900; - sub->subscribed = true; - sub->refer = false; sub->failc = 0; + + if (sub->terminated) { + sub->refer = false; + goto out; + } + + if (sub->refer) { + sub->refer = false; + return; + } + + if (pl_isset(&msg->expires)) + wait = pl_u32(&msg->expires); + else + wait = sub->expires; + + sipsub_reschedule(sub, wait * 900); + return; } else { if (sub->terminated && !sub->subscribed) @@ -216,13 +211,6 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg) sip_auth_reset(sub->auth); break; - case 408: - case 481: - sub->subscribed = false; - sub->dlg = mem_deref(sub->dlg); - hash_unlink(&sub->he); - break; - case 423: minexp = sip_msg_hdr(msg, SIP_HDR_MIN_EXPIRES); if (!minexp || !pl_u32(&minexp->val) || !sub->expires) @@ -235,9 +223,11 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg) break; return; - } - ++sub->failc; + case 481: + sub->subscribed = false; + break; + } } out: @@ -249,15 +239,10 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg) mem_deref(sub); } else { - if (sub->retry || sub->subscribed) { - re_printf("will re-subscribe in %llu ms...\n", wait); - tmr_start(&sub->tmr, wait, tmr_handler, sub); - } - else { - tmr_cancel(&sub->tmr); - } - - sub->resph(err, msg, sub->arg); + if (sub->subscribed && ++sub->failc < RESUB_FAILC_MAX) + sipsub_reschedule(sub, RESUB_FAIL_WAIT); + else + sipsub_terminate(sub, err, msg); } } @@ -308,13 +293,13 @@ static int request(struct sipsub *sub, bool reset_ls) static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock, - bool refer, bool retry, const char *uri, + bool refer, const char *uri, const char *from_name, const char *from_uri, const char *event, uint32_t expires, const char *cuser, const char *routev[], uint32_t routec, sip_auth_h *authh, void *aarg, bool aref, - sip_resp_h *resph, sip_msg_h *noth, void *arg, - const char *fmt, va_list ap) + sipevent_notify_h *notifyh, sipevent_close_h *closeh, + void *arg, const char *fmt, va_list ap) { struct sipsub *sub; int err; @@ -339,41 +324,6 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock, if (err) goto out; - err = str_dup(&sub->uri, uri); - if (err) - goto out; - - err = str_dup(&sub->from_uri, from_uri); - if (err) - goto out; - - if (from_name) { - - err = str_dup(&sub->from_name, from_name); - if (err) - goto out; - } - - sub->routec = routec; - - if (routec > 0) { - - uint32_t i; - - sub->routev = mem_zalloc(sizeof(*sub->routev) * routec, NULL); - if (!sub->routev) { - err = ENOMEM; - goto out; - } - - for (i=0; iroutev[i], routev[i]); - if (err) - goto out; - } - } - err = str_dup(&sub->event, event); if (err) goto out; @@ -390,12 +340,11 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock, } sub->refer = refer; - sub->retry = retry; sub->sock = mem_ref(sock); sub->sip = mem_ref(sock->sip); sub->expires = expires; - sub->resph = resph ? resph : internal_response_handler; - sub->noth = noth ? noth : internal_notify_handler; + sub->notifyh = notifyh ? notifyh : internal_notify_handler; + sub->closeh = closeh ? closeh : internal_close_handler; sub->arg = arg; err = request(sub, true); @@ -417,7 +366,6 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock, * * @param subp Pointer to allocated SIP subscriber client * @param sock SIP Event socket - * @param retry Re-subscribe if subscription terminates * @param uri SIP Request URI * @param from_name SIP From-header Name (optional) * @param from_uri SIP From-header URI @@ -429,29 +377,29 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock, * @param authh Authentication handler * @param aarg Authentication handler argument * @param aref True to ref argument - * @param resph SUBSCRIBE response handler * @param noth Notify handler + * @param closeh Close handler * @param arg Response handler argument * @param fmt Formatted strings with extra SIP Headers * * @return 0 if success, otherwise errorcode */ int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock, - bool retry, const char *uri, const char *from_name, + const char *uri, const char *from_name, const char *from_uri, const char *event, uint32_t expires, const char *cuser, const char *routev[], uint32_t routec, sip_auth_h *authh, void *aarg, bool aref, - sip_resp_h *resph, sip_msg_h *noth, void *arg, - const char *fmt, ...) + sipevent_notify_h *notifyh, sipevent_close_h *closeh, + void *arg, const char *fmt, ...) { va_list ap; int err; va_start(ap, fmt); - err = sipsub_alloc(subp, sock, false, retry, uri, from_name, from_uri, + err = sipsub_alloc(subp, sock, false, uri, from_name, from_uri, event, expires, cuser, routev, routec, authh, aarg, - aref, resph, noth, arg, fmt, ap); + aref, notifyh, closeh, arg, fmt, ap); va_end(ap); return err; @@ -472,8 +420,8 @@ int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock, * @param authh Authentication handler * @param aarg Authentication handler argument * @param aref True to ref argument - * @param resph SUBSCRIBE response handler * @param noth Notify handler + * @param closeh Close handler * @param arg Response handler argument * @param fmt Formatted strings with extra SIP Headers * @@ -484,16 +432,16 @@ int sipevent_refer(struct sipsub **subp, struct sipevent_sock *sock, const char *from_uri, const char *cuser, const char *routev[], uint32_t routec, sip_auth_h *authh, void *aarg, bool aref, - sip_resp_h *resph, sip_msg_h *noth, void *arg, - const char *fmt, ...) + sipevent_notify_h *notifyh, sipevent_close_h *closeh, + void *arg, const char *fmt, ...) { va_list ap; int err; va_start(ap, fmt); - err = sipsub_alloc(subp, sock, true, false, uri, from_name, from_uri, + err = sipsub_alloc(subp, sock, true, uri, from_name, from_uri, "refer", DEFAULT_EXPIRES, cuser, routev, routec, - authh, aarg, aref, resph, noth, arg, fmt, ap); + authh, aarg, aref, notifyh, closeh, arg, fmt, ap); va_end(ap); return err;