[omniORB] SMP bug with omniORB (Sun C++ 5.0 or Forte C++ 6.0)
Sai-Lai Lo
S.Lo@uk.research.att.com
11 Sep 2000 13:24:10 +0100
Arnault,
I think you have found a workaround to the problem.
I've repeated the consrv-conclt test with set_terminate() called
explicitly. The test programs no longer SEGV.
I'm testing with Forte Workshop 6.0 with Solaris 2.7.
In fact consrv rarely SEGV if I kill conclt. However, if one kill consrv
whenever conclt is running, conclt SEGV everytime. With set_terminiate()
called explicitly, the program behaves correctly.
Actually, you just have to call set_terminate() once instead of per-thread
as was done in your patch. The original set_terminate() call in the code is
a workaround for a problem with Sun C++ 4.2.
I suggest you put the set_terminate() call in CORBA::ORB_init().
This will work:
#if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
#if defined(__SUNPRO_CC) && __SUNPRO_CC >= 0x500
#include <exception.h>
static void omni_abort()
{
abort();
}
#endif
#endif
void ORB_init(...
...
#if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
#if defined(__SUNPRO_CC) && __SUNPRO_CC >= 0x500
set_terminate(omni_abort);
#endif
#endif
...
Thanks.
Sai-Lai
>>>>> Arnault Bonafos writes:
> Hello,
> I was working on a test case from Sai Lai Lo to reproduce a bug dealing
> with omniORB threads and a SMP machine.
> The steps to reproduce the bug are as follow:
> - compile a echo server object (I took eg2_impl from the examples in
> omniORB)
> - port the SLL test to have it compile without their test environment
> (file TestClt.cpp provided in attachment)
> - start the server eg2_impl
> - start the client TestClt and kill it while it's running
> Then the eg2_impl server object should do a segmentation fault from time
> to time. (everything's running on our SMP machine.)
> So I was able to reproduce the bug that SLL describe on a SMP machine
> with all the patches needed. The test programs (and omniORB) was
> compiled with Sun compiler 5.0, the compiler machine had all the patches
> needed applied, which is a bad news.
> As the the bug is somewhat difficult to reproduce, I've added abort();
> at the end of TestClt.cpp, because that's the better way to reproduce
> the bug (with that call the server is more likely to crash.)
> So I was trying to understand why things are happening like that.
> I've first looked at the call stack to understand where it crashes in
> the server, the stack shows that this happen when trying to throw a
> COMM_FAILURE exception from tcpSocketMtfactory.cc in
> "tcpSocketStrand::ll_recv" (the last throw)
> Some tests showed that this exception should be intercepted by the
> following code (in tcpSocketMtfactory.cc)
>> try {
>> GIOP_S::dispatcher(s);
>> }
>> catch (CORBA::COMM_FAILURE &ex) {
>> if (omniORB::traceLevel >= 5) {
>> omniORB::log << "#### Communication failure. Connection
> closed.\n";
>> omniORB::log.flush();
>> }
>> break;
>> }
> bumping the trace level never enabled me to see the above message
> (Communication failure), meaning that the segmentation fault occured
> before this code was executed.
> This code comes from tcpSocketWorker::_realRun, and at the beginning of
> the function there is the following code:
> #if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
> #if defined(__SUNPRO_CC) && __SUNPRO_CC <= 0x420
> set_terminate(omni_abort);
> #endif
> #endif
> which means this code is not generated for the 5.0 compiler (this is a
> back port I did from omniORB 3.0).
> Then I looked to see if there's something equivalent in the platform
> header files and was able to find a similar definition for exception
> handling in exception.h, the code became:
> #if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
> set_terminate(omni_abort);
> #endif
> after doing this modification I was no more able to reproduce the bug as
> described above.
> The complete diff for tcpSocketMTfactory.cc with my local tree is as
> follow:
> ===================================================================
> RCS file:
> /opt/cvsroot/External/omniORB_2.6.1_http/src/lib/omniORB2/tcpSocketMTfactory.cc,v
> retrieving revision 1.24
> diff -r1.24 tcpSocketMTfactory.cc
--> This part is about renaming the function abort to not clash with the
> C definition
> 1044c1044
> < extern "C" void abort()
> ---
>> void omni_abort()
--> this part is about the set_terminate declaration from the standard
> library.
> 1052,1053c1052
> < typedef void (*PFV)();
> < extern PFV set_terminate(PFV);
> ---
>> #include <exception.h>
> 1063c1062
> < set_terminate(abort);
> ---
>> set_terminate(omni_abort);
> 1268c1267
> < set_terminate(abort);
> ---
>> set_terminate(omni_abort);
> The main difference is that the call to set_terminate is compiled into
> the library using the platform declaration in exception.h.
> Then I ran a test (test_client given in attachment), this test has
> proven to make the echo server crash after a certain number of
> iterations (<50 on my test machine) when the library is compiled without
> the fix. I've run the same test with the new library and the test ran
> for several hours without making the echo server crash before I've
> stoppped it at the iteration 8661.
> The current setting of the test_client shell script program (one client
> program, restart the server each time) made the echo server crash more
> often (quicker).
> I don't have a clear explanation about why calling set_terminate should
> or should not make a difference, but this seems to be a reasonnable fix.
> I'm hoping someone will try to do the same thing to confirm/unconfirm my
> tests.
> Note:
> - I'm working with a modified version of omniORB_2.6.1 which includes
> http tunneling.
> - the test machine is a sparc Solaris2.7, 4 processors, with patches
> applied (106327-08, 106748-04, see sun doc.)
> - the compiler machine is a sparc Solaris2.7, single processor, and had
> the two previous patches applied plus
> 107295-01, 107311-11, 107354-02, 107355-06, 107357-09, 107740-01,
> 107742-01
> --
> Arnault Bonafos Software Engineer
> Tumbleweed Communications Corp.
> ph: (650) 216-2027 fx: (650) 216-2003
> // Testing code: client of echo objects
> //
> // interface Echo {
> // string echoString(in string mesg);
> // };
> //
> #include <stdlib.h>
> #include <iostream.h>
> #include "echo.hh"
> omni_mutex cerr_sync;
> static
> void
> contact(char* id, Echo_ptr e)
> {
> int loopcount = 200;
> while (loopcount--) {
> try {
> char * echostr;
> echostr = e->echoString((char *)"abcde");
> {
> omni_mutex_lock s(cerr_sync);
> cerr << id << ": reply " << echostr << endl;
> }
> if (strcmp((const char *)echostr,"abcde")) {
> cerr << loopcount << " : echo string differs ('" << "abcde', '"
> << (char *)echostr << "')" << endl;
> //>>>todo: OMNI_FAILED("echo string differs");
> }
> CORBA::string_free(echostr);
> }
> catch (...) {
> cerr << "Caught system exception. Abort";
> //>>>todo: OMNI_FAILED("Caught system exception. Abort");
> }
> }
> }
> class worker : public omni_thread {
> public:
> worker(char* id,Echo_ptr e) : omni_thread(id) {
> pd_e = e;
> start_undetached();
> return;
> }
> virtual void* run_undetached(void*id) {
> contact((char*)id,pd_e);
> return 0;
> };
> virtual ~worker() {}
> private:
> Echo_var pd_e;
> };
> void usage() {
> cout << "Usage: TestClt <IOR>" << endl;
> exit(-1);
> }
> Echo_ptr CreateEchoClient(int argc, char *argv[])
> {
> CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv,"omniORB2");
> CORBA::BOA_ptr boa = orb->BOA_init(argc,argv,"omniORB2_BOA");
> Echo_ptr value = Echo::_nil();
> if (argc != 2) {
> usage();
> }
> try {
> CORBA::Object_var obj = orb->string_to_object(argv[1]);
> value = Echo::_narrow(obj);
> }
> catch(CORBA::COMM_FAILURE& ex) {
> cerr << "Caught system exception COMM_FAILURE, unable to contact the "
> << "object." << endl;
> }
> catch(omniORB::fatalException& ex) {
> cerr << "Caught omniORB2 fatalException. This indicates a bug is caught "
> << "within omniORB2.\nPlease send a bug report.\n"
> << "The exception was thrown in file: " << ex.file() << "\n"
> << " line: " << ex.line() << "\n"
> << "The error message is: " << ex.errmsg() << endl;
> }
> catch(...) {
> cerr << "Caught a system exception." << endl;
> }
> return value;
> }
> int main(int argc, char** argv)
> {
> // this should be a template function to create and init a client CORBA object
> //>>>todo: OMNI_SIMPLE_CLIENT_INIT(Echo, e);
> Echo_ptr e = CreateEchoClient(argc, argv);
> if (Echo_Helper::is_nil(e)) {
> cerr << "Client could not be created" << endl;
> exit(-1);
> }
> worker* worker1 = new worker("worker 1:", Echo::_duplicate(e));
> worker* worker2 = new worker("worker 2:", Echo::_duplicate(e));
> worker* worker3 = new worker("worker 3:", Echo::_duplicate(e));
> worker* worker4 = new worker("worker 4:", Echo::_duplicate(e));
> worker* worker5 = new worker("worker 5:", Echo::_duplicate(e));
> worker* worker6 = new worker("worker 6:", Echo::_duplicate(e));
> worker* worker7 = new worker("worker 7:", Echo::_duplicate(e));
> worker* worker8 = new worker("worker 8:", Echo::_duplicate(e));
> worker* worker9 = new worker("worker 9:", Echo::_duplicate(e));
> worker* worker10 = new worker("worker 10:", Echo::_duplicate(e));
> contact("main",e);
worker1-> join(0);
worker2-> join(0);
worker3-> join(0);
worker4-> join(0);
worker5-> join(0);
worker6-> join(0);
worker7-> join(0);
worker8-> join(0);
worker9-> join(0);
worker10-> join(0);
> cout << "test completed" << endl;
> abort();
> //>>>todo: test_complete();
> return 1;
> }
> // -*- Mode: C++; -*-
> // Package : omniORB2
> // tcpSocketMTfactory.cc Created on: 18/3/96
> // Author : Sai Lai Lo (sll)
> //
> // Copyright (C) 1996, 1997 Olivetti & Oracle Research Laboratory
> //
> // This file is part of the omniORB library
> //
> // The omniORB library is free software; you can redistribute it and/or
> // modify it under the terms of the GNU Library General Public
> // License as published by the Free Software Foundation; either
> // version 2 of the License, or (at your option) any later version.
> //
> // This library is distributed in the hope that it will be useful,
> // but WITHOUT ANY WARRANTY; without even the implied warranty of
> // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> // Library General Public License for more details.
> //
> // You should have received a copy of the GNU Library General Public
> // License along with this library; if not, write to the Free
> // Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
> // 02111-1307, USA
> //
> //
> // Description:
> // Implementation of the Strand using TCP/IP and BSD socket interface
> //
> /*
> $Log: tcpSocketMTfactory.cc,v $
> Revision 1.24 2000/03/14 22:32:36 abonafos
> 5864, IP translating in tahiti
> Revision 1.23 1999/06/08 03:06:59 edumas
> Some code cleanup.
> Revision 1.22 1999/05/27 23:34:33 edumas
> Do not use proxy when SSL is activated.
> Revision 1.21 1999/05/11 00:30:43 rshoup
> NT compilation
> Revision 1.20 1999/04/29 22:03:13 edumas
> Add some traces.
> Revision 1.19 1999/04/07 17:44:20 edumas
> Encapsultate the global_iop_http object as a static member of IOP_HTTP.
> (Randy's comment).
> Revision 1.18 1999/04/01 03:20:34 edumas
> - Thread locking.
> - Memory management
> - Next set of Randy's comments.
> Revision 1.17 1999/03/26 01:34:54 edumas
> Randy's comments added.
> Revision 1.16 1999/03/25 00:08:26 edumas
> Update the changes done in IOP_HTTP_handshake.h.
> Revision 1.15 1999/03/22 20:43:21 edumas
> - Fix the chunck problem. We are now using the GIOP packet header
> and we reconstruct the "original" packet when it is supposed to
> be send in multiple chunchs. This way, it solves the answer/no
> answer.
> - Simplification of the code.
> - Cleanup.
> Revision 1.14 1999/03/18 02:14:25 edumas
> - Better socket error management
> - tmp fix.
> Revision 1.13 1999/03/16 02:56:46 edumas
> Fix the way we get the binded port of the thread. In the case
> the port is already used, jump to the next one.
> Revision 1.12 1999/03/12 23:03:01 edumas
> Use omniORB trace facilities.
> Revision 1.11 1999/03/10 00:49:48 edumas
> Handshaking is now done in a portable way.
> Revision 1.10 1999/02/22 19:52:52 edumas
> Handshake with the RemoteGateway proxy using different endian model fixed.
> Revision 1.9 1999/02/22 18:57:28 edumas
> No relaying is now working when "standard" omniORB.cfg is provided.
> Revision 1.8 1999/02/19 02:32:26 edumas
> - Comments update,
> - Modify the relaying policies,
> - The thread-proxy is now created *only* if we really need to
> use it, aka, only if relaying is necessary,
> Revision 1.7 1999/02/18 23:43:28 edumas
> - Start the proxy only if we need it.
> - Check if the thread-proxy has already been launched.
> Revision 1.6 1999/02/17 22:04:06 edumas
> NT compilation of libomniORB2 + IOP/HTTP.
> Revision 1.5 1999/02/16 19:14:46 edumas
> - Comments updates.
> Revision 1.4 1999/02/16 01:08:05 edumas
> *** empty log message ***
> Revision 1.3 1999/02/12 03:36:34 edumas
> IP mapping working integration. Better Corba type handling.
> Revision 1.2 1999/02/10 01:57:50 edumas
> Integration of the hook in order to perform IOP_HTTP relaying.
> Revision 1.1.1.1 1999/02/09 16:30:17 edumas
> omniORB 2.6.1 with IOP/HTTP
> Revision 1.13 1998/09/23 15:31:15 sll
> Previously, tcpSocketStrand::shutdown sends an incomplete GIOP
> CloseConnection message (the message length field is missing). Fixed.
> Revision 1.12 1998/09/23 08:48:34 sll
> Use config variable omniORB::maxTcpConnectionPerServer to determine the
> maximum number of outgoing per tcpSocketOutgoingRope.
> Revision 1.11 1998/08/14 13:54:38 sll
> Added pragma hdrstop to control pre-compile header if the compiler feature
> is available.
> Revision 1.10 1998/06/29 17:13:30 sll
> Fixed Solaris specific code in realConnect. Now switch the socket back
> to blocking mode after connect() until all circumstance.
> Revision 1.9 1998/04/22 16:39:50 sll
> Added try-catch loop to guard against exception raised by the thread library
> when it cannot create a new thread for tcpSocketWorker.
> Revision 1.8 1998/04/08 16:06:49 sll
> Added support for Reliant UNIX 5.43
> Revision 1.7 1998/04/07 19:39:40 sll
> Replace cerr with omniORB::log.
> Revision 1.6 1998/03/19 19:53:14 sll
> Delay connect to the remote address space until the first send or recv.
> Previously, connect was made inside the ctor of tcpSocketStrand.
> Revision 1.5 1998/03/04 14:44:36 sll
> Updated to use omniORB::giopServerThreadWrapper.
> // Revision 1.4 1998/01/20 17:32:38 sll
> // Added support for OpenVMS.
> //
> Revision 1.3 1997/12/18 17:27:39 sll
> Updated to work under glibc-2.0.
> Revision 1.2 1997/12/12 18:44:11 sll
> Added call to gateKeeper.
> Revision 1.1 1997/12/09 18:43:18 sll
> Initial revision
> */
> #include <omniORB2/CORBA.h>
> #ifdef HAS_pch
> #pragma hdrstop
> #endif
> #include <ropeFactory.h>
> #include <tcpSocket.h>
> #if defined(__WIN32__)
> #include <winsock.h>
> #include <sys/types.h>
> #define GETERRNO() WSAGetLastError()
> #define RC_INADDR_NONE INADDR_NONE
> #define RC_INVALID_SOCKET INVALID_SOCKET
> #define RC_SOCKET_ERROR SOCKET_ERROR
> #define INETSOCKET PF_INET
> #define CLOSESOCKET(sock) closesocket(sock)
> #define SHUTDOWNSOCKET(sock) ::shutdown(sock,2)
> #else
> #include <sys/time.h>
> #include <sys/socket.h>
> #include <netinet/in.h>
> #include <arpa/inet.h>
> #include <unistd.h>
> #if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
> #include <sys/types.h>
> #include <fcntl.h>
> #endif
> #define GETERRNO() errno
> #define RC_INADDR_NONE ((CORBA::ULong)-1)
> #define RC_INVALID_SOCKET (-1)
> #define RC_SOCKET_ERROR (-1)
> #define INETSOCKET AF_INET
> #define CLOSESOCKET(sock) close(sock)
> #if defined(__sunsos__) && defined(__sparc__) && __OSVERSION__ >= 5
> #define SHUTDOWNSOCKET(sock) ::shutdown(sock,2)
> #elif defined(__osf1__) && defined(__alpha__)
> #define SHUTDOWNSOCKET(sock) ::shutdown(sock,2)
> #else
> // XXX none of the above, calling shutdown() may not have the
> // desired effect.
> #define SHUTDOWNSOCKET(sock) ::shutdown(sock,2)
> #endif
> #endif
> #include <sys/types.h>
> #include <errno.h>
> #include <stdio.h>
> #include <libcWrapper.h>
> #include <gatekeeper.h>
> #ifdef NEED_GETHOSTNAME_PROTOTYPE
> extern "C" int gethostname(char *name, int namelen);
> #endif
> #ifdef IOP_HTTP
> #include <IOP_HTTP/IOP_HTTP_handshake.h>
> #include "IOP_HTTP.h"
> #endif
> #ifdef IP_TRANSLATING
> #ifndef IP_TRANSLATOR_H
> #include "ip_translator.h"
> #endif
> #endif
> class tcpSocketRendezvouser : public omni_thread {
> public:
> tcpSocketRendezvouser(tcpSocketIncomingRope *r) : omni_thread(r) {
> start_undetached();
> }
> virtual ~tcpSocketRendezvouser() { }
> virtual void* run_undetached(void *arg);
> private:
> tcpSocketRendezvouser();
> };
> class tcpSocketWorker : public omni_thread {
> public:
> tcpSocketWorker(tcpSocketStrand* s) : omni_thread(s), pd_sync(s,0,0) {
s-> decrRefCount();
> start();
> }
> virtual ~tcpSocketWorker() { }
> virtual void run(void *arg);
> static void _realRun(void* arg);
> private:
> Strand::Sync pd_sync;
> };
> /////////////////////////////////////////////////////////////////////////////
> tcpSocketMTincomingFactory::tcpSocketMTincomingFactory() : pd_state(IDLE)
> {
> tcpSocketFactoryType::init();
> }
> CORBA::Boolean
> tcpSocketMTincomingFactory::isIncoming(Endpoint* addr) const
> {
> Rope_iterator next_rope(&pd_anchor);
> Rope* r;
> while ((r = next_rope()))
> {
> if (r->this_is(addr))
> return 1;
> }
> return 0;
> }
> void
> tcpSocketMTincomingFactory::instantiateIncoming(Endpoint* addr,
> CORBA::Boolean exportp)
> {
> tcpSocketEndpoint* te = tcpSocketEndpoint::castup(addr);
> if (!te)
> throw omniORB::fatalException(__FILE__,__LINE__,
> "wrong endpoint argument type");
> Rope_iterator rope_lock(&pd_anchor);
> if (pd_state == ZOMBIE) {
> throw omniORB::fatalException(__FILE__,__LINE__,
> "cannot instantiate incoming in ZOMBIE state");
> }
> tcpSocketIncomingRope* r = new tcpSocketIncomingRope(this,0,te,exportp);
r-> incrRefCount(1);
> if (pd_state == ACTIVE) {
r-> rendezvouser = new tcpSocketRendezvouser(r);
> }
> }
> void
> tcpSocketMTincomingFactory::startIncoming()
> {
> Rope_iterator next_rope(&pd_anchor);
> tcpSocketIncomingRope* r;
> switch (pd_state) {
> case IDLE:
> try {
> pd_state = ACTIVE;
> while ((r = (tcpSocketIncomingRope*)next_rope())) {
> if (r->pd_shutdown == tcpSocketIncomingRope::NO_THREAD) {
r-> pd_shutdown = tcpSocketIncomingRope::ACTIVE;
r-> rendezvouser = new tcpSocketRendezvouser(r);
> }
> }
> }
> catch(...) { }
> break;
> default:
> break;
> }
> }
> void
> tcpSocketMTincomingFactory::stopIncoming()
> {
> Rope_iterator next_rope(&pd_anchor);
> tcpSocketIncomingRope* r;
> switch (pd_state) {
> case ACTIVE:
> try {
> while ((r = (tcpSocketIncomingRope*)next_rope())) {
r-> cancelThreads();
> }
> pd_state = IDLE;
> }
> catch(...) {}
> break;
> default:
> break;
> }
> }
> void
> tcpSocketMTincomingFactory::removeIncoming()
> {
> Rope_iterator next_rope(&pd_anchor);
> tcpSocketIncomingRope* r;
> switch (pd_state) {
> case ACTIVE:
> case IDLE:
> try {
> while ((r = (tcpSocketIncomingRope*)next_rope())) {
r-> cancelThreads();
> if (r->pd_shutdown != tcpSocketIncomingRope::NO_THREAD) {
> // rendezvouser has not been shutdown properly
> continue;
> }
> if (r->pd_rendezvous != RC_INVALID_SOCKET) {
> CLOSESOCKET(r->pd_rendezvous);
r-> pd_rendezvous = RC_INVALID_SOCKET;
r-> decrRefCount(1);
> }
> }
> pd_state = ZOMBIE;
> }
> catch(...) {}
> break;
> default:
> break;
> }
> }
> Rope*
> tcpSocketMTincomingFactory::findIncoming(Endpoint* addr) const
> {
> tcpSocketEndpoint* te = tcpSocketEndpoint::castup(addr);
> if (!te) return 0;
> Rope_iterator next_rope(&pd_anchor);
> Rope* r;
> while ((r = next_rope()))
> {
> if (r->this_is(addr)) {
r-> incrRefCount(1);
> return r;
> }
> }
> return 0;
> }
> void
> tcpSocketMTincomingFactory::getIncomingIOPprofiles(const CORBA::Octet* objkey,
> const size_t objkeysize,
> IOP::TaggedProfileList& profilelist) const
> {
> Rope_iterator next_rope(&pd_anchor);
> tcpSocketIncomingRope* r;
> while ((r = (tcpSocketIncomingRope*) next_rope()) && r->pd_export) {
> CORBA::ULong index = profilelist.length();
> profilelist.length(index+1);
> tcpSocketFactoryType::singleton->encodeIOPprofile(r->me,objkey,objkeysize,
> profilelist[index]);
> }
> }
> tcpSocketIncomingRope::tcpSocketIncomingRope(tcpSocketMTincomingFactory* f,
> unsigned int maxStrands,
> tcpSocketEndpoint *e,
> CORBA::Boolean exportp)
> : Rope(f->anchor(),maxStrands,1), pd_export(exportp),
> pd_shutdown(NO_THREAD), rendezvouser(0)
> {
> struct sockaddr_in myaddr;
> // For the moment, we do not impose a restriction on the maximum
> // no. of strands that can be accepted. In other words, <maxStrands> is
> // ignored.
> if ((pd_rendezvous = socket(INETSOCKET,SOCK_STREAM,0)) == RC_INVALID_SOCKET) {
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_NO);
> }
> myaddr.sin_family = INETSOCKET;
> myaddr.sin_addr.s_addr = INADDR_ANY;
> myaddr.sin_port = htons(e->port());
> if (e->port()) {
> int valtrue = 1;
> if (setsockopt(pd_rendezvous,SOL_SOCKET,
> SO_REUSEADDR,(char*)&valtrue,sizeof(int)) == RC_SOCKET_ERROR)
> {
> CLOSESOCKET(pd_rendezvous);
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_NO);
> }
> }
> if (bind(pd_rendezvous,(struct sockaddr *)&myaddr,
> sizeof(struct sockaddr_in)) == RC_SOCKET_ERROR)
> {
> CLOSESOCKET(pd_rendezvous);
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_NO);
> }
> // Make it a passive socket
> if (listen(pd_rendezvous,5) == RC_SOCKET_ERROR) {
> CLOSESOCKET(pd_rendezvous);
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_NO);
> }
> {
> #if (defined(__GLIBC__) && __GLIBC__ >= 2)
> // GNU C library uses socklen_t * instead of int* in getsockname().
> // This is suppose to be compatible with the upcoming POSIX standard.
> socklen_t l;
> #elif defined(__aix__) || defined(__VMS) || defined(__SINIX__)
> size_t l;
> # else
> int l;
> # endif
> l = sizeof(struct sockaddr_in);
> if (getsockname(pd_rendezvous,
> (struct sockaddr *)&myaddr,&l) == RC_SOCKET_ERROR) {
> CLOSESOCKET(pd_rendezvous);
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_NO);
> }
e-> port(ntohs(myaddr.sin_port));
> if (e->host() == 0 || strlen((const char*)e->host()) == 0) {
> char self[64];
> if (gethostname(&self[0],64) == RC_SOCKET_ERROR) {
> throw omniORB::fatalException(__FILE__,__LINE__,
> "Cannot get the name of this host");
> }
> LibcWrapper::hostent_var h;
> int rc;
> if (LibcWrapper::gethostbyname(self,h,rc) < 0) {
> throw omniORB::fatalException(__FILE__,__LINE__,
> "Cannot get the address of this host");
> }
> memcpy((void *)&myaddr.sin_addr,
> (void *)h.hostent()->h_addr_list[0],
> sizeof(myaddr.sin_addr));
> char ipaddr[16];
> // To prevent purify from generating UMR warnings, use the following temp
> // variables to store the IP address fields.
> int ip1 = (int)((ntohl(myaddr.sin_addr.s_addr) & 0xff000000) >> 24);
> int ip2 = (int)((ntohl(myaddr.sin_addr.s_addr) & 0x00ff0000) >> 16);
> int ip3 = (int)((ntohl(myaddr.sin_addr.s_addr) & 0x0000ff00) >> 8);
> int ip4 = (int)(ntohl(myaddr.sin_addr.s_addr) & 0x000000ff);
> sprintf(ipaddr,"%d.%d.%d.%d",ip1,ip2,ip3,ip4);
e-> host((const CORBA::Char *) ipaddr);
> }
> else {
> // The caller has already specified the host name, we are not going to
> // override it here. However, it may be possible that the host name does
> // not resolve to one of the IP addresses that identified the network
> // interfaces of this machine. There is no way to guard against this
> // mistake.
> // Do nothing here.
> }
> }
> me = new tcpSocketEndpoint(e);
> }
> tcpSocketIncomingRope::~tcpSocketIncomingRope()
> {
> if (omniORB::traceLevel >= 15) {
> omniORB::log << "tcpSocketIncomingRope::~tcpSocketIncomingRope: called.\n";
> omniORB::log.flush();
> }
> if (me) {
> delete me;
> me = 0;
> }
> if (pd_rendezvous != RC_INVALID_SOCKET) {
> CLOSESOCKET(pd_rendezvous);
> pd_rendezvous = RC_INVALID_SOCKET;
> }
> }
> void
> tcpSocketIncomingRope::cancelThreads()
> {
> if (rendezvouser) {
> pd_lock.lock();
> pd_shutdown = SHUTDOWN;
> pd_lock.unlock();
> }
> CutStrands();
> if (rendezvouser) {
> // Unblock the rendezvouser from its accept() call.
> // Cannot shutdown the rendezvous socket because we want to keep it open
> // and could be served later by another rendezvouser.
> //
> // Instead, unblock the rendezvouser by making a connect() call.
> struct sockaddr_in myaddr;
> {
> Endpoint* e = 0;
> this_is(e);
> tcpSocketEndpoint* te = (tcpSocketEndpoint*)e;
> myaddr.sin_family = INETSOCKET;
> myaddr.sin_port = htons(te->port());
> if (LibcWrapper::isipaddr((char*)te->host()))
> {
> CORBA::ULong ip_p = inet_addr((char*) te->host());
> memcpy((void*) &myaddr.sin_addr, (void*) &ip_p, sizeof(myaddr.sin_addr));
> }
> else
> {
> LibcWrapper::hostent_var h;
> int rc;
> LibcWrapper::gethostbyname((char*)te->host(),h,rc);
> memcpy((void*)&myaddr.sin_addr,
> (void*)h.hostent()->h_addr_list[0],
> sizeof(myaddr.sin_addr));
> }
> delete te;
> }
> tcpSocketHandle_t tmp_sock;
> if ((tmp_sock = socket(INETSOCKET,SOCK_STREAM,0)) == RC_INVALID_SOCKET)
> {
> // If we cannot create a socket, we cannot shutdown the rendezvouser
> if (omniORB::traceLevel > 0) {
> omniORB::log << "tcpSocketIncomingRope::cancelThreads() cannot create a socket to connect to the rendezvous socket.\n"
> << "The rendezvous thread may or may not have exited.\n"
> << "If this is temporary resource exhaustion, try again later.\n";
> omniORB::log.flush();
> }
> return;
> }
> if (connect(tmp_sock,(struct sockaddr *)&myaddr,
> sizeof(struct sockaddr_in)) == RC_SOCKET_ERROR)
> {
> // OK, so we cannot connect to the rendezvouser, it would have
> // unblock from accept() anyway. That is fine as well.
> }
> else
> {
> // Now we have unblock the rendezvouser, just close the socket.
> CLOSESOCKET(tmp_sock);
> }
> // Now we have got the rendezvouser's attention. We acknowlege that
> // we have seen it and instructs the rendezvouser to exit.
> pd_lock.lock();
> pd_shutdown = NO_THREAD;
> pd_lock.unlock();
> if (omniORB::traceLevel >= 15) {
> omniORB::log << "tcpSocketMTincomingFactory::stopIncoming: Waiting for tcpSocketMT Rendezvouser to exit...\n";
> omniORB::log.flush();
> }
rendezvouser-> join(0); // Wait till the rendezvouser to come back
> if (omniORB::traceLevel >= 15) {
> omniORB::log << "tcpSocketMTincomingFactory::stopIncoming: tcpSocketMT Rendezvouser has exited\n";
> omniORB::log.flush();
> }
> rendezvouser = 0;
> }
> }
> Strand *
> tcpSocketIncomingRope::newStrand()
> {
> throw omniORB::fatalException(__FILE__,__LINE__,
> "newStrand should not be called.");
> #ifdef NEED_DUMMY_RETURN
> return 0; // dummy return to keep some compilers happy
> #endif
> }
> /////////////////////////////////////////////////////////////////////////////
> tcpSocketMToutgoingFactory::tcpSocketMToutgoingFactory()
> {
> tcpSocketFactoryType::init();
> }
> CORBA::Boolean
> tcpSocketMToutgoingFactory::isOutgoing(Endpoint* addr) const
> {
> Rope_iterator next_rope(&pd_anchor);
> Rope* r;
> while ((r = next_rope()))
> {
> if (r->remote_is(addr))
> return 1;
> }
> return 0;
> }
> Rope*
> tcpSocketMToutgoingFactory::findOrCreateOutgoing(Endpoint* addr)
> {
> tcpSocketEndpoint* te = tcpSocketEndpoint::castup(addr);
> if (!te) return 0;
> Rope_iterator next_rope(&pd_anchor);
> Rope* r;
> while ((r = next_rope()))
> {
> if (r->remote_is(addr)) {
r-> incrRefCount(1);
> return r;
> }
> }
> r = new tcpSocketOutgoingRope(this,omniORB::maxTcpConnectionPerServer,te);
r-> incrRefCount(1);
> return r;
> }
> tcpSocketOutgoingRope::tcpSocketOutgoingRope(tcpSocketMToutgoingFactory* f,
> unsigned int maxStrands,
> tcpSocketEndpoint *e)
> : Rope(f->anchor(),maxStrands,1)
> {
> remote = new tcpSocketEndpoint(e);
> }
> tcpSocketOutgoingRope::~tcpSocketOutgoingRope()
> {
> if (omniORB::traceLevel >= 15) {
> omniORB::log << "tcpSocketOutgoingRope::~tcpSocketOutgoingRope: called.\n";
> omniORB::log.flush();
> }
> if (remote) {
> delete remote;
> remote = 0;
> }
> }
> Strand *
> tcpSocketOutgoingRope::newStrand()
> {
> return new tcpSocketStrand(this,remote,1);
> }
> /////////////////////////////////////////////////////////////////////////////
> // Size of transmit and receive buffers
> const
> unsigned int
> tcpSocketStrand::buffer_size = 8192 + (int)omni::max_alignment;
> static tcpSocketHandle_t realConnect(tcpSocketEndpoint* r);
> tcpSocketStrand::tcpSocketStrand(tcpSocketOutgoingRope *rope,
> tcpSocketEndpoint *r,
> CORBA::Boolean heapAllocated)
> : reliableStreamStrand(tcpSocketStrand::buffer_size,rope,heapAllocated),
> pd_send_giop_closeConnection(0), pd_delay_connect(0)
> {
> // Do not try to connect to the remote host in this ctor.
> // This is to avoid holding the mutex on rope->pd_lock while the connect
> // is in progress. Holding the mutex for an extended period is bad as this
> // can have ***serious*** side effect.
> // One immediate consequence of holding the rope->pd_lock is that the
> // outScavenger will be blocked on rope->pd_lock when it is scanning
> // for idle strands. This in turn blockout any thread trying to lock
> // rope->pd_anchor->pd_lock. This is really bad because no new rope
> // can be added to the anchor.
> pd_socket = RC_INVALID_SOCKET;
> pd_delay_connect = new tcpSocketEndpoint(r);
> // Do the connect on first call to ll_recv or ll_send.
> }
> tcpSocketStrand::tcpSocketStrand(tcpSocketIncomingRope *r,
> tcpSocketHandle_t sock,
> CORBA::Boolean heapAllocated)
> : reliableStreamStrand(tcpSocketStrand::buffer_size,r,heapAllocated),
> pd_socket(sock), pd_send_giop_closeConnection(1), pd_delay_connect(0)
> {
> }
> tcpSocketStrand::~tcpSocketStrand()
> {
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "tcpSocketStrand::~Strand() close socket no. " << pd_socket << "\n";
> omniORB::log.flush();
> }
> if (pd_socket != RC_INVALID_SOCKET)
> CLOSESOCKET(pd_socket);
> pd_socket = RC_INVALID_SOCKET;
> if (pd_delay_connect)
> delete pd_delay_connect;
> pd_delay_connect = 0;
> }
> size_t
> tcpSocketStrand::ll_recv(void* buf, size_t sz)
> {
> if (pd_delay_connect) {
> // We have not connect to the remote host yet. Do the connect now.
> // Note: May block on connect for sometime if the remote host is down
> //
> if ((pd_socket = realConnect(pd_delay_connect)) == RC_INVALID_SOCKET) {
> _setStrandIsDying();
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_NO);
> }
> delete pd_delay_connect;
> pd_delay_connect = 0;
> }
> int rx;
> while (1) {
> if ((rx = ::recv(pd_socket,(char*)buf,sz,0)) == RC_SOCKET_ERROR) {
> if (GETERRNO() == EINTR)
> continue;
> else
> {
> _setStrandIsDying();
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_MAYBE);
> }
> }
> else
> if (rx == 0) {
> _setStrandIsDying();
> throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
> }
> break;
> }
> return (size_t)rx;
> }
> void
> tcpSocketStrand::ll_send(void* buf,size_t sz)
> {
> if (pd_delay_connect) {
> // We have not connect to the remote host yet. Do the connect now.
> // Note: May block on connect for sometime if the remote host is down
> //
> if ((pd_socket = realConnect(pd_delay_connect)) == RC_INVALID_SOCKET) {
> _setStrandIsDying();
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_NO);
> }
> delete pd_delay_connect;
> pd_delay_connect = 0;
> }
> int tx;
> char *p = (char *)buf;
> while (sz) {
> if ((tx = ::send(pd_socket,p,sz,0)) == RC_SOCKET_ERROR) {
> if (GETERRNO() == EINTR)
> continue;
> else {
> _setStrandIsDying();
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_MAYBE);
> }
> }
> else
> if (tx == 0) {
> _setStrandIsDying();
> throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
> }
> sz -= tx;
> p += tx;
> }
> return;
> }
> void
> tcpSocketStrand::shutdown()
> {
> if (pd_send_giop_closeConnection)
> {
> // XXX This is a bit ugly. GIOP requires us to send a CloseConnection
> // message before shutdown. Ideally, we do not want this abstraction to
> // do any GIOP dependent stuff. If this a problem in future, we should
> // perhap make it a parameter to decide whether or what to send on
> // shutdown.
> static char closeConnectionMessage[12] = {
> 'G','I','O','P',
> 1,0,
> _OMNIORB_HOST_BYTE_ORDER_,
> GIOP::CloseConnection,
> 0,0,0,0
> };
> size_t sz = sizeof(closeConnectionMessage);
> char* p = closeConnectionMessage;
> while (sz) {
> fd_set wrfds;
> FD_ZERO(&wrfds);
> FD_SET(pd_socket,&wrfds);
> struct timeval t = { 5,0};
> int rc;
> if ((rc = select(pd_socket+1,0,&wrfds,0,&t)) <= 0) {
> // Any timeout or error, we just don't border sending the message.
> break;
> }
> int tx;
> if ((tx = ::send(pd_socket,p,sz,0)) == RC_SOCKET_ERROR) {
> // Any error we just don't border sending anymore.
> break;
> }
> sz -= tx;
> p += tx;
> }
> }
> _setStrandIsDying();
> SHUTDOWNSOCKET(pd_socket);
> return;
> }
> static
> tcpSocketHandle_t
> realConnect(tcpSocketEndpoint* r)
> {
> struct sockaddr_in raddr;
> LibcWrapper::hostent_var h;
> int rc;
> tcpSocketHandle_t sock;
> // Set the who we are suppose to send the GIOP data.
> CORBA::UShort port_relay = r->port();
> char *host_relay = (char*)r->host();
> #ifdef IOP_HTTP
> CORBA::Boolean relay = 0;
> char *def_host_proxy = "localhost"; /* Hostname of the proxy */
> /* Real destination of data (aka ORB hostname). */
> char *host_to_send = host_relay;
> CORBA::UShort port_to_send = port_relay;
> /* Check if we need to perform a relay */
> if (IopHttpManager().need_to_relay(host_relay)) {
> relay = 1;
> /* Start the proxy, if has not been already started. */
> IopHttpManager().start_proxy();
> // In the case we have to relay data, change the
> // port and the hostname of the socket destination.
> host_relay = def_host_proxy;
> port_relay = IopHttpManager().get_port_bound();
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "Proxy activated " << (char*)host_relay << " " << port_relay << "\n";
> omniORB::log.flush();
> }
> }
> #endif
> #ifdef IP_TRANSLATING
> ip_translator *default_translator = ip_translator::get_default_translator();
> char *translated_host = default_translator->translate_ip(host_relay);
> if (translated_host != NULL) {
> host_relay = translated_host;
> }
> #endif
> if (! LibcWrapper::isipaddr(host_relay)) {
> if (LibcWrapper::gethostbyname(host_relay,h,rc) < 0) {
> // XXX look at rc to decide what to do or if to give up what errno
> // XXX to return EINVAL.
> //
> return RC_INVALID_SOCKET;
> }
> // We just pick the first address in the list, may be we should go
> // through the list and if possible pick the one that is on the same
> // subnet.
> memcpy((void*)&raddr.sin_addr, (void*)h.hostent()->h_addr_list[0],
> sizeof(raddr.sin_addr));
> } else {
> // The machine name is already an IP address
> CORBA::ULong ip_p;
> if ( (ip_p = inet_addr( host_relay)) == RC_INADDR_NONE) {
> return RC_INVALID_SOCKET;
> }
> memcpy((void*) &raddr.sin_addr, (void*) &ip_p, sizeof(raddr.sin_addr));
> }
> #ifdef IP_TRANSLATING
> if (translated_host != NULL) {
> CORBA::string_free(translated_host);
> translated_host = NULL;
> }
> #endif
> raddr.sin_port = htons(port_relay);
> raddr.sin_family = INETSOCKET;
> if ((sock = socket(INETSOCKET,SOCK_STREAM,0)) == RC_INVALID_SOCKET) {
> return RC_INVALID_SOCKET;
> }
> #if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
> // Use non-blocking connect.
> int fl = O_NONBLOCK;
> if (fcntl(sock,F_SETFL,fl) < RC_SOCKET_ERROR) {
> CLOSESOCKET(sock);
> return RC_INVALID_SOCKET;
> }
> if (connect(sock,(struct sockaddr *)&raddr,
> sizeof(struct sockaddr_in)) == RC_SOCKET_ERROR)
> {
> if (GETERRNO() != EINPROGRESS) {
> CLOSESOCKET(sock);
> return RC_INVALID_SOCKET;
> }
> fd_set wrfds;
> FD_ZERO(&wrfds);
> FD_SET(sock,&wrfds);
> struct timeval t = { 30,0 };
> int rc;
> if ((rc = select(sock+1,0,&wrfds,0,&t)) <= 0) {
> // Timeout, do not bother trying again.
> CLOSESOCKET(sock);
> return RC_INVALID_SOCKET;
> }
> }
> // Set the socket back to blocking
> fl = 0;
> if (fcntl(sock,F_SETFL,fl) == RC_SOCKET_ERROR) {
> CLOSESOCKET(sock);
> return RC_INVALID_SOCKET;
> }
> #else
> if (connect(sock,(struct sockaddr *)&raddr,
> sizeof(struct sockaddr_in)) == RC_SOCKET_ERROR)
> {
> CLOSESOCKET(sock);
> return RC_INVALID_SOCKET;
> }
> #endif
> #ifdef IOP_HTTP
> if (relay) {
> /* Handshake protocol. These data will be sent to the HTTP server */
> iop_http_handshake handshake;
> memset(&handshake, 0,sizeof(iop_http_handshake));
> /* First, data to send directly to the HTTP Gateway */
> handshake.pd_magic = htonl(IOP_HTTP_MAGIC_HANDSHAKE);
> handshake.pd_version_proto = htons(IOP_HTTP_PROTOCOL_VERSION);
> strcpy((char *)handshake.pd_hostname, host_to_send);
> handshake.pd_port = htons(port_to_send);
> // Never pass a struct through a socket.
> if ((::send(sock, (const char*)&handshake.pd_magic, sizeof(handshake.pd_magic),0) == RC_SOCKET_ERROR) ||
> (::send(sock, (const char*)&handshake.pd_version_proto, sizeof(handshake.pd_version_proto),0) == RC_SOCKET_ERROR) ||
> (::send(sock, (const char*)&handshake.pd_hostname, sizeof(CORBA::Char)*128,0) == RC_SOCKET_ERROR) ||
> (::send(sock, (const char*)&handshake.pd_port, sizeof(handshake.pd_port),0) == RC_SOCKET_ERROR)) {
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_MAYBE);
> } /* endif */
> }
> #endif /* IOP_HTTP */
> return sock;
> }
> /////////////////////////////////////////////////////////////////////////////
> #if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
> #include <signal.h>
> void omni_abort()
> {
> kill (getpid(),SIGABRT);
> while (1) {
> sleep(1000000);
> }
> }
> #include <exception.h>
> #endif
> void*
> tcpSocketRendezvouser::run_undetached(void *arg)
> {
> tcpSocketIncomingRope* r = (tcpSocketIncomingRope*) arg;
> #if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
> set_terminate(omni_abort);
> #endif
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "tcpSocketMT Rendezvouser thread: starts.\n";
> omniORB::log.flush();
> }
> tcpSocketStrand *newSt = 0;
> tcpSocketWorker *newthr = 0;
> CORBA::Boolean die = 0;
> while (r->pd_shutdown == tcpSocketIncomingRope::ACTIVE && !die) {
> try {
> tcpSocketHandle_t new_sock;
> struct sockaddr_in raddr;
> #if (defined(__GLIBC__) && __GLIBC__ >= 2)
> // GNU C library uses socklen_t * instead of int* in accept ().
> // This is suppose to be compatible with the upcoming POSIX standard.
> socklen_t l;
> #elif defined(__aix__) || defined(__VMS) || defined(__SINIX__)
> size_t l;
> #else
> int l;
> #endif
> l = sizeof(struct sockaddr_in);
> if (omniORB::traceLevel >= 15) {
> omniORB::log << "tcpSocketMT Rendezvouser thread: block on accept().\n";
> omniORB::log.flush();
> }
> if ((new_sock = ::accept(r->pd_rendezvous,(struct sockaddr *)&raddr,&l))
> == RC_INVALID_SOCKET) {
> throw CORBA::COMM_FAILURE(GETERRNO(),CORBA::COMPLETED_NO);
> }
> if (omniORB::traceLevel >= 15) {
> omniORB::log << "tcpSocketMT Rendezvouser thread: unblock from accept().\n";
> omniORB::log.flush();
> }
> {
> omni_mutex_lock sync(r->pd_lock);
> if (r->pd_shutdown != tcpSocketIncomingRope::ACTIVE) {
> // It has been indicated that this thread should stop
> // accepting connection request.
> CLOSESOCKET(new_sock);
> continue;
> }
> newSt = new tcpSocketStrand(r,new_sock,1);
newSt-> incrRefCount(1);
> }
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "tcpSocketMT Rendezvouser thread: accept new strand.\n";
> omniORB::log.flush();
> }
> try {
> newthr = new tcpSocketWorker(newSt);
> }
> catch(...) {
> newthr = 0;
> }
> if (!newthr) {
> // Cannot create a new thread to serve the strand
> // We have no choice but to shutdown the strand.
> // The long term solutions are: start multiplexing the new strand
> // and the rendezvous; close down idle connections; reasign
> // threads to strands; etc.
newSt-> decrRefCount();
newSt-> shutdown();
> }
> }
> catch(const CORBA::COMM_FAILURE &) {
> // XXX accepts failed. The probable cause is that the number of
> // file descriptors opened has exceeded the limit.
> // On unix, the value of this limit can be set and get using the
> // ulimit command.
> // On NT, if this is part of a DLL, the limit is 256(?)
> // else the limit is 16(?)
> // The following is a temporary fix, this thread just wait for a while
> // and tries again. Hopfully, some connections might be freed by then.
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "tcpSocketMT Rendezvouser thread: accept fails. Too many file descriptors opened?\n";
> omniORB::log.flush();
> }
> omni_thread::sleep(1,0);
> continue;
> }
> catch(const omniORB::fatalException &ex) {
> if (omniORB::traceLevel > 0) {
> omniORB::log << "#### You have caught an omniORB2 bug, details are as follows:\n"
> << ex.file() << " " << ex.line() << ":" << ex.errmsg()
> << "\n"
> << "tcpSocketMT Rendezvouser thread will not accept new connection.\n";
> omniORB::log.flush();
> }
> die = 1;
> }
> catch(...) {
> if (omniORB::traceLevel > 0) {
> omniORB::log << "######## Unexpected exception caught by tcpSocketMT Rendezvouser\n"
> << "tcpSocketMT Rendezvouser thread will not accept new connection.\n";
> omniORB::log.flush();
> }
> die = 1;
> }
> if (die && newSt) {
newSt-> decrRefCount();
newSt-> shutdown();
> if (!newthr) {
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "tcpSocketMT Rendezvouser thread cannot spawn a new server thread.\n";
> omniORB::log.flush();
> }
> }
> }
> }
> if (die) {
> // Something very wrong has happened, before we quit, we have to
> // make sure that a future call by another thread to
> // tcpSocketIncomingRope::cancelThreads() would not wait indefinitely
> // on this thread to response to its connect() request to the
> // rendezous socket.
> // Shutdown (not close) the rendezvous socket to make sure that the
> // connect() in cancelThreads() would fail.
> SHUTDOWNSOCKET(r->pd_rendezvous);
> }
> while (r->pd_shutdown != tcpSocketIncomingRope::NO_THREAD) {
> // We keep on accepting connection requests but close the new sockets
> // immediately until the state of pd_shutdown changes to NO_THREAD.
> // This is to make sure that the thread calling cancelThreads()
> // will be unblocked from the connect() call.
> {
> fd_set rdfds;
> FD_ZERO(&rdfds);
> FD_SET(r->pd_rendezvous,&rdfds);
> struct timeval t = { 1,0};
> int rc;
> if ((rc = select(r->pd_rendezvous+1,&rdfds,0,0,&t)) <= 0) {
> if (rc < 0 && GETERRNO() != EINTR) {
> die = 1;
> }
> if (omniORB::traceLevel >= 15) {
> omniORB::log << "tcpSocketMT Rendezvouser thread: waiting on shutdown state to change to NO_THREAD.\n";
> omniORB::log.flush();
> }
> continue;
> }
> }
> tcpSocketHandle_t new_sock;
> struct sockaddr_in raddr;
> #if (defined(__GLIBC__) && __GLIBC__ >= 2)
> // GNU C library uses socklen_t * instead of int* in accept ().
> // This is suppose to be compatible with the upcoming POSIX standard.
> socklen_t l;
> #elif defined(__aix__) || defined(__VMS) || defined(__SINIX__)
> size_t l;
> #else
> int l;
> #endif
> l = sizeof(struct sockaddr_in);
> if ((new_sock = ::accept(r->pd_rendezvous,(struct sockaddr *)&raddr,&l))
> == RC_INVALID_SOCKET)
> {
> die = 1;
> continue;
> }
> CLOSESOCKET(new_sock);
> }
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "tcpSocketMT Rendezvouser thread: exits.\n";
> omniORB::log.flush();
> }
> return 0;
> }
> void
> tcpSocketWorker::run(void *arg)
> {
> omniORB::giopServerThreadWrapper::
> getGiopServerThreadWrapper()->run(tcpSocketWorker::_realRun,arg);
> // the wrapper run() method will pass back control to tcpSocketWorker
> // by calling _realRun(arg) when it is ready.
> }
> void
> tcpSocketWorker::_realRun(void *arg)
> {
> tcpSocketStrand* s = (tcpSocketStrand*)arg;
> #if defined(__sunos__) && defined(__sparc__) && __OSVERSION__ >= 5
> set_terminate(omni_abort);
> #endif
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "tcpSocketMT Worker thread: starts.\n";
> omniORB::log.flush();
> }
> if (!gateKeeper::checkConnect(s)) {
s-> shutdown();
> }
> else {
> while (1) {
> try {
> GIOP_S::dispatcher(s);
> }
> catch (const CORBA::COMM_FAILURE &) {
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "#### Communication failure. Connection closed.\n";
> omniORB::log.flush();
> }
> break;
> }
> catch(const omniORB::fatalException &ex) {
> if (omniORB::traceLevel > 0) {
> omniORB::log << "#### You have caught an omniORB2 bug, details are as follows:\n"
> << ex.file() << " " << ex.line() << ":" << ex.errmsg() << "\n";
> omniORB::log.flush();
> }
> break;
> }
> catch (...) {
> if (omniORB::traceLevel > 0) {
> omniORB::log << "#### A system exception has occured and was caught by tcpSocketMT Worker thread.\n";
> omniORB::log.flush();
> }
> break;
> }
> }
> }
> if (omniORB::traceLevel >= 5) {
> omniORB::log << "tcpSocketMT Worker thread: exits.\n";
> omniORB::log.flush();
> }
> }
> #!/bin/sh
> StartEchoServer() {
> tmp_pid=`GetProcessPid eg2_impl`
> if [ ! -z "$tmp_pid" ]; then
> echo "killing server $tmp_pid"
> kill -9 $tmp_pid
> fi
> echo "starting a new server"
> rm -f ~/echo_ior.txt
> ./eg2_impl 2>&1 | tee $HOME/echo_ior.txt &
> sleep 2
> tmp_ior=`cat $HOME/echo_ior.txt | grep IOR | xargs | cut -d" " -f1`
> echo $tmp_ior > $HOME/echo_ior2.txt
> tmp_ior=`cat $HOME/echo_ior2.txt`
> if [ -z "$tmp_ior" ]; then
> echo "IOR is not set, exit"
> exit
> fi
> }
> GetProcessPid() {
> ps -ef | grep "$1" | grep -v grep | awk '{ print $2 }' | xargs
> }
> TrueFunction() {
> return 0
> }
> count=1
> MAXIMUM_SERVER_COUNT=1
> MAXIMUM_CLIENT_COUNT=1
> server_count=$MAXIMUM_SERVER_COUNT
> LD_LIBRARY_PATH=/opt/oracle816/lib:/usr/X/lib:/usr/openwin/lib:/usr/dt/lib:/data/PostaTahiti/External/omniORB_2.6.1_http-sun50/lib/sun4_sosV_5.6_debug
> export LD_LIBRARY_PATH
> while TrueFunction; do
> if [ "$server_count" -eq $MAXIMUM_SERVER_COUNT ]; then
> StartEchoServer
> server_count=1
> else
> server_count=`expr $server_count + 1`
> fi
> tmp_pids=""
> client_count=0
> while [ "$client_count" -lt "$MAXIMUM_CLIENT_COUNT" ]; do
> TestClt $tmp_ior > /dev/null 2>&1 &
> tmp_pids="$tmp_pids $!"
> client_count=`expr $client_count + 1`
> done
> wait $tmp_pids
> echo "ran the test $count times"
> count=`expr $count + 1`
> tmp_server_pid=`GetProcessPid eg2_impl`
> if [ -z "$tmp_server_pid" ]; then
> echo "Echo server aborted"
> echo "End test."
> exit
> fi
> done
--
Sai-Lai Lo S.Lo@uk.research.att.com
AT&T Laboratories Cambridge WWW: http://www.uk.research.att.com
24a Trumpington Street Tel: +44 1223 343000
Cambridge CB2 1QA Fax: +44 1223 313542
ENGLAND