spf_thread_impl.c 3.03 KB
#ifndef WIN32
# include <sys/time.h>
# include <sys/types.h>
# include <unistd.h>
# include <errno.h>
#else
# include <Windows.h>
#endif

#include <scot/stream_pool.h>
#include <scot/stream_pool_fraction.h>

#include <scot/exception.h>
#include <scot/thread.h>
#include <scot/stream.h>
#include <scot/event.h>

#define GEN_LOCAL
# include <scot/list.h>
#undef  GEN_LOCAL

GEN_LIST_FUNC_PROTO (scot_sp_fraction);
GEN_LIST_IMPL (scot_sp_fraction);


static
T_PROC_RET
scot_spf_select_entry_func (void * _sp)
{
	struct scot_sp_fraction * sp = (struct scot_sp_fraction *)_sp;
	fd_set run_rfds, run_wfds, run_efds;
	excenv_t * ee;

	THREAD_CANCEL_ENABLE;
	THREAD_CANCEL_ASYNC;

	sp->thread_id      = THREAD_ID;

	TRY
	{
		FD_ZERO (&run_rfds);
		FD_ZERO (&run_wfds);
		FD_ZERO (&run_efds);

		while (1)
		{
			uint16_t i;
			excenv_t * ee;

			if (! THREAD_EQUAL (sp->thread_handle, SELF_THREAD))
			{
				scot_stream_pool_cmd (
						(struct scot_stream_pool *)sp->el, 
						SCOT_STREAM_POOL_STOP_FRACTION, sp);
				break;
			}

			for (i=0; i<SCOT_MAX_FRACTION; i++)
			{
				struct scot_stream_pool_event * e = NULL;
				SCOT_EVENT_NO eno = 0;
				struct scot_stream * st = sp->s_list [i];

				if (st == NULL) break;

				if (FD_ISSET (st->handle.sock, &run_rfds))
					eno = SCOT_EVENT_STREAM_POOL_READ;
				else if (FD_ISSET (st->handle.sock, &run_wfds))
					eno = SCOT_EVENT_STREAM_POOL_WRITE;
				else if (FD_ISSET (st->handle.sock, &run_efds))
					eno = SCOT_EVENT_STREAM_POOL_EXCEP;

				if (eno != 0)
				{
					while (!scot_stream_is_closed (st))
					{
						/* call callback... */
						e = scot_stream_pool_event_new (e, eno, NULL, 0, st);
						scot_event_listener_call_cb (sp->el, (struct scot_event *) e);
						scot_event_free ((struct scot_event *) e);
						e = NULL;

						if (scot_stream_read_pending (st) != 0)
							eno = SCOT_EVENT_STREAM_POOL_READ;
						else if (scot_stream_write_pending (st) != 0)
							eno = SCOT_EVENT_STREAM_POOL_WRITE;
						else
							break;
					}
				}

				if (scot_stream_is_closed (st))
				{
					scot_spf_remove (sp, st, SCOT_STREAM_POOL_FD_ADD_RMASK|
					                         SCOT_STREAM_POOL_FD_ADD_WMASK|
					                         SCOT_STREAM_POOL_FD_ADD_EMASK);
					SCOT_STREAM_FREE (st);

					if (sp->s_count == 0)
					{
						scot_stream_pool_cmd (
								(struct scot_stream_pool *)sp->el, 
								SCOT_STREAM_POOL_STOP_FRACTION, sp);
						break;
					}
				}
			}

			run_rfds = sp->rfds;
			run_wfds = sp->wfds;
			run_efds = sp->efds;

			/* evtl. sollte man hier noch timeout support einbauen, sprich
			 * das letzte NULL mit einer time austauschen. */
			if (select (sp->max_fd+1, &run_rfds, &run_wfds, &run_efds, NULL) == -1)
				THROW (EXC (EXC_ERROR, errno, strerror (errno)));
		}
	}
	CATCH (ee)
	{
		printf ("Exception in %s (%s, %d)\n", __FUNCTION__, __FILE__, __LINE__);
		print_all_exceptions (ee);

		exit (EXIT_FAILURE);
	}
}

scot_thread_entry_fptr scot_spf_thread_func[] = 
{
	scot_spf_select_entry_func, /* socket */
	scot_spf_select_entry_func, /* pipe */
	scot_spf_select_entry_func  /* terminal */
};