/*
 * Copyright (c) 2009 Washington University in St. Louis.
 * All rights reserved
 *
 *  Redistribution and use in source and binary forms, with or without
 *  modification, are permitted provided that the following conditions
 *  are met:
 *    1. Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *    2. Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in the
 *       documentation and/or other materials provided with the distribution.
 *    3. The name of the author or Washington University may not be used 
 *       to endorse or promote products derived from this source code 
 *       without specific prior written permission.
 *    4. Conditions of any other entities that contributed to this are also
 *       met. If a copyright notice is present from another entity, it must
 *       be maintained in redistributions of the source code.
 *
 * THIS INTELLECTUAL PROPERTY (WHICH MAY INCLUDE BUT IS NOT LIMITED TO SOFTWARE,
 * FIRMWARE, VHDL, etc) IS PROVIDED BY THE AUTHOR AND WASHINGTON UNIVERSITY 
 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR WASHINGTON UNIVERSITY 
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 * ARISING IN ANY WAY OUT OF THE USE OF THIS INTELLECTUAL PROPERTY, EVEN IF 
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * */
/*
 * File: shaper++.c  
 * Author: Ken Wong
 * Email: kenw@arl.wustl.edu
 * Organization: Applied Research Laboratory
 * 
 * Derived from: priq.c and setNxtBlk.c
 *
 * 	Modified control msgs; changed queueing to be more like delay
 * 	plugin queueing.
 *
 * Date Created: 2/24/2009 
 * 
 * Description: traffic shaper using token bucket
 *
 * Modification History:
 * 	3/19/09	version v0
 *
 */


#include <memory.h>
#include "plugin_api.h"
#include "plugin_dl.h"

#include "scratch_rings_WU.h"
#include "sram_rings_WU.h"


//-----------------------------------------------------------
// constants
//-----------------------------------------------------------

			// defaults
#ifndef TEST_MODE
    #define DEF_RATE	1000	// 1 Mbps
#else
    #define DEF_RATE	100	// 100 Kbps (when in test mode)
#endif
#define DEF_BUCKETSZ	3000	// Bytes

			// plugin counters
#define	PKT_COUNT	0	// #pkts received by handle_pkt_user()
#define CB_COUNT	1	// #pkts sent by callback()
#define ERR_COUNT	3	// #errors

//-----------------------------------------------------------
// typedefs, unions, enums
//-----------------------------------------------------------
union tm_tag {
    long long	tm;
    struct {
	unsigned long	hi;
	unsigned long	lo;
    }	tm2;
};

// sizeof(struct item_tag) = 32 ==> 32,768 items in 1 MB
struct item_tag {
    plugin_out_data	metapkt;;
    unsigned int	iplen;
    struct item_tag	*next;
};

struct queue_tag {
    unsigned long	npkts;		// #pkts in queue
    unsigned long	nbytes;		// #bytes in queue
    unsigned long	maxinq;		// max #pkts in queue
    unsigned long	ndrops;		// #overflows from queue
    unsigned long	nerrs;		// #errors other than drops
    struct item_tag	*hd;		// head ptr
    struct item_tag	*tl;		// tail ptr
    struct item_tag *free_hd;		// free list
};


//-----------------------------------------------------------
// Global variables/Registers
//-----------------------------------------------------------
//
// >> thread-specific globals <<
//
__declspec(gp_reg) int dlNextBlock;  // where to send packets to next
__declspec(gp_reg) int dlFromBlock;  // where to get packets from
__declspec(gp_reg) int msgNextBlock; // where to send control messages to next
__declspec(gp_reg) int msgFromBlock; // where to get control messages from

// see ring_formats.h for struct definitions
volatile __declspec(gp_reg) plc_plugin_data ring_in;	// ring data from PLC
volatile __declspec(gp_reg) plugin_out_data ring_out;	// ring data to nxt blk

const unsigned int SLEEP_CYCLES = 14000;	// cycles between
						//   callbacks (10 usec)
