[omniORB-dev] Poll based implementation of the SocketCollection

Serguei Kolos Serguei.Kolos at cern.ch
Fri Mar 26 09:46:06 GMT 2004


Skipped content of type multipart/alternative-------------- next part --------------
// -*- Mode: C++; -*-
//                      Package   : omniORB
// SocketCollection.cc	Created on: 23 Jul 2003
//                      Author    : Serguei Kolos
//
//    Copyright (C) 2001 AT&T Laboratories Cambridge
//
//    This file is part of the omniORB library
//
//    The omniORB library is free software; you can redistribute it and/or
//    modify it under the terms of the GNU Library General Public
//    License as published by the Free Software Foundation; either
//    version 2 of the License, or (at your option) any later version.
//
//    This library is distributed in the hope that it will be useful,
//    but WITHOUT ANY WARRANTY; without even the implied warranty of
//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
//    Library General Public License for more details.
//
//    You should have received a copy of the GNU Library General Public
//    License along with this library; if not, write to the Free
//    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  
//    02111-1307, USA
//
//
// Description:
//	*** PROPRIETORY INTERFACE ***
// 

/*
  $Log: SocketCollection.cc,v $
  Revision 1.3  2004/02/20 17:39:21  kolos
  Apply the omniORB patch: now FD_CLOEXEC flag is set for the TCP sockets, used by the omniORB.

  Revision 1.2  2004/02/19 17:10:21  kolos
  Implement new connection multiplexor, which uses the poll function.

  Revision 1.1.2.15  2003/07/25 16:04:57  dgrisby
  vxWorks patches.

  Revision 1.1.2.14  2003/02/17 10:39:52  dgrisby
  Fix inevitable Windows problem.

  Revision 1.1.2.13  2003/02/17 01:46:23  dgrisby
  Pipe to kick select thread (on Unix).

  Revision 1.1.2.12  2003/01/28 12:17:09  dgrisby
  Bug with Select() ignoring data in buffer indications.

  Revision 1.1.2.11  2002/10/14 15:27:41  dgrisby
  Typo in fcntl error check.

  Revision 1.1.2.10  2002/08/21 06:23:15  dgrisby
  Properly clean up bidir connections and ropes. Other small tweaks.

  Revision 1.1.2.9  2002/03/18 16:50:18  dpg1
  New threadPoolWatchConnection parameter.

  Revision 1.1.2.8  2002/03/14 12:21:49  dpg1
  Undo accidental scavenger period change, remove invalid assertion.

  Revision 1.1.2.7  2002/03/13 16:05:38  dpg1
  Transport shutdown fixes. Reference count SocketCollections to avoid
  connections using them after they are deleted. Properly close
  connections when in thread pool mode.

  Revision 1.1.2.6  2002/02/26 14:06:45  dpg1
  Recent changes broke Windows.

  Revision 1.1.2.5  2002/02/13 16:02:38  dpg1
  Stability fixes thanks to Bastiaan Bakker, plus threading
  optimisations inspired by investigating Bastiaan's bug reports.

  Revision 1.1.2.4  2001/08/24 15:56:44  sll
  Fixed code which made the wrong assumption about the semantics of
  do { ...; continue; } while(0)

  Revision 1.1.2.3  2001/08/02 13:00:53  sll
  Do not use select(0,0,0,0,&timeout), it doesn't work on win32.

  Revision 1.1.2.2  2001/08/01 15:56:07  sll
  Workaround MSVC++ bug. It generates wrong code with FD_ISSET and FD_SET
  under certain conditions.

  Revision 1.1.2.1  2001/07/31 16:16:26  sll
  New transport interface to support the monitoring of active connections.

*/

#include <omniORB4/CORBA.h>
#include <omniORB4/giopEndpoint.h>
#include <SocketCollection.h>

#if defined(__vxWorks__)
#  include "pipeDrv.h"
#  include "selectLib.h"
#  include "iostream.h"
#endif

#  include "iostream"
#  include "stdio.h"

OMNI_NAMESPACE_BEGIN(omni)

#define GDB_DEBUG

/////////////////////////////////////////////////////////////////////////
void
SocketSetTimeOut(unsigned long abs_sec,
		 unsigned long abs_nsec,struct timeval& t)
{
  unsigned long now_sec, now_nsec;
  omni_thread::get_time(&now_sec,&now_nsec);

  if ((abs_sec <= now_sec) && ((abs_sec < now_sec) || (abs_nsec < now_nsec))) {
    t.tv_sec = t.tv_usec = 0;
  }
  else {
    t.tv_sec = abs_sec - now_sec;
    if (abs_nsec >= now_nsec) {
      t.tv_usec = (abs_nsec - now_nsec) / 1000;
    }
    else {
      t.tv_usec = (1000000000 + abs_nsec - now_nsec) / 1000;
      t.tv_sec -= 1;
    }
  }
}

