stream_pool_base.c 8.07 KB
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>

#include <scot/stream_pool.h>
#include <scot/stream_pool_fraction.h>

#include <scot/event_listener.h>
#include <scot/exception.h>
#include <scot/event.h>
#include <scot/stream.h>
#include <scot/memory.h>
#include <scot/thread.h>
#include <scot/scot_types.h>

#define GEN_LOCAL
# include <scot/list.h>
#undef  GEN_LOCAL

GEN_LIST_FUNC_PROTO (scot_sp_fraction);
GEN_LIST_IMPL (scot_sp_fraction);

/*
 * static functions for this file
 * ----------------------------------------------------------------------
 */
static
unsigned short
scot_stream_pool_default_cb (struct scot_event * _e)
{
	SSIZE_T                         n;
	char                            puffer[1024];
	struct scot_stream_pool_event * e = (struct scot_stream_pool_event *) _e;

	if (_e->event == SCOT_EVENT_STREAM_POOL_READ)
	{
		if (scot_stream_eof (e->st))
		{
			printf ("Connection closed by foreign host\n");
			scot_stream_close (e->st);
		}
		else
		{
			n = scot_stream_read (e->st, puffer, 1024);
			printf ("read possible on %d (%d bytes read)\n", e->st->handle, n);
		}
	}
	if (_e->event == SCOT_EVENT_STREAM_POOL_WRITE)
		printf ("write possible on %d (%d bytes read)\n", e->st->handle, n);
	if (_e->event == SCOT_EVENT_STREAM_POOL_EXCEP)
		printf ("exception on %d (%d bytes read)\n", e->st->handle, n);

	fflush (stdout);

	return SCOT_EVENT_END;
}

static
void 
main_thread_fini (void * arg)
{
	struct scot_stream_pool *      pool = (struct scot_stream_pool *) arg;
	list_scot_sp_fraction_node_t * f_node = pool->pool;
	struct scot_sp_fraction *      f;

	LOCK_THREAD_MUTEX (&pool->mutex);
	/* this assures that the thread_fini does not try 
	 * to wake the main thread for cleanup....
	 * (done because main thread is finalized here) */
	((struct scot_event_listener *) arg)->thread_run_flg = 0;

	while (! list_scot_sp_fraction_eol (pool->pool, f_node))
	{
		f_node = list_scot_sp_fraction_next (f_node);
		f      = list_scot_sp_fraction_retrive (f_node);

		if (f->thread_run_flg != 0)
		{
			DETACH_THREAD (f->thread_handle);
			CANCEL_THREAD (f->thread_handle);
		}
	}

	UNLOCK_THREAD_MUTEX (&pool->mutex);

	exc_end ();
}

/*
 * entry point für einen stream-pool event-source thread.
 */