__declspec(gp_reg) unsigned int pluginId;	// plugin id (0...7)

// >> user globals <<
__declspec(shared gp_reg) unsigned int npkts;	// total #pkts
__declspec(shared gp_reg) unsigned int nsent;	// total #pkts sent by callback()
__declspec(shared gp_reg) unsigned int maxinq;	// max #pkts queued
__declspec(shared gp_reg) unsigned int ndrops;	// total #pkts dropped
__declspec(shared gp_reg) unsigned int debug_on;

__declspec(shared sram) unsigned int rate_Kbps;	// average target rate
__declspec(shared gp_reg) unsigned int bucketsz; // bucket size (bytes)
__declspec(shared gp_reg) union tm_tag told;	// last callback time
__declspec(shared gp_reg) unsigned int token_cnt; // 1 token = 0.01 bit

//------
#define UNLOCKED 0
#define LOCKED   1
__declspec(shared gp_reg) unsigned int queue_lock;

#define	MAX_QUEUE_SZ	32000
__declspec(shared sram) struct queue_tag queue;


//------


#include "plugin_helpers.h"

//-----------------------------------------------------------
// Function prototypes
//-----------------------------------------------------------
void handle_pkt_user();
void handle_msg_user();
void plugin_init_user();
int queue_init( __declspec(shared, sram) struct queue_tag *qptr );

static void wait_packet_signal(SIGNAL *);
static void send_packet_signal(SIGNAL *);

// forward reference
static __forceinline int
helper_send_from_queue_to_x(
			__declspec(shared sram) struct queue_tag *qptr,
			__declspec(gp_reg) int dlNextBlock );
static __forceinline int
helper_send_from_queue(	__declspec(shared sram) struct queue_tag *qptr,
			__declspec(gp_reg) int dlNextBlock );
struct item_tag * queue_alloc( __declspec(shared, sram) struct queue_tag *qptr );
void queue_free( __declspec(shared, sram) struct queue_tag *qptr, struct item_tag *item );
//struct item_tag * queue_alloc( struct queue_tag *queue );
//void queue_free( struct queue_tag *queue, struct item_tag *item );
int queue_enq(	__declspec(shared, sram) struct queue_tag *qptr,
		volatile __declspec(gp_reg) plc_plugin_data ring_in,
    		__declspec(local_mem) unsigned int iplen );
int queue_pop( __declspec(shared sram) struct queue_tag *qptr );


//-----------------------------------------------------------
// New helper functions 
//-----------------------------------------------------------

#define	FormRawQid(out_port,qid)	((((out_port)+1) << 13) | qid)

// handle errors
#define	BAD_QUEUE_INIT_ERR	1	// bad queue_init()
#define	BAD_ENQ_ERR		2	// bad queue_enq()
#define	BAD_POP_EMPTY_ERR	3	// bad queue_pop() - empty queue
#define	BAD_POP_FREE_ERR	4	// bad queue_pop() - free() failed

__declspec(shared gp_reg) unsigned int nerrs;		// #errors
volatile __declspec(shared sram) unsigned int errno[5];	// 1st 5 errors

// record error number
//									<<<<<
static __forceinline void
helper_set_errno( __declspec(local_mem) unsigned int n ) {
    if( nerrs < 5 )	errno[nerrs] = n;
    ++nerrs;
    onl_api_plugin_cntr_inc(pluginId, 0);	// external error counter
}

// set ring_out qid given output port# and external qid
//									<<<<<
static __forceinline void
helper_set_meta_qid(	__declspec(gp_reg) unsigned int out_port,
			__declspec(gp_reg) unsigned int xqid ) {
    ring_out.plugin_qm_data_out.qid = (out_port+1 << 13) | xqid;
}