/////////////////////////////////////////////////////////////////////////
int
SocketSetnonblocking(SocketHandle_t sock) {
# if defined(__vxWorks__)
  int fl = TRUE;
  if (ioctl(sock, FIONBIO, (int)&fl) == ERROR) {
    return RC_INVALID_SOCKET;
  }
  return 0;
# elif defined(__WIN32__)
  u_long v = 1;
  if (ioctlsocket(sock,FIONBIO,&v) == RC_SOCKET_ERROR) {
    return RC_INVALID_SOCKET;
  }
  return 0;
# else
  int fl = O_NONBLOCK;
  if (fcntl(sock,F_SETFL,fl) == RC_SOCKET_ERROR) {
    return RC_INVALID_SOCKET;
  }
  return 0;
# endif
}

/////////////////////////////////////////////////////////////////////////
int
SocketSetblocking(SocketHandle_t sock) {
# if defined(__vxWorks__)
  int fl = FALSE;
  if (ioctl(sock, FIONBIO, (int)&fl) == ERROR) {
    return RC_INVALID_SOCKET;
  }
  return 0;
# elif defined(__WIN32__)
  u_long v = 0;
  if (ioctlsocket(sock,FIONBIO,&v) == RC_SOCKET_ERROR) {
    return RC_INVALID_SOCKET;
  }
  return 0;
# else
  int fl = 0;
  if (fcntl(sock,F_SETFL,fl) == RC_SOCKET_ERROR) {
    return RC_INVALID_SOCKET;
  }
  return 0;
# endif
}

/////////////////////////////////////////////////////////////////////////
// This bit is set in the events field of the pollfd 
// structure to indicate that this fd has not to be 
// taken into account while processing the poll result
#define FDCLRD		0x1000
// This bit is set in the events field of the pollfd 
// structure to indicate that this fd has to be 
// moved to the end of the pollfd array
#define FDMVD		0x2000
// This bit is set in the events field of the pollfd 
// structure to indicate that this fd has data to be read
#define FDDIB		0x4000

/////////////////////////////////////////////////////////////////////////
// Defines the maximum number of connections 
#define MAX_FD_SIZE	4096

#define SWAP( data, index, i1, i2 ) { \
    pollfd tmp = data[i1];	\
    data[i1] = data[i2];	\
    index[data[i1].fd] = i1;	\
    data[i2] = tmp;		\
    index[data[i2].fd] = i2;	\
} \


/////////////////////////////////////////////////////////////////////////
SocketHandleSet::SocketHandleSet() 
  : pd_length_full( 0 ),
    pd_length_now( 0 ),
    pd_used_by_poll_n( 0 ),
    pd_to_be_removed_n( 0 ),
    pd_to_be_moved_n( 0 ),
    pd_dib_n( 0 )
{
    pd_data = new pollfd[MAX_FD_SIZE];
    pd_index = new short[MAX_FD_SIZE];
    memset( pd_data, 0, sizeof( pollfd ) * MAX_FD_SIZE );
    memset( pd_index, -1, sizeof( short ) * MAX_FD_SIZE );
}

SocketHandleSet::~SocketHandleSet()
{
    delete[] pd_data;
    delete[] pd_index;
}

