stream_pool_management.c 6.5 KB
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>

#include <scot/stream_pool.h>
#include <scot/stream_pool_fraction.h>

#include <scot/event_listener.h>
#include <scot/exception.h>
#include <scot/stream.h>
#include <scot/memory.h>
#include <scot/thread.h>

#define GEN_LOCAL
# include <scot/list.h>
#undef  GEN_LOCAL

GEN_LIST_FUNC_PROTO (scot_sp_fraction);
GEN_LIST_IMPL (scot_sp_fraction);

/*
 * static functions for this file
 * ----------------------------------------------------------------------
 */
static
int
scot_get_free_spf_by_type (const struct scot_sp_fraction * a,
                           const struct scot_sp_fraction * b)
{
	if (a->stream_type != b->stream_type || 
			b->s_count >= SCOT_MAX_FRACTION) /* b ist der node bei find. */
		return -1;

	return 0;
}

static
int
scot_get_spf_by_stream_handle (const struct scot_sp_fraction * a,
                               const struct scot_sp_fraction * b)
{
	int i;

	for (i = 0; i < SCOT_MAX_FRACTION && b->s_list [i] != a->s_list [0]; i++);
	if (i >= SCOT_MAX_FRACTION)
		return -1;

	return 0;
}

/* ---------------------------------------------------------------------- */


/*
 * Es wäre sicher auch schön eine Funktion zu haben, die einen Stream
 * nach dem anderen (die in dem Stream Pool registriert sind)....oder aber
 * eine komplette Liste aller in einem Pool registrierten Streams zurückgibt.
 * So kann man bevor man einen StreamPool freigibt erstmal alle Streams
 * rausholen (und evtl. auch freigeben.)
 */
struct scot_stream *
scot_sp_get_all_streams (struct scot_stream_pool * sp)
{
}

/*
 * adds the given stream s->handle to the fd_set specified via
 * rwe_mask. If the stream is not already under control of this
 * stream pool it is added to s_list. Else only the flags within
 * rfds, wfds and efds are updated. This function will never clean
 * a flag, but only set them.
 */
void
scot_stream_pool_add (
		struct scot_stream_pool * sp, 
		struct scot_stream *      s,
		int                       rwe_mask)
{
	struct scot_event_listener *   l = (struct scot_event_listener *) sp;
	struct scot_sp_fraction *      f_search;
	list_scot_sp_fraction_node_t * f_node;
	struct scot_sp_fraction *      f;

	/*
	 * soweit ich das in erinnerung habe sind meine listen nicht per se
	 * thread save. Daher sollte hier unbedingt noch ein mutex her,
	 * damit nicht gleichzeitig in die liste geschrieben (hier)
	 * und gelöscht wird (in remove)
	 */
	LOCK_THREAD_MUTEX (&sp->mutex);

	/*
	 * it would be nice to check if the stream is already in the pool before
	 * doing anything else
	 */

	/*
	 * first seek a not full fraction for s->stream_type
	 */
	f_search = scot_spf_new (f_search, l, s->s_type);
	scot_spf_add (f_search, s, rwe_mask);

	list_scot_sp_fraction_set_cmp (scot_get_free_spf_by_type);
	f_node = list_scot_sp_fraction_find (sp->pool, f_search);
	list_scot_sp_fraction_set_cmp (list_scot_sp_fraction_default_cmp);

	if (f_node == NULL)
	{
		f = f_search;
		list_scot_sp_fraction_insert (sp->pool, f);
	}
	else
	{
		scot_spf_free_s (f_search, SCOT_STREAM_POOL_KEEP_STREAMS);
		scot_spf_free (f_search);

		f = list_scot_sp_fraction_retrive (f_node);
		scot_spf_add (f, s, rwe_mask);
	}

	scot_stream_pool_cmd (sp, SCOT_STREAM_POOL_RESTART_FRACTION, f);

	UNLOCK_THREAD_MUTEX (&sp->mutex);
}

/*
 * removes the given stream s->handle from the fd_set specified via
 * rwe_mask. If the stream isn't left in at least one of the sets, either
 * rfds, wfds or efds it is removed from s_list. This function will never
 * set a flag, but the mask is used to determine where to remove fd.
 *
 * Diese Funktion und scot_stream_pool_add sollten sich gegenseitig
 * ausschließen, um die liste immer in einem konsistenten zustand zu halten.
 * Relevant sind da wohl nur threads, also sollte es ein thread_mutex tun.
 */