// reset global counters
//									<<<<<
static __forceinline void
reset_counters( void ) {
    npkts = 0;
    nsent = 0;
    maxinq = 0;
    ndrops = 0;
    token_cnt = 0;

    nerrs = 0;
    errno[0] = 0;	errno[1] = 0;	errno[2] = 0;	errno[3] = 0;
    errno[4] = 0;

    sleep( SLEEP_CYCLES );		// not sure if I need this
    helper_plugin_cntr_zero( PKT_COUNT );
    helper_plugin_cntr_zero( CB_COUNT );
    helper_plugin_cntr_zero( ERR_COUNT );
}


// Hard coded to send meta-pkts to plugin ME 4
// Used in place of helper_send_from_queue() while debugging
//
static __forceinline int
helper_send_from_queue_to_x(
			__declspec(shared sram) struct queue_tag *qptr,
			__declspec(gp_reg) int dlNextBlock ) {
    int			rc;

    // ASSUME dlNextBlock == plugin ME 4
    {
	sram_ring_put_buffer_6word( PLC_TO_PLUGIN_4_SRAM_RING,
						qptr->hd->metapkt.i, 0 );
    }

    rc = queue_pop( qptr );
    if( rc == -1 ) {
    	helper_set_errno( BAD_POP_EMPTY_ERR );
	onl_api_plugin_cntr_inc(pluginId, ERR_COUNT);
    } else if( rc == -2 ) {
    	helper_set_errno( BAD_POP_FREE_ERR );
	onl_api_plugin_cntr_inc(pluginId, ERR_COUNT);
    }

    return 0;
}


// Same as dl_sink_packet() but don't do any signalling
//
// set ring_out data from item in queue data.
// 	return 0 if OK; -1 otherwise
//
static __forceinline int
helper_send_from_queue(	__declspec(shared sram) struct queue_tag *qptr,
			__declspec(gp_reg) int dlNextBlock ) {
    int			rc;

    if( dlNextBlock == QM ) {
	plugin_out_data	my_ring_out;	// ring data to next block
    	__declspec(gp_reg) int	out_port;

	out_port = (qptr->hd->metapkt.plugin_plugin_data_out.uc_mc_bits >> 3)
			& 0x7;
	my_ring_out.plugin_qm_data_out.out_port		= out_port;
	my_ring_out.plugin_qm_data_out.qid		=
		qptr->hd->metapkt.plugin_plugin_data_out.qid;
	my_ring_out.plugin_qm_data_out.l3_pkt_len	=
		qptr->hd->metapkt.plugin_plugin_data_out.l3_pkt_len;
	my_ring_out.plugin_qm_data_out.buf_handle_lo24	=
		qptr->hd->metapkt.plugin_plugin_data_out.buf_handle_lo24;
	scr_ring_put_buffer_3word( PLUGIN_TO_QM_RING, my_ring_out.i, 0 );
    } else if( dlNextBlock == PACKET_IN_RING_0 )	return -1;
    else if(	(dlNextBlock == PACKET_IN_RING_1)  ||
		(dlNextBlock == PACKET_IN_RING_2)  ||
		(dlNextBlock == PACKET_IN_RING_3)  ||
		(dlNextBlock == PACKET_IN_RING_4)  ) {
	if( dlNextBlock == PACKET_IN_RING_1 )
	    sram_ring_put_buffer_6word( PLC_TO_PLUGIN_1_SRAM_RING,
						qptr->hd->metapkt.i, 0 );
	else if( dlNextBlock == PACKET_IN_RING_2 )
	    sram_ring_put_buffer_6word( PLC_TO_PLUGIN_2_SRAM_RING,
						qptr->hd->metapkt.i, 0 );
	else if( dlNextBlock == PACKET_IN_RING_3 )
	    sram_ring_put_buffer_6word( PLC_TO_PLUGIN_3_SRAM_RING,
						qptr->hd->metapkt.i, 0 );
	else if( dlNextBlock == PACKET_IN_RING_4 )
	    sram_ring_put_buffer_6word( PLC_TO_PLUGIN_4_SRAM_RING,
						qptr->hd->metapkt.i, 0 );
    } else {					// all other options
    	return -1;
    }

    rc = queue_pop( qptr );
    if( rc == -1 ) {
    	helper_set_errno( BAD_POP_EMPTY_ERR );
	onl_api_plugin_cntr_inc(pluginId, ERR_COUNT);
    } else if( rc == -2 ) {
    	helper_set_errno( BAD_POP_FREE_ERR );
	onl_api_plugin_cntr_inc(pluginId, ERR_COUNT);
    }

    return 0;
}


