Weblog

All | CMT | General | NUMA | OpenSolaris | Perl | Photo | Programmers Desk | STREAMS
« Grown up Do-Nothing... | Main | I want to understand... »
20050727 Wednesday July 27, 2005

STREAMS flow-control implementation STREAMS flow-control implementation

In my previous blog entry I discussed how to write a very simple STREAMS module that participates in the STREAMS flow control. It had two bugs in it - one intentional and one unintentional. Both were spotted by Yu Xiangning in the comments. The unintentional bug was in the setting of the flow control high and low water marks. This blog goes into more detail of the STREAMS flow control and discusses the actual implementation in Solaris.

STREAMS flow-control implementation

STREAMS have a simple flow-control mechanism that is voluntary by design. Participating modules and drivers ask the next queue whether it wishes to accept more messages by calling canputnext(9f) and if the next queue is "full" (it has more data than is specified in the module high-water mark) the module enqueues the data with putq(9f) or putbq(9f). The putq() and putbq() functions place the message on the module's queue and arrange a service procedure to be run some time later. If the service procedure returns without processing all messages on its queue it will not be called again unless it is either enabled explicitly by qenable(9f) or implicitly when the amount of data queued in the next module drops below low-water mark.

All modules participating in the flow control must have a service routine. The flow control operates between the two nearest queues in a stream containing service procedures. Detailed description of the flow control is contained in the Solaris STREAMS Programming Guide. The excellent UNIX System V Network Programming contains a very good description of the flow control in section 9.2:

A stream is said to be flow-controlled when its queues become full. When the number of bytes of data in the message on a queue becomes greater than the queue's high-water mark, the queue is considered full. Flow control is an advisory state where the processing element passing messages to the full queue stops sending messages and places them on its own queue. This way, flow control can propagate from one end of the stream to the other.

At the stream head, when a process tries to write to a stream whose topmost write queue with a service procedure is full, the process goes to sleep until the number of bytes of data stored in the queue reaches the queue's low-water mark. At this point the queue is no longer flow controlled. Note the distinction between the queue being full and being flow-controlled. The queue is only full as long as the amount of data it contains is over its high-water mark, but the queue remains flow controlled after the amount of data falls below the high-water mark. Of course, if the high and low-water marks are set to the same value, then there is no such distinction.

Source: Stephen A. Rago, UNIX System V Network Programming.

What we will do now is look at the actual code that implements all this. I will provide a simplified versions of most functions, ignoring locks and priority bands. By all means, look at the actual code to see all the missing parts.

canputnext()

The canputnext() function is pretty simple. it finds the next queue with a service procedure and checks whether it has QFULL flag set. If QFULL is not set, it returns 1 and if it is set, it sets QWANTW flag and returns 0. The QWANTW tells that another queue wants to place messages here, so it should be backenabled when the QFULL flag is dropped:

int
canputnext(queue_t *q)
{
	/* get next module forward with a service queue */
	q = q->q_next->q_nfsrv;

	if (!(q->q_flag & QFULL)) {
		return (1);
	} else {
		q->q_flag |= QWANTW;
		return (0);
	}
}

putq()

The putq() puts messages on a driver's queue. The message is placed after any other messages of the same priority, and flow control parameters are updated. If QNOENB is not set, the service routine is enabled:

/*
 * Put a message on a queue.
 *
 * Messages are enqueued on a priority basis.  The priority classes
 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
 * and B_NORMAL (type < QPCTL && band == 0).
 *
 * Add appropriate weighted data block sizes to queue count.
 * If queue hits high water mark then set QFULL flag.
 *
 * If QNOENAB is not set (putq is allowed to enable the queue),
 * enable the queue only if the message is PRIORITY,
 * or the QWANTR flag is set (indicating that the service procedure
 * is ready to read the queue.  This implies that a service
 * procedure must NEVER put a high priority message back on its own
 * queue, as this would result in an infinite loop (!).
 */
int
putq(queue_t *q, mblk_t *bp)
{
	mblk_t *tmp;
	int	bytecnt = 0, mblkcnt = 0;

	/*
	 * If queue is empty, add the message and initialize the pointers.
	 * Otherwise, adjust message pointers and queue pointers.
	 */
	if (!q->q_first) {
		bp->b_next = NULL;
		bp->b_prev = NULL;
		q->q_first = bp;
		q->q_last = bp;
	} else {
		tmp = q->q_last;
		bp->b_next = NULL;
		bp->b_prev = tmp;
		tmp->b_next = bp;
		q->q_last = bp;
	}

	/* Get message byte count for q_count accounting */
	for (tmp = bp; tmp; tmp = tmp->b_cont) {
		bytecnt += (tmp->b_wptr - tmp->b_rptr);
		mblkcnt++;
	}
	q->q_count += bytecnt;
	q->q_mblkcnt += mblkcnt;
	if ((q->q_count >= q->q_hiwat) ||
	    (q->q_mblkcnt >= q->q_hiwat)) {
		q->q_flag |= QFULL;
	}

	/* Don't enable the queue that was noenable(9f)-ed */
	if ((canenable(q) && (q->q_flag & QWANTR)))
		qenable(q);

	return (1);
}