/////////////////////////////////////////////////////////////////////////
void
SocketHandleSet::Add( SocketHandle_t fd, CORBA::Boolean now, CORBA::Boolean data_in_buffer )
{
    short i = pd_index[fd];
	    
    if ( i != -1 ) {
	// fd is already in the set
	
	CORBA::Boolean was_removed = pd_data[i].events & FDCLRD;
	
	// fd was marked to be removed while we were in the poll
	// we will set it again
	if (was_removed) {
	    pd_to_be_removed_n--;
	    pd_data[i].events &= ~FDCLRD;
	}
	
	if (data_in_buffer && !(pd_data[i].events & FDDIB)) {
	    pd_data[i].events |= FDDIB;
	    pd_dib_n++;
	}
	
	if (now || data_in_buffer) {
	// fd has to be in the part of the set, which will 
	// be scheduled for the next poll invocation
	    if (i >= pd_length_now) {
		// it is not there,
		// so we will move it to that part
		SWAP( pd_data, pd_index, pd_length_now, i )
		pd_length_now++;
	    }
	    return;
	}
	
	// fd has to be in the part of the set, which will NOT
	// be scheduled for the next poll invocation
	if ( was_removed && i < pd_length_now) {
	// fd is now in the part of the set, which will 
	// be scheduled for the next poll invocation
	// and it is marked as removed
	    
	    if (i < pd_used_by_poll_n) {
	    // the poll is running 
	    // we mark the fd as to be moved to another part of the fd set
		pd_data[i].events |= FDMVD;
    		pd_to_be_moved_n++;
	    }
	    else {
	    // the poll is NOT running 
	    // we move the fd to another part of the fd set
	   	if (pd_data[i].events & FDMVD) {
		    pd_data[i].events &= ~FDMVD;
		    pd_to_be_moved_n--;
		}
			
		pd_length_now--;
		if (i != pd_length_now) {
		    SWAP( pd_data, pd_index, pd_length_now, i )
		}
	    }
	}
	return;
    }
    
    // fd is NOT in the set
    // lets put it there
    if (now || data_in_buffer) {
    	// put fd to the first part of the array
	// it will be used for the next poll invocation
	if (pd_length_full != pd_length_now) {
	    pd_data[pd_length_full] = pd_data[pd_length_now];
	    pd_index[pd_data[pd_length_full].fd] = pd_length_full;
	}
    	pd_data[pd_length_now].fd = fd;
	pd_data[pd_length_now].events = POLLIN;
    	if (data_in_buffer) {
	    pd_data[pd_length_now].events |= FDDIB;
	    pd_dib_n++;
	}
    	pd_data[pd_length_now].revents = 0;
	pd_index[fd] = pd_length_now;
	pd_length_full++;
	pd_length_now++;
    }
    else {
    	// put fd to the last part of the array
	// it will not be used for the next poll invocation
    	pd_data[pd_length_full].fd = fd;
	pd_data[pd_length_full].events = POLLIN;
    	pd_data[pd_length_full].revents = 0;
	pd_index[fd] = pd_length_full;
	pd_length_full++;
    }	
}

/////////////////////////////////////////////////////////////////////////
void
SocketHandleSet::Remove( SocketHandle_t fd )
{
    short i = pd_index[fd];

    if ( i == -1 )
    	return;
    
    if (pd_data[i].events & FDDIB) {
	pd_data[i].events &= ~FDDIB;
	pd_dib_n--;
    }
    if (pd_data[i].events & FDMVD) {
	pd_data[i].events &= ~FDMVD;
	pd_to_be_moved_n--;
    }
    if (pd_data[i].events & FDCLRD) {
	pd_data[i].events &= ~FDCLRD;
	pd_to_be_removed_n--;
    }
    
    if ( i < pd_used_by_poll_n ) {
	// this fd is used by the poll function - be careful
	// don't modify array, which is now used by poll
	// just mark the fd by setting the FDCLRD bit in the events field
	// it will be removed later in the SocketCollection::Select
	pd_data[i].events |= FDCLRD;
	pd_to_be_removed_n++;
    }
    else {
	// fd is NOT used by the poll function - simply remove fd
	pd_index[fd] = -1;
		
    	pd_length_full--;
	if (i < pd_length_now) {
	   pd_length_now--;
	   if (i != pd_length_now) {
		pd_data[i] = pd_data[pd_length_now];
		pd_index[pd_data[i].fd] = i;
	   }
	   if (pd_length_now != pd_length_full) {
		pd_data[pd_length_now] = pd_data[pd_length_full];
		pd_index[pd_data[pd_length_now].fd] = pd_length_now;
	   }
	}
	else {
	   if (i != pd_length_full) {
		pd_data[i] = pd_data[pd_length_full];
		pd_index[pd_data[i].fd] = i;
	   }
	}
    }
}

/////////////////////////////////////////////////////////////////////////
void
SocketHandleSet::Move( SocketHandle_t fd )
{
    int i = pd_index[fd];
    
    if (!(pd_data[i].events & FDMVD))
    	return; 
    
    // socket has been marked as moved when the poll was running
    // now move it to the end of the array
    pd_data[i].events &= ~FDMVD;
			
    pd_to_be_moved_n--;
    pd_length_now--;
    if (i != pd_length_now) {
	SWAP( pd_data, pd_index, pd_length_now, i )
    }
}

/////////////////////////////////////////////////////////////////////////
CORBA::Boolean
SocketHandleSet::isSet( SocketHandle_t fd, CORBA::Boolean data_in_buffer )
{
    int i = pd_index[fd];
    
    if (      ( i != -1 )
	  &&  ( data_in_buffer == 0 || (pd_data[i].events & FDDIB) ) 
	  && !( pd_data[i].events & FDCLRD )
	  && !( pd_data[i].events & FDMVD ) )
	return 1;
    else
	return 0;
}

/////////////////////////////////////////////////////////////////////////
int
SocketSetCloseOnExec(SocketHandle_t sock) {
# if defined(__vxWorks__)
  // Not supported on vxWorks
  return 0;
# elif defined(__WIN32__)
  SetHandleInformation((HANDLE)sock, HANDLE_FLAG_INHERIT, 0);
  return 0;
# else
  int fl = FD_CLOEXEC;
  if (fcntl(sock,F_SETFD,fl) == RC_SOCKET_ERROR) {
    return RC_INVALID_SOCKET;
  }
  return 0;
# endif
}

/////////////////////////////////////////////////////////////////////////
unsigned long SocketCollection::scan_interval_sec  = 0;
unsigned long SocketCollection::scan_interval_nsec = 50*1000*1000;
CORBA::ULong  SocketCollection::hashsize           = 103;

/////////////////////////////////////////////////////////////////////////
SocketCollection::SocketCollection() :
  pd_poll_cond(&pd_fdset_lock),
  pd_abs_sec(0), pd_abs_nsec(0),
  pd_pipe_read(-1), pd_pipe_write(-1), pd_pipe_full(0),
  pd_refcount(1)
{

#ifdef UnixArchitecture
#  ifdef __vxWorks__
    if (pipeDrv() == OK) {
      if (pipeDevCreate("/pipe/SocketCollection",10,sizeof(int)) == OK) {
	pd_pipe_read = pd_pipe_write = open("/pipe/SocketCollection",
					    O_RDWR,0);
      }
    }
    if (pd_pipe_read <= 0) {
      omniORB::logs(5, "Unable to create pipe for SocketCollection.");
    }
#  else
    int filedes[2];
    int r = pipe(filedes);
    if (r != -1) {
      pd_pipe_read  = filedes[0];
      pd_pipe_write = filedes[1];
    }
    else {
      omniORB::logs(5, "Unable to create pipe for SocketCollection.");
    }
#  endif
#endif

  if (pd_pipe_read > 0) {
    omni_tracedmutex_lock sync(pd_fdset_lock);
    pd_fdset.Add( pd_pipe_read, 1, 0 );
  }

  pd_hash_table = new SocketLink* [hashsize];
  for (CORBA::ULong i=0; i < hashsize; i++)
    pd_hash_table[i] = 0;
}


/////////////////////////////////////////////////////////////////////////
SocketCollection::~SocketCollection()
{
  pd_refcount = -1;
  delete [] pd_hash_table;

#ifdef UnixArchitecture
#  ifdef __vxWorks__
  // *** How do we clean up on vxWorks?
#  else
  close(pd_pipe_read);
  close(pd_pipe_write);
#  endif
#endif
}


/////////////////////////////////////////////////////////////////////////
void
SocketCollection::setSelectable(SocketHandle_t sock, 
				CORBA::Boolean now,
				CORBA::Boolean data_in_buffer,
				CORBA::Boolean hold_lock) {

  ASSERT_OMNI_TRACEDMUTEX_HELD(pd_fdset_lock, hold_lock);

  if (!hold_lock) pd_fdset_lock.lock();

  pd_fdset.Add( sock, now, data_in_buffer );
  
  if (now || data_in_buffer) {
    // Wake up the thread blocked in select() if we can.
    if (pd_pipe_write > 0) {
#ifdef UnixArchitecture
      if (!pd_pipe_full) {
	char data = '\0';
	pd_pipe_full = 1;
	write(pd_pipe_write, &data, 1);
      }
#endif
    }
    else {
      pd_poll_cond.signal();
    }
  }
  if (!hold_lock) pd_fdset_lock.unlock();
}

/////////////////////////////////////////////////////////////////////////
void
SocketCollection::clearSelectable(SocketHandle_t sock) {

  omni_tracedmutex_lock sync(pd_fdset_lock);
  pd_fdset.Remove( sock );
}

#ifdef GDB_DEBUG

static
int
do_poll(pollfd * r, unsigned int l, int t) {
  return poll(r,l,t);
}

#endif