// Same as dl_sink_packet() but don't do any signalling and assume pkt
// goes to QM
// Used during debugging
//
static __forceinline void
helper_send_from_queue_to_QM( __declspec(shared,sram) struct queue_tag *qptr ) {
    plugin_out_data	my_ring_out;	// ring data to next block
    __declspec(gp_reg) int	out_port;
    int			rc;

    out_port = (qptr->hd->metapkt.plugin_plugin_data_out.uc_mc_bits >> 3) & 0x7;
    my_ring_out.plugin_qm_data_out.out_port	= out_port;
    my_ring_out.plugin_qm_data_out.qid		=
		qptr->hd->metapkt.plugin_plugin_data_out.qid;
    my_ring_out.plugin_qm_data_out.l3_pkt_len	=
		qptr->hd->metapkt.plugin_plugin_data_out.l3_pkt_len;
    my_ring_out.plugin_qm_data_out.buf_handle_lo24	=
		qptr->hd->metapkt.plugin_plugin_data_out.buf_handle_lo24;

    rc = queue_pop( qptr );
    if( rc == -1 ) {
    	helper_set_errno( BAD_POP_EMPTY_ERR );
	onl_api_plugin_cntr_inc(pluginId, ERR_COUNT);
    } else if( rc == -2 ) {
    	helper_set_errno( BAD_POP_FREE_ERR );
	onl_api_plugin_cntr_inc(pluginId, ERR_COUNT);
    }

#ifdef DEBUG3
    if( debug_on )	helper_check_meta( my_ring_out );	// DEBUG3
#endif

    scr_ring_put_buffer_3word( PLUGIN_TO_QM_RING, my_ring_out.i, 0 );
}

//-----------------------------------------------------------
// Begin Normal Functions
//-----------------------------------------------------------
//									<<<<<
void handle_pkt_user( )  {
    __declspec(gp_reg) buf_handle_t buf_handle;
    __declspec(gp_reg) onl_api_buf_desc	bufDescriptor;
    __declspec(local_mem) unsigned int	bufDescPtr;
    __declspec(local_mem) unsigned int	ipv4HdrPtr;
    __declspec(local_mem) unsigned int	dramBufferPtr;
    __declspec(gp_reg) onl_api_ip_hdr	ipv4_hdr;
    unsigned long	ninq;		// #pkts queued
 
    // accounting
    ++npkts;
    onl_api_plugin_cntr_inc(pluginId, PKT_COUNT);

    // prepare to read IPv4 header
    onl_api_get_buf_handle(&buf_handle);			// rd handle
    bufDescPtr = onl_api_getBufferDescriptorPtr(buf_handle);	// descr addr
    onl_api_readBufferDescriptor(bufDescPtr, &bufDescriptor);	// rd descriptor
    dramBufferPtr = onl_api_getBufferPtr(buf_handle);		// dram addr
    ipv4HdrPtr = onl_api_getIpv4HdrPtr(dramBufferPtr, bufDescriptor.offset);
    onl_api_readIpv4Hdr(ipv4HdrPtr, &ipv4_hdr);

    ninq = queue_enq( &queue, ring_in, ipv4_hdr.ip_len );

    if( ninq == -1 ) {
    	helper_set_errno( BAD_ENQ_ERR );
	onl_api_plugin_cntr_inc(pluginId, ERR_COUNT);
	++ndrops;
	helper_set_out_to_DROP( );
	return;
    } else {
	if ( ninq > maxinq )	maxinq = ninq;
	helper_set_out_to_DO_NOTHING( );
    }
}