static
void
scot_stream_pool_main_loop (struct scot_stream_pool * sp)
{
	excenv_t * ee;

	LOCK_THREAD_MUTEX (&sp->mutex);

	((struct scot_event_listener *) sp)->thread_id      = THREAD_ID;
	((struct scot_event_listener *) sp)->thread_run_flg = 1;

	THREAD_CANCEL_ENABLE;
	THREAD_CANCEL_DEFER;

	TRY
	{
		list_scot_sp_fraction_node_t * f_node = sp->pool;
		struct scot_sp_fraction *      f;

		THREAD_ATEXIT_BEGIN (main_thread_fini, sp);

		/*
		 * starte all threads und suspende in einen cancellation point...
		 * Das könnte ich nochmal überdenken, immerhin kann ich mit
		 * diesem thread evtl. noch mehr anfangen. Z.B. Funktionen die
		 * in wecken, damit er alle threads stoppt (bzw. startet).
		 * Einzelne threads neu starten etc. Die Logik dafür könnte
		 * glaube ich gut in dieser Funktion liegen.....
		 * Vielleicht aber auch nicht.
		 */
		while (! list_scot_sp_fraction_eol (sp->pool, f_node))
		{
			f_node = list_scot_sp_fraction_next (f_node);
			f      = list_scot_sp_fraction_retrive (f_node);

			f->thread_handle = NEW_THREAD (f->spf_entry_func, f);
			f->thread_run_flg = 1;
		}

		UNLOCK_THREAD_MUTEX (&sp->mutex);

		while (1)
		{
			THREAD_COND_ENTER_CS (&sp->cmd_cs);
			
			WAIT_THREAD_COND (&sp->cmd_cond, &sp->cmd_cs, INFINITE);

			switch (sp->cmd)
			{
				case SCOT_STREAM_POOL_RESTART_ALL_FRAC:
					f_node = sp->pool;
					while (! list_scot_sp_fraction_eol (sp->pool, f_node))
					{
						f_node = list_scot_sp_fraction_next (f_node);
						f      = list_scot_sp_fraction_retrive (f_node);

						if (f->thread_run_flg != 0)
						{
							END_THREAD (f->thread_handle, INFINITE);
							thread_exc_end (f->thread_handle);
						}
						f->thread_handle = NEW_THREAD (f->spf_entry_func, f);
						f->thread_run_flg = 1;
					}
					break;

				case SCOT_STREAM_POOL_START_ALL_FRAC:
					f_node = sp->pool;
					while (! list_scot_sp_fraction_eol (sp->pool, f_node))
					{
						f_node = list_scot_sp_fraction_next (f_node);
						f      = list_scot_sp_fraction_retrive (f_node);

						if (f->thread_run_flg == 0)
						{
							f->thread_handle = NEW_THREAD (f->spf_entry_func, f);
							f->thread_run_flg = 1;
						}
					}
					break;

				case SCOT_STREAM_POOL_STOP_ALL_FRAC:
					f_node = sp->pool;
					while (! list_scot_sp_fraction_eol (sp->pool, f_node))
					{
						f_node = list_scot_sp_fraction_next (f_node);
						f      = list_scot_sp_fraction_retrive (f_node);

						if (f->thread_run_flg != 0)
						{
							END_THREAD (f->thread_handle, INFINITE);
							thread_exc_end (f->thread_handle);
							f->thread_run_flg = 0;
						}
					}
					break;

				case SCOT_STREAM_POOL_RESTART_FRACTION:
					f = (struct scot_sp_fraction *) sp->cmd_arg;
					if (f->thread_run_flg != 0)
					{
						END_THREAD (f->thread_handle, INFINITE);
						thread_exc_end (f->thread_handle);
					}
					f->thread_handle = NEW_THREAD (f->spf_entry_func, f);
					f->thread_run_flg = 1;
					break;

				case SCOT_STREAM_POOL_START_FRACTION:
					f = (struct scot_sp_fraction *) sp->cmd_arg;
					if (f->thread_run_flg == 0)
					{
						f->thread_handle = NEW_THREAD (f->spf_entry_func, f);
						f->thread_run_flg = 1;
					}
					break;

				case SCOT_STREAM_POOL_STOP_FRACTION:
					f = (struct scot_sp_fraction *) sp->cmd_arg;
					if (f->thread_run_flg != 0)
					{
						END_THREAD (f->thread_handle, INFINITE);
						thread_exc_end (f->thread_handle);
						f->thread_run_flg = 0;
					}
			}

			/*
			 * now we do a cleanup. That is
			 * we remove empty fractions from the stream pool.
			 */
			f_node = sp->pool;
			while (!list_scot_sp_fraction_eol (sp->pool, f_node))
			{
				int i;

				f_node = list_scot_sp_fraction_next (f_node);
				f      = list_scot_sp_fraction_retrive (f_node);

				if (f->s_count == 0)
				{
					if (f->thread_run_flg != 0)
					{
						END_THREAD (f->thread_handle, INFINITE);
						thread_exc_end (f->thread_handle);
						f->thread_run_flg = 0;
					}

					list_scot_sp_fraction_delete (f_node);
				}
			}

			THREAD_COND_LEAVE_CS (&sp->cmd_cs);
		}

		THREAD_ATEXIT_END (0);
	}
	CATCH (ee)
	{
		print_all_exceptions (ee);

		exit (EXIT_FAILURE);
	}

	main_thread_fini (sp);
}

/* ---------------------------------------------------------------------- */


struct scot_stream_pool *
scot_stream_pool_new (struct scot_stream_pool * sp)
{
	sp = SCOT_MEM_GET (sizeof (struct scot_stream_pool));
	SCOT_MEM_ZERO (sp, sizeof (struct scot_stream_pool));

	sp->pool = list_scot_sp_fraction_new (sp->pool);

	if (! list_scot_sp_fraction_elem_free_is_set ())
		list_scot_sp_fraction_set_elem_free (scot_spf_free);

	scot_event_listener_init (
			(struct scot_event_listener *) sp,
			SCOT_EG_STREAM_POOL,
			(scot_thread_entry_fptr)       scot_stream_pool_main_loop);

	scot_event_listener_register_cb (
			(struct scot_event_listener *) sp,
			SCOT_EVENT_STREAM_POOL_READ,
			scot_stream_pool_default_cb, NULL);
	scot_event_listener_register_cb (
			(struct scot_event_listener *) sp,
			SCOT_EVENT_STREAM_POOL_WRITE,
			scot_stream_pool_default_cb, NULL);
	scot_event_listener_register_cb (
			(struct scot_event_listener *) sp,
			SCOT_EVENT_STREAM_POOL_EXCEP,
			scot_stream_pool_default_cb, NULL);

	NEW_THREAD_MUTEX (&sp->mutex);
	NEW_THREAD_COND (&sp->cmd_cond);
	NEW_THREAD_COND_CS (&sp->cmd_cs);

	return sp;
}

void
scot_stream_pool_free (struct scot_stream_pool * sp)
{
	scot_stream_pool_cmd (sp, SCOT_STREAM_POOL_STOP_ALL_FRAC, NULL);

	scot_event_listener_fini ((struct scot_event_listener *) sp);
	
	list_scot_sp_fraction_free (sp->pool);

	FREE_THREAD_MUTEX (&sp->mutex);
	FREE_THREAD_COND (&sp->cmd_cond);
	FREE_THREAD_COND_CS (&sp->cmd_cs);
	
	SCOT_MEM_FREE (sp);
}

void 
scot_stream_pool_free_s (struct scot_stream_pool * sp, int free_s)
{
	list_scot_sp_fraction_node_t * spf_node = sp->pool;

	while (! list_scot_sp_fraction_eol (sp->pool, spf_node))
	{
		struct scot_sp_fraction * f;

		spf_node = list_scot_sp_fraction_next (spf_node);
		f = list_scot_sp_fraction_retrive (spf_node);

		scot_spf_free_s (f, free_s);
	}
}