[omniORB-dev] Poll based implementation of the SocketCollection
Serguei Kolos
Serguei.Kolos at cern.ch
Fri Mar 26 09:46:06 GMT 2004
Skipped content of type multipart/alternative-------------- next part --------------
// -*- Mode: C++; -*-
// Package : omniORB
// SocketCollection.cc Created on: 23 Jul 2003
// Author : Serguei Kolos
//
// Copyright (C) 2001 AT&T Laboratories Cambridge
//
// This file is part of the omniORB library
//
// The omniORB library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the Free
// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
// 02111-1307, USA
//
//
// Description:
// *** PROPRIETORY INTERFACE ***
//
/*
$Log: SocketCollection.cc,v $
Revision 1.3 2004/02/20 17:39:21 kolos
Apply the omniORB patch: now FD_CLOEXEC flag is set for the TCP sockets, used by the omniORB.
Revision 1.2 2004/02/19 17:10:21 kolos
Implement new connection multiplexor, which uses the poll function.
Revision 1.1.2.15 2003/07/25 16:04:57 dgrisby
vxWorks patches.
Revision 1.1.2.14 2003/02/17 10:39:52 dgrisby
Fix inevitable Windows problem.
Revision 1.1.2.13 2003/02/17 01:46:23 dgrisby
Pipe to kick select thread (on Unix).
Revision 1.1.2.12 2003/01/28 12:17:09 dgrisby
Bug with Select() ignoring data in buffer indications.
Revision 1.1.2.11 2002/10/14 15:27:41 dgrisby
Typo in fcntl error check.
Revision 1.1.2.10 2002/08/21 06:23:15 dgrisby
Properly clean up bidir connections and ropes. Other small tweaks.
Revision 1.1.2.9 2002/03/18 16:50:18 dpg1
New threadPoolWatchConnection parameter.
Revision 1.1.2.8 2002/03/14 12:21:49 dpg1
Undo accidental scavenger period change, remove invalid assertion.
Revision 1.1.2.7 2002/03/13 16:05:38 dpg1
Transport shutdown fixes. Reference count SocketCollections to avoid
connections using them after they are deleted. Properly close
connections when in thread pool mode.
Revision 1.1.2.6 2002/02/26 14:06:45 dpg1
Recent changes broke Windows.
Revision 1.1.2.5 2002/02/13 16:02:38 dpg1
Stability fixes thanks to Bastiaan Bakker, plus threading
optimisations inspired by investigating Bastiaan's bug reports.
Revision 1.1.2.4 2001/08/24 15:56:44 sll
Fixed code which made the wrong assumption about the semantics of
do { ...; continue; } while(0)
Revision 1.1.2.3 2001/08/02 13:00:53 sll
Do not use select(0,0,0,0,&timeout), it doesn't work on win32.
Revision 1.1.2.2 2001/08/01 15:56:07 sll
Workaround MSVC++ bug. It generates wrong code with FD_ISSET and FD_SET
under certain conditions.
Revision 1.1.2.1 2001/07/31 16:16:26 sll
New transport interface to support the monitoring of active connections.
*/
#include <omniORB4/CORBA.h>
#include <omniORB4/giopEndpoint.h>
#include <SocketCollection.h>
#if defined(__vxWorks__)
# include "pipeDrv.h"
# include "selectLib.h"
# include "iostream.h"
#endif
# include "iostream"
# include "stdio.h"
OMNI_NAMESPACE_BEGIN(omni)
#define GDB_DEBUG
/////////////////////////////////////////////////////////////////////////
void
SocketSetTimeOut(unsigned long abs_sec,
unsigned long abs_nsec,struct timeval& t)
{
unsigned long now_sec, now_nsec;
omni_thread::get_time(&now_sec,&now_nsec);
if ((abs_sec <= now_sec) && ((abs_sec < now_sec) || (abs_nsec < now_nsec))) {
t.tv_sec = t.tv_usec = 0;
}
else {
t.tv_sec = abs_sec - now_sec;
if (abs_nsec >= now_nsec) {
t.tv_usec = (abs_nsec - now_nsec) / 1000;
}
else {
t.tv_usec = (1000000000 + abs_nsec - now_nsec) / 1000;
t.tv_sec -= 1;
}
}
}
/////////////////////////////////////////////////////////////////////////
int
SocketSetnonblocking(SocketHandle_t sock) {
# if defined(__vxWorks__)
int fl = TRUE;
if (ioctl(sock, FIONBIO, (int)&fl) == ERROR) {
return RC_INVALID_SOCKET;
}
return 0;
# elif defined(__WIN32__)
u_long v = 1;
if (ioctlsocket(sock,FIONBIO,&v) == RC_SOCKET_ERROR) {
return RC_INVALID_SOCKET;
}
return 0;
# else
int fl = O_NONBLOCK;
if (fcntl(sock,F_SETFL,fl) == RC_SOCKET_ERROR) {
return RC_INVALID_SOCKET;
}
return 0;
# endif
}
/////////////////////////////////////////////////////////////////////////
int
SocketSetblocking(SocketHandle_t sock) {
# if defined(__vxWorks__)
int fl = FALSE;
if (ioctl(sock, FIONBIO, (int)&fl) == ERROR) {
return RC_INVALID_SOCKET;
}
return 0;
# elif defined(__WIN32__)
u_long v = 0;
if (ioctlsocket(sock,FIONBIO,&v) == RC_SOCKET_ERROR) {
return RC_INVALID_SOCKET;
}
return 0;
# else
int fl = 0;
if (fcntl(sock,F_SETFL,fl) == RC_SOCKET_ERROR) {
return RC_INVALID_SOCKET;
}
return 0;
# endif
}
/////////////////////////////////////////////////////////////////////////
// This bit is set in the events field of the pollfd
// structure to indicate that this fd has not to be
// taken into account while processing the poll result
#define FDCLRD 0x1000
// This bit is set in the events field of the pollfd
// structure to indicate that this fd has to be
// moved to the end of the pollfd array
#define FDMVD 0x2000
// This bit is set in the events field of the pollfd
// structure to indicate that this fd has data to be read
#define FDDIB 0x4000
/////////////////////////////////////////////////////////////////////////
// Defines the maximum number of connections
#define MAX_FD_SIZE 4096
#define SWAP( data, index, i1, i2 ) { \
pollfd tmp = data[i1]; \
data[i1] = data[i2]; \
index[data[i1].fd] = i1; \
data[i2] = tmp; \
index[data[i2].fd] = i2; \
} \
/////////////////////////////////////////////////////////////////////////
SocketHandleSet::SocketHandleSet()
: pd_length_full( 0 ),
pd_length_now( 0 ),
pd_used_by_poll_n( 0 ),
pd_to_be_removed_n( 0 ),
pd_to_be_moved_n( 0 ),
pd_dib_n( 0 )
{
pd_data = new pollfd[MAX_FD_SIZE];
pd_index = new short[MAX_FD_SIZE];
memset( pd_data, 0, sizeof( pollfd ) * MAX_FD_SIZE );
memset( pd_index, -1, sizeof( short ) * MAX_FD_SIZE );
}
SocketHandleSet::~SocketHandleSet()
{
delete[] pd_data;
delete[] pd_index;
}
/////////////////////////////////////////////////////////////////////////
void
SocketHandleSet::Add( SocketHandle_t fd, CORBA::Boolean now, CORBA::Boolean data_in_buffer )
{
short i = pd_index[fd];
if ( i != -1 ) {
// fd is already in the set
CORBA::Boolean was_removed = pd_data[i].events & FDCLRD;
// fd was marked to be removed while we were in the poll
// we will set it again
if (was_removed) {
pd_to_be_removed_n--;
pd_data[i].events &= ~FDCLRD;
}
if (data_in_buffer && !(pd_data[i].events & FDDIB)) {
pd_data[i].events |= FDDIB;
pd_dib_n++;
}
if (now || data_in_buffer) {
// fd has to be in the part of the set, which will
// be scheduled for the next poll invocation
if (i >= pd_length_now) {
// it is not there,
// so we will move it to that part
SWAP( pd_data, pd_index, pd_length_now, i )
pd_length_now++;
}
return;
}
// fd has to be in the part of the set, which will NOT
// be scheduled for the next poll invocation
if ( was_removed && i < pd_length_now) {
// fd is now in the part of the set, which will
// be scheduled for the next poll invocation
// and it is marked as removed
if (i < pd_used_by_poll_n) {
// the poll is running
// we mark the fd as to be moved to another part of the fd set
pd_data[i].events |= FDMVD;
pd_to_be_moved_n++;
}
else {
// the poll is NOT running
// we move the fd to another part of the fd set
if (pd_data[i].events & FDMVD) {
pd_data[i].events &= ~FDMVD;
pd_to_be_moved_n--;
}
pd_length_now--;
if (i != pd_length_now) {
SWAP( pd_data, pd_index, pd_length_now, i )
}
}
}
return;
}
// fd is NOT in the set
// lets put it there
if (now || data_in_buffer) {
// put fd to the first part of the array
// it will be used for the next poll invocation
if (pd_length_full != pd_length_now) {
pd_data[pd_length_full] = pd_data[pd_length_now];
pd_index[pd_data[pd_length_full].fd] = pd_length_full;
}
pd_data[pd_length_now].fd = fd;
pd_data[pd_length_now].events = POLLIN;
if (data_in_buffer) {
pd_data[pd_length_now].events |= FDDIB;
pd_dib_n++;
}
pd_data[pd_length_now].revents = 0;
pd_index[fd] = pd_length_now;
pd_length_full++;
pd_length_now++;
}
else {
// put fd to the last part of the array
// it will not be used for the next poll invocation
pd_data[pd_length_full].fd = fd;
pd_data[pd_length_full].events = POLLIN;
pd_data[pd_length_full].revents = 0;
pd_index[fd] = pd_length_full;
pd_length_full++;
}
}
/////////////////////////////////////////////////////////////////////////
void
SocketHandleSet::Remove( SocketHandle_t fd )
{
short i = pd_index[fd];
if ( i == -1 )
return;
if (pd_data[i].events & FDDIB) {
pd_data[i].events &= ~FDDIB;
pd_dib_n--;
}
if (pd_data[i].events & FDMVD) {
pd_data[i].events &= ~FDMVD;
pd_to_be_moved_n--;
}
if (pd_data[i].events & FDCLRD) {
pd_data[i].events &= ~FDCLRD;
pd_to_be_removed_n--;
}
if ( i < pd_used_by_poll_n ) {
// this fd is used by the poll function - be careful
// don't modify array, which is now used by poll
// just mark the fd by setting the FDCLRD bit in the events field
// it will be removed later in the SocketCollection::Select
pd_data[i].events |= FDCLRD;
pd_to_be_removed_n++;
}
else {
// fd is NOT used by the poll function - simply remove fd
pd_index[fd] = -1;
pd_length_full--;
if (i < pd_length_now) {
pd_length_now--;
if (i != pd_length_now) {
pd_data[i] = pd_data[pd_length_now];
pd_index[pd_data[i].fd] = i;
}
if (pd_length_now != pd_length_full) {
pd_data[pd_length_now] = pd_data[pd_length_full];
pd_index[pd_data[pd_length_now].fd] = pd_length_now;
}
}
else {
if (i != pd_length_full) {
pd_data[i] = pd_data[pd_length_full];
pd_index[pd_data[i].fd] = i;
}
}
}
}
/////////////////////////////////////////////////////////////////////////
void
SocketHandleSet::Move( SocketHandle_t fd )
{
int i = pd_index[fd];
if (!(pd_data[i].events & FDMVD))
return;
// socket has been marked as moved when the poll was running
// now move it to the end of the array
pd_data[i].events &= ~FDMVD;
pd_to_be_moved_n--;
pd_length_now--;
if (i != pd_length_now) {
SWAP( pd_data, pd_index, pd_length_now, i )
}
}
/////////////////////////////////////////////////////////////////////////
CORBA::Boolean
SocketHandleSet::isSet( SocketHandle_t fd, CORBA::Boolean data_in_buffer )
{
int i = pd_index[fd];
if ( ( i != -1 )
&& ( data_in_buffer == 0 || (pd_data[i].events & FDDIB) )
&& !( pd_data[i].events & FDCLRD )
&& !( pd_data[i].events & FDMVD ) )
return 1;
else
return 0;
}
/////////////////////////////////////////////////////////////////////////
int
SocketSetCloseOnExec(SocketHandle_t sock) {
# if defined(__vxWorks__)
// Not supported on vxWorks
return 0;
# elif defined(__WIN32__)
SetHandleInformation((HANDLE)sock, HANDLE_FLAG_INHERIT, 0);
return 0;
# else
int fl = FD_CLOEXEC;
if (fcntl(sock,F_SETFD,fl) == RC_SOCKET_ERROR) {
return RC_INVALID_SOCKET;
}
return 0;
# endif
}
/////////////////////////////////////////////////////////////////////////
unsigned long SocketCollection::scan_interval_sec = 0;
unsigned long SocketCollection::scan_interval_nsec = 50*1000*1000;
CORBA::ULong SocketCollection::hashsize = 103;
/////////////////////////////////////////////////////////////////////////
SocketCollection::SocketCollection() :
pd_poll_cond(&pd_fdset_lock),
pd_abs_sec(0), pd_abs_nsec(0),
pd_pipe_read(-1), pd_pipe_write(-1), pd_pipe_full(0),
pd_refcount(1)
{
#ifdef UnixArchitecture
# ifdef __vxWorks__
if (pipeDrv() == OK) {
if (pipeDevCreate("/pipe/SocketCollection",10,sizeof(int)) == OK) {
pd_pipe_read = pd_pipe_write = open("/pipe/SocketCollection",
O_RDWR,0);
}
}
if (pd_pipe_read <= 0) {
omniORB::logs(5, "Unable to create pipe for SocketCollection.");
}
# else
int filedes[2];
int r = pipe(filedes);
if (r != -1) {
pd_pipe_read = filedes[0];
pd_pipe_write = filedes[1];
}
else {
omniORB::logs(5, "Unable to create pipe for SocketCollection.");
}
# endif
#endif
if (pd_pipe_read > 0) {
omni_tracedmutex_lock sync(pd_fdset_lock);
pd_fdset.Add( pd_pipe_read, 1, 0 );
}
pd_hash_table = new SocketLink* [hashsize];
for (CORBA::ULong i=0; i < hashsize; i++)
pd_hash_table[i] = 0;
}
/////////////////////////////////////////////////////////////////////////
SocketCollection::~SocketCollection()
{
pd_refcount = -1;
delete [] pd_hash_table;
#ifdef UnixArchitecture
# ifdef __vxWorks__
// *** How do we clean up on vxWorks?
# else
close(pd_pipe_read);
close(pd_pipe_write);
# endif
#endif
}
/////////////////////////////////////////////////////////////////////////
void
SocketCollection::setSelectable(SocketHandle_t sock,
CORBA::Boolean now,
CORBA::Boolean data_in_buffer,
CORBA::Boolean hold_lock) {
ASSERT_OMNI_TRACEDMUTEX_HELD(pd_fdset_lock, hold_lock);
if (!hold_lock) pd_fdset_lock.lock();
pd_fdset.Add( sock, now, data_in_buffer );
if (now || data_in_buffer) {
// Wake up the thread blocked in select() if we can.
if (pd_pipe_write > 0) {
#ifdef UnixArchitecture
if (!pd_pipe_full) {
char data = '\0';
pd_pipe_full = 1;
write(pd_pipe_write, &data, 1);
}
#endif
}
else {
pd_poll_cond.signal();
}
}
if (!hold_lock) pd_fdset_lock.unlock();
}
/////////////////////////////////////////////////////////////////////////
void
SocketCollection::clearSelectable(SocketHandle_t sock) {
omni_tracedmutex_lock sync(pd_fdset_lock);
pd_fdset.Remove( sock );
}
#ifdef GDB_DEBUG
static
int
do_poll(pollfd * r, unsigned int l, int t) {
return poll(r,l,t);
}
#endif
/////////////////////////////////////////////////////////////////////////
CORBA::Boolean
SocketCollection::Select() {
struct timeval timeout;
unsigned int total;
pollfd * rfds;
again:
// (pd_abs_sec,pd_abs_nsec) define the absolute time when we switch fdset
SocketSetTimeOut(pd_abs_sec,pd_abs_nsec,timeout);
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
omni_thread::get_time(&pd_abs_sec,&pd_abs_nsec,
scan_interval_sec,scan_interval_nsec);
timeout.tv_sec = scan_interval_sec;
timeout.tv_usec = scan_interval_nsec / 1000;
omni_tracedmutex_lock sync(pd_fdset_lock);
rfds = pd_fdset.enterPoll(total);
pd_fdset.Reschedule();
}
else {
omni_tracedmutex_lock sync(pd_fdset_lock);
rfds = pd_fdset.enterPoll(total);
}
int nready;
if (total != 0) {
#ifndef GDB_DEBUG
nready = poll(rfds,total,timeout.tv_sec*1000+(timeout.tv_usec/1000));
#else
nready = do_poll(rfds,total,timeout.tv_sec*1000+(timeout.tv_usec/1000));
#endif
}
else {
omni_tracedmutex_lock sync(pd_fdset_lock);
pd_poll_cond.timedwait(pd_abs_sec,pd_abs_nsec);
// The condition variable should be poked so we are woken up
// immediately when there is something to monitor. We cannot use
// select(0,0,0,0,&timeout) because win32 doesn't like it.
nready = 0; // simulate a timeout
}
if (nready == RC_SOCKET_ERROR) {
omni_tracedmutex_lock sync(pd_fdset_lock);
pd_fdset.exitPoll();
if (ERRNO == RC_EBADF) {
omniORB::logs(20, "poll() returned EBADF, retrying");
goto again;
}
else if (ERRNO != RC_EINTR) {
return 0;
}
else {
return 1;
}
}
if (total != 0) {
omni_tracedmutex_lock sync(pd_fdset_lock);
pd_fdset.exitPoll();
while (pd_fdset.needsProcessing() || nready) {
if (rfds->revents) {
nready--;
}
if (rfds->events & FDCLRD) {
pd_fdset.Remove(rfds->fd);
continue;
}
if (rfds->events & FDMVD) {
rfds->revents = 0;
pd_fdset.Move(rfds->fd);
continue;
}
if (rfds->revents || (rfds->events & FDDIB)) {
if (rfds->fd == pd_pipe_read) {
#ifdef UnixArchitecture
char data;
read(pd_pipe_read, &data, 1);
pd_pipe_full = 0;
#endif
}
else {
int fd = rfds->fd;
pd_fdset.Remove(fd);
if (!notifyReadable(fd)) return 0;
continue;
}
}
rfds++;
}
}
return 1;
}
/////////////////////////////////////////////////////////////////////////
CORBA::Boolean
SocketCollection::Peek(SocketHandle_t sock) {
{
omni_tracedmutex_lock sync(pd_fdset_lock);
// Do nothing if this socket is not set to be monitored.
if (!pd_fdset.isSet(sock,0))
return 0;
// If data in buffer is set, do callback straight away.
if (pd_fdset.isSet(sock,1)) {
pd_fdset.Remove(sock);
return 1;
}
}
struct timeval timeout;
// select on the socket for half the time of scan_interval, if no request
// arrives in this interval, we just let AcceptAndMonitor take care
// of it.
timeout.tv_sec = scan_interval_sec / 2;
timeout.tv_usec = scan_interval_nsec / 1000 / 2;
if (scan_interval_sec % 2) timeout.tv_usec += 500000;
pollfd rfds;
do {
rfds.fd = sock;
rfds.events = POLLIN;
#ifndef GDB_DEBUG
int nready = poll(&rfds,1,timeout.tv_sec*1000+(timeout.tv_usec/1000));
#else
int nready = do_poll(&rfds,1,timeout.tv_sec*1000+(timeout.tv_usec/1000));
#endif
if (nready == RC_SOCKET_ERROR) {
if (ERRNO != RC_EINTR) {
break;
}
else {
continue;
}
}
// Reach here if nready >= 0
if (rfds.revents) {
omni_tracedmutex_lock sync(pd_fdset_lock);
// Are we still interested?
if (pd_fdset.isSet(sock,0)) {
pd_fdset.Remove(sock);
return 1;
}
}
break;
} while(1);
return 0;
}
/////////////////////////////////////////////////////////////////////////
void
SocketCollection::incrRefCount()
{
omni_tracedmutex_lock sync(pd_fdset_lock);
OMNIORB_ASSERT(pd_refcount > 0);
pd_refcount++;
}
/////////////////////////////////////////////////////////////////////////
void
SocketCollection::decrRefCount()
{
int refcount;
{
omni_tracedmutex_lock sync(pd_fdset_lock);
OMNIORB_ASSERT(pd_refcount > 0);
refcount = --pd_refcount;
}
if (refcount == 0) delete this;
}
/////////////////////////////////////////////////////////////////////////
void
SocketCollection::addSocket(SocketLink* conn)
{
omni_tracedmutex_lock sync(pd_fdset_lock);
SocketLink** head = &(pd_hash_table[conn->pd_socket % hashsize]);
conn->pd_next = *head;
*head = conn;
OMNIORB_ASSERT(pd_refcount > 0);
pd_refcount++;
}
/////////////////////////////////////////////////////////////////////////
SocketLink*
SocketCollection::removeSocket(SocketHandle_t sock)
{
int refcount = 0; // Initialise to stop over-enthusiastic compiler warnings
SocketLink* l = 0;
{
omni_tracedmutex_lock sync(pd_fdset_lock);
SocketLink** head = &(pd_hash_table[sock % hashsize]);
while (*head) {
if ((*head)->pd_socket == sock) {
l = *head;
*head = (*head)->pd_next;
OMNIORB_ASSERT(pd_refcount > 0);
refcount = --pd_refcount;
break;
}
head = &((*head)->pd_next);
}
}
if (l && refcount == 0) delete this;
return l;
}
/////////////////////////////////////////////////////////////////////////
SocketLink*
SocketCollection::findSocket(SocketHandle_t sock,
CORBA::Boolean hold_lock) {
if (!hold_lock) pd_fdset_lock.lock();
SocketLink* l = 0;
SocketLink** head = &(pd_hash_table[sock % hashsize]);
while (*head) {
if ((*head)->pd_socket == sock) {
l = *head;
break;
}
head = &((*head)->pd_next);
}
if (!hold_lock) pd_fdset_lock.unlock();
return l;
}
OMNI_NAMESPACE_END(omni)
-------------- next part --------------
// -*- Mode: C++; -*-
// Package : omniORB
// SocketCollection.h Created on: 23 Jul 2003
// Author : Serguei Kolos
//
// Copyright (C) 2001 AT&T Laboratories Cambridge
//
// This file is part of the omniORB library
//
// The omniORB library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the Free
// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
// 02111-1307, USA
//
//
// Description:
// *** PROPRIETORY INTERFACE ***
//
/*
*/
#ifndef __SOCKETCOLLECTION_H__
#define __SOCKETCOLLECTION_H__
////////////////////////////////////////////////////////////////////////
// Platform feature selection
#define SOCKNAME_SIZE_T OMNI_SOCKNAME_SIZE_T
#define USE_NONBLOCKING_CONNECT
#if defined(__linux__)
# define USE_POLL
#endif
#if defined(__sunos__)
# define USE_POLL
#endif
#if defined(__hpux__)
# if __OSVERSION__ >= 11
# define USE_POLL
# endif
# define USE_FAKE_INTERRUPTABLE_RECV
#endif
#if defined(__freebsd__)
# define USE_POLL
#endif
#if defined(__WIN32__)
# define USE_FAKE_INTERRUPTABLE_RECV
#endif
#if defined(__irix__)
# define USE_POLL
#endif
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
// win32 API
//
#if defined(__WIN32__)
# include <sys/types.h>
# include <libcWrapper.h>
# define RC_INADDR_NONE INADDR_NONE
# define RC_INVALID_SOCKET INVALID_SOCKET
# define RC_SOCKET_ERROR SOCKET_ERROR
# define INETSOCKET PF_INET
# define CLOSESOCKET(sock) closesocket(sock)
# define SHUTDOWNSOCKET(sock) ::shutdown(sock,2)
# define ERRNO ::WSAGetLastError()
# define EINPROGRESS WSAEWOULDBLOCK
# define RC_EINTR WSAEINTR
# define RC_EBADF WSAENOTSOCK
# define NEED_SOCKET_SHUTDOWN_FLAG 1
OMNI_NAMESPACE_BEGIN(omni)
typedef SOCKET SocketHandle_t;
OMNI_NAMESPACE_END(omni)
#else
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
// unix(ish)
//
# if defined(__vxWorks__)
# include <sockLib.h>
# include <hostLib.h>
# include <ioLib.h>
# include <netinet/tcp.h>
# else
# include <sys/time.h>
# endif
# include <sys/socket.h>
# include <netinet/in.h>
# include <netinet/tcp.h>
# include <arpa/inet.h>
# include <unistd.h>
# include <sys/types.h>
# include <errno.h>
# include <libcWrapper.h>
# if defined(USE_POLL)
# include <poll.h>
# endif
# if !defined(__VMS)
# include <fcntl.h>
# endif
# if defined (__uw7__)
# ifdef shutdown
# undef shutdown
# endif
# endif
# if defined(__VMS) && defined(USE_tcpSocketVaxRoutines)
# include "tcpSocketVaxRoutines.h"
# undef accept
# undef recv
# undef send
# define accept(a,b,c) tcpSocketVaxAccept(a,b,c)
# define recv(a,b,c,d) tcpSocketVaxRecv(a,b,c,d)
# define send(a,b,c,d) tcpSocketVaxSend(a,b,c,d)
# endif
# ifdef __rtems__
extern "C" int select (int,fd_set*,fd_set*,fd_set*,struct timeval *);
# endif
# define RC_INADDR_NONE ((CORBA::ULong)-1)
# define RC_INVALID_SOCKET (-1)
# define RC_SOCKET_ERROR (-1)
# define INETSOCKET AF_INET
# define CLOSESOCKET(sock) close(sock)
# if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
# define SHUTDOWNSOCKET(sock) ::shutdown(sock,2)
# elif defined(__osf1__) && defined(__alpha__)
# define SHUTDOWNSOCKET(sock) ::shutdown(sock,2)
# else
// XXX none of the above, calling shutdown() may not have the
// desired effect.
# define SHUTDOWNSOCKET(sock) ::shutdown(sock,2)
# endif
# define ERRNO errno
# define RC_EINTR EINTR
# define RC_EBADF EBADF
OMNI_NAMESPACE_BEGIN(omni)
typedef int SocketHandle_t;
OMNI_NAMESPACE_END(omni)
#endif
#if defined(NEED_GETHOSTNAME_PROTOTYPE)
extern "C" int gethostname(char *name, int namelen);
#endif
OMNI_NAMESPACE_BEGIN(omni)
// This class is a replacement for the two fd sets used
// by the select-based SocketCollection implementation
// It holds an array of pollfd structures, which consits
// from two parts:
// 1. elements [0..length_now-1] will be used
// for the next poll invocation
// 2. elements [length_now..length_full-1] will be passed
// to poll only if the Reschedule method has been called
class SocketHandleSet
{
public:
SocketHandleSet();
~SocketHandleSet();
// Adds socket to the array of pollfd structures
// If now == 1, this socket is set to the first part of the
// array and will be used for the next poll invocation
// If now == 0 the socket will be stored in the last part and will
// be passed to poll only if the Reschedule method is called
// If data_in_buffer == 1 the FDDTNBFFR bit will be set in the
// events field of the corresponding pollfd structure
void Add( SocketHandle_t fd, CORBA::Boolean now, CORBA::Boolean data_in_buffer );
// Removes socket from the array of pollfd structures
void Remove( SocketHandle_t fd );
// Moves socket from the current position to the end of array of pollfd structures
void Move( SocketHandle_t fd );
// Checks if socket exist in the array of pollfd structures
// If data_in_buffer == 1, function returns 1 only if socket has the FDDTNBFFR
// bit set in the events field of the corresponding pollfd structure
// If data_in_buffer == 0, function returns 1 if socket is in
// the array of pollfd structures
CORBA::Boolean isSet( SocketHandle_t fd, CORBA::Boolean data_in_buffer );
// Must be called before entering the poll
inline pollfd * enterPoll( unsigned int & length ) {
length = pd_length_now;
pd_used_by_poll_n = pd_length_now;
return pd_data;
}
// Must be called after exiting the poll
inline void exitPoll( ) {
pd_used_by_poll_n = 0;
}
// Schedule all the sockets for the next poll invocation
inline void Reschedule( ) {
pd_length_now = pd_length_full;
}
// Returns 1 if there are sockets in the array, which were
// requested to be moved or removed, or if there are
// unprocessed sockets marked as having data in buffer
inline CORBA::Boolean needsProcessing( ) {
return ( pd_to_be_removed_n
|| pd_to_be_moved_n
|| pd_dib_n );
}
private:
pollfd * pd_data;
short * pd_index; // takes more memory, but allows to find
// sockets int the data array very efficiently
unsigned short pd_length_full;
unsigned short pd_length_now;
unsigned short pd_used_by_poll_n;
unsigned short pd_to_be_removed_n;
unsigned short pd_to_be_moved_n;
unsigned short pd_dib_n;
};
class SocketCollection;
extern void SocketSetTimeOut(unsigned long abs_sec,
unsigned long abs_nsec,struct timeval& t);
extern int SocketSetnonblocking(SocketHandle_t sock);
extern int SocketSetblocking(SocketHandle_t sock);
extern int SocketSetCloseOnExec(SocketHandle_t sock);
class SocketLink {
public:
SocketLink(SocketHandle_t s)
: pd_socket(s),
#ifdef NEED_SOCKET_SHUTDOWN_FLAG
pd_shutdown(0),
#endif
pd_next(0) {}
~SocketLink() {}
friend class SocketCollection;
protected:
SocketHandle_t pd_socket;
#ifdef NEED_SOCKET_SHUTDOWN_FLAG
// select() on Windows does not return an error after the socket has
// shutdown, so we have to store an extra flag here.
CORBA::Boolean pd_shutdown;
#endif
private:
SocketLink* pd_next;
};
class SocketCollection {
public:
SocketCollection();
protected:
virtual ~SocketCollection();
virtual CORBA::Boolean notifyReadable(SocketHandle_t) = 0;
// Callback used by Select(). This method is called while holding
// pd_fdset_lock.
public:
void setSelectable(SocketHandle_t sock, CORBA::Boolean now,
CORBA::Boolean data_in_buffer,
CORBA::Boolean hold_lock=0);
// Indicates that this socket should be watched by a poll()
// so that any new data arriving on the connection will be noted.
// If now == 1, immediately make this socket part of the poll
// set.
// If data_in_buffer == 1, treat this socket as if there are
// data available from the connection already.
// If hold_lock == 1, pd_fdset_lock is already held.
void clearSelectable(SocketHandle_t);
// Indicates that this connection need not be watched any more.
CORBA::Boolean Select();
// Returns TRUE(1) if the Select() has successfully done a scan.
// otherwise returns false(0) to indicate that an error has been
// detected and this function should not be called again.
//
// For each of the sockets that has been marked watchable and indeed
// has become readable, call notifyReadable() with the socket no.
// as the argument.
CORBA::Boolean Peek(SocketHandle_t sock);
// Do nothing and returns immediately if the socket has not been
// set to be watched by a previous setSelectable().
// Otherwise, monitor the socket's status for a short time.
// Returns TRUE(1) if the socket becomes readable.
// otherwise returns FALSE(0).
void incrRefCount();
void decrRefCount();
void addSocket(SocketLink* conn);
// Add this socket to the collection. <conn> is associated with the
// socket and should be added to the table hashed by the socket number.
// Increments this collection's refcount.
SocketLink* removeSocket(SocketHandle_t sock);
// Remove the socket from this collection. Return the socket which has
// been removed. Return 0 if the socket is not found.
// Decrements this collection's refcount if a socket is removed.
SocketLink* findSocket(SocketHandle_t sock,
CORBA::Boolean hold_lock=0);
// Returns the connection that is associated with this socket.
// Return 0 if this socket cannot be found in the hash table.
// if hold_lock == 1, the caller has already got the lock on pd_fdset_lock.
// (use purely by member functions.)
static unsigned long scan_interval_sec;
static unsigned long scan_interval_nsec;
static CORBA::ULong hashsize;
private:
SocketHandleSet pd_fdset;
omni_tracedmutex pd_fdset_lock;
omni_tracedcondition pd_poll_cond; // timedwait on if nothing to select
unsigned long pd_abs_sec;
unsigned long pd_abs_nsec;
int pd_pipe_read;
int pd_pipe_write;
CORBA::Boolean pd_pipe_full;
int pd_refcount;
protected:
SocketLink** pd_hash_table;
};
OMNI_NAMESPACE_END(omni)
#endif // __SOCKETCOLLECTION_H__
More information about the omniORB-dev
mailing list