Weblog

All | CMT | General | NUMA | OpenSolaris | Perl | Photo | Programmers Desk | STREAMS
« Task Queues in OpenS... | Main | STREAMS flow-control... »
20050707 Thursday July 07, 2005

Grown up Do-Nothing STREAMS Module

Grown up Do-Nothing STREAMS Module

In my earlier entry I played with a simple STREAMS module that does nothing useful, but just passes messages back and forth. Now I want to extend this to a respectful STREAMS module fully participating in the STREAMS flow control. This means that in addition to the open/close entry points the module should define read and write put procedures and a service procedure. Previous module was called "nullmod", this module will be called "passmod".

Let us start with the put procedure. It can be as simple as

void
passmodput(queue_t *q, mblk_t *mp)
{
	putnext(q, mp);
}

What we now want to do is to check that the next module in the STREAM can accept our message. We do this by calling canputnext(9f) and using putq(9f) if canputnext() fails:

void
passmodput(queue_t *q, mblk_t *mp)
{
	if (canputnext(q)) {
		putnext(q, mp);
	} else {
		(void) putbq(q, mp);
	}
}

Here is the service routine. It gets all messages one by one and passes it down down the STREAM observing the flow control:

/* Read/write side service procedure. */
static void
passmodsrv(queue_t *q)
{
	mblk_t *mp;

	/*
	 * Get messages from the service queue and pass them forward until flow
	 * controlled.
	 */
	while ((mp = getq(q)) != NULL) {
		if (canputnext(q)) {
			putnext(q, mp);
		} else {
			(void) putbq(q, mp);
			break;
		}
	}
}

Now, what happens if by the time we enter the put procedure there are already some messages enqueued? We definitely do not want to call putnext() on the new message since it may arrive before the earlier messages and violate the message ordering in the STREAM. To defend against this problem we revise the put procedure a bit:

void
passmodput(queue_t *q, mblk_t *mp)
{
	if ((q->q_first == NULL) && canputnext(q)) {
		putnext(q, mp);
	} else {
		(void) putbq(q, mp);
	}
}