//									<<<<<
void handle_msg_user(){}				// NOT USED

//									<<<<<
void plugin_init_user()
{
    if(ctx() == 0)
    {
	rate_Kbps = DEF_RATE;
	bucketsz  = DEF_BUCKETSZ;
	reset_counters( );
	debug_on = 0;

	queue_lock = UNLOCKED;
	if( queue_init( &queue ) != 0 ) {
	    helper_set_errno( BAD_QUEUE_INIT_ERR );
	}

#ifdef DEBUGX
	__set_timestamp( 0 );
#endif
    }

    // plugin chain
    if( pluginId == 0 )		dlNextBlock = PACKET_IN_RING_1;
    else if( pluginId == 1 )	dlNextBlock = PACKET_IN_RING_2;
    else if( pluginId == 2 )	dlNextBlock = PACKET_IN_RING_3;
    else if( pluginId == 3 )	dlNextBlock = PACKET_IN_RING_4;
    else			dlNextBlock = QM;
}



/**
	----------------------------------------------------------------
 @User: YOU SHOULD NOT NEED TO MAKE ANY CHANGES TO THE REST OF THIS FILE
	----------------------------------------------------------------
*/


/* handle packets */
//									<<<<<
void handle_pkt()
{
    dl_source_packet( dlFromBlock );
  
    handle_pkt_user( );

    dl_sink_packet( dlNextBlock );
}


