sipevent: subscriber work in progress

This commit is contained in:
Richard Aas 2011-11-30 08:08:20 +00:00
parent e0a4162a77
commit bf5657c3a6
8 changed files with 536 additions and 39 deletions

View file

@ -301,6 +301,8 @@ int sip_dialog_update(struct sip_dialog *dlg, const struct sip_msg *msg);
bool sip_dialog_rseq_valid(struct sip_dialog *dlg, const struct sip_msg *msg);
const char *sip_dialog_callid(const struct sip_dialog *dlg);
bool sip_dialog_cmp(const struct sip_dialog *dlg, const struct sip_msg *msg);
bool sip_dialog_cmp_half(const struct sip_dialog *dlg,
const struct sip_msg *msg);
/* msg */

View file

@ -4,12 +4,34 @@
* Copyright (C) 2010 Creytiv.com
*/
struct sipevent_sock;
struct sipsub;
int sipevent_subscribe(struct sipsub **subp, struct sip *sip, const char *uri,
const char *from_name, const char *from_uri,
const char *event, uint32_t expires, const char *cuser,
int sipevent_listen(struct sipevent_sock **sockp, struct sip *sip,
uint32_t htsize_not, uint32_t htsize_sub,
sip_msg_h *subh, void *arg);
int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock,
const char *uri, const char *from_name,
const char *from_uri, const char *event,
uint32_t expires, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sip_resp_h *resph, void *arg,
sip_resp_h *resph, sip_msg_h *noth, void *arg,
const char *fmt, ...);
enum sipevent_subst {
SIPEVENT_ACTIVE = 0,
SIPEVENT_TERMINATED,
};
struct sipevent_substate {
enum sipevent_subst state;
struct pl params;
uint32_t expires;
};
int sipevent_substate_decode(struct sipevent_substate *ss,
const struct pl *pl);
const char *sipevent_substate_name(enum sipevent_subst state);

View file

@ -428,3 +428,22 @@ bool sip_dialog_cmp(const struct sip_dialog *dlg, const struct sip_msg *msg)
return true;
}
bool sip_dialog_cmp_half(const struct sip_dialog *dlg,
const struct sip_msg *msg)
{
if (!dlg || !msg)
return false;
if (pl_strcmp(&msg->callid, dlg->callid))
return false;
if (pl_strcmp(msg->req ? &msg->to.tag : &msg->from.tag, dlg->ltag))
return false;
if (dlg->rtag)
return false;
return true;
}

243
src/sipevent/listen.c Normal file
View file

