test_handler.c 2.87 KB
#include <stdio.h>
#include <string.h>
#include <inttypes.h>

#include "trbase.h"
#include "trcomm.h"
#include "trevent.h"
#include "test_handler.h"

static
TR_EventDone
testHandlerHeartbeat(TR_EventHandler this, TR_Event event)
{   
	double size      = (double)((TestHandler)this)->size;
	double size_msg  = ((TestHandler)this)->size
		? size / ((TestHandler)this)->handled
		: 0.0;
	int    div_count = 0;

	while (size > 1024. && div_count != 'G') {
		size /= 1024.;
		switch (div_count) {
			case   0: div_count = 'K'; break;
			case 'K': div_count = 'M'; break;
			case 'M': div_count = 'G'; break;
		}
	}

    printf("%zd beat(s) since last beat / "
			"handled: %llu msg/s %4.2lf %cBytes/s %4.0lf Bytes/msg\n",
			((TR_EventDispatcher)event->subject)->n_beats,
			((TestHandler)this)->handled,
			size, div_count, size_msg);
	((TestHandler)this)->handled = 0;
	((TestHandler)this)->size    = 0;

	return TR_EVENT_DONE;
}   

static
TR_EventDone
testHandlerNewMessage(TR_EventHandler this, TR_Event event)
{   
	TR_Event           _event;
	TR_ProtoMessageRaw message = event->data;

	((TestHandler)this)->handled++;
	((TestHandler)this)->size += ((TR_SizedData)message->data)->size;

	_event = TR_eventSubjectEmit(
			event->subject,
			TR_CEP_EVENT_MSG_READY,
			event->data);

	TR_eventHandlerIssueEvent((TR_EventHandler)this, _event);

	return TR_EVENT_DONE;
}   

static
TR_EventDone
testHandlerClose(TR_EventHandler this, TR_Event event)
{   
//    puts("close");

	return TR_EVENT_PENDING;
}   

#if 0
static
TR_EventDone
testHandlerUpgrade(TR_EventHandler this, TR_Event event)
{
    printf("upgrade: %"PRIdPTR"\n", event->id);

	return TR_EVENT_PENDING;
}
#endif

static
int
testHandlerCtor(void * _this, va_list * params)
{
	TR_PARENTCALL(TestHandler, _this, TR_Class, ctor, params);
	((TestHandler)_this)->handled = 0;

	return 0;
}

static
void
testHandlerDtor(void * _this, va_list * params)
{
	TR_PARENTCALL(TestHandler, _this, TR_Class, dtor);
}

static
void
testHandlerCvInit(TR_class_ptr class)
{
    TR_EVENT_HANDLER_SET_METHOD(
            class,
            TR_EventDispatcher,
            TR_DISPATCHER_EVENT_HEARTBEAT,
            testHandlerHeartbeat);
    TR_EVENT_HANDLER_SET_METHOD(
            class,
            TR_CommEndPoint,
            TR_CEP_EVENT_NEW_MSG,
            testHandlerNewMessage);
    TR_EVENT_HANDLER_SET_METHOD(
            class,
            TR_CommEndPoint,
            TR_CEP_EVENT_CLOSE,
            testHandlerClose);
#if 0
    TR_EVENT_HANDLER_SET_METHOD(
            class,
            TR_CommEndPoint,
            TR_CEP_EVENT_UPGRADE,
            testHandlerUpgrade);
#endif
}

TR_INSTANCE(TR_Hash, testHandlerEventMethods);
TR_INIT_IFACE(TR_Class, testHandlerCtor, testHandlerDtor, NULL);
TR_CREATE_CLASS(
        TestHandler,
        TR_EventHandler,
        testHandlerCvInit,
        TR_IF(TR_Class)) = {
    { &(_testHandlerEventMethods.data) }
};

// vim: set ts=4 sw=4: