diff --git a/include/Makefile.am b/include/Makefile.am index db6862d..2ba0ece 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -54,6 +54,7 @@ nobase_libnlinclude_HEADERS = \ netlink/route/qdisc/red.h \ netlink/route/qdisc/sfq.h \ netlink/route/qdisc/tbf.h \ + netlink/route/qdisc/plug.h \ netlink/route/addr.h \ netlink/route/class.h \ netlink/route/classifier.h \ diff --git a/include/linux/pkt_sched.h b/include/linux/pkt_sched.h index c533670..7ccc1fd 100644 --- a/include/linux/pkt_sched.h +++ b/include/linux/pkt_sched.h @@ -127,6 +127,27 @@ struct tc_multiq_qopt { __u16 max_bands; /* Maximum number of queues */ }; +/* PLUG section */ + +#define TCQ_PLUG_BUFFER 0 +#define TCQ_PLUG_RELEASE_ONE 1 +#define TCQ_PLUG_RELEASE_INDEFINITE 2 +#define TCQ_PLUG_LIMIT 3 + +struct tc_plug_qopt { + /* TCQ_PLUG_BUFFER: Inset a plug into the queue and + * buffer any incoming packets + * TCQ_PLUG_RELEASE_ONE: Dequeue packets from queue head + * to beginning of the next plug. + * TCQ_PLUG_RELEASE_INDEFINITE: Dequeue all packets from queue. + * Stop buffering packets until the next TCQ_PLUG_BUFFER + * command is received (just act as a pass-thru queue). + * TCQ_PLUG_LIMIT: Increase/decrease queue size + */ + int action; + __u32 limit; +}; + /* TBF section */ struct tc_tbf_qopt { diff --git a/include/netlink-types.h b/include/netlink-types.h index 82481b7..5ced836 100644 --- a/include/netlink-types.h +++ b/include/netlink-types.h @@ -669,6 +669,12 @@ struct rtnl_red uint32_t qr_mask; }; +struct rtnl_plug +{ + int action; + uint32_t limit; +}; + struct flnl_request { NLHDR_COMMON diff --git a/include/netlink/route/qdisc/plug.h b/include/netlink/route/qdisc/plug.h new file mode 100644 index 0000000..ffb1a04 --- /dev/null +++ b/include/netlink/route/qdisc/plug.h @@ -0,0 +1,30 @@ +/* + * netlink/route/qdisc/plug.c PLUG Qdisc + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation version 2.1 + * of the License. + * + * Copyright (c) 2012 Shriram Rajagopalan + */ + +#ifndef NETLINK_PLUG_H_ +#define NETLINK_PLUG_H_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +extern int rtnl_qdisc_plug_set_limit(struct rtnl_qdisc *, int); +extern int rtnl_qdisc_plug_buffer(struct rtnl_qdisc *); +extern int rtnl_qdisc_plug_release_one(struct rtnl_qdisc *); +extern int rtnl_qdisc_plug_release_indefinite(struct rtnl_qdisc *); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/Makefile.am b/lib/Makefile.am index c2668ef..aee8d0f 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -72,7 +72,7 @@ libnl_route_3_la_SOURCES = \ route/qdisc/blackhole.c route/qdisc/cbq.c route/qdisc/dsmark.c \ route/qdisc/fifo.c route/qdisc/htb.c route/qdisc/netem.c \ route/qdisc/prio.c route/qdisc/red.c route/qdisc/sfq.c \ - route/qdisc/tbf.c \ + route/qdisc/tbf.c route/qdisc/plug.c \ \ fib_lookup/lookup.c fib_lookup/request.c \ \ @@ -101,6 +101,7 @@ nobase_pkglib_LTLIBRARIES = \ cli/qdisc/htb.la \ cli/qdisc/blackhole.la \ cli/qdisc/pfifo.la \ + cli/qdisc/plug.la \ cli/qdisc/bfifo.la \ cli/cls/basic.la \ cli/cls/cgroup.la @@ -108,6 +109,7 @@ nobase_pkglib_LTLIBRARIES = \ cli_qdisc_htb_la_LDFLAGS = -module -avoid-version cli_qdisc_blackhole_la_LDFLAGS = -module -avoid-version cli_qdisc_pfifo_la_LDFLAGS = -module -avoid-version +cli_qdisc_plug_la_LDFLAGS = -module -avoid-version cli_qdisc_bfifo_la_LDFLAGS = -module -avoid-version cli_cls_basic_la_LDFLAGS = -module -avoid-version cli_cls_cgroup_la_LDFLAGS = -module -avoid-version diff --git a/lib/cli/qdisc/plug.c b/lib/cli/qdisc/plug.c new file mode 100644 index 0000000..2b8d5d6 --- /dev/null +++ b/lib/cli/qdisc/plug.c @@ -0,0 +1,113 @@ + +/* + * src/lib/cli/qdisc/plug.c plug module for CLI lib + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation version 2.1 + * of the License. + * + * Copyright (c) 2012 Shriram Rajagopalan + */ + +#include +#include +#include + +static void print_usage(void) +{ + printf( +"Usage: nl-qdisc-add [...] plug [OPTIONS]...\n" +"\n" +"OPTIONS\n" +" --help Show this help text.\n" +" --limit Maximum queue length in bytes.\n" +" --buffer create a new buffer(plug) and queue incoming traffic into it.\n" +" --release-one release traffic from previous buffer.\n" +" --release-indefinite stop buffering and release all (buffered and new) packets.\n" +"\n" +"EXAMPLE" +" # Attach plug qdisc with 32KB queue size to ifb0\n" +" nl-qdisc-add --dev=ifb0 --parent=root plug --limit=32768\n" +" # Plug network traffic arriving at ifb0\n" +" nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n" +" # Unplug traffic arriving at ifb0 indefinitely\n" +" nl-qdisc-add --dev=ifb0 --parent=root --update plug --release-indefinite\n\n" +" # If operating in output buffering mode:\n" +" # at time t=t0, create a new output buffer b0 to hold network output\n" +" nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n\n" +" # at time t=t1, take a checkpoint c0, create a new output buffer b1\n" +" nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n" +" # at time t=t1+r, after c0 is committed, release b0\n" +" nl-qdisc-add --dev=ifb0 --parent=root --update plug --release-one\n\n" +" # at time t=t2, take a checkpoint c1, create a new output buffer b2\n" +" nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n" +" # at time t=t2+r, after c1 is committed, release b1\n" +" nl-qdisc-add --dev=ifb0 --parent=root --update plug --release-one\n"); +} + +static void plug_parse_argv(struct rtnl_tc *tc, int argc, char **argv) +{ + struct rtnl_qdisc *qdisc = (struct rtnl_qdisc *) tc; + + for (;;) { + int c, optidx = 0; + enum { + ARG_LIMIT = 257, + ARG_BUFFER = 258, + ARG_RELEASE_ONE = 259, + ARG_RELEASE_INDEFINITE = 260, + }; + static struct option long_opts[] = { + { "help", 0, 0, 'h' }, + { "limit", 1, 0, ARG_LIMIT }, + { "buffer", 0, 0, ARG_BUFFER }, + { "release-one", 0, 0, ARG_RELEASE_ONE }, + { "release-indefinite", 0, 0, ARG_RELEASE_INDEFINITE }, + { 0, 0, 0, 0 } + }; + + c = getopt_long(argc, argv, "h", long_opts, &optidx); + if (c == -1) + break; + + switch (c) { + case 'h': + print_usage(); + return; + + case ARG_LIMIT: + rtnl_qdisc_plug_set_limit(qdisc, nl_cli_parse_u32(optarg)); + break; + + case ARG_BUFFER: + rtnl_qdisc_plug_buffer(qdisc); + break; + + case ARG_RELEASE_ONE: + rtnl_qdisc_plug_release_one(qdisc); + break; + + case ARG_RELEASE_INDEFINITE: + rtnl_qdisc_plug_release_indefinite(qdisc); + break; + } + } +} + +static struct nl_cli_tc_module plug_module = +{ + .tm_name = "plug", + .tm_type = RTNL_TC_TYPE_QDISC, + .tm_parse_argv = plug_parse_argv, +}; + +static void __init plug_init(void) +{ + nl_cli_tc_register(&plug_module); +} + +static void __exit plug_exit(void) +{ + nl_cli_tc_unregister(&plug_module); +} diff --git a/lib/route/qdisc/plug.c b/lib/route/qdisc/plug.c new file mode 100644 index 0000000..a99b9be --- /dev/null +++ b/lib/route/qdisc/plug.c @@ -0,0 +1,177 @@ +/* + * lib/route/qdisc/plug.c PLUG Qdisc + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation version 2.1 + * of the License. + * + * Copyright (c) 2012 Shriram Rajagopalan + */ + +/** + * @ingroup qdisc + * @defgroup qdisc_plug Plug/Unplug Traffic (PLUG) + * @brief + * + * Queue traffic until an explicit release command. + * + * There are two ways to use this qdisc: + * 1. A simple "instantaneous" plug/unplug operation, by issuing an alternating + * sequence of TCQ_PLUG_BUFFER & TCQ_PLUG_RELEASE_INDEFINITE commands. + * + * 2. For network output buffering (a.k.a output commit) functionality. + * Output commit property is commonly used by applications using checkpoint + * based fault-tolerance to ensure that the checkpoint from which a system + * is being restored is consistent w.r.t outside world. + * + * Consider for e.g. Remus - a Virtual Machine checkpointing system, + * wherein a VM is checkpointed, say every 50ms. The checkpoint is replicated + * asynchronously to the backup host, while the VM continues executing the + * next epoch speculatively. + * + * The following is a typical sequence of output buffer operations: + * 1.At epoch i, start_buffer(i) + * 2. At end of epoch i (i.e. after 50ms): + * 2.1 Stop VM and take checkpoint(i). + * 2.2 start_buffer(i+1) and Resume VM + * 3. While speculatively executing epoch(i+1), asynchronously replicate + * checkpoint(i) to backup host. + * 4. When checkpoint_ack(i) is received from backup, release_buffer(i) + * Thus, this Qdisc would receive the following sequence of commands: + * TCQ_PLUG_BUFFER (epoch i) + * .. TCQ_PLUG_BUFFER (epoch i+1) + * ....TCQ_PLUG_RELEASE_ONE (epoch i) + * ......TCQ_PLUG_BUFFER (epoch i+2) + * ........ + * + * + * State of the queue, when used for network output buffering: + * + * plug(i+1) plug(i) head + * ------------------+--------------------+----------------> + * | | + * | | + * pkts_current_epoch| pkts_last_epoch |pkts_to_release + * ----------------->|<--------+--------->|+---------------> + * v v + * + * + * @{ + */ + +#include +#include +#include +#include +#include +#include + +static int plug_msg_fill(struct rtnl_tc *tc, void *data, struct nl_msg *msg) +{ + struct rtnl_plug *plug = data; + struct tc_plug_qopt opts; + + if (!plug) + return -NLE_INVAL; + + opts.action = plug->action; + opts.limit = plug->limit; + + return nlmsg_append(msg, &opts, sizeof(opts), NL_DONTPAD); +} + +/** + * @name Attribute Modification + * @{ + */ + +/** + * Insert a plug into the qdisc and buffer any incoming + * network traffic. + * @arg qdisc PLUG qdisc to be modified. + */ +int rtnl_qdisc_plug_buffer(struct rtnl_qdisc *qdisc) +{ + struct rtnl_plug *plug; + + if (!(plug = rtnl_tc_data(TC_CAST(qdisc)))) + return -NLE_NOMEM; + + plug->action = TCQ_PLUG_BUFFER; + return 0; +} + +/** + * Unplug the qdisc, releasing packets from queue head + * to the last complete buffer, while new traffic + * continues to be buffered. + * @arg qdisc PLUG qdisc to be modified. + */ +int rtnl_qdisc_plug_release_one(struct rtnl_qdisc *qdisc) +{ + struct rtnl_plug *plug; + + if (!(plug = rtnl_tc_data(TC_CAST(qdisc)))) + return -NLE_NOMEM; + + plug->action = TCQ_PLUG_RELEASE_ONE; + return 0; +} + +/** + * Indefinitely unplug the qdisc, releasing all packets. + * Network traffic will not be buffered until the next + * buffer command is issued. + * @arg qdisc PLUG qdisc to be modified. + */ +int rtnl_qdisc_plug_release_indefinite(struct rtnl_qdisc *qdisc) +{ + struct rtnl_plug *plug; + + if (!(plug = rtnl_tc_data(TC_CAST(qdisc)))) + return -NLE_NOMEM; + + plug->action = TCQ_PLUG_RELEASE_INDEFINITE; + return 0; +} + +/** + * Set limit of PLUG qdisc. + * @arg qdisc PLUG qdisc to be modified. + * @arg limit New limit. + * @return 0 on success or a negative error code. + */ +int rtnl_qdisc_plug_set_limit(struct rtnl_qdisc *qdisc, int limit) +{ + struct rtnl_plug *plug; + + if (!(plug = rtnl_tc_data(TC_CAST(qdisc)))) + return -NLE_NOMEM; + + plug->action = TCQ_PLUG_LIMIT; + plug->limit = limit; + + return 0; +} + +/** @} */ + +static struct rtnl_tc_ops plug_ops = { + .to_kind = "plug", + .to_type = RTNL_TC_TYPE_QDISC, + .to_size = sizeof(struct rtnl_plug), + .to_msg_fill = plug_msg_fill, +}; + +static void __init plug_init(void) +{ + rtnl_tc_register(&plug_ops); +} + +static void __exit plug_exit(void) +{ + rtnl_tc_unregister(&plug_ops); +} + +/** @} */