Wednesday July 27, 2005 STREAMS flow-control implementation
In my previous blog entry I discussed how to write a very simple STREAMS module that participates in the STREAMS flow control. It had two bugs in it - one intentional and one unintentional. Both were spotted by Yu Xiangning in the comments. The unintentional bug was in the setting of the flow control high and low water marks. This blog goes into more detail of the STREAMS flow control and discusses the actual implementation in Solaris.
STREAMS have a simple flow-control mechanism that is voluntary by design. Participating modules and drivers ask the next queue whether it wishes to accept more messages by calling canputnext(9f) and if the next queue is "full" (it has more data than is specified in the module high-water mark) the module enqueues the data with putq(9f) or putbq(9f). The putq() and putbq() functions place the message on the module's queue and arrange a service procedure to be run some time later. If the service procedure returns without processing all messages on its queue it will not be called again unless it is either enabled explicitly by qenable(9f) or implicitly when the amount of data queued in the next module drops below low-water mark.
All modules participating in the flow control must have a service routine. The flow control operates between the two nearest queues in a stream containing service procedures. Detailed description of the flow control is contained in the Solaris STREAMS Programming Guide. The excellent UNIX System V Network Programming contains a very good description of the flow control in section 9.2:
A stream is said to be flow-controlled when its queues become full. When the number of bytes of data in the message on a queue becomes greater than the queue's high-water mark, the queue is considered full. Flow control is an advisory state where the processing element passing messages to the full queue stops sending messages and places them on its own queue. This way, flow control can propagate from one end of the stream to the other.
At the stream head, when a process tries to write to a stream whose topmost write queue with a service procedure is full, the process goes to sleep until the number of bytes of data stored in the queue reaches the queue's low-water mark. At this point the queue is no longer flow controlled. Note the distinction between the queue being full and being flow-controlled. The queue is only full as long as the amount of data it contains is over its high-water mark, but the queue remains flow controlled after the amount of data falls below the high-water mark. Of course, if the high and low-water marks are set to the same value, then there is no such distinction.
The canputnext()
function is pretty simple. it finds the next queue with a service procedure and
checks whether it has QFULL flag set. If QFULL is not
set, it returns 1 and if it is set, it sets QWANTW flag and returns
0. The QWANTW tells that another queue wants to place messages
here, so it should be backenabled when the QFULL flag is dropped:
int
canputnext(queue_t *q)
{
/* get next module forward with a service queue */
q = q->q_next->q_nfsrv;
if (!(q->q_flag & QFULL)) {
return (1);
} else {
q->q_flag |= QWANTW;
return (0);
}
}
The putq()
puts messages on a driver's queue. The message is placed after any other
messages of the same priority, and flow control parameters are updated. If
QNOENB is not set, the service routine is enabled:
/*
* Put a message on a queue.
*
* Messages are enqueued on a priority basis. The priority classes
* are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
* and B_NORMAL (type < QPCTL && band == 0).
*
* Add appropriate weighted data block sizes to queue count.
* If queue hits high water mark then set QFULL flag.
*
* If QNOENAB is not set (putq is allowed to enable the queue),
* enable the queue only if the message is PRIORITY,
* or the QWANTR flag is set (indicating that the service procedure
* is ready to read the queue. This implies that a service
* procedure must NEVER put a high priority message back on its own
* queue, as this would result in an infinite loop (!).
*/
int
putq(queue_t *q, mblk_t *bp)
{
mblk_t *tmp;
int bytecnt = 0, mblkcnt = 0;
/*
* If queue is empty, add the message and initialize the pointers.
* Otherwise, adjust message pointers and queue pointers.
*/
if (!q->q_first) {
bp->b_next = NULL;
bp->b_prev = NULL;
q->q_first = bp;
q->q_last = bp;
} else {
tmp = q->q_last;
bp->b_next = NULL;
bp->b_prev = tmp;
tmp->b_next = bp;
q->q_last = bp;
}
/* Get message byte count for q_count accounting */
for (tmp = bp; tmp; tmp = tmp->b_cont) {
bytecnt += (tmp->b_wptr - tmp->b_rptr);
mblkcnt++;
}
q->q_count += bytecnt;
q->q_mblkcnt += mblkcnt;
if ((q->q_count >= q->q_hiwat) ||
(q->q_mblkcnt >= q->q_hiwat)) {
q->q_flag |= QFULL;
}
/* Don't enable the queue that was noenable(9f)-ed */
if ((canenable(q) && (q->q_flag & QWANTR)))
qenable(q);
return (1);
}
getq()
function fetches and returns the first message from the queue. As a side effect
it may trigger back-enabling of the queues which were previously flow
controlled because this queue had too much data. The back-enabling process will
arrange the service routines of a previously flow-controlled module to be
called. It will call getq() on its own queue which may cause
further back-enabling, propagating the release of the flow control backwards
down the stream.
/*
* Get a message off head of queue
*
* If queue has no buffers then mark queue
* with QWANTR. (queue wants to be read by
* someone when data becomes available)
*
* If there is something to take off then do so.
* If queue falls below hi water mark turn off QFULL
* flag. Decrement weighted count of queue.
* Also turn off QWANTR because queue is being read.
*
* The queue count is maintained on a per-band basis.
* Priority band 0 (normal messages) uses q_count,
* q_lowat, etc.
*
* If queue count is below the lo water mark and QWANTW
* is set, enable the closest backq which has a service
* procedure and turn off the QWANTW flag.
*
* A note on the use of q_count and q_mblkcnt:
* q_count is the traditional byte count for messages that
* have been put on a queue. Documentation tells us that
* we shouldn't rely on that count, but some drivers/modules
* do. What was needed, however, is a mechanism to prevent
* runaway streams from consuming all of the resources,
* and particularly be able to flow control zero-length
* messages. q_mblkcnt is used for this purpose. It
* counts the number of mblk's that are being put on
* the queue. The intention here, is that each mblk should
* contain one byte of data and, for the purpose of
* flow-control, logically does. A queue will become
* full when EITHER of these values (q_count and q_mblkcnt)
* reach the highwater mark. It will clear when BOTH
* of them drop below the highwater mark. And it will
* backenable when BOTH of them drop below the lowwater
* mark.
* With this algorithm, a driver/module might be able
* to find a reasonably accurate q_count, and the
* framework can still try and limit resource usage.
*/
mblk_t *
getq(queue_t *q)
{
mblk_t *bp;
int band = 0;
bp = getq_noenab(q);
if (bp != NULL)
band = bp->b_band;
qbackenable(q, band);
return (bp);
}
The getq_noenab()
is a STREAMS framework internal function which does the actual job of fetching
the message but doesn't deal with back-enabling the STREAM.
/*
* Like getq() but does not backenable. The caller must call qbackenable()
* after it is done with accessing the queue.
*/
mblk_t *
getq_noenab(queue_t *q)
{
mblk_t *bp;
mblk_t *tmp;
int bytecnt = 0, mblkcnt = 0;
if ((bp = q->q_first) == 0) {
q->q_flag |= QWANTR;
} else {
if ((q->q_first = bp->b_next) == NULL)
q->q_last = NULL;
else
q->q_first->b_prev = NULL;
/* Get message byte count for q_count accounting */
for (tmp = bp; tmp; tmp = tmp->b_cont) {
bytecnt += (tmp->b_wptr - tmp->b_rptr);
mblkcnt++;
}
q->q_count -= bytecnt;
q->q_mblkcnt -= mblkcnt;
if ((q->q_count < q->q_hiwat) &&
(q->q_mblkcnt < q->q_hiwat)) {
q->q_flag &= ~QFULL;
q->q_flag &= ~QWANTR;
bp->b_next = NULL;
bp->b_prev = NULL;
}
return (bp);
}
The
qbackenable() function is another STREAMS internal function
that checks whether queue back-enabling is required and calls the actual
function backenable() doing back-enabling.
/*
* Determine if a backenable is needed after removing a message in the
* specified band.
*/
void
qbackenable(queue_t *q, int band)
{
int backenab = 0;
if (band == 0 && (q->q_flag & QWANTW) == 0)
return;
if (band == 0) {
if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
q->q_mblkcnt < q->q_lowat)) {
backenab = q->q_flag & QWANTW;
}
} else {
...
}
if (backenab & QWANTW) {
q->q_flag &= ~QWANTW;
backenable(q, band);
}
}
The
backenable() is a STREAMS internal function that finds the
nearest back queue with service procedure and enables it. The
enabling involves arranging the service routine to be run sometime in
the future. It is handled by the qenable_locked()
function that is beyond the scope of this blog.
/*
/*
* enable first back queue with svc procedure.
* Use pri == -1 to avoid the setqback
*/
void
backenable(queue_t *q, int pri)
{
queue_t *nq;
/* find nearest back queue with service proc */
for (nq = backq(q); nq && !nq->q_qinfo->qi_srvp; nq = backq(nq))
;
if (nq) {
if (pri != -1)
setqback(nq, pri);
qenable_locked(nq);
}
}
Grown up Do-Nothing STREAMS Module
In my earlier entry I played with a simple STREAMS module that does nothing useful, but just passes messages back and forth. Now I want to extend this to a respectful STREAMS module fully participating in the STREAMS flow control. This means that in addition to the open/close entry points the module should define read and write put procedures and a service procedure. Previous module was called "nullmod", this module will be called "passmod".
Let us start with the put procedure. It can be as simple as
void
passmodput(queue_t *q, mblk_t *mp)
{
putnext(q, mp);
}
What we now want to do is to check that the next module in the STREAM can accept
our message. We do this by calling canputnext(9f) and using
putq(9f) if canputnext() fails:
void
passmodput(queue_t *q, mblk_t *mp)
{
if (canputnext(q)) {
putnext(q, mp);
} else {
(void) putbq(q, mp);
}
}
Here is the service routine. It gets all messages one by one and passes it down down the STREAM observing the flow control:
/* Read/write side service procedure. */
static void
passmodsrv(queue_t *q)
{
mblk_t *mp;
/*
* Get messages from the service queue and pass them forward until flow
* controlled.
*/
while ((mp = getq(q)) != NULL) {
if (canputnext(q)) {
putnext(q, mp);
} else {
(void) putbq(q, mp);
break;
}
}
}
Now, what happens if by the time we enter the put procedure there are already some messages enqueued? We definitely do not want to call putnext() on the new message since it may arrive before the earlier messages and violate the message ordering in the STREAM. To defend against this problem we revise the put procedure a bit:
void
passmodput(queue_t *q, mblk_t *mp)
{
if ((q->q_first == NULL) && canputnext(q)) {
putnext(q, mp);
} else {
(void) putbq(q, mp);
}
}
Now if there are any messages already enqueued we will continue enqueueing all
new messages.This code is very straightforward, but a bit naive. The
complication comes from the high-priority messages (which can be passed using
RS_HIPRI flags to the putmsg(2) function. When you
call putq() with the high-priority message, the STREAMS framework
immediately enables the queue and calls its service procedure which
will cause an infinite loop, so we should be a bit more accurate and always pass
high priority messages. This means that we don't need to enqueue them in the
first place, so we can rewrite the put procedure again to fix the problem:
void
passmodput(queue_t *q, mblk_t *mp)
{
/*
* If the message is a high-priority message or there is no flow control
* and there are no messages in the queue already, pass it forward,
* otherwise enqueue. High priority message should be always passed
* forward.
*/
if (queclass(mp) == QPCTL ||
((q->q_first == NULL) && canputnext(q)))
putnext(q, mp);
else
(void) putq(q, mp);
}
Now we have all the components to construct a fully-functioning STREAMS module which correctly implements flow control. The full code is below.
NOTE: The code below contains a subtle bug. Try to find it before I explain the bug in the next blog entry.
/* * This example demonstrates a minimum STREAMS module that honors flow control. */ /* * Required include files. */ #include#include #include #include #include #include #include /* * Function prototypes. */ static int passmodopen(queue_t *, dev_t *, int, int, cred_t *); static int passmodclose(queue_t *); static void passmodput(queue_t *, mblk_t *); static void passmodsrv(queue_t *); /* * Module linkage data */ static struct module_info passmod_minfo = { 2, /* mi_idnum */ "passmod", /* mi_idname */ 0, /* mi_minpsz */ INFPSZ, /* mi_maxpsz */ 0, /* mi_hiwat */ 0 /* mi_lowat */ }; static struct qinit passmod_rinit = { (int (*)())passmodput, /* qi_putp */ (int (*)())passmodsrv, /* qi_srvp */ passmodopen, /* qi_qopen */ passmodclose, /* qi_qclose */ NULL, /* qi_qadmin */ &passmod_minfo, /* qi_minfo */ }; static struct qinit passmod_winit = { (int (*)())passmodput, /* qi_putp */ (int (*)())passmodsrv, /* qi_srvp */ NULL, /* qi_qopen */ NULL, /* qi_qclose */ NULL, /* qi_qadmin */ &passmod_minfo, /* qi_minfo */ }; static struct streamtab passmod_info = { &passmod_rinit, /* st_rdinit */ &passmod_winit, /* st_wrinit */ }; static struct fmodsw fsw = { "passmod", &passmod_info, D_MP }; /* * Module linkage information for the kernel. */ struct mod_ops mod_strmodops; static struct modlstrmod modlstrmod = { &mod_strmodops, "Example pass-through module 1.0", &fsw }; static struct modlinkage modlinkage = { MODREV_1, (void *)&modlstrmod, NULL }; /* * Standard module entry points. */ int _init(void) { return (mod_install(&modlinkage)); } int _fini(void) { return (mod_remove(&modlinkage)); } int _info(struct modinfo *modinfop) { return (mod_info(&modlinkage, modinfop)); } /* * Actual module code. */ /* * STREAMS entry points. */ /* ARGSUSED */ static int passmodopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp) { if (sflag != MODOPEN) return (EINVAL); /* Prevent duplicate opens */ if (rq->q_ptr != NULL) return (0); rq->q_ptr = WR(rq)->q_ptr = (void *)1; qprocson(rq); /* * At this point module is linked in the STREAM and can send/receive * messages. Its put/service procedures may execute at any time. */ return (0); } static int passmodclose(queue_t *rq) { qprocsoff(rq); rq->q_ptr = WR(rq)->q_ptr = NULL; /* * At this point module is disconnected from the STREAM and can * no longer receive messages. Its put or service procedures are not * running. */ return (0); } /* * Support routines. */ /* Read/write side put procedure. */ static void passmodput(queue_t *q, mblk_t *mp) { /* * If the message is a high-priority message or there is no flow control * and there are no messages in the queue already, pass it forward, * otherwise enqueue. High priority message should be always passed * forward. */ if (queclass(mp) == QPCTL || ((q->q_first == NULL) && canputnext(q))) putnext(q, mp); else (void) putq(q, mp); } /* Read/write side service procedure. */ static void passmodsrv(queue_t *q) { mblk_t *mp; /* * Get messages from the service queue and pass them forward until flow * controlled. */ while ((mp = getq(q)) != NULL) { if (canputnext(q)) { putnext(q, mp); } else { (void) putbq(q, mp); break; } } }