/* handle control messages */
//									<<<<<
// op codes:
//   set:
//   	params=	rate_Kbps bucketsz(bytes)
//   		set traffic shaper parameters (rate_Kbps, bucketsz)
//   get:
//   	=vers	display version number
//   	=params	display parameters (rate_Kbps, bucketsz, counter)
//	=counts	display counts (npkts, maxinq, nerrs)
//	=errno	display errno[0], ... , errno[4]
//   miscellaneous:
//	reset	reset npkts[], ndrops[], errno[] counters, etc.
//	debug	toggle debug_on
//
void handle_msg()
{
    // assume messages are at most 8 words for now
    __declspec(gp_reg) unsigned int i;
    __declspec(gp_reg) unsigned int message[8];
    __declspec(gp_reg) onl_api_ctrl_msg_hdr hdr;
    __declspec(local_mem) char inmsgstr[28];			// inbound
    __declspec(local_mem) char outmsgstr[28];			// outbound
    __declspec(local_mem) char lmem_tmpstr[8];
    __declspec(sram) char sram_inmsgstr[28];
    __declspec(sram) char vers[4] = "1.0";

    char SET_params[8]	= "params=";
    char GET_params[8]	= "=params";
    char GET_vers[8]	= "=vers";
    char GET_counts[8]	= "=counts";
    char GET_errno[8]	= "=errno";
    char RESET[8]	= "reset";
    char DEBUG_op[8]	= "debug";

    char OK_msg[4]	= "OK";
    char BAD_OP_msg[8]	= "BAD OP";
    char NEED_ARG_msg[12]= "NEED ARG";

    // expand for-loop to get rid of the compiler error:
    //					"Incorrect use of register variable
    message[0] = 0;
    message[1] = 0;
    message[2] = 0;
    message[3] = 0;
    message[4] = 0;
    message[5] = 0;
    message[6] = 0;
    message[7] = 0;

    dl_source_message(msgFromBlock, message);

    hdr.value = message[0];
    if( hdr.type != CM_CONTROLMSG )	return;
    if( hdr.response_requested != 1 )	return;

    onl_api_intarr2str( &message[1], inmsgstr );

    outmsgstr[0] = '\0';
    memcpy_sram_lmem( sram_inmsgstr, inmsgstr, 28 );

    if( strncmp_sram(sram_inmsgstr, GET_vers, 5) == 0 ) {
	memcpy_lmem_sram( outmsgstr, (void *)vers, 4 );
    } else if( strncmp_sram(sram_inmsgstr, GET_params, 7) == 0 ) {
	helper_sram_outmsg_3ul( rate_Kbps, bucketsz, 0, outmsgstr );
    } else if( strncmp_sram(sram_inmsgstr, GET_counts, 7) == 0 ) {
	helper_sram_outmsg_3ul( npkts, maxinq, nerrs, outmsgstr );
    } else if( strncmp_sram(sram_inmsgstr, GET_errno, 6) == 0 ) {
	helper_sram_outmsg_5ul( errno[0], errno[1], errno[2], errno[3],
							errno[4], outmsgstr );
    } else if( strncmp_sram(sram_inmsgstr, SET_params, 7) == 0 ) {
    	char	*cmnd_word;		// points to input command field
    	char	*rate_word;		// points to input rate(Kbps) field
    	char	*bucketsz_word;		// points to input bucketsz(bytes) field
	unsigned int	nwords;

	nwords = helper_count_words( sram_inmsgstr );
	if( nwords != 3 ) {
	    memcpy_lmem_sram( outmsgstr, NEED_ARG_msg, 12 );
	} else {
	    cmnd_word = helper_tokenize( sram_inmsgstr );	// get command
	    rate_word = helper_tokenize( cmnd_word+strlen(cmnd_word)+1 );
	    bucketsz_word = helper_tokenize( rate_word+strlen(rate_word)+1 );

	    rate_Kbps = helper_atou_sram( rate_word );
	    bucketsz = helper_atou_sram( bucketsz_word );
	    helper_sram_outmsg_2ul( rate_Kbps, bucketsz, outmsgstr );
	}
    } else if( strncmp_sram(sram_inmsgstr, RESET, 5) == 0 ) {
	reset_counters( );
    } else if( strncmp_sram(sram_inmsgstr, DEBUG_op, 5) == 0 ) {
    	debug_on = (debug_on+1) & 0x1;
	helper_sram_outmsg_1ul( debug_on, outmsgstr );
    } else {
	memcpy_lmem_sram( outmsgstr, BAD_OP_msg, 8 );
    }

    if( onl_api_str2intarr(outmsgstr, &message[1]) < 0 )	return;

    hdr.type = CM_CONTROLMSGRSP;
    hdr.response_requested = 0;
    hdr.num_words = 7;
    message[0] = hdr.value;

    dl_sink_message(msgNextBlock, message);
}

// handle periodic functionality
//									<<<<<
// Called ABOUT every 10 usec
//    - We can only guarantee that this thread will not get control sooner
//   	than 10 usec.
// Min rate of 1 Kbps and 1 token = 0.0001 bits ==>
// 	Add about 100 tokens every callback.  Although we could have chosen
// 	1 token = 0.01 bits, our choice allows greater accuracy.
// Computing number of tokens to add:
//	Let	T = elapsed time in nsec since last token update
//		R = avg rate of regulator (Kbps)
//
//	token_cnt' = token_cnt + T*R/100
//
//	For R = 1 Mbps and T about 10 usec,
//
//	token_cnt' = token_cnt + (10,000,000*1)/100 = token_cnt + 100,000
//
#define TOKENS_PER_BYTE	80000		// 1 token = 0.0001 bits
void callback()
{
    __declspec(gp_reg) unsigned int	pktlen_tokens;
    union tm_tag	tnow;
    long long		tdiff_nsec;
    int			rc;

	// update token counter
    tnow.tm2.lo = local_csr_read( local_csr_timestamp_low );
    tnow.tm2.hi = local_csr_read( local_csr_timestamp_high );
    tdiff_nsec = diff_nsec( tnow.tm, told.tm );
    token_cnt = token_cnt + (tdiff_nsec*rate_Kbps)/100;
    if( token_cnt > TOKENS_PER_BYTE*bucketsz )
    	{ token_cnt = TOKENS_PER_BYTE*bucketsz; }
    told.tm = tnow.tm;
	// forward pkts as long as there are enough tokens
    while( queue.npkts > 0 ) {
	pktlen_tokens = TOKENS_PER_BYTE*queue.hd->iplen;
	if( token_cnt >= pktlen_tokens ) {	// fwd first pkt
	    onl_api_plugin_cntr_inc(pluginId, CB_COUNT);
#ifdef DEBUG1
helper_sram_dbgmsg_3ul( token_cnt, pktlen_tokens, nsent );
helper_sram_dbgmsg_3ul( queue.npkts, dlNextBlock, 0 );
#endif
// used in early testing
//XXX	    helper_send_from_queue_to_QM( &queue );
//XXX	    rc = helper_send_from_queue_to_x( &queue, dlNextBlock );
	    rc = helper_send_from_queue( &queue, dlNextBlock );
	    token_cnt -= pktlen_tokens;
	    if( rc == 0 ) {
		++nsent;
#ifdef DEBUG1
helper_sram_dbgmsg_3ul( token_cnt, pktlen_tokens, nsent );
helper_sram_dbgmsg_3ul( queue.npkts, 0, 0 );
#endif
	    } else {
		onl_api_plugin_cntr_inc(pluginId, ERR_COUNT);
		break;
	    }
	} else	break;
    }

    sleep( SLEEP_CYCLES );
}