/////////////////////////////////////////////////////////////////////////
CORBA::Boolean
SocketCollection::Select() {

  struct timeval timeout;
  unsigned int	 total;
  pollfd *	 rfds;

 again:

  // (pd_abs_sec,pd_abs_nsec) define the absolute time when we switch fdset
  SocketSetTimeOut(pd_abs_sec,pd_abs_nsec,timeout);

  if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {

    omni_thread::get_time(&pd_abs_sec,&pd_abs_nsec,
			  scan_interval_sec,scan_interval_nsec);
    timeout.tv_sec  = scan_interval_sec;
    timeout.tv_usec = scan_interval_nsec / 1000;

    omni_tracedmutex_lock sync(pd_fdset_lock);
    rfds = pd_fdset.enterPoll(total);
    pd_fdset.Reschedule();
  }
  else {
    omni_tracedmutex_lock sync(pd_fdset_lock);
    rfds = pd_fdset.enterPoll(total);
  }

  int nready;

  if (total != 0) {
#ifndef GDB_DEBUG
    nready = poll(rfds,total,timeout.tv_sec*1000+(timeout.tv_usec/1000));
#else
    nready = do_poll(rfds,total,timeout.tv_sec*1000+(timeout.tv_usec/1000));
#endif
  }
  else {
    omni_tracedmutex_lock sync(pd_fdset_lock);
    pd_poll_cond.timedwait(pd_abs_sec,pd_abs_nsec);
    // The condition variable should be poked so we are woken up
    // immediately when there is something to monitor.  We cannot use
    // select(0,0,0,0,&timeout) because win32 doesn't like it.
    nready = 0; // simulate a timeout
  }

  if (nready == RC_SOCKET_ERROR) {
    omni_tracedmutex_lock sync(pd_fdset_lock);
    pd_fdset.exitPoll();
    if (ERRNO == RC_EBADF) {
      omniORB::logs(20, "poll() returned EBADF, retrying");
      goto again;
    }
    else if (ERRNO != RC_EINTR) {
      return 0;
    }
    else {
      return 1;
    }
  }

  if (total != 0) {
    omni_tracedmutex_lock sync(pd_fdset_lock);
    pd_fdset.exitPoll();

    while (pd_fdset.needsProcessing() || nready) {
	if (rfds->revents) {
	    nready--;
	}
		
	if (rfds->events & FDCLRD) {
	    pd_fdset.Remove(rfds->fd);
	    continue;	
	}
	
	if (rfds->events & FDMVD) {
	    rfds->revents = 0;
	    pd_fdset.Move(rfds->fd);
	    continue;	
	}
	
	if (rfds->revents || (rfds->events & FDDIB)) {
	    if (rfds->fd == pd_pipe_read) {
#ifdef UnixArchitecture
		char data;
		read(pd_pipe_read, &data, 1);
		pd_pipe_full = 0;
#endif
            }
	    else {
		int fd = rfds->fd;
		pd_fdset.Remove(fd);

		if (!notifyReadable(fd)) return 0;
	      
		continue;
	    }
	}
	rfds++;
    }
  }  
  return 1;

}

/////////////////////////////////////////////////////////////////////////
CORBA::Boolean
SocketCollection::Peek(SocketHandle_t sock) {

  {
    omni_tracedmutex_lock sync(pd_fdset_lock);
   
    // Do nothing if this socket is not set to be monitored.
    if (!pd_fdset.isSet(sock,0))
      return 0;

    // If data in buffer is set, do callback straight away.
    if (pd_fdset.isSet(sock,1)) {
      pd_fdset.Remove(sock);
      return 1;
    }
  }

  struct timeval timeout;
  // select on the socket for half the time of scan_interval, if no request
  // arrives in this interval, we just let AcceptAndMonitor take care
  // of it.
  timeout.tv_sec  = scan_interval_sec / 2;
  timeout.tv_usec = scan_interval_nsec / 1000 / 2;
  if (scan_interval_sec % 2) timeout.tv_usec += 500000;
  
  pollfd rfds;

  do {
    rfds.fd = sock;
    rfds.events = POLLIN;
#ifndef GDB_DEBUG
    int nready = poll(&rfds,1,timeout.tv_sec*1000+(timeout.tv_usec/1000));
#else
    int nready = do_poll(&rfds,1,timeout.tv_sec*1000+(timeout.tv_usec/1000));
#endif

    if (nready == RC_SOCKET_ERROR) {
      if (ERRNO != RC_EINTR) {
	break;
      }
      else {
	continue;
      }
    }

    // Reach here if nready >= 0

    if (rfds.revents) {
      omni_tracedmutex_lock sync(pd_fdset_lock);

      // Are we still interested?
      if (pd_fdset.isSet(sock,0)) {
	pd_fdset.Remove(sock);
	return 1;
      }
    }
    break;

  } while(1);

  return 0;
}


/////////////////////////////////////////////////////////////////////////
void
SocketCollection::incrRefCount()
{
  omni_tracedmutex_lock sync(pd_fdset_lock);
  OMNIORB_ASSERT(pd_refcount > 0);
  pd_refcount++;
}

/////////////////////////////////////////////////////////////////////////
void
SocketCollection::decrRefCount()
{
  int refcount;
  {
    omni_tracedmutex_lock sync(pd_fdset_lock);
    OMNIORB_ASSERT(pd_refcount > 0);
    refcount = --pd_refcount;
  }
  if (refcount == 0) delete this;
}

