sipevent: subscribe fork handling

This commit is contained in:
Richard Aas 2011-12-06 13:44:37 +00:00
parent a8e32ddace
commit 2f8a580fc3
6 changed files with 285 additions and 62 deletions

View file

@ -301,6 +301,8 @@ int sip_dialog_alloc(struct sip_dialog **dlgp,
const char *routev[], uint32_t routec);
int sip_dialog_accept(struct sip_dialog **dlgp, const struct sip_msg *msg);
int sip_dialog_create(struct sip_dialog *dlg, const struct sip_msg *msg);
int sip_dialog_fork(struct sip_dialog **dlgp, struct sip_dialog *odlg,
const struct sip_msg *msg);
int sip_dialog_update(struct sip_dialog *dlg, const struct sip_msg *msg);
bool sip_dialog_rseq_valid(struct sip_dialog *dlg, const struct sip_msg *msg);
const char *sip_dialog_callid(const struct sip_dialog *dlg);

View file

@ -16,11 +16,14 @@ int sipevent_listen(struct sipevent_sock **sockp, struct sip *sip,
/* Subscriber */
struct sipsub;
typedef int (sipevent_fork_h)(struct sipsub **subp, struct sipsub *osub,
const struct sip_msg *msg, void *arg);
typedef void (sipevent_notify_h)(struct sip *sip, const struct sip_msg *msg,
void *arg);
typedef void (sipevent_close_h)(int err, const struct sip_msg *msg, void *arg);
struct sipsub;
int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock,
const char *uri, const char *from_name,
@ -28,15 +31,22 @@ int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock,
uint32_t expires, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, ...);
sipevent_fork_h *forkh, sipevent_notify_h *notifyh,
sipevent_close_h *closeh, void *arg,
const char *fmt, ...);
int sipevent_refer(struct sipsub **subp, struct sipevent_sock *sock,
const char *uri, const char *from_name,
const char *from_uri, const char *cuser,
const char *routev[], uint32_t routec,
const char *from_uri, const char *refer_to,
const char *cuser, const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, ...);
sipevent_fork_h *forkh, sipevent_notify_h *notifyh,
sipevent_close_h *closeh, void *arg,
const char *fmt, ...);
int sipevent_fork(struct sipsub **subp, struct sipsub *osub,
const struct sip_msg *msg,
sip_auth_h *authh, void *aarg, bool aref,
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg);
/* Message Components */

View file