Now if there are any messages already enqueued we will continue enqueueing all new messages.This code is very straightforward, but a bit naive. The complication comes from the high-priority messages (which can be passed using RS_HIPRI flags to the putmsg(2) function. When you call putq() with the high-priority message, the STREAMS framework immediately enables the queue and calls its service procedure which will cause an infinite loop, so we should be a bit more accurate and always pass high priority messages. This means that we don't need to enqueue them in the first place, so we can rewrite the put procedure again to fix the problem:

void
passmodput(queue_t *q, mblk_t *mp)
{
	/*
	 * If the message is a high-priority message or there is no flow control
	 * and there are no messages in the queue already, pass it forward,
	 * otherwise enqueue. High priority message should be always passed
	 * forward.
	 */
	if (queclass(mp) == QPCTL ||
	    ((q->q_first == NULL) && canputnext(q)))
		putnext(q, mp);
	else
		(void) putq(q, mp);
}

Now we have all the components to construct a fully-functioning STREAMS module which correctly implements flow control. The full code is below.

NOTE: The code below contains a subtle bug. Try to find it before I explain the bug in the next blog entry.

/*
 * This example demonstrates a minimum STREAMS module that honors flow control.
 */

/*
 * Required include files.
 */
#include	
#include	
#include	
#include	
#include	
#include	
#include	

/*
 * Function prototypes.
 */
static int	passmodopen(queue_t *, dev_t *, int, int, cred_t *);
static int	passmodclose(queue_t *);
static void	passmodput(queue_t *, mblk_t *);
static void	passmodsrv(queue_t *);

/*
 * Module linkage data
 */
static struct module_info	passmod_minfo = {
	2,		/* mi_idnum */
	"passmod",	/* mi_idname */
	0,		/* mi_minpsz */
	INFPSZ,		/* mi_maxpsz */
	0,		/* mi_hiwat */
	0		/* mi_lowat */
};

static struct qinit	passmod_rinit = {
	(int (*)())passmodput,	/* qi_putp */
	(int (*)())passmodsrv,	/* qi_srvp  */
	passmodopen,	/* qi_qopen */
	passmodclose,	/* qi_qclose */
	NULL,		/* qi_qadmin */
	&passmod_minfo,	/* qi_minfo */
};

static struct qinit	passmod_winit = {
	(int (*)())passmodput,	/* qi_putp */
	(int (*)())passmodsrv,	/* qi_srvp */
	NULL,		/* qi_qopen */
	NULL,		/* qi_qclose */
	NULL,		/* qi_qadmin */
	&passmod_minfo,	/* qi_minfo */
};

static struct streamtab	passmod_info = {
	&passmod_rinit,	/* st_rdinit */
	&passmod_winit,	/* st_wrinit */
};

static struct fmodsw fsw = {
	"passmod",
	&passmod_info,
	D_MP
};

/*
 * Module linkage information for the kernel.
 */
struct mod_ops mod_strmodops;

static struct modlstrmod modlstrmod = {
	&mod_strmodops, "Example pass-through module 1.0", &fsw
};

static struct modlinkage modlinkage = {
	MODREV_1, (void *)&modlstrmod, NULL
};

/*
 * Standard module entry points.
 */
int
_init(void)
{
	return (mod_install(&modlinkage));
}

int
_fini(void)
{
	return (mod_remove(&modlinkage));
}

int
_info(struct modinfo *modinfop)
{
	return (mod_info(&modlinkage, modinfop));
}


/*
 * Actual module code.
 */

/*
 * STREAMS entry points.
 */

/* ARGSUSED */
static int
passmodopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
{
	if (sflag != MODOPEN)
		return (EINVAL);

	/* Prevent duplicate opens */
	if (rq->q_ptr != NULL)
		return (0);

	rq->q_ptr = WR(rq)->q_ptr = (void *)1;

	qprocson(rq);
	/*
	 * At this point module is linked in the STREAM and can send/receive
	 * messages. Its put/service procedures may execute at any time.
	 */
	return (0);
}

static int
passmodclose(queue_t *rq)
{
	qprocsoff(rq);
	rq->q_ptr = WR(rq)->q_ptr = NULL;
	/*
	 * At this point module is disconnected from the STREAM and can
	 * no longer receive messages. Its put or service procedures are not
	 * running.
	 */
	return (0);
}

/*
 * Support routines.
 */

/* Read/write side put procedure. */
static void
passmodput(queue_t *q, mblk_t *mp)
{
	/*
	 * If the message is a high-priority message or there is no flow control
	 * and there are no messages in the queue already, pass it forward,
	 * otherwise enqueue. High priority message should be always passed
	 * forward.
	 */
	if (queclass(mp) == QPCTL ||
	    ((q->q_first == NULL) && canputnext(q)))
		putnext(q, mp);
	else
		(void) putq(q, mp);
}

/* Read/write side service procedure. */
static void
passmodsrv(queue_t *q)
{
	mblk_t *mp;

	/*
	 * Get messages from the service queue and pass them forward until flow
	 * controlled.
	 */
	while ((mp = getq(q)) != NULL) {
		if (canputnext(q)) {
			putnext(q, mp);
		} else {
			(void) putbq(q, mp);
			break;
		}
	}
}


Technorati Tag:
Technorati Tag:
Technorati Tag:

( Jul 07 2005, 06:30:42 PM PDT ) Permalink Comments [14]

Trackback URL: http://blogs.sun.com/akolb/entry/grown_up_do_nothing_streams
Comments:

Great info! Though I cannot spot the bug yet :-) I am thinking about writing usermode packet processing/filtering. Can streams be used to redirect packet to/from ethernet driver and inject it back after it is processed by usermode program? In order to do this Do I need the device file in /dev/* for userland program to read and write? Or is there any other easy way? :-) -Thanks

Posted by sitchai on July 07, 2005 at 09:15 PM PDT #

The bug is that the queue's high-water mark of passmod_minfo is 0, is it?

Posted by yxn on July 07, 2005 at 10:36 PM PDT #

<strong>sitchai</strong>: This would be a really cool feature. Are you interested in actually implementing this? Sounds like a perfect project for the OpenSolaris community.

<strong>yxn</strong>: Good eye! Now, can you spot another, even more subtle problem?

Posted by Alexander Kolbasov on July 08, 2005 at 09:50 AM PDT #

It's a bit difficult for me, but I'll try it. The read side of put procedure might be called from an interrput handler, and the read side service procedure might be executing concurrently with it. Do we need some lock mechanism when checking the queue with "q->q_first == NULL"?

Posted by Yu Xiangning on July 11, 2005 at 06:53 PM PDT #

<strong>yxn</strong>: Correct. Note that this can happen even without interrupts: service procedures are executed asynchronously and may always execute in parrallel with both read and write put rocedures.

<strong>Q:</strong> What would you suggest to fix the problem?

Posted by Alexander Kolbasov on July 14, 2005 at 02:53 PM PDT #

Well, we can add r/w lock to do the protection ourselves, or we can simple use STREAM perimeter to do it automaticly.

In this case we need a inner perimeter for each queue.

<tt> static struct fmodsw fsw = { "passmod", &passmod_info, D_MTPERQ | D_MP }; </tt>

Posted by Yu Xiangning on July 14, 2005 at 11:00 PM PDT #

Yu, use of perimeters is one (and good way) to fix the problem. There is another one which doesn't involve either perimeters or locks. Can you find it?

Do you want to outline the r/w locks approach which will achieve the purpose?

Posted by Alexander Kolbasov on July 15, 2005 at 10:44 AM PDT #

WOW, this is more and more interesting. :)

The another way is to use Synchronous STREAM by setting D_SYNCSTR flag? If yes, I think it has much work to do.

I'll try to figure out the r/w approach ASAP.

Posted by Yu Xiangning on July 17, 2005 at 07:50 PM PDT #

Or there's another tricky: don't call putnext() for ordinary messages in the put procedure, enqueue them and putnext in service procedure.

Posted by Yu Xiangning on July 17, 2005 at 08:07 PM PDT #

<strong>Yu</strong>: Synchronous STREAM is a hack to support combined copy+checksum, it is not the official support mechanism and it doesn't help here.

You correcty suggested to call <code>putq()</code> in the put procedure and <code>putnext()</code> in the service procedure. This will always preserve message ordering. So you described both good ways - using inner perimeters and using service routines for all message processing. Congratulations!

I think that achieving the message ordering with locks may be quite tricky. Keep in mind that you can't hold locks across <code>putnext()</code>

Posted by Alexander Kolbasov on July 18, 2005 at 10:53 PM PDT #

Thank you, Sasha.

We can't hold locks across <tt>putnext</tt> but we have to hold a lock to make it multithread-safe, so we can add a check point to for the lock by looking at the value of <tt>curthread</tt> in put procedure:

kthread_id_t runner;

runner == STREAM(p)->sd_freezer;
if (runner == curthread)
        ASSERT(MUTEX_HELD(QLOCK(q)));
else 
        mutex_enter(QLOCK(q));

if (queclass(mp) == QPCTL ||
   ((q->q_first == NULL) && banputnext(q)))
        putnext(q, mp);
else
        (void) putq(q, mp);

if (runner != curthread)
        mutex_exit(QLOCK(q));

Please feel free to correct me. :)

Posted by Yu Xiangning on July 19, 2005 at 01:22 AM PDT #

<strong>Yu</strong>:You are reading too much of the STREAMS framework code :-). Few notes regarding your code snipped:

  • <code>sd_freezer</code> is a private field specifically used for deprecated <code>freezestr()</code> implementation.
  • <code>QLOCK</code> is private for STREAMS framework and you can't hold it in your module. In fact, <code>putnext()</code> itself uses it.

Achieving the purpose with private locks may be quite ticky (and I don't get a good solution, BTW).

Posted by Alexander Kolbasov on July 20, 2005 at 12:36 PM PDT #

emm, I have to seperate the framework and the module.

I wrote the following codes to protect <tt>q</tt> by adding a private lock to protect the read access to <tt>q_first</tt> from write access to <tt>getq()</tt>, <tt>putq()</tt> and <tt>putbq()</tt>.

But it doesn't look so clean. :(

Sasha, what is your solution?

In put procedure:

if (queclass(mp) == QPCTL) {
        putnext(q, mp);
} else {
        if (bcanputnext(q)) {
                mutex_enter(&private_lock);
                if (q->q_first == NULL) {
                        mutex_exit(&private_lock);
                        putnext(q, mp);
                        return;   
                }           
                mutex_exit(&private_lock); 
        }
        mutex_enter(&private_lock);
        (void) putq(q, mp);
        mutex_exit(&private_lock);
}  

In service procedure:

while (1) {
        mutex_enter(&private_lock);
        mp = getq(q);
        mutex_exit(&private_lock);
        if (mp == NULL);
               break;
        if (canputnext(q)) {
		putnext(q, mp);
	} else {
                mutex_enter(&private_lock);
                (void) putbq(q, mp);
                mutex_exit(&private_lock);
		break;
	}
}

Posted by Yu Xiangning on July 21, 2005 at 08:27 PM PDT #

Let's take a closer look at the part of the code above:

<pre class="code"> <code> if (bcanputnext(q)) { mutex_enter(&private_lock); if (q->q_first == NULL) { mutex_exit(&private_lock); putnext(q, mp); return; } mutex_exit(&private_lock); } </code>

Note that you drop the lock before calling <code>putnext()</code> and once you do that the service another thread may happily call <code>putq()</code> and beat you in the race.

My solution is the one you suggested - use perimeters.

Posted by Alexander Kolbasov on July 22, 2005 at 03:23 PM PDT #

Post a Comment:

Name:
E-Mail:
URL:

Your Comment:

HTML Syntax: NOT allowed

Calendar

RSS Feeds

Search

Links

Navigation

Referers