spf_thread_impl.c 4.49 KB
#ifndef WIN32
# include <sys/time.h>
# include <sys/types.h>
# include <unistd.h>
# include <errno.h>
#else
# include <Windows.h>
#endif

#include <scot/stream_pool.h>
#include <scot/stream_pool_fraction.h>

#include <scot/exception.h>
#include <scot/thread.h>
#include <scot/stream.h>
#include <scot/event.h>

#define GEN_LOCAL
# include <scot/list.h>
#undef  GEN_LOCAL

GEN_LIST_FUNC_PROTO (scot_sp_fraction);
GEN_LIST_IMPL (scot_sp_fraction);


static
T_PROC_RET
scot_spf_select_entry_func (void * _sp)
{
	struct scot_sp_fraction * sp = (struct scot_sp_fraction *)_sp;
	fd_set run_rfds, run_wfds, run_efds;
	excenv_t * ee;

	/*
	 * OK, i am not sure if i can do it this way but i hope i will get a 
	 * cancellation point with this, that did the blocking i wanted to
	 * have. I try to initialize an event object with all sockets i
	 * want to observe and use it within pthreadCancelableWait to get an
	 * cancellation point. If it returns i have to figure out wich
	 * socket produced what event and call the callback accordingly.
	 */
	WSAEVENT EventArray [SCOT_MAX_FRACTION];
	SOCKET SocketArray [SCOT_MAX_FRACTION];
	WSANETWORKEVENTS NetworkEvents;
	int Index;

	THREAD_CANCEL_ENABLE;
	THREAD_CANCEL_ASYNC;

	sp->thread_id      = THREAD_ID;

	TRY
	{
		FD_ZERO (&run_rfds);
		FD_ZERO (&run_wfds);
		FD_ZERO (&run_efds);

		while (1)
		{
			uint16_t i;
			excenv_t * ee;

			SCOT_MEM_ZERO (SocketArray, sizeof (SOCKET) * SCOT_MAX_FRACTION);
			SCOT_MEM_ZERO (EventArray, sizeof (WSAEVENT) * SCOT_MAX_FRACTION);

			if (! THREAD_EQUAL (sp->thread_handle, SELF_THREAD))
			{
				scot_stream_pool_cmd (
						(struct scot_stream_pool *)sp->el, 
						SCOT_STREAM_POOL_STOP_FRACTION, sp);
				break;
			}

			for (i=0; i<SCOT_MAX_FRACTION; i++)
			{
				struct scot_stream_pool_event * e = NULL;
				SCOT_EVENT_NO eno = 0;
				struct scot_stream * st = sp->s_list [i];
				long flags = FD_CLOSE;

				if (st == NULL) break;

				if (FD_ISSET (st->handle.sock, &sp->rfds))
					flags |= FD_READ;
				if (FD_ISSET (st->handle.sock, &sp->wfds))
					flags |= FD_WRITE;

				/*
				 * Das ist jetzt etwas gefrickelt, eigentlich sollte unter
				 * Windows die Datenstruktur für eine sp_fraction angepasst werden
				 * und ebenso die add und remove methoden.
				 * Jetzt übertrage ich etwas mühsam die Daten auf windows
				 * was auch dazu führt, das ich öfter als nötig events
				 * closen und createn muß....
				 */
				SocketArray [i] = st->handle.sock;
				if (EventArray [i] == (WSAEVENT) NULL)
					WSACreateEvent (EventArray [i]);
				WSAEventSelect (
						SocketArray [i], 
						EventArray [i], 
						flags);
				EventTotal ++;
			}

			Index = WSAWaitForMultipleEvents (EventTotal, EventArray, 
					FALSE, WSA_INFINITE, TRUE);
			Index = Index - WSA_WAIT_EVENT_0;

			for (i = Index; i < EventTotal; i++) 
			{
				Index = WSAWaitForMultipleEvents (1, &EventArray[i], 
						TRUE, 1000, TRUE);

				if ((Index != WSA_WAIT_FAILED) && (Index != WSA_WAIT_TIMEOUT)) 
				{
					struct scot_stream * st = sp->s_list [i];

					WSAEnumNetworkEvents (SocketArray[i],
							EventArray[i],
							&NetworkEvents);

					if ((NetworkEvents & FD_READ) != 0)
						eno = SCOT_EVENT_STREAM_POOL_READ;
					else if ((NetworkEvents & FD_READ) != 0)
						eno = SCOT_EVENT_STREAM_POOL_WRITE;

					if (eno != 0)
					{
						while (!scot_stream_is_closed (st))
						{
							/* call callback... */
							e = scot_stream_pool_event_new (e, eno, NULL, 0, st);
							scot_event_listener_call_cb (
									sp->el, (struct scot_event *) e);
							scot_event_free ((struct scot_event *) e);
							e = NULL;

							if (scot_stream_read_pending (st) != 0)
								eno = SCOT_EVENT_STREAM_POOL_READ;
							else if (scot_stream_write_pending (st) != 0)
								eno = SCOT_EVENT_STREAM_POOL_WRITE;
							else
								break;
						}
					}

					if (scot_stream_is_closed (st))
					{
						WSACloseEvent (EventArray [i]);

						scot_spf_remove (sp, st, 
								SCOT_STREAM_POOL_FD_ADD_RMASK|
								SCOT_STREAM_POOL_FD_ADD_WMASK|
								SCOT_STREAM_POOL_FD_ADD_EMASK);
						SCOT_STREAM_FREE (st);

						if (sp->s_count == 0)
						{
							scot_stream_pool_cmd (
									(struct scot_stream_pool *)sp->el, 
									SCOT_STREAM_POOL_STOP_FRACTION, sp);
							break;
						}
					}
				}
			}
		}
	}
	CATCH (ee)
	{
		printf ("Exception in %s (%s, %d)\n", __FUNCTION__, __FILE__, __LINE__);
		print_all_exceptions (ee);

		exit (EXIT_FAILURE);
	}
}

scot_thread_entry_fptr scot_spf_thread_func[] = 
{
	scot_spf_select_entry_func, /* socket */
	NULL, /* pipe */
	NULL  /* terminal */
};