/* take care of any setup that needs to be done before processing begins */
//									<<<<<
void plugin_init()
{
  /* set the default next block to be the Queue Manager */
  dlNextBlock = QM;

  /* by default, get packets and get and put control messages from input rings
   * based on which microengine we are currently running on; this assumes a
   * default one to one mapping */
  switch(__ME())
  {
    case 0x7:
      pluginId = 0;
      dlFromBlock  = PACKET_IN_RING_0;
      msgFromBlock = MESSAGE_IN_RING_0;
      msgNextBlock = MESSAGE_OUT_RING_0;
      break;
    case 0x10:
      pluginId = 1;
      dlFromBlock  = PACKET_IN_RING_1;
      msgFromBlock = MESSAGE_IN_RING_1;
      msgNextBlock = MESSAGE_OUT_RING_1;

      break;
    case 0x11:
      pluginId = 2;
      dlFromBlock  = PACKET_IN_RING_2;
      msgFromBlock = MESSAGE_IN_RING_2;
      msgNextBlock = MESSAGE_OUT_RING_2;  
      break;
    case 0x12:
      pluginId = 3;
      dlFromBlock  = PACKET_IN_RING_3;
      msgFromBlock = MESSAGE_IN_RING_3;
      msgNextBlock = MESSAGE_OUT_RING_3;    
      break;
    case 0x13:
      pluginId = 4;
      dlFromBlock  = PACKET_IN_RING_4;
      msgFromBlock = MESSAGE_IN_RING_4;
      msgNextBlock = MESSAGE_OUT_RING_4;
      break;
    default:  // keep the compiler happy
      pluginId = 0;
      dlFromBlock  = PACKET_IN_RING_0;
      msgFromBlock = MESSAGE_IN_RING_0;
      msgNextBlock = MESSAGE_OUT_RING_0;
      break;
  }

  plugin_init_user(); // user hook
}


/* entry point */
//									<<<<<
void main()
{
  int c;

  /* do initialization */
  plugin_init();
  dl_sink_init();
  dl_source_init();

  /* get the current thread's context number (0-7) */
  c = ctx();

  if(c >= FIRST_PACKET_THREAD && c <= LAST_PACKET_THREAD)
  {
    while(1)
    {
      handle_pkt();
    }
  }
#ifdef MESSAGE_THREAD
  else if(c == MESSAGE_THREAD)
  {
    while(1)
    {
      handle_msg();
    }
  }
#endif
#ifdef CALLBACK_THREAD
  else if(c == CALLBACK_THREAD)
  {
    while(1)
    {
      callback();
    }
  }
#endif
}