/////////////////////////////////////////////////////////////////////////
void
SocketCollection::addSocket(SocketLink* conn)
{
  omni_tracedmutex_lock sync(pd_fdset_lock);
  SocketLink** head = &(pd_hash_table[conn->pd_socket % hashsize]);
  conn->pd_next = *head;
  *head = conn;
  OMNIORB_ASSERT(pd_refcount > 0);
  pd_refcount++;
}

/////////////////////////////////////////////////////////////////////////
SocketLink*
SocketCollection::removeSocket(SocketHandle_t sock)
{
  int refcount  = 0; // Initialise to stop over-enthusiastic compiler warnings
  SocketLink* l = 0;
  {
    omni_tracedmutex_lock sync(pd_fdset_lock);
    SocketLink** head = &(pd_hash_table[sock % hashsize]);
    while (*head) {
      if ((*head)->pd_socket == sock) {
	l = *head;
	*head = (*head)->pd_next;
	OMNIORB_ASSERT(pd_refcount > 0);
	refcount = --pd_refcount;
	break;
      }
      head = &((*head)->pd_next);
    }
  }
  if (l && refcount == 0) delete this;
  return l;
}

/////////////////////////////////////////////////////////////////////////
SocketLink*
SocketCollection::findSocket(SocketHandle_t sock,
				CORBA::Boolean hold_lock) {

  if (!hold_lock) pd_fdset_lock.lock();

  SocketLink* l = 0;
  SocketLink** head = &(pd_hash_table[sock % hashsize]);
  while (*head) {
    if ((*head)->pd_socket == sock) {
      l = *head;
      break;
    }
    head = &((*head)->pd_next);
  }

  if (!hold_lock) pd_fdset_lock.unlock();

  return l;
}

OMNI_NAMESPACE_END(omni)
-------------- next part --------------
// -*- Mode: C++; -*-
//                            	Package   : omniORB
// SocketCollection.h		Created on: 23 Jul 2003
//				Author    : Serguei Kolos
//
//    Copyright (C) 2001 AT&T Laboratories Cambridge
//
//    This file is part of the omniORB library
//
//    The omniORB library is free software; you can redistribute it and/or
//    modify it under the terms of the GNU Library General Public
//    License as published by the Free Software Foundation; either
//    version 2 of the License, or (at your option) any later version.
//
//    This library is distributed in the hope that it will be useful,
//    but WITHOUT ANY WARRANTY; without even the implied warranty of
//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
//    Library General Public License for more details.
//
//    You should have received a copy of the GNU Library General Public
//    License along with this library; if not, write to the Free
//    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
//    02111-1307, USA
//
//
// Description:
//	*** PROPRIETORY INTERFACE ***
//

/*
*/

#ifndef __SOCKETCOLLECTION_H__
#define __SOCKETCOLLECTION_H__

////////////////////////////////////////////////////////////////////////
//  Platform feature selection

#define SOCKNAME_SIZE_T OMNI_SOCKNAME_SIZE_T

#define USE_NONBLOCKING_CONNECT

#if defined(__linux__)
#   define USE_POLL
#endif

#if defined(__sunos__)
#   define USE_POLL
#endif

#if defined(__hpux__)
#   if __OSVERSION__ >= 11
#       define USE_POLL
#   endif
#   define USE_FAKE_INTERRUPTABLE_RECV
#endif

#if defined(__freebsd__)
#  define USE_POLL
#endif

#if defined(__WIN32__)
#   define USE_FAKE_INTERRUPTABLE_RECV
#endif

#if defined(__irix__)
#   define USE_POLL
#endif

////////////////////////////////////////////////////////////////////////


////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
//             win32 API
//
#if defined(__WIN32__)

#  include <sys/types.h>
#  include <libcWrapper.h>

#  define RC_INADDR_NONE     INADDR_NONE
#  define RC_INVALID_SOCKET  INVALID_SOCKET
#  define RC_SOCKET_ERROR    SOCKET_ERROR
#  define INETSOCKET         PF_INET
#  define CLOSESOCKET(sock)  closesocket(sock)
#  define SHUTDOWNSOCKET(sock) ::shutdown(sock,2)
#  define ERRNO              ::WSAGetLastError()
#  define EINPROGRESS        WSAEWOULDBLOCK
#  define RC_EINTR           WSAEINTR
#  define RC_EBADF           WSAENOTSOCK
#  define NEED_SOCKET_SHUTDOWN_FLAG 1

OMNI_NAMESPACE_BEGIN(omni)

typedef SOCKET SocketHandle_t;

OMNI_NAMESPACE_END(omni)