@ -0,0 +1,243 @@
/**
* @file listen.c SIP Event Listen
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_sa.h>
#include <re_list.h>
#include <re_hash.h>
#include <re_fmt.h>
#include <re_uri.h>
#include <re_tmr.h>
#include <re_sip.h>
#include <re_sipevent.h>
#include "sipevent.h"
static void destructor(void *arg)
{
struct sipevent_sock *sock = arg;
mem_deref(sock->lsnr);
hash_flush(sock->ht_not);
hash_flush(sock->ht_sub);
mem_deref(sock->ht_not);
mem_deref(sock->ht_sub);
}
static bool not_cmp_handler(struct le *le, void *arg)
{
const struct sip_msg *msg = arg;
struct sipnot *not = le->data;
return sip_dialog_cmp(not->dlg, msg);
}
static bool sub_cmp_handler(struct le *le, void *arg)
{
const struct sip_msg *msg = arg;
struct sipsub *sub = le->data;
return sip_dialog_cmp(sub->dlg, msg);
}
static bool sub_cmp_half_handler(struct le *le, void *arg)
{
const struct sip_msg *msg = arg;
struct sipsub *sub = le->data;
return sip_dialog_cmp_half(sub->dlg, msg);
}
static struct sipnot *sipnot_find(struct sipevent_sock *sock,
const struct sip_msg *msg)
{
return list_ledata(hash_lookup(sock->ht_not,
hash_joaat_pl(&msg->callid),
not_cmp_handler, (void *)msg));
}
static struct sipsub *sipsub_find(struct sipevent_sock *sock,
const struct sip_msg *msg, bool full)
{
return list_ledata(hash_lookup(sock->ht_sub,
hash_joaat_pl(&msg->callid), full ?
sub_cmp_handler : sub_cmp_half_handler,
(void *)msg));
}
static void notify_handler(struct sipevent_sock *sock,
const struct sip_msg *msg)
{
struct sipevent_substate ss;
struct sip *sip = sock->sip;
const struct sip_hdr *hdr;
struct sipsub *sub;
sub = sipsub_find(sock, msg, true);
if (!sub) {
sub = sipsub_find(sock, msg, false);
if (!sub || sub->subscribed) {
(void)sip_reply(sip, msg,
481, "Subsctiption Does Not Exist");
return;
}
}
else {
if (!sip_dialog_rseq_valid(sub->dlg, msg)) {
(void)sip_reply(sip, msg, 500,"Server Internal Error");
return;
}
// todo: check
(void)sip_dialog_update(sub->dlg, msg);
}
hdr = sip_msg_hdr(msg, SIP_HDR_EVENT);
// todo: check case sensitiveness, header syntax and status code
if (!hdr || pl_strcmp(&hdr->val, sub->event)) {
(void)sip_reply(sip, msg, 489, "Bad Event");
return;
}
hdr = sip_msg_hdr(msg, SIP_HDR_SUBSCRIPTION_STATE);
if (sub->subscribed && hdr &&
!sipevent_substate_decode(&ss, &hdr->val)) {
re_printf("substate: %s (%u secs) [%r]\n",
sipevent_substate_name(ss.state),
ss.expires, &ss.params);
switch (ss.state) {
case SIPEVENT_ACTIVE:
if (sub->req || sub->terminated)
break;
sipevent_resubscribe(sub, ss.expires * 900);
break;
case SIPEVENT_TERMINATED:
sub->req = mem_deref(sub->req); /* forget request */
if (sub->terminated) {
mem_deref(sub);
goto reply;
}
sub->subscribed = false;
sub->dlg = mem_deref(sub->dlg);
hash_unlink(&sub->he);
sipevent_resubscribe(sub, 0);
break;
}
}
if (sub->noth(msg, sub->arg))
return;
reply:
(void)sip_treply(NULL, sip, msg, 200, "OK");
}
static void subscribe_handler(struct sipevent_sock *sock,
const struct sip_msg *msg)
{
struct sip *sip = sock->sip;
struct sipnot *not;
not = sipnot_find(sock, msg);
if (!not || not->terminated) {
(void)sip_reply(sip, msg, 481, "Subscription Does Not Exist");
return;
}
if (!sip_dialog_rseq_valid(not->dlg, msg)) {
(void)sip_reply(sip, msg, 500, "Server Internal Error");
return;
}
// todo: check
(void)sip_dialog_update(not->dlg, msg);
// ...
}
static bool request_handler(const struct sip_msg *msg, void *arg)
{
struct sipevent_sock *sock = arg;
if (!pl_strcmp(&msg->met, "SUBSCRIBE")) {
if (pl_isset(&msg->to.tag)) {
subscribe_handler(sock, msg);
return true;
}
return sock->subh ? sock->subh(msg, arg) : false;
}
else if (!pl_strcmp(&msg->met, "NOTIFY")) {
notify_handler(sock, msg);
return true;
}
else {
return false;
}
}
int sipevent_listen(struct sipevent_sock **sockp, struct sip *sip,
uint32_t htsize_not, uint32_t htsize_sub,
sip_msg_h *subh, void *arg)
{
struct sipevent_sock *sock;
int err;
if (!sockp || !sip || !htsize_not || !htsize_sub)
return EINVAL;
sock = mem_zalloc(sizeof(*sock), destructor);
if (!sock)
return ENOMEM;
err = sip_listen(&sock->lsnr, sip, true, request_handler, sock);
if (err)
goto out;
err = hash_alloc(&sock->ht_not, htsize_not);
if (err)
goto out;
err = hash_alloc(&sock->ht_sub, htsize_sub);
if (err)
goto out;
sock->sip = sip;
sock->subh = subh;
sock->arg = arg;
out:
if (err)
mem_deref(sock);
else
*sockp = sock;
return err;
}

View file

@ -4,4 +4,6 @@
# Copyright (C) 2010 Creytiv.com
#
SRCS += sipevent/listen.c
SRCS += sipevent/sub.c
SRCS += sipevent/substate.c

52
src/sipevent/sipevent.h Normal file
View file

@ -0,0 +1,52 @@
/**
* @file sipevent.h SIP Event Private Interface
*
* Copyright (C) 2010 Creytiv.com
*/
struct sipevent_sock {
struct sip_lsnr *lsnr;
struct hash *ht_not;
struct hash *ht_sub;
struct sip *sip;
sip_msg_h *subh;
void *arg;
};
struct sipnot {
struct le he;
struct sip_dialog *dlg;
bool terminated;
};
struct sipsub {
struct le he;
struct sip_loopstate ls;
struct tmr tmr;
struct sipevent_sock *sock;
struct sip_request *req;
struct sip_dialog *dlg;
struct sip_auth *auth;
struct sip *sip;
char *uri;
char *from_name;
char *from_uri;
char **routev;
char *event;
char *cuser;
char *hdrs;
sip_resp_h *resph;
sip_msg_h *noth;
void *arg;
uint32_t expires;
uint32_t failc;
uint32_t routec;
bool subscribed;
bool terminated;
};
void sipevent_resubscribe(struct sipsub *sub, uint32_t wait);