getq()

Thegetq() function fetches and returns the first message from the queue. As a side effect it may trigger back-enabling of the queues which were previously flow controlled because this queue had too much data. The back-enabling process will arrange the service routines of a previously flow-controlled module to be called. It will call getq() on its own queue which may cause further back-enabling, propagating the release of the flow control backwards down the stream.

/*
 * Get a message off head of queue
 *
 * If queue has no buffers then mark queue
 * with QWANTR. (queue wants to be read by
 * someone when data becomes available)
 *
 * If there is something to take off then do so.
 * If queue falls below hi water mark turn off QFULL
 * flag.  Decrement weighted count of queue.
 * Also turn off QWANTR because queue is being read.
 *
 * The queue count is maintained on a per-band basis.
 * Priority band 0 (normal messages) uses q_count,
 * q_lowat, etc.
 *
 * If queue count is below the lo water mark and QWANTW
 * is set, enable the closest backq which has a service
 * procedure and turn off the QWANTW flag.
 *
 * A note on the use of q_count and q_mblkcnt:
 *   q_count is the traditional byte count for messages that
 *   have been put on a queue.  Documentation tells us that
 *   we shouldn't rely on that count, but some drivers/modules
 *   do.  What was needed, however, is a mechanism to prevent
 *   runaway streams from consuming all of the resources,
 *   and particularly be able to flow control zero-length
 *   messages.  q_mblkcnt is used for this purpose.  It
 *   counts the number of mblk's that are being put on
 *   the queue.  The intention here, is that each mblk should
 *   contain one byte of data and, for the purpose of
 *   flow-control, logically does.  A queue will become
 *   full when EITHER of these values (q_count and q_mblkcnt)
 *   reach the highwater mark.  It will clear when BOTH
 *   of them drop below the highwater mark.  And it will
 *   backenable when BOTH of them drop below the lowwater
 *   mark.
 *   With this algorithm, a driver/module might be able
 *   to find a reasonably accurate q_count, and the
 *   framework can still try and limit resource usage.
 */
mblk_t *
getq(queue_t *q)
{
	mblk_t *bp;
	int band = 0;

	bp = getq_noenab(q);
	if (bp != NULL)
		band = bp->b_band;

	qbackenable(q, band);
	return (bp);
}

getq_noenab()

The getq_noenab() is a STREAMS framework internal function which does the actual job of fetching the message but doesn't deal with back-enabling the STREAM.

/*
 * Like getq() but does not backenable. The caller must call qbackenable()
 * after it is done with accessing the queue.
 */
mblk_t *
getq_noenab(queue_t *q)
{
	mblk_t *bp;
	mblk_t *tmp;
	int	bytecnt = 0, mblkcnt = 0;

	if ((bp = q->q_first) == 0) {
		q->q_flag |= QWANTR;
	} else {
		if ((q->q_first = bp->b_next) == NULL)
			q->q_last = NULL;
		else
			q->q_first->b_prev = NULL;

		/* Get message byte count for q_count accounting */
		for (tmp = bp; tmp; tmp = tmp->b_cont) {
			bytecnt += (tmp->b_wptr - tmp->b_rptr);
			mblkcnt++;
		}

		q->q_count -= bytecnt;
		q->q_mblkcnt -= mblkcnt;
		if ((q->q_count < q->q_hiwat) &&
		    (q->q_mblkcnt < q->q_hiwat)) {
			q->q_flag &= ~QFULL;

		q->q_flag &= ~QWANTR;
		bp->b_next = NULL;
		bp->b_prev = NULL;
	}
	return (bp);
}

qbackenable()

The qbackenable() function is another STREAMS internal function that checks whether queue back-enabling is required and calls the actual function backenable() doing back-enabling.

/*
 * Determine if a backenable is needed after removing a message in the
 * specified band.
 */
void
qbackenable(queue_t *q, int band)
{
	int backenab = 0;

	if (band == 0 && (q->q_flag & QWANTW) == 0)
		return;

	if (band == 0) {
		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
		    q->q_mblkcnt < q->q_lowat)) {
			backenab = q->q_flag & QWANTW;
		}
	} else {
		...
	}

	if (backenab & QWANTW) {
		q->q_flag &= ~QWANTW;
		backenable(q, band);
        }
}

backenable()

The backenable() is a STREAMS internal function that finds the nearest back queue with service procedure and enables it. The enabling involves arranging the service routine to be run sometime in the future. It is handled by the qenable_locked() function that is beyond the scope of this blog.

/*
/*
 * enable first back queue with svc procedure.
 * Use pri == -1 to avoid the setqback
 */
void
backenable(queue_t *q, int pri)
{
	queue_t	*nq;

	/* find nearest back queue with service proc */
	for (nq = backq(q); nq && !nq->q_qinfo->qi_srvp; nq = backq(nq))
		;

	if (nq) {
		if (pri != -1)
			setqback(nq, pri);
		qenable_locked(nq);
	}
}

Conclusion