#else

////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
//             unix(ish)
//
#  if defined(__vxWorks__)
#    include <sockLib.h>
#    include <hostLib.h>
#    include <ioLib.h>
#    include <netinet/tcp.h>
#  else
#    include <sys/time.h>
#  endif
#  include <sys/socket.h>
#  include <netinet/in.h>
#  include <netinet/tcp.h>
#  include <arpa/inet.h>
#  include <unistd.h>
#  include <sys/types.h>
#  include <errno.h>
#  include <libcWrapper.h>

#  if defined(USE_POLL)
#    include <poll.h>
#  endif

#  if !defined(__VMS)
#    include <fcntl.h>
#  endif

#  if defined (__uw7__)
#    ifdef shutdown
#      undef shutdown
#    endif
#  endif

#  if defined(__VMS) && defined(USE_tcpSocketVaxRoutines)
#    include "tcpSocketVaxRoutines.h"
#    undef accept
#    undef recv
#    undef send
#    define accept(a,b,c) tcpSocketVaxAccept(a,b,c)
#    define recv(a,b,c,d) tcpSocketVaxRecv(a,b,c,d)
#    define send(a,b,c,d) tcpSocketVaxSend(a,b,c,d)
#  endif

#  ifdef __rtems__
extern "C" int select (int,fd_set*,fd_set*,fd_set*,struct timeval *);
#  endif

#  define RC_INADDR_NONE     ((CORBA::ULong)-1)
#  define RC_INVALID_SOCKET  (-1)
#  define RC_SOCKET_ERROR    (-1)
#  define INETSOCKET         AF_INET
#  define CLOSESOCKET(sock)  close(sock)

#  if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
#    define SHUTDOWNSOCKET(sock)  ::shutdown(sock,2)
#  elif defined(__osf1__) && defined(__alpha__)
#    define SHUTDOWNSOCKET(sock)  ::shutdown(sock,2)
#  else
     // XXX none of the above, calling shutdown() may not have the
     // desired effect.
#    define SHUTDOWNSOCKET(sock)  ::shutdown(sock,2)
#  endif

#  define ERRNO              errno
#  define RC_EINTR           EINTR
#  define RC_EBADF           EBADF

OMNI_NAMESPACE_BEGIN(omni)

typedef int    SocketHandle_t;

OMNI_NAMESPACE_END(omni)

#endif

#if defined(NEED_GETHOSTNAME_PROTOTYPE)
extern "C" int gethostname(char *name, int namelen);
#endif

OMNI_NAMESPACE_BEGIN(omni)

// This class is a replacement for the two fd sets used
// by the select-based SocketCollection implementation
// It holds an array of pollfd structures, which consits
// from two parts:
// 1. elements [0..length_now-1] will be used 
//    for the next poll invocation
// 2. elements [length_now..length_full-1] will be passed 
//    to poll only if the Reschedule method has been called
class SocketHandleSet
{
public:
  
  SocketHandleSet();
  ~SocketHandleSet();
	
  // Adds socket to the array of pollfd structures 
  // If now == 1, this socket is set to the first part of the 
  //   array and will be used for the next poll invocation
  // If now == 0 the socket will be stored in the last part and will
  //   be passed to poll only if the Reschedule method is called
  // If data_in_buffer == 1 the FDDTNBFFR bit will be set in the
  //   events field of the corresponding pollfd structure
  void Add( SocketHandle_t fd, CORBA::Boolean now, CORBA::Boolean data_in_buffer );
  
  // Removes socket from the array of pollfd structures 
  void Remove( SocketHandle_t fd );

  // Moves socket from the current position to the end of array of pollfd structures 
  void Move( SocketHandle_t fd );

  // Checks if socket exist in the array of pollfd structures 
  // If data_in_buffer == 1, function returns 1 only if socket has the FDDTNBFFR
  //   bit set in the events field of the corresponding pollfd structure
  // If data_in_buffer == 0, function returns 1 if socket is in 
  //   the array of pollfd structures
  CORBA::Boolean isSet( SocketHandle_t fd, CORBA::Boolean data_in_buffer );
	  
  // Must be called before entering the poll
  inline pollfd * enterPoll( unsigned int & length ) {
    length = pd_length_now;
    pd_used_by_poll_n = pd_length_now;
    return pd_data;
  }

  // Must be called after exiting the poll
  inline void exitPoll( ) {
    pd_used_by_poll_n = 0;
  }
  
  // Schedule all the sockets for the next poll invocation
  inline void Reschedule( ) {
    pd_length_now = pd_length_full;
  }
  