// --------------------------------------------------------------------------
// queueing functions
//
//	queue_init		initialize free list and queue descriptor
//	queue_enq		enqueue an item onto a queue
//	queue_pop		pop an item from a queue
//	queue_alloc		allocate space for an item from the free list
//	queue_free		put an item back onto the free list
//
// --------------------------------------------------------------------------

// initialize queue
//									<<<<<
int
queue_init( __declspec(shared, sram) struct queue_tag *qptr ) {
    int		i;
    int		K = MAX_QUEUE_SZ-1;
    struct item_tag *item_ptr;

    if ( pluginId == 0)		item_ptr = (struct item_tag *) 0xC0100000;
    else if ( pluginId == 1)	item_ptr = (struct item_tag *) 0xC0200000;
    else if ( pluginId == 2)	item_ptr = (struct item_tag *) 0xC0300000;
    else if ( pluginId == 3)	item_ptr = (struct item_tag *) 0xC0400000;
    else if ( pluginId == 4)	item_ptr = (struct item_tag *) 0xC0500000;
    else	return -1;

    qptr->free_hd = item_ptr;
    qptr->hd = qptr->tl = 0;
    qptr->npkts = 0;
    qptr->nbytes = 0;
    qptr->maxinq = 0;
    qptr->ndrops = 0;
    qptr->nerrs = 0;

    (item_ptr+K)->next = 0;

    for (i=0; i<K; i++) {
    	item_ptr->next = item_ptr+1;
	++item_ptr;
    }

    return 0;
}

// insert item at end of queue
// 	return number of items if OK; else -1
int
queue_enq(	__declspec(shared, sram) struct queue_tag *qptr,
		volatile __declspec(gp_reg) plc_plugin_data ring_in,
    		__declspec(local_mem) unsigned int iplen ) {
    struct item_tag	*item;

    while( queue_lock == LOCKED )	ctx_swap();
    queue_lock = LOCKED;

	item = queue_alloc( &queue );
	if( item == 0 ) {
	    ++qptr->ndrops;
	    return -1;
	}

	item->metapkt.i[0] = ring_in.i[0];
	item->metapkt.i[1] = ring_in.i[1];
	item->metapkt.i[2] = ring_in.i[2];
	item->metapkt.i[3] = ring_in.i[3];
	item->metapkt.i[4] = ring_in.i[4];
	item->metapkt.i[5] = ring_in.i[5];
	item->iplen = iplen;

	if( qptr->npkts == 0 )	qptr->hd = item;
	else			qptr->tl->next = item;
	qptr->tl = item;

	++(qptr->npkts);
	if( qptr->npkts > qptr->maxinq )	qptr->maxinq = qptr->npkts;

    queue_lock = UNLOCKED;

    return qptr->npkts;
}

// pop front of list
int
queue_pop( __declspec(shared, sram) struct queue_tag *qptr ) {
    struct item_tag	*item;

    while( queue_lock == LOCKED )	ctx_swap();
    queue_lock = LOCKED;

	if( qptr->npkts <= 0 ) {
	    ++qptr->nerrs;
	    return -1;
	}

	item = qptr->hd;
	qptr->hd = item->next;
	--(qptr->npkts);
	if( qptr->npkts == 0 )	qptr->tl = 0;
	queue_free( qptr, item );

    queue_lock = UNLOCKED;
    return 0;
}

// allocate an item
struct item_tag *
queue_alloc( __declspec(shared, sram) struct queue_tag *qptr ) {
    struct item_tag *item;

    if( qptr->free_hd == 0 )	return 0;

    item = qptr->free_hd;
    qptr->free_hd = item->next;
    return item;
}

// free an item
void
queue_free(	__declspec(shared, sram) struct queue_tag *qptr,
		struct item_tag *item ) {
    if( item == 0 )	return;
    item->next = qptr->free_hd;
    qptr->free_hd = item;
}