void
scot_stream_pool_remove (
		struct scot_stream_pool * sp,
		struct scot_stream *      s,
		int                       rwe_mask)
{
	struct scot_event_listener *   l = (struct scot_event_listener *) sp;
	struct scot_sp_fraction *      f_search;
	list_scot_sp_fraction_node_t * f_node;
	excenv_t * ee;

	TRY
	{
		LOCK_THREAD_MUTEX (&sp->mutex);

		/*
		 * first seek a not full fraction for s->stream_type
		 */
		f_search = scot_spf_new (f_search, l, s->s_type);
		scot_spf_add (f_search, s, rwe_mask);

		list_scot_sp_fraction_set_cmp (scot_get_spf_by_stream_handle);
		f_node = list_scot_sp_fraction_find (sp->pool, f_search);
		list_scot_sp_fraction_set_cmp (list_scot_sp_fraction_default_cmp);

		scot_spf_free (f_search);

		if (f_node == NULL)
			THROW (EXC (EXC_ERROR, 345, "Can't find stream in any fraction."));
		else
		{
			struct scot_sp_fraction * f;

			scot_stream_pool_cmd (sp, SCOT_STREAM_POOL_STOP_FRACTION, f);

			f = list_scot_sp_fraction_retrive (f_node);
			scot_spf_remove (f, s, rwe_mask);

			scot_stream_pool_cmd (sp, SCOT_STREAM_POOL_START_FRACTION, f);
		}

		UNLOCK_THREAD_MUTEX (&sp->mutex);
	}
	CATCH (ee)
	{
		printf ("Exception in %s (%s, %d)\n", __FUNCTION__, __FILE__, __LINE__);
		forward_all_exceptions (ee);
	}
}

int /* rwe_mask */
scot_stream_pool_get_rwe (
		struct scot_stream_pool * sp, 
		struct scot_stream *      s)
{
	struct scot_event_listener *   l = (struct scot_event_listener *) sp;
	struct scot_sp_fraction *      f_search;
	list_scot_sp_fraction_node_t * f_node;
	struct scot_sp_fraction *      f;
	int ret = 0;
	excenv_t * ee;

	TRY
	{
		f_search = scot_spf_new (f_search, l, s->s_type);
		scot_spf_add (f_search, s, 0);

		list_scot_sp_fraction_set_cmp (scot_get_spf_by_stream_handle);
		f_node = list_scot_sp_fraction_find (sp->pool, f_search);
		list_scot_sp_fraction_set_cmp (list_scot_sp_fraction_default_cmp);

		scot_spf_free (f_search);

		if (f_node == NULL)
			THROW (EXC (EXC_ERROR, 987, "could not find stream in stream pool"));
		else
			f = list_scot_sp_fraction_retrive (f_node);

		/*
		 * !!!FIXME!!! I will only work with sockets right now..... :-(
		 * Much more abstraction is in the stream code.
		 */
		if (FD_ISSET (s->handle.sock, &f->rfds))
			ret = ret | SCOT_STREAM_POOL_FD_ADD_RMASK;
		if (FD_ISSET (s->handle.sock, &f->wfds))
			ret = ret | SCOT_STREAM_POOL_FD_ADD_WMASK;
		if (FD_ISSET (s->handle.sock, &f->efds))
			ret = ret | SCOT_STREAM_POOL_FD_ADD_EMASK;
	}
	CATCH (ee);
	{
		printf ("Exception in %s (%s, %d)\n", __FUNCTION__, __FILE__, __LINE__);
		forward_all_exceptions (ee);
	}

	return ret;
}

	void
scot_stream_pool_cmd (struct scot_stream_pool * sp, uint16_t cmd, void * arg)
{
	THREAD_COND_ENTER_CS (&sp->cmd_cs);

	sp->cmd     = cmd;
	sp->cmd_arg = arg;
	SIGNAL_THREAD_COND (&sp->cmd_cond);

	THREAD_COND_LEAVE_CS (&sp->cmd_cs);
}