Socket changes for portability
Billy Newport
bnewport@eps.agfa.be
Tue, 2 Dec 1997 13:18:38 +0100
This is a multi-part message in MIME format.
------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: multipart/alternative;
boundary="----=_NextPart_001_01F0_01BCFF24.CDE73B00"
------=_NextPart_001_01F0_01BCFF24.CDE73B00
Content-Type: text/plain;
charset="iso-8859-1"
Content-Transfer-Encoding: quoted-printable
I was wondering if the following changes could be including in OmniORBs =
standard release.
I've abstract all socket operations to a base class (baseSocket). I then =
changed tcpSocket_NT to use this abstraction instead of bsd calls. I =
then implemented a subclass of baseSocket for the normal NT bsd/WinSock =
calls. If you implement Sun ATMOS versions of this abstract class they =
should all work with the new tcpSocket_NT code.
There is a factory method baseSocket::getSocket which returns a new =
socket object for the platform.
This should improve the portability and maintainability of the code. =
I've attached the changes. Also, if some-one could do some-thing similar =
to OmniThread to avoid the IMPLEMENTATION macros in it, that would also =
improve matters. I mean make OmniThread a pure abstract base class along =
with all omni_xxx mutex classes. Include a factory method on each one to =
construct the platform specific version of it.
Billy Newport
------=_NextPart_001_01F0_01BCFF24.CDE73B00
Content-Type: text/html;
charset="iso-8859-1"
Content-Transfer-Encoding: quoted-printable
<!DOCTYPE HTML PUBLIC "-//W3C//DTD W3 HTML//EN">
<HTML>
<HEAD>
<META content=3Dtext/html;charset=3Diso-8859-1 =
http-equiv=3DContent-Type>
<META content=3D'"MSHTML 4.71.1712.3"' name=3DGENERATOR>
</HEAD>
<BODY bgColor=3D#ffffff>
<DIV><FONT color=3D#000000 face=3DArial size=3D2>I was wondering if the =
following=20
changes could be including in OmniORBs standard release.</FONT></DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2></FONT> </DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2>I've abstract all =
socket operations=20
to a base class (baseSocket). I then changed tcpSocket_NT to use this=20
abstraction instead of bsd calls. I then implemented a subclass of =
baseSocket=20
for the normal NT bsd/WinSock calls. If you implement Sun ATMOS versions =
of this=20
abstract class they should all work with the new tcpSocket_NT =
code.</FONT></DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2></FONT> </DIV>
<DIV><FONT face=3DArial size=3D2>There is a factory method =
baseSocket::getSocket=20
which returns a new socket object for the platform.</FONT></DIV>
<DIV><FONT face=3DArial size=3D2></FONT> </DIV>
<DIV><FONT face=3DArial size=3D2>This should improve the portability and =
maintainability of the code. I've attached the changes. Also, if =
some-one could=20
do some-thing similar to OmniThread to avoid the IMPLEMENTATION macros =
in it,=20
that would also improve matters. I mean make OmniThread a pure abstract =
base=20
class along with all omni_xxx mutex classes. Include a factory method on =
each=20
one to construct the platform specific version of it.</FONT></DIV>
<DIV><FONT face=3DArial size=3D2></FONT> </DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2>Billy =
Newport</FONT></DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2></FONT> </DIV>
<DIV> </DIV></BODY></HTML>
------=_NextPart_001_01F0_01BCFF24.CDE73B00--
------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
name="baseSocket.h"
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment;
filename="baseSocket.h"
#ifndef BASE_SOCKET_H
#define BASE_SOCKET_H
class baseSocket
{
public:
enum SocketErrors { eOK, eINTERRUPTED, eERROR, eINVALID};
static const int SocketError;
protected:
SocketErrors m_errno;
public:
SocketErrors getErrNo() const
{
return m_errno;
};
virtual bool connect(tcpSocketEndpoint *pPoint) = 0;
virtual bool bind(tcpSocketEndpoint *pPoint) = 0;
virtual ~baseSocket();
virtual baseSocket *accept() = 0;
virtual int recv(char *pBuffer, int NumBytes) = 0;
virtual int send(const char *pBuffer, int NumBytes) = 0;
virtual void shutdown() = 0;
static baseSocket *getSocket();
static bool Initialise();
static bool Terminate();
};
#endif
------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
name="bsdSocket.cc"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment;
filename="bsdSocket.cc"
#include <omniORB2/CORBA.h>
#include "tcpSocket_NT.h"
#include "bsdSocket.h"
#include <winsock.h>
#include <sys/types.h>
#include <errno.h>
#include <limits.h>
#include <stdio.h>
#include "libcWrapper.h"
#ifdef QWERTY
baseSocket *baseSocket::getSocket()
{
return new NTsocket();
}
bool baseSocket::Initialise()
{
return true;
}
bool baseSocket::Terminate()
{
return true;
}
#endif
NTsocket::NTsocket()
{
m_socket =3D -1;
}
NTsocket::NTsocket(int socket)
{
m_socket =3D socket;
}
NTsocket::~NTsocket()
{
if(m_socket !=3D -1)
closesocket(m_socket);
m_socket =3D -1;
}
bool NTsocket::connect(tcpSocketEndpoint *r)
{
struct sockaddr_in raddr;
LibcWrapper::hostent_var h;
int rc;
if (! LibcWrapper::isipaddr( (char*) r->host()))
{
if (LibcWrapper::gethostbyname((char *)r->host(),h,rc) < 0)=20
{
// XXX look at rc to decide what to do or if to give up what errno
// XXX to return
// XXX For the moment, just return EINVAL
m_errno =3D Convert_errno(EINVAL);
return false;
}
// We just pick the first address in the list, may be we should go
// through the list and if possible pick the one that is on the same
// subnet.
memcpy((void*)&raddr.sin_addr,
(void*)h.hostent()->h_addr_list[0],
sizeof(raddr.sin_addr));
}
else
{
// The machine name is already an IP address
CORBA::ULong ip_p;
if ( (ip_p =3D inet_addr( (char*) r->host() )) =3D=3D INADDR_NONE)
{
m_errno =3D Convert_errno(errno);
return false;
}
memcpy((void*) &raddr.sin_addr, (void*) &ip_p, =
sizeof(raddr.sin_addr));
}
raddr.sin_family =3D PF_INET;
raddr.sin_port =3D htons(r->port());
if ((m_socket =3D socket(PF_INET,SOCK_STREAM,0)) =3D=3D INVALID_SOCKET) =
{
m_errno =3D Convert_errno(errno);
return false;
}
if (::connect(m_socket,(struct sockaddr *)&raddr,
sizeof(struct sockaddr_in)) =3D=3D baseSocket::SocketError)=20
{
m_errno =3D Convert_errno(errno);
::closesocket(m_socket);
m_socket =3D -1;
return false;
}
return true;
}
baseSocket *NTsocket::accept()
{
int new_sock;
struct sockaddr_in raddr;
int l;
l =3D sizeof(struct sockaddr_in);
if ((new_sock =3D ::accept(m_socket,(struct sockaddr *)&raddr,&l)) =
=3D=3D INVALID_SOCKET )=20
{
m_errno =3D Convert_errno(errno);
return 0;
}
NTsocket *rc =3D new NTsocket(new_sock);
return rc;
}
baseSocket::SocketErrors NTsocket::Convert_errno(int err) const
{
SocketErrors rc =3D eERROR;
switch(err)
{
case EINTR:
rc =3D eINTERRUPTED;
break;
case EINVAL:
rc =3D eINVALID;
break;
}
return rc;
}
int NTsocket::recv(char *pBuffer, int NumBytes)
{
int rx;
printf("Reading %d from %p\n", NumBytes, pBuffer);
if ((rx =3D ::recv(m_socket,pBuffer,NumBytes,0)) =3D=3D =
baseSocket::SocketError)
m_errno =3D Convert_errno(errno);
return rx;
}
int NTsocket::send(const char *pBuffer, int NumBytes)
{
int tx;
printf("Sending %d from %p\n", NumBytes, pBuffer);
if ((tx =3D ::send(m_socket,pBuffer,NumBytes,0)) =3D=3D =
baseSocket::SocketError)
m_errno =3D Convert_errno(errno);
return tx;
}
void NTsocket::shutdown()
{
::shutdown(m_socket,2);
}
bool NTsocket::bind(tcpSocketEndpoint *me)
{
struct sockaddr_in myaddr;
if ((m_socket =3D socket(PF_INET,SOCK_STREAM,0)) =3D=3D INVALID_SOCKET) =
{
m_errno =3D Convert_errno(errno);
return false;
}
myaddr.sin_family =3D PF_INET;
myaddr.sin_addr.s_addr =3D INADDR_ANY;
printf("Port number wanted is %d\n", me->port());
myaddr.sin_port =3D htons(me->port());
if (me->port())=20
{
int valtrue =3D 1;
if (setsockopt(m_socket,SOL_SOCKET,
SO_REUSEADDR,(char*)&valtrue,sizeof(int)) =3D=3D =
baseSocket::SocketError)
{
m_errno =3D Convert_errno(errno);
closesocket(m_socket);
m_socket =3D -1;
return false;
}
}
if (::bind(m_socket,(struct sockaddr *)&myaddr,
sizeof(struct sockaddr_in)) =3D=3D baseSocket::SocketError)=20
{
m_errno =3D Convert_errno(errno);
closesocket(m_socket);
m_socket =3D -1;
return false;
}
// Make it a passive socket
if (listen(m_socket,5) =3D=3D baseSocket::SocketError)=20
{
m_errno =3D Convert_errno(errno);
closesocket(m_socket);
m_socket =3D -1;
return false;
}
{
int l;
l =3D sizeof(struct sockaddr_in);
if (getsockname(m_socket,(struct sockaddr *)&myaddr,&l) =3D=3D =
baseSocket::SocketError)=20
{
m_errno =3D Convert_errno(errno);
closesocket(m_socket);
m_socket =3D -1;
return false;
}
me->port(ntohs(myaddr.sin_port));
printf("Port number given is %d\n", me->port());
char self[64];
if (gethostname(&self[0],64) =3D=3D baseSocket::SocketError)=20
{
throw omniORB::fatalException(__FILE__,__LINE__,
"Cannot get the name of this host");
}
LibcWrapper::hostent_var h;
int rc;
if (LibcWrapper::gethostbyname(self,h,rc) < 0)=20
{
throw omniORB::fatalException(__FILE__,__LINE__,
"Cannot get the address of this host");
}
memcpy((void *)&myaddr.sin_addr,
(void *)h.hostent()->h_addr_list[0],
sizeof(myaddr.sin_addr));
char ipaddr[16];
sprintf(ipaddr,"%d.%d.%d.%d",
(int)((ntohl(myaddr.sin_addr.s_addr) & 0xff000000) >> 24),
(int)((ntohl(myaddr.sin_addr.s_addr) & 0x00ff0000) >> 16),
(int)((ntohl(myaddr.sin_addr.s_addr) & 0x0000ff00) >> 8),
(int)((ntohl(myaddr.sin_addr.s_addr) & 0x000000ff)));
me->host((const CORBA::Char *) ipaddr);
}
return true;
}
------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
name="bsdSocket.h"
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment;
filename="bsdSocket.h"
#ifndef bsdSocket_H
#define bsdSocket_H
class NTsocket : public baseSocket
{
protected:
int m_socket;
SocketErrors Convert_errno(int ErrorNumber) const;
NTsocket(int socket);
public:
bool connect(tcpSocketEndpoint *pPoint);
bool bind(tcpSocketEndpoint *pPoint);
NTsocket();
~NTsocket();
baseSocket *accept();
int recv(char *pBuffer, int NumBytes);
int send(const char *pBuffer, int NumBytes);
void shutdown();
};
#endif
------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
name="tcpSocket_NT.cc"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment;
filename="tcpSocket_NT.cc"
// -*- Mode: C++; -*-
// Package : omniORB2
// tcpSocket_NT.cc Created on: 4/2/97
// Author : Eoin Carroll (ewc)
//
// Copyright (C) 1996, 1997 Olivetti & Oracle Research Laboratory
//
// This file is part of the omniORB library
//
// The omniORB library is free software; you can redistribute it =
and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the Free
// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA =
=20
// 02111-1307, USA
//
//
// Description:
// Implementation of the Strand using TCP/IP and WinSock interface
//=09
/*
$Log: /Apogee/dev/OmniORB/lib/omniORB2/tcpSocket_NT.cc $
*=20
* 2 11/28/97 3:12p Bnewport
* Now works with AGS for possible Mac port
*=20
* 1 11/26/97 2:51p Bnewport
// Revision 1.6 1997/05/06 15:30:32 sll
// Public release.
//
*/
#include <omniORB2/CORBA.h>
#include "tcpSocket_NT.h"
#include <limits.h>
#include <stdio.h>
#include "libcWrapper.h"
#define DO_NOT_AVOID_MISALIGNMENT =20
// Size of transmit and receive buffers
const=20
unsigned int=20
tcpSocketStrand::buffer_size =3D 8192 + (int)omni::max_alignment;
tcpSocketStrand::tcpSocketStrand(tcpSocketRope *rope,
tcpSocketEndpoint *r,
CORBA::Boolean heapAllocated)
: Strand(rope,heapAllocated)
{
pd_socket =3D baseSocket::getSocket();
if(pd_socket->connect(r) =3D=3D false)
{
throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_NO);
}
pd_tx_buffer =3D (void *) new char[tcpSocketStrand::buffer_size];
pd_tx_begin =3D pd_tx_end =3D pd_tx_reserved_end =3D pd_tx_buffer;
pd_rx_buffer =3D (void *) new char[tcpSocketStrand::buffer_size];
pd_rx_begin =3D pd_rx_end =3D pd_rx_received_end =3D pd_rx_buffer;
return;
}
tcpSocketStrand::tcpSocketStrand(tcpSocketRope *r,
tcpSocketHandle_t sock,
CORBA::Boolean heapAllocated)
: Strand(r,heapAllocated)
{
pd_socket =3D sock;
pd_tx_buffer =3D (void *) new char[tcpSocketStrand::buffer_size];
pd_tx_begin =3D pd_tx_end =3D pd_tx_reserved_end =3D pd_tx_buffer;
pd_rx_buffer =3D (void *) new char[tcpSocketStrand::buffer_size];
pd_rx_begin =3D pd_rx_end =3D pd_rx_received_end =3D pd_rx_buffer;
return;
}
tcpSocketStrand::~tcpSocketStrand()=20
{
if (omniORB::traceLevel>=3D5) {
cerr << "tcpSocketStrand::~Strand() close socket no. " << pd_socket =
<< endl;
}
if(pd_socket !=3D 0)
delete pd_socket;
pd_socket =3D 0;
if (pd_tx_buffer) {
delete [] (char *)pd_tx_buffer;
pd_tx_buffer =3D 0;
}
if (pd_rx_buffer) {
delete [] (char *)pd_rx_buffer;
pd_rx_buffer =3D 0;
}
}
size_t=20
tcpSocketStrand::MaxMTU() const {
// No limit
return UINT_MAX;
}
Strand::sbuf
tcpSocketStrand::receive(size_t size,
CORBA::Boolean exactly,
int align)=20
{
giveback_received(0);
size_t bsz =3D ((omni::ptr_arith_t) pd_rx_end -=20
(omni::ptr_arith_t) pd_rx_begin);
int current_alignment;
omni::ptr_arith_t new_align_ptr;
if (!bsz) {
// No data left in receive buffer, fetch() and try again
// rewind the buffer pointers to the beginning of the buffer and
// at the same alignment as it is requested in <align>
new_align_ptr =3D omni::align_to((omni::ptr_arith_t) pd_rx_buffer,
omni::max_alignment) + align;
if (new_align_ptr >=3D ((omni::ptr_arith_t)pd_rx_buffer +=20
(int)omni::max_alignment)) {
new_align_ptr -=3D (int) omni::max_alignment;
}
pd_rx_begin =3D pd_rx_received_end =3D pd_rx_end =3D (void =
*)new_align_ptr;
#ifndef DO_NOT_AVOID_MISALIGNMENT =20
fetch(size);
#else
fetch();
#endif
return receive(size,exactly,align);
}
if (align > (int)omni::max_alignment) {
throw CORBA::INTERNAL(0,CORBA::COMPLETED_MAYBE);
}
current_alignment =3D (omni::ptr_arith_t) pd_rx_begin &
((int)omni::max_alignment - 1);
if (current_alignment =3D=3D 0) {
current_alignment =3D (int) omni::max_alignment;
}
if (current_alignment !=3D align) {
// alignment is not right, move the data to the correct alignment
new_align_ptr =3D omni::align_to((omni::ptr_arith_t) pd_rx_buffer,
omni::max_alignment) + align;
if (new_align_ptr >=3D ((omni::ptr_arith_t)pd_rx_buffer +=20
(int)omni::max_alignment)) {
new_align_ptr -=3D (int) omni::max_alignment;
}
memmove((void *)new_align_ptr,(void *)pd_rx_begin,bsz);
pd_rx_begin =3D pd_rx_received_end =3D (void *)new_align_ptr;
pd_rx_end =3D (void *)(new_align_ptr + bsz);
}
if (bsz < size) {
if (exactly) {
if (size > max_receive_buffer_size()) {
throw CORBA::INTERNAL(0,CORBA::COMPLETED_MAYBE);
}
// Not enough data to satisfy the request, fetch() and try again
// Check if there is enough empty space for fetch() to satisfy =
this
// request. If necessary, make some space by moving existing data =
to
// the beginning of the buffer. Always keep the buffer pointers at
// the same alignment as they were previously
=20
size_t avail =3D tcpSocketStrand::buffer_size -=20
((omni::ptr_arith_t) pd_rx_end -=20
(omni::ptr_arith_t) pd_rx_buffer) + bsz;
if (avail < size) {
// Not enough empty space, got to move existing data
current_alignment =3D (omni::ptr_arith_t) pd_rx_begin &
((int)omni::max_alignment - 1);
if (current_alignment =3D=3D 0) {
current_alignment =3D (int) omni::max_alignment;
}
new_align_ptr =3D omni::align_to((omni::ptr_arith_t) pd_rx_buffer,
omni::max_alignment) +=20
current_alignment;
if (new_align_ptr >=3D ((omni::ptr_arith_t)pd_rx_buffer +=20
(int)omni::max_alignment)) {
new_align_ptr -=3D (int) omni::max_alignment;
}
memmove((void *)new_align_ptr,pd_rx_begin,bsz);
pd_rx_begin =3D pd_rx_received_end =3D (void *)new_align_ptr;
pd_rx_end =3D (void *)(new_align_ptr + bsz);
}
#ifndef DO_NOT_AVOID_MISALIGNMENT
fetch(size-bsz);
#else
fetch();
#endif
return receive(size,exactly,align);
}
else {
size =3D bsz;
}
}
pd_rx_received_end =3D (void *)((omni::ptr_arith_t)pd_rx_begin + =
size);
Strand::sbuf result;
result.buffer =3D pd_rx_begin;
result.size =3D size;
return result;
}
void=20
tcpSocketStrand::giveback_received(size_t leftover)=20
{
size_t total =3D (omni::ptr_arith_t)pd_rx_received_end -
(omni::ptr_arith_t)pd_rx_begin;
if (total < leftover) {
throw CORBA::MARSHAL(0,CORBA::COMPLETED_MAYBE);
}
total -=3D leftover;
pd_rx_begin =3D (void *)((omni::ptr_arith_t)pd_rx_begin + total);
pd_rx_received_end =3D pd_rx_begin;
return;
}
size_t=20
tcpSocketStrand::max_receive_buffer_size()=20
{
return tcpSocketStrand::buffer_size - (int)omni::max_alignment;
}
void
tcpSocketStrand::receive_and_copy(Strand::sbuf b)
{
giveback_received(0);
size_t sz =3D b.size;
char *p =3D (char *)b.buffer;
size_t bsz =3D ((omni::ptr_arith_t)pd_rx_end -=20
(omni::ptr_arith_t)pd_rx_begin);
if (bsz) {
if (bsz > sz) {
bsz =3D sz;
}
memcpy((void *)p,pd_rx_begin,bsz);
pd_rx_begin =3D (void *)((omni::ptr_arith_t) pd_rx_begin + bsz);
pd_rx_received_end =3D pd_rx_begin;
sz -=3D bsz;
p +=3D bsz;
}
while (sz) {
int rx;
#ifdef TRACE_RECV
if (omniORB::traceLevel>=3D10) {
cerr << "tcpSocketStrand::receive_and_copy--- recv " << pd_socket =
<< endl;
}
#endif
rx =3D pd_socket->recv(p, sz);
if (rx =3D=3D baseSocket::SocketError) {
if(pd_socket->getErrNo() =3D=3D baseSocket::eINTERRUPTED)
continue;
else
{
setStrandIsDying();
throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_MAYBE);
}
}
else
if (rx =3D=3D 0) {
setStrandIsDying();
throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
}
#ifdef TRACE_RECV
if (omniORB::traceLevel >=3D 10) {
cerr << "tcpSocketStrand::receive_and_copy-- recv " << pd_socket =
<< " "
<< rx << " bytes" << endl;
}
#endif
sz -=3D rx;
p +=3D rx;
}
}
void
tcpSocketStrand::skip(size_t size)
{
giveback_received(0);
while (size) {
size_t sz;
sz =3D max_receive_buffer_size();
if (sz > size) {
sz =3D size;
}
int current_alignment =3D (omni::ptr_arith_t) pd_rx_begin &
((int)omni::max_alignment - 1);
if (current_alignment =3D=3D 0) {
current_alignment =3D (int) omni::max_alignment;
}
Strand::sbuf sb =3D receive(sz,0,current_alignment);
size -=3D sb.size;
}
return;
}
void
tcpSocketStrand::fetch(CORBA::ULong max)
{
size_t bsz =3D tcpSocketStrand::buffer_size -
((omni::ptr_arith_t) pd_rx_end - (omni::ptr_arith_t) pd_rx_buffer);
bsz =3D (max !=3D 0 && bsz > max) ? max : bsz;
if (!bsz) return;
int rx;
again:
#ifdef TRACE_RECV
if (omniORB::traceLevel >=3D 10) {
cerr << "tcpSocketStrand::fetch--- recv " << pd_socket << endl;
}
#endif
rx =3D pd_socket->recv((char *)pd_rx_end, bsz);
if (rx =3D=3D baseSocket::SocketError) {
if (pd_socket->getErrNo() =3D=3D baseSocket::eINTERRUPTED)
goto again;
else=20
{
setStrandIsDying();
throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_MAYBE);
}
}
else=20
if (rx =3D=3D 0) {
setStrandIsDying();
throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
}
#ifdef TRACE_RECV
if (omniORB::traceLevel >=3D 10) {
cerr << "tcpSocketStrand::fetch-- recv " << pd_socket << " "
<< rx << " bytes" << endl;
}
#endif
pd_rx_end =3D (void *)((omni::ptr_arith_t) pd_rx_end + rx);
return;
}
Strand::sbuf=20
tcpSocketStrand::reserve(size_t size,
CORBA::Boolean exactly,
int align,
CORBA::Boolean tx)=20
{
giveback_reserved(0,tx);
=20
size_t bsz =3D tcpSocketStrand::buffer_size -
((omni::ptr_arith_t) pd_tx_end - (omni::ptr_arith_t) pd_tx_buffer);
=20
if (!bsz) {
// No space left, transmit and try again
transmit();
return reserve(size,exactly,align,tx);
}
if (align > (int)omni::max_alignment) {
throw CORBA::INTERNAL(0,CORBA::COMPLETED_MAYBE);
}
int current_alignment =3D (omni::ptr_arith_t) pd_tx_end &=20
((int)omni::max_alignment - 1);
if (current_alignment =3D=3D 0) {
current_alignment =3D (int)omni::max_alignment;
}
if (current_alignment !=3D align) {
// alignment is not right
if (pd_tx_end =3D=3D pd_tx_begin) {
// There is nothing in the buffer, we could adjust
// pd_tx_begin and pd_tx_end to the required alignment
omni::ptr_arith_t new_align_ptr;
new_align_ptr =3D omni::align_to((omni::ptr_arith_t) pd_tx_buffer,
omni::max_alignment) + align;
if (new_align_ptr >=3D ((omni::ptr_arith_t)pd_tx_buffer +=20
(int)omni::max_alignment)) {
new_align_ptr -=3D (int) omni::max_alignment;
}
pd_tx_begin =3D pd_tx_end =3D pd_tx_reserved_end =3D (void =
*)new_align_ptr;
bsz =3D tcpSocketStrand::buffer_size - ((omni::ptr_arith_t) =
pd_tx_end=20
- (omni::ptr_arith_t) pd_tx_buffer);
}
else {
// transmit what is left and try again
transmit();
return reserve(size,exactly,align,tx);
}
}
if (bsz < size) {
if (exactly) {
if (size > max_reserve_buffer_size()) {
throw CORBA::INTERNAL(0,CORBA::COMPLETED_MAYBE);
}
// Not enough space to satisfy the request, transmit what is
// left and try again
transmit();
return reserve(size,exactly,align,tx);
}
else {
size =3D bsz;
}
}
pd_tx_reserved_end =3D (void *)((omni::ptr_arith_t)pd_tx_end + size);
Strand::sbuf result;
result.buffer =3D pd_tx_end;
result.size =3D size;
return result;
}
void
tcpSocketStrand::giveback_reserved(size_t leftover,
CORBA::Boolean tx)=20
{
size_t total =3D (omni::ptr_arith_t)pd_tx_reserved_end -
(omni::ptr_arith_t)pd_tx_end;
if (total < leftover) {
throw CORBA::MARSHAL(0,CORBA::COMPLETED_MAYBE);
}
total -=3D leftover;
pd_tx_end =3D (void *)((omni::ptr_arith_t)pd_tx_end + total);
pd_tx_reserved_end =3D pd_tx_end;
if (tx) {
transmit();
}
return;
}
void=20
tcpSocketStrand::reserve_and_copy(Strand::sbuf b,
CORBA::Boolean transmit)
{
// transmit anything that is left in the transmit buffer
giveback_reserved(0,1);
=20
// Transmit directly from the supplied buffer
int tx;
size_t sz =3D b.size;
char *p =3D (char *)b.buffer;
while (sz) {
#ifdef TRACE_SEND
if (omniORB::traceLevel >=3D 10) {
cerr << "tcpSocketStrand::reserve_and_copy-- send " <<
pd_socket << " " << sz << " bytes" << endl;
}
#endif
tx =3D pd_socket->send(p, sz);
if (tx =3D=3D baseSocket::SocketError) {
if (pd_socket->getErrNo() =3D=3D baseSocket::eINTERRUPTED)
continue;
else=20
{
setStrandIsDying();
throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_MAYBE);
}
}
else
if (tx =3D=3D 0) {
setStrandIsDying();
throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
}
sz -=3D tx;
p +=3D tx;
}
}
size_t
tcpSocketStrand::max_reserve_buffer_size()=20
{
return tcpSocketStrand::buffer_size - (int)omni::max_alignment;
}
void
tcpSocketStrand::transmit()=20
{
size_t sz =3D (omni::ptr_arith_t)pd_tx_end -=20
(omni::ptr_arith_t)pd_tx_begin;
int tx;
char *p =3D (char *)pd_tx_begin;
while (sz) {
#ifdef TRACE_SEND
if (omniORB::traceLevel >=3D 10) {
cerr << "tcpSocketStrand::transmit-- send " <<
pd_socket << " " << sz << " bytes" << endl;
}
#endif
tx =3D pd_socket->send(p, sz);
if (tx =3D=3D baseSocket::SocketError) {
if (pd_socket->getErrNo() =3D=3D baseSocket::eINTERRUPTED)
continue;
else=20
{
setStrandIsDying();
throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_MAYBE);
}
}
else
if (tx =3D=3D 0)=20
{
setStrandIsDying();
throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
}
sz -=3D tx;
p +=3D tx;
}
pd_tx_begin =3D pd_tx_end =3D pd_tx_reserved_end =3D pd_tx_buffer;
return;
}
void
tcpSocketStrand::shutdown()
{
setStrandIsDying();
pd_socket->shutdown();
delete pd_socket;
pd_socket =3D 0;
return;
}
tcpSocketRendezvous::tcpSocketRendezvous(tcpSocketRope =
*r,tcpSocketEndpoint *me)=20
{
pd_socket =3D baseSocket::getSocket();
if(pd_socket->bind(me) =3D=3D false)
throw CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_NO);
=20
pd_rope =3D r;
return;
}
tcpSocketRendezvous::tcpSocketRendezvous(tcpSocketRope =
*r,tcpSocketHandle_t sock)=20
{
pd_socket =3D sock;
pd_rope =3D r;
return;
}
tcpSocketRendezvous::~tcpSocketRendezvous()=20
{
if(pd_socket)
{
delete pd_socket;
pd_socket =3D 0;
}
}
tcpSocketStrand *
tcpSocketRendezvous::accept()=20
{
tcpSocketHandle_t new_sock;
new_sock =3D pd_socket->accept();
if(new_sock =3D=3D 0)
{
throw CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_NO);
}
tcpSocketStrand *ns;
pd_rope->pd_lock.lock();
try=20
{
ns =3D new tcpSocketStrand(pd_rope,new_sock,1);
if (!ns)=20
{
pd_rope->pd_lock.unlock();
throw CORBA::NO_MEMORY(0,CORBA::COMPLETED_NO);
}
ns->incrRefCount(1);
}
catch (...)=20
{
pd_rope->pd_lock.unlock();
throw;
}
pd_rope->pd_lock.unlock();
return ns;
}
tcpSocketRope::tcpSocketRope(Anchor *a,
unsigned int maxStrands,
Endpoint *e,
CORBA::Boolean passive,
CORBA::Boolean heapAllocated)
: Rope(a,maxStrands,heapAllocated)
{
tcpSocketEndpoint *te =3D tcpSocketEndpoint::castup(e);
if (!te) {
throw CORBA::INTERNAL(0,CORBA::COMPLETED_NO);
}
pd_is_passive =3D passive;
if (!passive) {
pd_endpoint.remote =3D new tcpSocketEndpoint(te);
}
else {
pd_endpoint.me =3D new tcpSocketEndpoint(te);
pd_rendezvous =3D new tcpSocketRendezvous(this,pd_endpoint.me);
*te =3D *pd_endpoint.me;
}
return;
}
tcpSocketRope::~tcpSocketRope()
{
if (is_passive()) {
if (pd_rendezvous) {
delete pd_rendezvous;
pd_rendezvous =3D 0;
}
if (pd_endpoint.me) {
delete pd_endpoint.me;
pd_endpoint.me =3D 0;
}
}
else {
if (pd_endpoint.remote) {
delete pd_endpoint.remote;
pd_endpoint.remote =3D 0;
}
}
return;
}
CORBA::Boolean
tcpSocketRope::remote_is(Endpoint *&e)
{
if (is_passive())
return 0;
if (e) {
tcpSocketEndpoint *te =3D tcpSocketEndpoint::castup(e);
if (!te)
return 0;
if (*te =3D=3D pd_endpoint.remote)
return 1;
else
return 0;
}
else {
e =3D new tcpSocketEndpoint(pd_endpoint.remote);
if (!e)
throw CORBA::NO_MEMORY(0,CORBA::COMPLETED_NO);
return 1;
}
}
CORBA::Boolean
tcpSocketRope::this_is(Endpoint *&e)
{
if (!is_passive())
return 0;
if (e) {
tcpSocketEndpoint *te =3D tcpSocketEndpoint::castup(e);
if (!te)
return 0;
if (*te =3D=3D pd_endpoint.me)
return 1;
else
return 0;
}
else {
e =3D new tcpSocketEndpoint(pd_endpoint.me);
if (!e)
throw CORBA::NO_MEMORY(0,CORBA::COMPLETED_NO);
return 1;
}
}
void
tcpSocketRope::iopProfile(const _CORBA_Octet *objkey,const size_t =
objkeysize,
IOP::TaggedProfile &p)
{
p.tag =3D IOP::TAG_INTERNET_IOP;
IIOP::ProfileBody b;
b.iiop_version.major =3D IIOP::current_major;
b.iiop_version.minor =3D IIOP::current_minor;
if (is_passive()) {
b.host =3D pd_endpoint.me->host();
b.port =3D pd_endpoint.me->port();
}
else {
b.host =3D pd_endpoint.remote->host();
b.port =3D pd_endpoint.remote->port();
}
b.object_key.length((CORBA::ULong)objkeysize);
memcpy((void *)&b.object_key[0],(void *)objkey,objkeysize);
IIOP::profileToEncapStream(b,p.profile_data);
return;
}
Strand *
tcpSocketRope::newStrand()
{
if (is_passive()) {
throw CORBA::INTERNAL(0,CORBA::COMPLETED_NO);
}
return new tcpSocketStrand(this,pd_endpoint.remote,1);
}
const int baseSocket::SocketError =3D -1;
baseSocket::~baseSocket()
{
}
------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
name="tcpSocket_NT.h"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment;
filename="tcpSocket_NT.h"
// -*- Mode: C++; -*-
// Package : omniORB2
// TCPSocket_NT.h Created on: 4/2/97
// Author : Eoin Carroll (ewc)
//
// Copyright (C) 1996, 1997 Olivetti & Oracle Research Laboratory
//
// This file is part of the omniORB library
//
// The omniORB library is free software; you can redistribute it =
and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the Free
// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA =
=20
// 02111-1307, USA
//
//
// Description:
// Implementation of the Strand using TCP/IP and WinSock interface
//=09
/*
$Log: /Apogee/dev/OmniORB/lib/omniORB2/tcpSocket_NT.h $
*=20
* 2 11/28/97 3:12p Bnewport
* Now works with AGS for possible Mac port
*=20
* 1 11/26/97 2:51p Bnewport
* Revision 1.3 1997/05/06 15:29:54 sll
* Public release.
*
*/
#ifndef __TCPSOCKET_NT_H__
#define __TCPSOCKET_NT_H__
class tcpSocketRope;
class tcpSocketStrand;
class tcpSocketEndpoint;
#include "baseSocket.h"
typedef baseSocket *tcpSocketHandle_t;
class tcpSocketEndpoint : public Endpoint {
public:
tcpSocketEndpoint(CORBA::Char *h,CORBA::UShort p)=20
: Endpoint((CORBA::Char *)"TCPIP")=20
{
pd_host =3D 0;
pd_port =3D 0;
host(h);
port(p);
return;
}
tcpSocketEndpoint(const tcpSocketEndpoint *e)=20
: Endpoint((CORBA::Char *)"TCPIP")=20
{
pd_host =3D 0;
pd_port =3D 0;
host(e->host());
port(e->port());
return;
}
tcpSocketEndpoint &operator=3D(const tcpSocketEndpoint &e)=20
{
host(e.host());
port(e.port());
return *this;
}
=20
CORBA::Boolean operator=3D=3D(const tcpSocketEndpoint *e)
{
if ((strcmp((const char *)pd_host,
(const char *)e->host())=3D=3D0) && (pd_port =3D=3D e->port()))
return 1;
else
return 0;
}=09
virtual ~tcpSocketEndpoint() {
if (pd_host) delete [] pd_host;
return;
}
CORBA::Char * host() const { return pd_host; }
void host(const CORBA::Char *p) {
if (pd_host) delete [] pd_host;
if (p) {
pd_host =3D new CORBA::Char [strlen((char *)p) + 1];
strcpy((char *)pd_host,(char *)p);
}
else {
pd_host =3D new CORBA::Char [1];
pd_host[0] =3D '\0';
}
return;
}
CORBA::UShort port() const { return pd_port; }
void port(const CORBA::UShort p) { pd_port =3D p; }
=20
static tcpSocketEndpoint *castup(Endpoint *e) {
if (e->is_protocol((CORBA::Char *)"TCPIP")) {
return (tcpSocketEndpoint *)e;
}
else {
return 0;
}
}
=20
private:
CORBA::Char *pd_host;
CORBA::UShort pd_port;
=20
tcpSocketEndpoint();
};
class tcpSocketRendezvous {
public:
tcpSocketRendezvous(tcpSocketRope *r,tcpSocketEndpoint *me);
tcpSocketRendezvous(tcpSocketRope *r,tcpSocketHandle_t sock);
virtual ~tcpSocketRendezvous();
tcpSocketStrand * accept();
static CORBA::Boolean has_spawned_rendevous_threads;
private:
tcpSocketRope *pd_rope;
tcpSocketHandle_t pd_socket;
};
class tcpSocketStrand : public Strand {
public:
static const unsigned int buffer_size;
tcpSocketStrand(tcpSocketRope *r,
tcpSocketEndpoint *remote,
_CORBA_Boolean heapAllocated =3D 0);
// Concurrency Control:
// MUTEX =3D r->pd_lock
// Pre-condition:
// Must hold <MUTEX> on entry
// Post-condition:
// Still hold <MUTEX> on exit, even if an exception is raised
tcpSocketStrand(tcpSocketRope *r,
tcpSocketHandle_t sock,
_CORBA_Boolean heapAllocated =3D 0);
// Concurrency Control:
// MUTEX =3D r->pd_lock
// Pre-condition:
// Must hold <MUTEX> on entry
// Post-condition:
// Still hold <MUTEX> on exit, even if an exception is raised
virtual ~tcpSocketStrand();
// MUTEX:
// pd_rope->pd_lock
// Pre-condition:
// Must hold <MUTEX> on entry
// Post-condition:
// Still hold <MUTEX> on exit
virtual size_t MaxMTU() const;
virtual Strand::sbuf receive(size_t size,
CORBA::Boolean exactly,
int align);
virtual void giveback_received(size_t leftover);
virtual size_t max_receive_buffer_size();
virtual void receive_and_copy(Strand::sbuf b);
virtual void skip(size_t size);
virtual Strand::sbuf reserve(size_t size,
CORBA::Boolean exactly,
int align,
CORBA::Boolean transmit=3D0);
virtual void giveback_reserved(size_t leftover,
CORBA::Boolean transmit=3D0);
virtual size_t max_reserve_buffer_size();
virtual void reserve_and_copy(Strand::sbuf b,
CORBA::Boolean transmit=3D0);
virtual void shutdown();
private:
void transmit();
void fetch(CORBA::ULong max=3D0);
// fetch data from the network to the internal buffer.
// If <max>=3D0, fetch as much as possible, otherwise fetch at most =
<max>
// bytes.
tcpSocketHandle_t pd_socket;
void *pd_tx_buffer;
void *pd_tx_begin;
void *pd_tx_end;
void *pd_tx_reserved_end;
void *pd_rx_buffer;
void *pd_rx_begin;
void *pd_rx_end;
void *pd_rx_received_end;
=20
};
class tcpSocketRope : public Rope {
public:
tcpSocketRope(Anchor *a,
unsigned int maxStrands,
Endpoint *e,=20
_CORBA_Boolean passive =3D 0,
_CORBA_Boolean heapAllocated =3D 0);
// Create a tcpSocket Rope.
// If passive =3D=3D 1,
// create a passive socket. e->port() specifies the port number =
to
// bind to or 0 if an arbitary port number can be assigned).=20
// e->host() can either be the host's fully qualified domain name =
// (FQDN) or a 0 length string. If it is the latter, the =
constructor
// will initialise the host field with the host's IP address.
// This is a dot separated numeric string of the form =
"xxx.xxx.xxx.xxx".
// If passive =3D=3D 0,
// The endpoint <e> is the remote endpoint and should contain the
// host name in FQDN form or as a dot separeted numeric string.
// Concurrency Control:
// MUTEX =3D a->pd_lock
// Pre-condition:
// Must hold <MUTEX> on entry
// Post-condition:
// Still hold <MUTEX> on exit, even if an exception is raised
virtual ~tcpSocketRope();
// Concurrency Control:
// MUTEX =3D pd_anchor->pd_lock
// Pre-condition:
// Must hold <MUTEX> on entry
// Post-condition:
// Still hold <MUTEX> on exit
virtual CORBA::Boolean remote_is(Endpoint *&e);
=20
virtual CORBA::Boolean this_is(Endpoint *&e);
=20
virtual void iopProfile(const _CORBA_Octet *objkey,const size_t =
objkeysize,
IOP::TaggedProfile &p);
virtual Strand *newStrand();
CORBA::Boolean is_passive() { return pd_is_passive; }
tcpSocketRendezvous * getRendezvous() { return pd_rendezvous; }
friend class tcpSocketRendezvous;
private:
CORBA::Boolean pd_is_passive;
union {
tcpSocketEndpoint *remote;
tcpSocketEndpoint *me;
} pd_endpoint;
tcpSocketRendezvous * pd_rendezvous;
};
#endif // __TCPSOCKET_NT_H__
------=_NextPart_000_01EF_01BCFF24.CDE73B00--