View file

@ -1,5 +1,5 @@
/**
* @file sub.c SIP Subscription
* @file sub.c SIP Event Subscriber
*
* Copyright (C) 2010 Creytiv.com
*/
@ -15,6 +15,7 @@
#include <re_tmr.h>
#include <re_sip.h>
#include <re_sipevent.h>
#include "sipevent.h"
enum {
@ -22,30 +23,11 @@ enum {
};
/** Defines a SIP subscriber client */
struct sipsub {
struct sip_loopstate ls;
struct tmr tmr;
struct sip *sip;
struct sip_request *req;
struct sip_dialog *dlg;
struct sip_auth *auth;
char *event;
char *cuser;
char *hdrs;
sip_resp_h *resph;
void *arg;
uint32_t expires;
uint32_t failc;
bool subscribed;
bool terminated;
};
static int request(struct sipsub *sub, bool reset_ls);
static void dummy_handler(int err, const struct sip_msg *msg, void *arg)
static void internal_response_handler(int err, const struct sip_msg *msg,
void *arg)
{
(void)err;
(void)msg;
@ -53,6 +35,15 @@ static void dummy_handler(int err, const struct sip_msg *msg, void *arg)
}
static bool internal_notify_handler(const struct sip_msg *msg, void *arg)
{
(void)msg;
(void)arg;
return false;
}
static void destructor(void *arg)
{
struct sipsub *sub = arg;
@ -61,7 +52,8 @@ static void destructor(void *arg)
if (!sub->terminated) {
sub->resph = dummy_handler;
sub->resph = internal_response_handler;
sub->noth = internal_notify_handler;
sub->terminated = true;
if (sub->req) {
@ -75,12 +67,27 @@ static void destructor(void *arg)
}
}
if (sub->routev) {
uint32_t i;
for (i=0; i<sub->routec; i++)
mem_deref(sub->routev[i]);
mem_deref(sub->routev);
}
hash_unlink(&sub->he);
mem_deref(sub->dlg);
mem_deref(sub->auth);
mem_deref(sub->uri);
mem_deref(sub->from_name);
mem_deref(sub->from_uri);
mem_deref(sub->event);
mem_deref(sub->cuser);
mem_deref(sub->sip);
mem_deref(sub->hdrs);
mem_deref(sub->sock);
mem_deref(sub->sip);
}
@ -95,7 +102,25 @@ static void tmr_handler(void *arg)
struct sipsub *sub = arg;
int err;
if (!sub->dlg) {
err = sip_dialog_alloc(&sub->dlg, sub->uri, sub->uri,
sub->from_name, sub->from_uri,
(const char **)sub->routev,
sub->routec);
if (err)
goto out;
hash_append(sub->sock->ht_sub,
hash_joaat_str(sip_dialog_callid(sub->dlg)),
&sub->he, sub);
}
err = request(sub, true);
if (err)
goto out;
out:
if (err) {
tmr_start(&sub->tmr, failwait(++sub->failc), tmr_handler, sub);
sub->resph(err, NULL, sub->arg);
@ -103,6 +128,17 @@ static void tmr_handler(void *arg)
}
void sipevent_resubscribe(struct sipsub *sub, uint32_t wait)
{
if (!wait)
wait = failwait(++sub->failc);
re_printf("will re-subscribe in %u ms\n", wait);
tmr_start(&sub->tmr, wait, tmr_handler, sub);
}
static void response_handler(int err, const struct sip_msg *msg, void *arg)
{
const struct sip_hdr *minexp;
@ -121,8 +157,24 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg)
}
else if (msg->scode < 300) {
sub->subscribed = true;
sub->failc = 0;
if (!sub->subscribed) {
err = sip_dialog_create(sub->dlg, msg);
if (err) {
sub->dlg = mem_deref(sub->dlg);
hash_unlink(&sub->he);
sub->failc++;
goto out;
}
sub->subscribed = true;
}
else {
// todo: check
(void)sip_dialog_update(sub->dlg, msg);
}
sub->failc = 0;
if (pl_isset(&msg->expires))
wait = pl_u32(&msg->expires);
@ -167,6 +219,13 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg)
break;
return;
case 481:
// todo: test
sub->subscribed = false;
sub->dlg = mem_deref(sub->dlg);
hash_unlink(&sub->he);
break;
}
++sub->failc;
@ -181,6 +240,7 @@ static void response_handler(int err, const struct sip_msg *msg, void *arg)
mem_deref(sub);
}
else {
re_printf("will re-subscribe in %u ms...\n", wait);
tmr_start(&sub->tmr, wait, tmr_handler, sub);
sub->resph(err, msg, sub->arg);
}
@ -223,7 +283,7 @@ static int request(struct sipsub *sub, bool reset_ls)
* Allocate a SIP subscriber client
*
* @param subp Pointer to allocated SIP subscriber client
* @param sip SIP Stack instance
* @param sock SIP Event socket
* @param uri SIP Request URI
* @param from_name SIP From-header Name (optional)
* @param from_uri SIP From-header URI
@ -235,24 +295,26 @@ static int request(struct sipsub *sub, bool reset_ls)
* @param authh Authentication handler
* @param aarg Authentication handler argument
* @param aref True to ref argument
* @param resph Response handler
* @param resph SUBSCRIBE response handler
* @param noth Notify handler
* @param arg Response handler argument
* @param fmt Formatted strings with extra SIP Headers
*
* @return 0 if success, otherwise errorcode
*/
int sipevent_subscribe(struct sipsub **subp, struct sip *sip, const char *uri,
const char *from_name, const char *from_uri,
const char *event, uint32_t expires, const char *cuser,
int sipevent_subscribe(struct sipsub **subp, struct sipevent_sock *sock,
const char *uri, const char *from_name,
const char *from_uri, const char *event,
uint32_t expires, const char *cuser,
const char *routev[], uint32_t routec,
sip_auth_h *authh, void *aarg, bool aref,
sip_resp_h *resph, void *arg,
sip_resp_h *resph, sip_msg_h *noth, void *arg,
const char *fmt, ...)
{
struct sipsub *sub;
int err;
if (!subp || !sip || !uri || !from_uri || !event || !expires || !cuser)
if (!subp || !sock || !uri || !from_uri || !event || !expires ||!cuser)
return EINVAL;
sub = mem_zalloc(sizeof(*sub), destructor);
@ -264,10 +326,49 @@ int sipevent_subscribe(struct sipsub **subp, struct sip *sip, const char *uri,
if (err)
goto out;
hash_append(sock->ht_sub,
hash_joaat_str(sip_dialog_callid(sub->dlg)),
&sub->he, sub);
err = sip_auth_alloc(&sub->auth, authh, aarg, aref);
if (err)
goto out;
err = str_dup(&sub->uri, uri);
if (err)
goto out;
err = str_dup(&sub->from_uri, from_uri);
if (err)
goto out;
if (from_name) {
err = str_dup(&sub->from_name, from_name);
if (err)
goto out;
}
sub->routec = routec;
if (routec > 0) {
uint32_t i;
sub->routev = mem_zalloc(sizeof(*sub->routev) * routec, NULL);
if (!sub->routev) {
err = ENOMEM;
goto out;
}
for (i=0; i<routec; i++) {
err = str_dup(&sub->routev[i], routev[i]);
if (err)
goto out;
}
}
err = str_dup(&sub->event, event);
if (err)
goto out;
@ -288,9 +389,11 @@ int sipevent_subscribe(struct sipsub **subp, struct sip *sip, const char *uri,
goto out;
}
sub->sip = mem_ref(sip);
sub->sock = mem_ref(sock);
sub->sip = mem_ref(sock->sip);
sub->expires = expires;
sub->resph = resph ? resph : dummy_handler;
sub->resph = resph ? resph : internal_response_handler;
sub->noth = noth ? noth : internal_notify_handler;
sub->arg = arg;
err = request(sub, true);

54
src/sipevent/substate.c Normal file
View file

@ -0,0 +1,54 @@
/**
* @file substate.c SIP Subscription-State header
*
* Copyright (C) 2010 Creytiv.com
*/
#include <re_types.h>
#include <re_fmt.h>
#include <re_mbuf.h>
#include <re_uri.h>
#include <re_list.h>
#include <re_sa.h>
#include <re_sip.h>
#include <re_sipevent.h>
int sipevent_substate_decode(struct sipevent_substate *ss, const struct pl *pl)
{
struct pl state, expires;
int err;
if (!ss || !pl)
return EINVAL;
err = re_regex(pl->p, pl->l, "[a-z]+[ \t\r\n]*[^]*",
&state, NULL, &ss->params);
if (err)
return EBADMSG;
// todo: check case-sensitiveness
if (!pl_strcasecmp(&state, "active"))
ss->state = SIPEVENT_ACTIVE;
else if (!pl_strcasecmp(&state, "terminated"))
ss->state = SIPEVENT_TERMINATED;
else
ss->state = -1;
if (!sip_param_decode(&ss->params, "expires", &expires))
ss->expires = pl_u32(&expires);
else
ss->expires = 0;
return 0;
}
const char *sipevent_substate_name(enum sipevent_subst state)
{
switch (state) {
case SIPEVENT_ACTIVE: return "active";
case SIPEVENT_TERMINATED: return "terminated";
default: return "???";
}
}