added close handler

This commit is contained in:
Richard Aas 2011-12-02 14:09:27 +00:00
parent 7d24909807
commit a8e32ddace
5 changed files with 196 additions and 224 deletions

View file

@ -4,6 +4,9 @@
* Copyright (C) 2010 Creytiv.com
*/
/* Listener Socket */
struct sipevent_sock;
int sipevent_listen(struct sipevent_sock **sockp, struct sip *sip,
@ -11,25 +14,33 @@ int sipevent_listen(struct sipevent_sock **sockp, struct sip *sip,
sip_msg_h *subh, void *arg);
/* Subscriber */
typedef void (sipevent_notify_h)(struct sip *sip, const struct sip_msg *msg,
void *arg);
typedef void (sipevent_close_h)(int err, const struct sip_msg *msg, void *arg);
struct sipsub;
int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock,
bool retry, const char *uri, const char *from_name,
const char *uri, const char *from_name,
const char *from_uri, const char *event,
uint32_t expires, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sip_resp_h *resph, sip_msg_h *noth, void *arg,
const char *fmt, ...);
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, ...);
int sipevent_refer(struct sipsub **subp, struct sipevent_sock *sock,
const char *uri, const char *from_name,
const char *from_uri, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sip_resp_h *resph, sip_msg_h *noth, void *arg,
const char *fmt, ...);
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, ...);
/* Message Components */
struct sipevent_event {
struct pl event;
struct pl params;
@ -37,13 +48,15 @@ struct sipevent_event {
enum sipevent_subst {
SIPEVENT_ACTIVE = 0,
SIPEVENT_PENDING,
SIPEVENT_TERMINATED,
};
struct sipevent_substate {
enum sipevent_subst state;
struct pl params;
uint32_t expires;
struct pl expires;
struct pl reason;
};
int sipevent_event_decode(struct sipevent_event *se, const struct pl *pl);

View file

@ -79,18 +79,33 @@ static struct sipsub *sipsub_find(struct sipevent_sock *sock,
static void notify_handler(struct sipevent_sock *sock,
const struct sip_msg *msg)
{
struct sipevent_substate ss;
struct sipevent_substate state;
struct sipevent_event event;
struct sip *sip = sock->sip;
const struct sip_hdr *hdr;
struct sipevent_event se;
struct sipsub *sub;
uint32_t nrefs;
bool indialog;
hdr = sip_msg_hdr(msg, SIP_HDR_EVENT);
if (!hdr || sipevent_event_decode(&event, &hdr->val)) {
(void)sip_reply(sip, msg, 400, "Bad Event Header");
return;
}
hdr = sip_msg_hdr(msg, SIP_HDR_SUBSCRIPTION_STATE);
if (!hdr || sipevent_substate_decode(&state, &hdr->val)) {
(void)sip_reply(sip, msg, 400,"Bad Subscription-State Header");
return;
}
sub = sipsub_find(sock, msg, true);
if (!sub) {
/* hack: while we are waiting for proper fork handling */
sub = sipsub_find(sock, msg, false);
if (!sub) {
if (!sub || sip_dialog_established(sub->dlg)) {
(void)sip_reply(sip, msg,
481, "Subscription Does Not Exist");
return;
@ -100,7 +115,7 @@ static void notify_handler(struct sipevent_sock *sock,
}
else {
if (!sip_dialog_rseq_valid(sub->dlg, msg)) {
(void)sip_reply(sip, msg, 500,"Server Internal Error");
(void)sip_reply(sip, msg, 500, "Bad Sequence");
return;
}
@ -109,58 +124,46 @@ static void notify_handler(struct sipevent_sock *sock,
indialog = true;
}
hdr = sip_msg_hdr(msg, SIP_HDR_EVENT);
if (!hdr || sipevent_event_decode(&se, &hdr->val) ||
pl_strcasecmp(&se.event, sub->event)) {
if (pl_strcasecmp(&event.event, sub->event)) {
(void)sip_reply(sip, msg, 489, "Bad Event");
return;
}
hdr = sip_msg_hdr(msg, SIP_HDR_SUBSCRIPTION_STATE);
if (indialog && hdr && !sipevent_substate_decode(&ss, &hdr->val)) {
re_printf("dialog substate: %s (%u secs) [%r]\n",
sipevent_substate_name(ss.state),
ss.expires, &ss.params);
switch (ss.state) {
case SIPEVENT_ACTIVE:
sub->subscribed = true;
if (sub->terminated)
break;
sipevent_resubscribe(sub, ss.expires * 900);
break;
case SIPEVENT_TERMINATED:
sub->req = mem_deref(sub->req); /* forget request */
if (sub->terminated) {
mem_deref(sub);
goto reply;
}
sub->subscribed = false;
sub->dlg = mem_deref(sub->dlg);
hash_unlink(&sub->he);
if (sub->retry)
sipevent_resubscribe(sub, 0);
else
tmr_cancel(&sub->tmr);
break;
}
/* hack: while we are waiting for proper fork handling */
if (!indialog) {
sub->notifyh(sip, msg, sub->arg);
return;
}
if (sub->noth(msg, sub->arg))
re_printf("notify: %s (%r)\n", sipevent_substate_name(state.state),
&state.params);
switch (state.state) {
case SIPEVENT_ACTIVE:
case SIPEVENT_PENDING:
sub->subscribed = true;
if (!sub->terminated && pl_isset(&state.expires))
sipsub_reschedule(sub, pl_u32(&state.expires) * 900);
break;
case SIPEVENT_TERMINATED:
sub->subscribed = false;
break;
}
mem_ref(sub);
sub->notifyh(sip, msg, sub->arg);
nrefs = mem_nrefs(sub);
mem_deref(sub);
/* check if subscription was deref'd from notify handler */
if (nrefs == 1)
return;
reply:
(void)sip_treply(NULL, sip, msg, 200, "OK");
if (!sub->terminated && state.state == SIPEVENT_TERMINATED)
sipsub_terminate(sub, 0, msg);
}
@ -177,7 +180,7 @@ static void subscribe_handler(struct sipevent_sock *sock,
}
if (!sip_dialog_rseq_valid(not->dlg, msg)) {
(void)sip_reply(sip, msg, 500, "Server Internal Error");
(void)sip_reply(sip, msg, 500, "Bad Sequence");
return;
}

View file

@ -31,7 +31,7 @@ int sipevent_event_decode(struct sipevent_event *se, const struct pl *pl)
int sipevent_substate_decode(struct sipevent_substate *ss, const struct pl *pl)
{
struct pl state, expires;
struct pl state, param;
int err;
if (!ss || !pl)
@ -44,15 +44,22 @@ int sipevent_substate_decode(struct sipevent_substate *ss, const struct pl *pl)
if (!pl_strcasecmp(&state, "active"))
ss->state = SIPEVENT_ACTIVE;
else if (!pl_strcasecmp(&state, "pending"))
ss->state = SIPEVENT_PENDING;
else if (!pl_strcasecmp(&state, "terminated"))
ss->state = SIPEVENT_TERMINATED;
else
ss->state = -1;
if (!sip_param_decode(&ss->params, "expires", &expires))
ss->expires = pl_u32(&expires);
if (!sip_param_decode(&ss->params, "expires", &param))
ss->expires = param;
else
ss->expires = 0;
ss->expires = pl_null;
if (!sip_param_decode(&ss->params, "reason", &param))
ss->reason = param;
else
ss->reason = pl_null;
return 0;
}
@ -63,6 +70,7 @@ const char *sipevent_substate_name(enum sipevent_subst state)
switch (state) {
case SIPEVENT_ACTIVE: return "active";
case SIPEVENT_PENDING: return "pending";
case SIPEVENT_TERMINATED: return "terminated";
default: return "???";
}

View file

@ -5,6 +5,8 @@
*/
/* Listener Socket */
struct sipevent_sock {
struct sip_lsnr *lsnr;
struct hash *ht_not;
@ -15,6 +17,8 @@ struct sipevent_sock {
};
/* Notifier */
struct sipnot {
struct le he;
struct sip_dialog *dlg;
@ -22,6 +26,8 @@ struct sipnot {
};
/* Subscriber */
struct sipsub {
struct le he;
struct sip_loopstate ls;
@ -31,24 +37,18 @@ struct sipsub {
struct sip_dialog *dlg;
struct sip_auth *auth;
struct sip *sip;
char *uri;
char *from_name;
char *from_uri;
char **routev;
char *event;
char *cuser;
char *hdrs;
sip_resp_h *resph;
sip_msg_h *noth;
sipevent_notify_h *notifyh;
sipevent_close_h *closeh;
void *arg;
uint32_t expires;
uint32_t failc;
uint32_t routec;
bool subscribed;
bool terminated;
bool refer;
bool retry;
};
void sipevent_resubscribe(struct sipsub *sub, uint64_t wait);
void sipsub_reschedule(struct sipsub *sub, uint64_t wait);
void sipsub_terminate(struct sipsub *sub, int err, const struct sip_msg *msg);

View file

@ -3,6 +3,7 @@
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h> // todo: remove
#include <re_types.h>
#include <re_mem.h>
#include <re_mbuf.h>
@ -20,14 +21,25 @@
enum {
DEFAULT_EXPIRES = 3600,
RESUB_FAIL_WAIT = 60000,
RESUB_FAILC_MAX = 7,
};
static int request(struct sipsub *sub, bool reset_ls);
static void internal_response_handler(int err, const struct sip_msg *msg,
void *arg)
static void internal_notify_handler(struct sip *sip, const struct sip_msg *msg,
void *arg)
{
(void)arg;
(void)sip_treply(NULL, sip, msg, 200, "OK");
}
static void internal_close_handler(int err, const struct sip_msg *msg,
void *arg)
{
(void)err;
(void)msg;
@ -35,10 +47,21 @@ static void internal_response_handler(int err, const struct sip_msg *msg,
}
static bool internal_notify_handler(const struct sip_msg *msg, void *arg)
static bool terminate(struct sipsub *sub)
{
(void)msg;
(void)arg;
sub->terminated = true;
sub->notifyh = internal_notify_handler;
sub->closeh = internal_close_handler;
if (sub->req) {
mem_ref(sub);
return true;
}
if (sub->subscribed && !request(sub, true)) {
mem_ref(sub);
return true;
}
return false;
}
@ -52,37 +75,14 @@ static void destructor(void *arg)
if (!sub->terminated) {
sub->resph = internal_response_handler;
sub->noth = internal_notify_handler;
sub->terminated = true;
if (sub->req) {
mem_ref(sub);
if (terminate(sub))
return;
}
if (sub->subscribed && !request(sub, true)) {
mem_ref(sub);
return;
}
}
if (sub->routev) {
uint32_t i;
for (i=0; i<sub->routec; i++)
mem_deref(sub->routev[i]);
mem_deref(sub->routev);
}
hash_unlink(&sub->he);
mem_deref(sub->req);
mem_deref(sub->dlg);
mem_deref(sub->auth);
mem_deref(sub->uri);
mem_deref(sub->from_name);
mem_deref(sub->from_uri);
mem_deref(sub->event);
mem_deref(sub->cuser);
mem_deref(sub->hdrs);
@ -91,12 +91,6 @@ static void destructor(void *arg)
}
static uint64_t failwait(uint32_t failc)
{
return min(1800, (30 * (1<<min(failc, 6)))) * (500 + rand_u16() % 501);
}
static void tmr_handler(void *arg)
{
struct sipsub *sub = arg;
@ -105,75 +99,66 @@ static void tmr_handler(void *arg)
if (sub->req)
return;
if (!sub->dlg) {
err = sip_dialog_alloc(&sub->dlg, sub->uri, sub->uri,
sub->from_name, sub->from_uri,
(const char **)sub->routev,
sub->routec);
if (err)
goto out;
hash_append(sub->sock->ht_sub,
hash_joaat_str(sip_dialog_callid(sub->dlg)),
&sub->he, sub);
}
err = request(sub, true);
if (err)
goto out;
out:
if (err) {
tmr_start(&sub->tmr, failwait(++sub->failc), tmr_handler, sub);
sub->resph(err, NULL, sub->arg);
if (++sub->failc < RESUB_FAILC_MAX) {
sipsub_reschedule(sub, RESUB_FAIL_WAIT);
}
else {
sipsub_terminate(sub, err, NULL);
}
}
}
void sipevent_resubscribe(struct sipsub *sub, uint64_t wait)
void sipsub_reschedule(struct sipsub *sub, uint64_t wait)
{
if (!wait)
wait = failwait(++sub->failc);
re_printf("will re-subscribe in %llu ms\n", wait);
re_printf("will re-subscribe in %llu secs\n", wait/1000);
tmr_start(&sub->tmr, wait, tmr_handler, sub);
}
void sipsub_terminate(struct sipsub *sub, int err, const struct sip_msg *msg)
{
sipevent_close_h *closeh;
void *arg;
closeh = sub->closeh;
arg = sub->arg;
tmr_cancel(&sub->tmr);
(void)terminate(sub);
closeh(err, msg, arg);
}
static void response_handler(int err, const struct sip_msg *msg, void *arg)
{
const struct sip_hdr *minexp;
struct sipsub *sub = arg;
uint64_t wait;
wait = failwait(sub->failc + 1);
if (err)
re_printf("reply: %s\n", strerror(err));
else
re_printf("reply: %u %r\n", msg->scode, &msg->reason);
if (err || sip_request_loops(&sub->ls, msg->scode)) {
if (err == ETIMEDOUT) {
sub->subscribed = false;
sub->dlg = mem_deref(sub->dlg);
hash_unlink(&sub->he);
}
sub->failc++;
if (err || sip_request_loops(&sub->ls, msg->scode))
goto out;
}
if (msg->scode < 200) {
return;
}
else if (msg->scode < 300) {
uint32_t wait;
if (!sip_dialog_established(sub->dlg)) {
err = sip_dialog_create(sub->dlg, msg);
if (err) {
sub->dlg = mem_deref(sub->dlg);
hash_unlink(&sub->he);
sub->failc++;
sub->subscribed = false;
goto out;
}
}
@ -181,16 +166,26 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg)
(void)sip_dialog_update(sub->dlg, msg);
}
if (sub->refer && tmr_isrunning(&sub->tmr))
wait = tmr_get_expire(&sub->tmr);
else if (pl_isset(&msg->expires))
wait = pl_u32(&msg->expires) * 900;
else
wait = sub->expires * 900;
sub->subscribed = true;
sub->refer = false;
sub->failc = 0;
if (sub->terminated) {
sub->refer = false;
goto out;
}
if (sub->refer) {
sub->refer = false;
return;
}
if (pl_isset(&msg->expires))
wait = pl_u32(&msg->expires);
else
wait = sub->expires;
sipsub_reschedule(sub, wait * 900);
return;
}
else {
if (sub->terminated && !sub->subscribed)
@ -216,13 +211,6 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg)
sip_auth_reset(sub->auth);
break;
case 408:
case 481:
sub->subscribed = false;
sub->dlg = mem_deref(sub->dlg);
hash_unlink(&sub->he);
break;
case 423:
minexp = sip_msg_hdr(msg, SIP_HDR_MIN_EXPIRES);
if (!minexp || !pl_u32(&minexp->val) || !sub->expires)
@ -235,9 +223,11 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg)
break;
return;
}
++sub->failc;
case 481:
sub->subscribed = false;
break;
}
}
out:
@ -249,15 +239,10 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg)
mem_deref(sub);
}
else {
if (sub->retry || sub->subscribed) {
re_printf("will re-subscribe in %llu ms...\n", wait);
tmr_start(&sub->tmr, wait, tmr_handler, sub);
}
else {
tmr_cancel(&sub->tmr);
}
sub->resph(err, msg, sub->arg);
if (sub->subscribed && ++sub->failc < RESUB_FAILC_MAX)
sipsub_reschedule(sub, RESUB_FAIL_WAIT);
else
sipsub_terminate(sub, err, msg);
}
}
@ -308,13 +293,13 @@ static int request(struct sipsub *sub, bool reset_ls)
static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock,
bool refer, bool retry, const char *uri,
bool refer, const char *uri,
const char *from_name, const char *from_uri,
const char *event, uint32_t expires, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sip_resp_h *resph, sip_msg_h *noth, void *arg,
const char *fmt, va_list ap)
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, va_list ap)
{
struct sipsub *sub;
int err;
@ -339,41 +324,6 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock,
if (err)
goto out;
err = str_dup(&sub->uri, uri);
if (err)
goto out;
err = str_dup(&sub->from_uri, from_uri);
if (err)
goto out;
if (from_name) {
err = str_dup(&sub->from_name, from_name);
if (err)
goto out;
}
sub->routec = routec;
if (routec > 0) {
uint32_t i;
sub->routev = mem_zalloc(sizeof(*sub->routev) * routec, NULL);
if (!sub->routev) {
err = ENOMEM;
goto out;
}
for (i=0; i<routec; i++) {
err = str_dup(&sub->routev[i], routev[i]);
if (err)
goto out;
}
}
err = str_dup(&sub->event, event);
if (err)
goto out;
@ -390,12 +340,11 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock,
}
sub->refer = refer;
sub->retry = retry;
sub->sock = mem_ref(sock);
sub->sip = mem_ref(sock->sip);
sub->expires = expires;
sub->resph = resph ? resph : internal_response_handler;
sub->noth = noth ? noth : internal_notify_handler;
sub->notifyh = notifyh ? notifyh : internal_notify_handler;
sub->closeh = closeh ? closeh : internal_close_handler;
sub->arg = arg;
err = request(sub, true);
@ -417,7 +366,6 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock,
*
* @param subp Pointer to allocated SIP subscriber client
* @param sock SIP Event socket
* @param retry Re-subscribe if subscription terminates
* @param uri SIP Request URI
* @param from_name SIP From-header Name (optional)
* @param from_uri SIP From-header URI
@ -429,29 +377,29 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock,
* @param authh Authentication handler
* @param aarg Authentication handler argument
* @param aref True to ref argument
* @param resph SUBSCRIBE response handler
* @param noth Notify handler
* @param closeh Close handler
* @param arg Response handler argument
* @param fmt Formatted strings with extra SIP Headers
*
* @return 0 if success, otherwise errorcode
*/
int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock,
bool retry, const char *uri, const char *from_name,
const char *uri, const char *from_name,
const char *from_uri, const char *event,
uint32_t expires, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sip_resp_h *resph, sip_msg_h *noth, void *arg,
const char *fmt, ...)
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, ...)
{
va_list ap;
int err;
va_start(ap, fmt);
err = sipsub_alloc(subp, sock, false, retry, uri, from_name, from_uri,
err = sipsub_alloc(subp, sock, false, uri, from_name, from_uri,
event, expires, cuser, routev, routec, authh, aarg,
aref, resph, noth, arg, fmt, ap);
aref, notifyh, closeh, arg, fmt, ap);
va_end(ap);
return err;
@ -472,8 +420,8 @@ int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock,
* @param authh Authentication handler
* @param aarg Authentication handler argument
* @param aref True to ref argument
* @param resph SUBSCRIBE response handler
* @param noth Notify handler
* @param closeh Close handler
* @param arg Response handler argument
* @param fmt Formatted strings with extra SIP Headers
*
@ -484,16 +432,16 @@ int sipevent_refer(struct sipsub **subp, struct sipevent_sock *sock,
const char *from_uri, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sip_resp_h *resph, sip_msg_h *noth, void *arg,
const char *fmt, ...)
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, ...)
{
va_list ap;
int err;
va_start(ap, fmt);
err = sipsub_alloc(subp, sock, true, false, uri, from_name, from_uri,
err = sipsub_alloc(subp, sock, true, uri, from_name, from_uri,
"refer", DEFAULT_EXPIRES, cuser, routev, routec,
authh, aarg, aref, resph, noth, arg, fmt, ap);
authh, aarg, aref, notifyh, closeh, arg, fmt, ap);
va_end(ap);
return err;