The main purpose of this blog is to link the theoretical discussion of the STREAMS flow control with the actual code. A few lines in C is sometimes worth many words, but words do help to understand what this C code actually does.
Technorati Tag:
Technorati Tag:
Technorati Tag:
Technorati Tag:

( Jul 27 2005, 05:16:20 PM PDT ) Permalink Comments [10]

Trackback URL: http://blogs.sun.com/akolb/entry/streams_flow_control_implementation
Comments:

Great! I really learned a lot from your blogs!

Posted by Yu Xiangning on August 02, 2005 at 02:02 AM PDT #

Thank you for your blog. It is very informative and very readable. I trying to use your first (very simple) streams example to trace streams data exchange between ip module and others at Solaris 5.9 and experiencing some problems. I trying to do it by the following way: 1 - I configuring nullmod to be loaded automatically when somebody opens ip module by autopush. 2 - I simply running netstat -ns to receive list of routes (It opens ip module as well by output of truss utility) and netstat stucks during call of open function. After reboot netstat works well, so I think the problem is in this module or in a fact of its existance. What am I doing not right ? Is your sample STREAMS module suitable for such kind of work ? Thank you in advance.

Posted by Michael Atlas on July 12, 2006 at 11:06 PM PDT #

Hi Michael, Have you used the <code>nullmod</code> module provided here without any modifications? Does it work if you push it on your <code>stdin</code>?

Posted by Alexander Kolbasov on July 13, 2006 at 06:40 PM PDT #

I did some code modifications from yesterday, to avoid some problems(result is the same, the module works fine on ethernet adapter ( eri0 ), and stdin but doesn't work on ip device, and the trouble is in open function: If I pushing this module on already opened ip device in my socket program everything works fine). Modifications are: I updated high and low water-marks up to something 128 and 128 * 1024 that should be enough. I added support for servce routines both for write and read queues to support flow control. I applied perimeter ( as you discussed with Yu Xiangning , by adding D_MP | D_MTPERQ flags to module initialization ) , so the module passed to be passmode, your second example with fixed problems. I added some logging by cmn_err function. Result remains as was: netstat stucks on opening ip device by truss output. If the module chained on ip after open by ioctl I_PUSH all the requests passing between module instances free and without any problem. Thank you very much for your answer.

Posted by Michael Atlas on July 14, 2006 at 02:05 AM PDT #

Mike, netstat does some crazy things with STREAMS plumbing. Let's try to figure out what is going on. Once you have your netstat stuck, open another window and run 'mdb -k' as super-user. You should be able to find all instances of nullmod by doing ::queue -m nullmod The following command will show you STREAMS configuration for every STREAM that has nullmod in it: ::queue -m nullmod |::q2stream |::stream I think that one of these will stand out and you may be able to find something. As for your module modification, does the nullmod provided (which has putnext() as its read and write put procedures) without any modifications also blocks netstat?

Posted by Alexander Kolbasov on July 14, 2006 at 09:57 AM PDT #

Good time of the day, Alex. Agter running netstat there is only one instance of nullmod. I detected strange bit in Q flags _Q_NEXTLESS. (I didn't find it in STREAMS manual) After setting this "experimental" flag to module flags (_D_QNEXTLESS) and removing service read and write routines it stucks yet, but in another point - ioctl (x, I_PUSH, "arp"). Original (nullmod) module doesn't work (stucks netstat) neither with _D_QNEXTLESS flag nor without. Counters are 0 for all participating queues, so I don't think there was some missed message. Thanks for an answer.

Posted by Michael Atlas on July 17, 2006 at 06:17 AM PDT #

Mike, don't set _D_QNEXTLESS. Let's try to debug this off-line via e-mail to alexander.kolbasov at sun.com. Please send me the STREAM configuration of the nullmod stream and (if you can get it) the stack trace of the netstat process. You can get it by running
::ps
in mdb and finding netstat there. Then, find the process address (the ADDR column) and run
<addr> :: ps -t
This will print the thread pointer. Run
<addr> ::findstack
This should print the stack trace.

Posted by Alexander Kolbasov on July 17, 2006 at 02:59 PM PDT #

Was there any resolution? Having nullmod which would be able to sit between selected modules and just trace messages would be very handy, especially on S9 and earlier.

Thanks

--
Vlad

Posted by Vladimir Marek on March 18, 2008 at 08:09 AM PDT #

I forgot to tick "Notify me by email", so I have to create another note. Sorry.

--

Vlad

Posted by Vladimir Marek on March 18, 2008 at 08:10 AM PDT #

This was a while ago _ I don't remember whether we diagnosed this - probably not.

BTW, the _D_QNEXTLESS flag was present until S10 and it was used for some important optimizations for TCP/IP. Pushing nullmod on top of IP will have a performance impact on IP in pre-S10 version sof Solaris. If you would like to push nullmod on top of ip in pre-S10 you may need to specify _D_QNEXTLESS as well.

Posted by Alexander Kolbasov on March 18, 2008 at 12:10 PM PDT #

Post a Comment:

Name:
E-Mail:
URL:

Your Comment:

HTML Syntax: NOT allowed

Calendar

RSS Feeds

Search

Links

Navigation

Referers