Thursday July 07, 2005 Grown up Do-Nothing STREAMS Module
In my earlier entry I played with a simple STREAMS module that does nothing useful, but just passes messages back and forth. Now I want to extend this to a respectful STREAMS module fully participating in the STREAMS flow control. This means that in addition to the open/close entry points the module should define read and write put procedures and a service procedure. Previous module was called "nullmod", this module will be called "passmod".
Let us start with the put procedure. It can be as simple as
void
passmodput(queue_t *q, mblk_t *mp)
{
putnext(q, mp);
}
What we now want to do is to check that the next module in the STREAM can accept
our message. We do this by calling canputnext(9f) and using
putq(9f) if canputnext() fails:
void
passmodput(queue_t *q, mblk_t *mp)
{
if (canputnext(q)) {
putnext(q, mp);
} else {
(void) putbq(q, mp);
}
}
Here is the service routine. It gets all messages one by one and passes it down down the STREAM observing the flow control:
/* Read/write side service procedure. */
static void
passmodsrv(queue_t *q)
{
mblk_t *mp;
/*
* Get messages from the service queue and pass them forward until flow
* controlled.
*/
while ((mp = getq(q)) != NULL) {
if (canputnext(q)) {
putnext(q, mp);
} else {
(void) putbq(q, mp);
break;
}
}
}
Now, what happens if by the time we enter the put procedure there are already some messages enqueued? We definitely do not want to call putnext() on the new message since it may arrive before the earlier messages and violate the message ordering in the STREAM. To defend against this problem we revise the put procedure a bit:
void
passmodput(queue_t *q, mblk_t *mp)
{
if ((q->q_first == NULL) && canputnext(q)) {
putnext(q, mp);
} else {
(void) putbq(q, mp);
}
}
Now if there are any messages already enqueued we will continue enqueueing all
new messages.This code is very straightforward, but a bit naive. The
complication comes from the high-priority messages (which can be passed using
RS_HIPRI flags to the putmsg(2) function. When you
call putq() with the high-priority message, the STREAMS framework
immediately enables the queue and calls its service procedure which
will cause an infinite loop, so we should be a bit more accurate and always pass
high priority messages. This means that we don't need to enqueue them in the
first place, so we can rewrite the put procedure again to fix the problem:
void
passmodput(queue_t *q, mblk_t *mp)
{
/*
* If the message is a high-priority message or there is no flow control
* and there are no messages in the queue already, pass it forward,
* otherwise enqueue. High priority message should be always passed
* forward.
*/
if (queclass(mp) == QPCTL ||
((q->q_first == NULL) && canputnext(q)))
putnext(q, mp);
else
(void) putq(q, mp);
}
Now we have all the components to construct a fully-functioning STREAMS module which correctly implements flow control. The full code is below.
NOTE: The code below contains a subtle bug. Try to find it before I explain the bug in the next blog entry.
/* * This example demonstrates a minimum STREAMS module that honors flow control. */ /* * Required include files. */ #include#include #include #include #include #include #include /* * Function prototypes. */ static int passmodopen(queue_t *, dev_t *, int, int, cred_t *); static int passmodclose(queue_t *); static void passmodput(queue_t *, mblk_t *); static void passmodsrv(queue_t *); /* * Module linkage data */ static struct module_info passmod_minfo = { 2, /* mi_idnum */ "passmod", /* mi_idname */ 0, /* mi_minpsz */ INFPSZ, /* mi_maxpsz */ 0, /* mi_hiwat */ 0 /* mi_lowat */ }; static struct qinit passmod_rinit = { (int (*)())passmodput, /* qi_putp */ (int (*)())passmodsrv, /* qi_srvp */ passmodopen, /* qi_qopen */ passmodclose, /* qi_qclose */ NULL, /* qi_qadmin */ &passmod_minfo, /* qi_minfo */ }; static struct qinit passmod_winit = { (int (*)())passmodput, /* qi_putp */ (int (*)())passmodsrv, /* qi_srvp */ NULL, /* qi_qopen */ NULL, /* qi_qclose */ NULL, /* qi_qadmin */ &passmod_minfo, /* qi_minfo */ }; static struct streamtab passmod_info = { &passmod_rinit, /* st_rdinit */ &passmod_winit, /* st_wrinit */ }; static struct fmodsw fsw = { "passmod", &passmod_info, D_MP }; /* * Module linkage information for the kernel. */ struct mod_ops mod_strmodops; static struct modlstrmod modlstrmod = { &mod_strmodops, "Example pass-through module 1.0", &fsw }; static struct modlinkage modlinkage = { MODREV_1, (void *)&modlstrmod, NULL }; /* * Standard module entry points. */ int _init(void) { return (mod_install(&modlinkage)); } int _fini(void) { return (mod_remove(&modlinkage)); } int _info(struct modinfo *modinfop) { return (mod_info(&modlinkage, modinfop)); } /* * Actual module code. */ /* * STREAMS entry points. */ /* ARGSUSED */ static int passmodopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp) { if (sflag != MODOPEN) return (EINVAL); /* Prevent duplicate opens */ if (rq->q_ptr != NULL) return (0); rq->q_ptr = WR(rq)->q_ptr = (void *)1; qprocson(rq); /* * At this point module is linked in the STREAM and can send/receive * messages. Its put/service procedures may execute at any time. */ return (0); } static int passmodclose(queue_t *rq) { qprocsoff(rq); rq->q_ptr = WR(rq)->q_ptr = NULL; /* * At this point module is disconnected from the STREAM and can * no longer receive messages. Its put or service procedures are not * running. */ return (0); } /* * Support routines. */ /* Read/write side put procedure. */ static void passmodput(queue_t *q, mblk_t *mp) { /* * If the message is a high-priority message or there is no flow control * and there are no messages in the queue already, pass it forward, * otherwise enqueue. High priority message should be always passed * forward. */ if (queclass(mp) == QPCTL || ((q->q_first == NULL) && canputnext(q))) putnext(q, mp); else (void) putq(q, mp); } /* Read/write side service procedure. */ static void passmodsrv(queue_t *q) { mblk_t *mp; /* * Get messages from the service queue and pass them forward until flow * controlled. */ while ((mp = getq(q)) != NULL) { if (canputnext(q)) { putnext(q, mp); } else { (void) putbq(q, mp); break; } } }