@ -219,7 +219,6 @@ int sip_dialog_accept(struct sip_dialog **dlgp, const struct sip_msg *msg)
err |= sip_msg_hdr_apply(msg, true, SIP_HDR_RECORD_ROUTE,
record_route_handler, &renc) ? ENOMEM : 0;
dlg->cpos = dlg->mb->pos;
err |= mbuf_printf(dlg->mb, "To: %r\r\n", &msg->from.val);
err |= mbuf_printf(dlg->mb, "From: %r;tag=%016llx\r\n", &msg->to.val,
msg->tag);
@ -251,13 +250,14 @@ int sip_dialog_accept(struct sip_dialog **dlgp, const struct sip_msg *msg)
int sip_dialog_create(struct sip_dialog *dlg, const struct sip_msg *msg)
{
char *uri = NULL, *rtag = NULL;
const struct sip_hdr *contact;
struct route_enc renc;
struct sip_addr addr;
struct pl pl;
int err;
if (!dlg || dlg->rtag || !msg)
if (!dlg || dlg->rtag || !dlg->cpos || !msg)
return EINVAL;
contact = sip_msg_hdr(msg, SIP_HDR_CONTACT);
@ -268,20 +268,18 @@ int sip_dialog_create(struct sip_dialog *dlg, const struct sip_msg *msg)
if (sip_addr_decode(&addr, &contact->val))
return EBADMSG;
dlg->uri = mem_deref(dlg->uri);
err = pl_strdup(&dlg->uri, &addr.auri);
if (err)
return err;
err = pl_strdup(&dlg->rtag, msg->req ? &msg->from.tag : &msg->to.tag);
if (err)
return err;
renc.mb = mbuf_alloc(512);
if (!renc.mb)
return ENOMEM;
err = pl_strdup(&uri, &addr.auri);
if (err)
goto out;
err = pl_strdup(&rtag, msg->req ? &msg->from.tag : &msg->to.tag);
if (err)
goto out;
renc.end = 0;
err |= sip_msg_hdr_apply(msg, msg->req, SIP_HDR_RECORD_ROUTE,
@ -303,6 +301,106 @@ int sip_dialog_create(struct sip_dialog *dlg, const struct sip_msg *msg)
pl.p = (const char *)mbuf_buf(renc.mb) + ROUTE_OFFSET;
pl.l = renc.end - ROUTE_OFFSET;
err = sip_addr_decode(&addr, &pl);
if (err)
goto out;
dlg->route = addr.uri;
}
else {
struct uri tmp;
pl_set_str(&pl, uri);
err = uri_decode(&tmp, &pl);
if (err)
goto out;
dlg->route = tmp;
}
mem_deref(dlg->mb);
mem_deref(dlg->uri);
dlg->mb = mem_ref(renc.mb);
dlg->rtag = mem_ref(rtag);
dlg->uri = mem_ref(uri);
dlg->rseq = msg->req ? msg->cseq.num : 0;
dlg->cpos = 0;
out:
mem_deref(renc.mb);
mem_deref(rtag);
mem_deref(uri);
return err;
}
int sip_dialog_fork(struct sip_dialog **dlgp, struct sip_dialog *odlg,
const struct sip_msg *msg)
{
const struct sip_hdr *contact;
struct sip_dialog *dlg;
struct route_enc renc;
struct sip_addr addr;
struct pl pl;
int err;
if (!dlgp || !odlg || !odlg->cpos || !msg)
return EINVAL;
contact = sip_msg_hdr(msg, SIP_HDR_CONTACT);
if (!contact || !msg->callid.p)
return EBADMSG;
if (sip_addr_decode(&addr, &contact->val))
return EBADMSG;
dlg = mem_zalloc(sizeof(*dlg), destructor);
if (!dlg)
return ENOMEM;
dlg->callid = mem_ref(odlg->callid);
dlg->ltag = mem_ref(odlg->ltag);
dlg->lseq = odlg->lseq;
dlg->rseq = msg->req ? msg->cseq.num : 0;
err = pl_strdup(&dlg->uri, &addr.auri);
if (err)
goto out;
err = pl_strdup(&dlg->rtag, msg->req ? &msg->from.tag : &msg->to.tag);
if (err)
goto out;
dlg->mb = mbuf_alloc(512);
if (!dlg->mb) {
err = ENOMEM;
goto out;
}
renc.mb = dlg->mb;
renc.end = 0;
err |= sip_msg_hdr_apply(msg, msg->req, SIP_HDR_RECORD_ROUTE,
record_route_handler, &renc) ? ENOMEM : 0;
err |= mbuf_printf(dlg->mb, "To: %r\r\n",
msg->req ? &msg->from.val : &msg->to.val);
odlg->mb->pos = odlg->cpos;
err |= mbuf_write_mem(dlg->mb, mbuf_buf(odlg->mb),
mbuf_get_left(odlg->mb));
odlg->mb->pos = 0;
if (err)
goto out;
dlg->mb->pos = 0;
if (renc.end) {
pl.p = (const char *)mbuf_buf(dlg->mb) + ROUTE_OFFSET;
pl.l = renc.end - ROUTE_OFFSET;
err = sip_addr_decode(&addr, &pl);
dlg->route = addr.uri;
}
else {
@ -310,14 +408,11 @@ int sip_dialog_create(struct sip_dialog *dlg, const struct sip_msg *msg)
err = uri_decode(&dlg->route, &pl);
}
if (err)
goto out;
mem_deref(dlg->mb);
dlg->mb = mem_ref(renc.mb);
out:
mem_deref(renc.mb);
if (err)
mem_deref(dlg);
else
*dlgp = dlg;
return err;
}
@ -327,7 +422,6 @@ int sip_dialog_update(struct sip_dialog *dlg, const struct sip_msg *msg)
{
const struct sip_hdr *contact;
struct sip_addr addr;
struct pl pl;
char *uri;
int err;
@ -347,10 +441,15 @@ int sip_dialog_update(struct sip_dialog *dlg, const struct sip_msg *msg)
if (dlg->route.scheme.p == dlg->uri) {
struct uri tmp;
struct pl pl;
pl_set_str(&pl, uri);
err = uri_decode(&dlg->route, &pl);
err = uri_decode(&tmp, &pl);
if (err)
goto out;
dlg->route = tmp;
}
mem_deref(dlg->uri);

View file

@ -53,7 +53,8 @@ static bool sub_cmp_half_handler(struct le *le, void *arg)
const struct sip_msg *msg = arg;
struct sipsub *sub = le->data;
return sip_dialog_cmp_half(sub->dlg, msg);
return sip_dialog_cmp_half(sub->dlg, msg) &&
!sip_dialog_established(sub->dlg);
}
@ -66,8 +67,8 @@ static struct sipnot *sipnot_find(struct sipevent_sock *sock,
}
static struct sipsub *sipsub_find(struct sipevent_sock *sock,
const struct sip_msg *msg, bool full)
struct sipsub *sipsub_find(struct sipevent_sock *sock,
const struct sip_msg *msg, bool full)
{
return list_ledata(hash_lookup(sock->ht_sub,
hash_joaat_pl(&msg->callid), full ?
@ -85,7 +86,7 @@ static void notify_handler(struct sipevent_sock *sock,
const struct sip_hdr *hdr;
struct sipsub *sub;
uint32_t nrefs;
bool indialog;
int err;
hdr = sip_msg_hdr(msg, SIP_HDR_EVENT);
if (!hdr || sipevent_event_decode(&event, &hdr->val)) {
@ -101,17 +102,36 @@ static void notify_handler(struct sipevent_sock *sock,
sub = sipsub_find(sock, msg, true);
if (!sub) {
/* hack: while we are waiting for proper fork handling */
sub = sipsub_find(sock, msg, false);
if (!sub || sip_dialog_established(sub->dlg)) {
if (!sub) {
(void)sip_reply(sip, msg,
481, "Subscription Does Not Exist");
return;
}
indialog = false;
if (sub->forkh) {
struct sipsub *fsub;
err = sub->forkh(&fsub, sub, msg, sub->arg);
if (err) {
(void)sip_reply(sip, msg, 500, strerror(err));
return;
}
re_printf("*** new subscription forked from NOTIFY\n");
sub = fsub;
}
else {
err = sip_dialog_create(sub->dlg, msg);
if (err) {
(void)sip_reply(sip, msg, 500, strerror(err));
return;
}
re_printf("*** dialog established from NOTIFY\n");
}
}
else {
if (!sip_dialog_rseq_valid(sub->dlg, msg)) {
@ -120,8 +140,6 @@ static void notify_handler(struct sipevent_sock *sock,
}
(void)sip_dialog_update(sub->dlg, msg);
indialog = true;
}
if (pl_strcasecmp(&event.event, sub->event)) {
@ -129,12 +147,6 @@ static void notify_handler(struct sipevent_sock *sock,
return;
}
/* hack: while we are waiting for proper fork handling */
if (!indialog) {
sub->notifyh(sip, msg, sub->arg);
return;
}
re_printf("notify: %s (%r)\n", sipevent_substate_name(state.state),
&state.params);

View file

@ -38,8 +38,10 @@ struct sipsub {
struct sip_auth *auth;
struct sip *sip;
char *event;
char *refer_to;
char *cuser;
char *hdrs;
sipevent_fork_h *forkh;
sipevent_notify_h *notifyh;
sipevent_close_h *closeh;
void *arg;
@ -50,5 +52,7 @@ struct sipsub {
bool refer;
};
struct sipsub *sipsub_find(struct sipevent_sock *sock,
const struct sip_msg *msg, bool full);
void sipsub_reschedule(struct sipsub *sub, uint64_t wait);
void sipsub_terminate(struct sipsub *sub, int err, const struct sip_msg *msg);

View file

@ -50,6 +50,7 @@ static void internal_close_handler(int err, const struct sip_msg *msg,
static bool terminate(struct sipsub *sub)
{
sub->terminated = true;
sub->forkh = NULL;
sub->notifyh = internal_notify_handler;
sub->closeh = internal_close_handler;
@ -84,6 +85,7 @@ static void destructor(void *arg)
mem_deref(sub->dlg);
mem_deref(sub->auth);
mem_deref(sub->event);
mem_deref(sub->refer_to);
mem_deref(sub->cuser);
mem_deref(sub->hdrs);
mem_deref(sub->sock);
@ -154,25 +156,49 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg)
uint32_t wait;
if (!sip_dialog_established(sub->dlg)) {
if (sub->forkh) {
struct sipsub *fsub;
fsub = sipsub_find(sub->sock, msg, true);
if (!fsub) {
err = sub->forkh(&fsub, sub, msg, sub->arg);
if (err)
return;
re_printf("*** new subscription forked\n");
}
else {
(void)sip_dialog_update(fsub->dlg, msg);
}
sub = fsub;
}
else if (!sip_dialog_established(sub->dlg)) {
err = sip_dialog_create(sub->dlg, msg);
if (err) {
sub->subscribed = false;
goto out;
}
re_printf("*** dialog established\n");
}
else {
/* Ignore 2xx responses for other dialogs
* if forking is disabled */
if (!sip_dialog_cmp(sub->dlg, msg))
return;
(void)sip_dialog_update(sub->dlg, msg);
}
sub->subscribed = true;
sub->failc = 0;
if (sub->terminated) {
sub->refer = false;
if (sub->terminated)
goto out;
}
if (sub->refer) {
sub->refer = false;
@ -231,6 +257,8 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg)
}
out:
sub->refer = false;
if (!sub->expires) {
mem_deref(sub);
}
@ -271,9 +299,11 @@ static int request(struct sipsub *sub, bool reset_ls)
return sip_drequestf(&sub->req, sub->sip, true, "REFER",
sub->dlg, 0, sub->auth,
send_handler, response_handler, sub,
"Refer-To: %s\r\n"
"%s"
"Content-Length: 0\r\n"
"\r\n",
sub->refer_to,
sub->hdrs);
}
else {
@ -295,11 +325,13 @@ static int request(struct sipsub *sub, bool reset_ls)
static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock,
bool refer, const char *uri,
const char *from_name, const char *from_uri,
const char *event, uint32_t expires, const char *cuser,
const char *event, uint32_t expires,
const char *refer_to, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, va_list ap)
sipevent_fork_h *forkh, sipevent_notify_h *notifyh,
sipevent_close_h *closeh, void *arg,
const char *fmt, va_list ap)
{
struct sipsub *sub;
int err;
@ -307,6 +339,9 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock,
if (!subp || !sock || !uri || !from_uri || !event || !expires ||!cuser)
return EINVAL;
if (refer && !refer_to)
return EINVAL;
sub = mem_zalloc(sizeof(*sub), destructor);
if (!sub)
return ENOMEM;
@ -328,6 +363,12 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock,
if (err)
goto out;
if (refer_to) {
err = str_dup(&sub->refer_to, refer_to);
if (err)
goto out;
}
err = str_dup(&sub->cuser, cuser);
if (err)
goto out;
@ -343,6 +384,7 @@ static int sipsub_alloc(struct sipsub **subp, struct sipevent_sock *sock,
sub->sock = mem_ref(sock);
sub->sip = mem_ref(sock->sip);
sub->expires = expires;
sub->forkh = forkh;
sub->notifyh = notifyh ? notifyh : internal_notify_handler;
sub->closeh = closeh ? closeh : internal_close_handler;
sub->arg = arg;
@ -390,16 +432,18 @@ int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock,
uint32_t expires, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, ...)
sipevent_fork_h *forkh, sipevent_notify_h *notifyh,
sipevent_close_h *closeh, void *arg,
const char *fmt, ...)
{
va_list ap;
int err;
va_start(ap, fmt);
err = sipsub_alloc(subp, sock, false, uri, from_name, from_uri,
event, expires, cuser, routev, routec, authh, aarg,
aref, notifyh, closeh, arg, fmt, ap);
event, expires, NULL, cuser,
routev, routec, authh, aarg, aref, forkh, notifyh,
closeh, arg, fmt, ap);
va_end(ap);
return err;
@ -429,20 +473,72 @@ int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock,
*/
int sipevent_refer(struct sipsub **subp, struct sipevent_sock *sock,
const char *uri, const char *from_name,
const char *from_uri, const char *cuser,
const char *routev[], uint32_t routec,
const char *from_uri, const char *refer_to,
const char *cuser, const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg, const char *fmt, ...)
sipevent_fork_h *forkh, sipevent_notify_h *notifyh,
sipevent_close_h *closeh, void *arg,
const char *fmt, ...)
{
va_list ap;
int err;
va_start(ap, fmt);
err = sipsub_alloc(subp, sock, true, uri, from_name, from_uri,
"refer", DEFAULT_EXPIRES, cuser, routev, routec,
authh, aarg, aref, notifyh, closeh, arg, fmt, ap);
"refer", DEFAULT_EXPIRES, refer_to, cuser,
routev, routec, authh, aarg, aref, forkh, notifyh,
closeh, arg, fmt, ap);
va_end(ap);
return err;
}
int sipevent_fork(struct sipsub **subp, struct sipsub *osub,
const struct sip_msg *msg,
sip_auth_h *authh, void *aarg, bool aref,
sipevent_notify_h *notifyh, sipevent_close_h *closeh,
void *arg)
{
struct sipsub *sub;
int err;
if (!subp || !osub || !msg)
return EINVAL;
sub = mem_zalloc(sizeof(*sub), destructor);
if (!sub)
return ENOMEM;
err = sip_dialog_fork(&sub->dlg, osub->dlg, msg);
if (err)
goto out;
hash_append(osub->sock->ht_sub,
hash_joaat_str(sip_dialog_callid(sub->dlg)),
&sub->he, sub);
err = sip_auth_alloc(&sub->auth, authh, aarg, aref);
if (err)
goto out;
sub->event = mem_ref(osub->event);
sub->cuser = mem_ref(osub->cuser);
sub->hdrs = mem_ref(osub->hdrs);
sub->refer = osub->refer;
sub->sock = mem_ref(osub->sock);
sub->sip = mem_ref(osub->sip);
sub->expires = osub->expires;
sub->forkh = NULL;
sub->notifyh = notifyh ? notifyh : internal_notify_handler;
sub->closeh = closeh ? closeh : internal_close_handler;
sub->arg = arg;
out:
if (err)
mem_deref(sub);
else
*subp = sub;
return err;
}