  // Returns 1 if there are sockets in the array, which were 
  // requested to be moved or removed, or if there are 
  // unprocessed sockets marked as having data in buffer
  inline CORBA::Boolean needsProcessing( ) {
    return (    pd_to_be_removed_n
	     || pd_to_be_moved_n
	     || pd_dib_n );
  }
	
private:	
  pollfd *		pd_data;
  short	*		pd_index;	// takes more memory, but allows to find
  					// sockets int the data array very efficiently
  unsigned short	pd_length_full;
  unsigned short	pd_length_now;
  unsigned short	pd_used_by_poll_n;
  unsigned short	pd_to_be_removed_n;
  unsigned short	pd_to_be_moved_n;
  unsigned short	pd_dib_n;
};


class SocketCollection;

extern void SocketSetTimeOut(unsigned long abs_sec,
			     unsigned long abs_nsec,struct timeval& t);

extern int SocketSetnonblocking(SocketHandle_t sock);

extern int SocketSetblocking(SocketHandle_t sock);

extern int SocketSetCloseOnExec(SocketHandle_t sock);

class SocketLink {

public:
  SocketLink(SocketHandle_t s)
    : pd_socket(s),
#ifdef NEED_SOCKET_SHUTDOWN_FLAG
      pd_shutdown(0),
#endif
      pd_next(0) {}

  ~SocketLink() {}

  friend class SocketCollection;

protected:
  SocketHandle_t pd_socket;

#ifdef NEED_SOCKET_SHUTDOWN_FLAG
  // select() on Windows does not return an error after the socket has
  // shutdown, so we have to store an extra flag here.
  CORBA::Boolean pd_shutdown;
#endif

private:
  SocketLink*    pd_next;
};

class SocketCollection {
public:

  SocketCollection();

protected:
  virtual ~SocketCollection();

  virtual CORBA::Boolean notifyReadable(SocketHandle_t) = 0;
  // Callback used by Select(). This method is called while holding
  // pd_fdset_lock.

public:
  void setSelectable(SocketHandle_t sock, CORBA::Boolean now,
		     CORBA::Boolean data_in_buffer,
		     CORBA::Boolean hold_lock=0);
  // Indicates that this socket should be watched by a poll()
  // so that any new data arriving on the connection will be noted.
  // If now == 1, immediately make this socket part of the poll
  // set.
  // If data_in_buffer == 1, treat this socket as if there are
  // data available from the connection already.
  // If hold_lock == 1, pd_fdset_lock is already held.

  void clearSelectable(SocketHandle_t);
  // Indicates that this connection need not be watched any more.

  CORBA::Boolean Select();
  // Returns TRUE(1) if the Select() has successfully done a scan.
  // otherwise returns false(0) to indicate that an error has been
  // detected and this function should not be called again.
  //
  // For each of the sockets that has been marked watchable and indeed
  // has become readable, call notifyReadable() with the socket no.
  // as the argument.

  CORBA::Boolean Peek(SocketHandle_t sock);
  // Do nothing and returns immediately if the socket has not been
  // set to be watched by a previous setSelectable().
  // Otherwise, monitor the socket's status for a short time.
  // Returns TRUE(1) if the socket becomes readable.
  // otherwise returns FALSE(0).

  void incrRefCount();
  void decrRefCount();

  void addSocket(SocketLink* conn);
  // Add this socket to the collection. <conn> is associated with the
  // socket and should be added to the table hashed by the socket number.
  // Increments this collection's refcount.

  SocketLink* removeSocket(SocketHandle_t sock);
  // Remove the socket from this collection. Return the socket which has
  // been removed. Return 0 if the socket is not found.
  // Decrements this collection's refcount if a socket is removed.

  SocketLink* findSocket(SocketHandle_t sock,
			 CORBA::Boolean hold_lock=0);
  // Returns the connection that is associated with this socket.
  // Return 0 if this socket cannot be found in the hash table.
  // if hold_lock == 1, the caller has already got the lock on pd_fdset_lock.
  // (use purely by member functions.)

  static unsigned long scan_interval_sec;
  static unsigned long scan_interval_nsec;

  static CORBA::ULong  hashsize;

private:
  SocketHandleSet      pd_fdset;
  omni_tracedmutex     pd_fdset_lock;
  omni_tracedcondition pd_poll_cond; // timedwait on if nothing to select
  unsigned long        pd_abs_sec;
  unsigned long        pd_abs_nsec;
  int                  pd_pipe_read;
  int                  pd_pipe_write;
  CORBA::Boolean       pd_pipe_full;
  int                  pd_refcount;

protected:
  SocketLink**         pd_hash_table;

};

OMNI_NAMESPACE_END(omni)

#endif // __SOCKETCOLLECTION_H__


More information about the omniORB-dev mailing list