[omniORB] Using omniNotify with omniOrbPy
Robert E. Gruber
gruber@research.att.com
Wed, 13 Jun 2001 19:57:34 -0400
I have written a new test suite for omniNotify in python using omniORBpy. I
am quite happy with how much I was able to do in relatively little time.
I will probably release it at some point (separtately from omniNotify since
it should be able to test any Notification Service).
In answer to your specific questions:
STUBS:
I run the following (mkcosstubs.py) in a Stubs dir, and put
the Stubs dir on my PYTHONPATH. Then I can say, e.g.,
import CosNotification
in python
----------------------------------------------------------------------------
------
#!/usr/bin/env python
import os
idlfiles = ["TimeBase", "CosTime", "CosEventComm", "CosEventChannelAdmin",
"CosTypedEventComm", "CosTypedEventChannelAdmin", "CosTimerEvent",
"CosNotification", "CosNotifyComm", "CosNotifyFilter",
"CosNotifyChannelAdmin", "CosTypedNotifyComm", "CosTypedNotifyChannelAdmin",
"AttNotifyChannelAdmin"]
# d is the top-level idl directory from an omniORB release
d = "/home/alcfp/project/omni/omni3rel/idl"
for x in idlfiles:
y = x + ".idl"
print x
cmd = "omniidl -bpython -I%s -I%s/COS -DNOLONGLONG %s/COS/%s" % (d, d,
d, y)
print cmd
os.system(cmd)
----------------------------------------------------------------------------
----
The following helper file, ChanUtils.py, has a function that initializes
the orb and poa, another that looks up the default factory in the
naming service, 3 flavors of functions that get event channels from
the factory, and a cleanup function.
------------------------------------------------------------------------
######################################################################
# Module ChanUtils:
import sys
import omniORB
from omniORB import CORBA, PortableServer
import CosNaming,CosNotification,CosNotifyChannelAdmin
from Utils import PropUtils
##############################################################
def setup_orb_and_poa(verbose):
# Initialise the ORB
omniORB.maxTcpConnectionPerServer(1000)
orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
# Find the root POA
poa = orb.resolve_initial_references("RootPOA")
# Activate the POA
poaManager = poa._get_the_POAManager()
poaManager.activate()
return orb, poa
##############################################################
def get_factory(orb, verbose):
# Obtain a reference to the root naming context
obj = orb.resolve_initial_references("NameService")
rootContext = obj._narrow(CosNaming.NamingContext)
if rootContext is None:
print "ChanUtils.get_factory: Failed to narrow the root naming
context"
return None # error
if (verbose > 0):
print 'ChanUtils.get_factory: Resolve the name
"ChannelFactory.ChannelFactory"'
name = [CosNaming.NameComponent("ChannelFactory", "ChannelFactory")]
try:
obj = rootContext.resolve(name)
except CosNaming.NamingContext.NotFound, ex:
print "ChanUtils.get_factory: Name not found"
return None # error
# Narrow the object to EventChannelFactory
fact = obj._narrow(CosNotifyChannelAdmin.EventChannelFactory)
if (fact is None):
print "ChanUtils.factory: Object reference is not a
CosNotifyChannelAdmin.EventChannelFactory"
return None # error
return fact
##############################################################
def get_channel_qos(fact, nqos, aqos, verbose):
"Get a new channel"
chan = None
try:
chan, chan_id = fact.create_channel(nqos, aqos)
except CosNotification.UnsupportedAdmin, eseqlist:
if (verbose > 0):
print "get_channel_qos: UnsupportedAdmin exception"
for eseq in eseqlist:
PropUtils.print_propseq(eseq)
return None # error
except CosNotification.UnsupportedQoS, eseqlist:
if (verbose > 0):
print "get_channel_qos: UnsupportedQoS exception"
for eseq in eseqlist:
PropUtils.print_propseq(eseq)
return None # error
except (CORBA.COMM_FAILURE):
print "get_channel_qos: Caught COMM_FAILURE"
return None # error
# except:
# print "ChanUtils.get_channel: create_channel failed"
# return None # error
if (chan is None):
print "ChanUtils.get_channel: Could not obtain channel from factory"
return None # error
if (verbose > 0):
print "ChanUtils.get_channel: New channel has chan_id = " , chan_id
return chan
##############################################################
def get_channel(fact, verbose):
"Get a new channel"
nqos = []
aqos = []
return get_channel_qos(fact, nqos, aqos, verbose)
##############################################################
def get_channel_id(fact, id, verbose):
# Get channel with channel ident id
try:
chan = fact.get_event_channel(id)
except (CORBA.COMM_FAILURE):
print "get_channel_id: Caught COMM_FAILURE"
return None # error
except:
print "ChanUtils.get_channel_id: get_event_channel(", id, "):
failed"
return None # error
if (chan is None):
print "ChanUtils.get_channel_id: get_event_channel(", id, "):
failed"
return None # error
if (verbose > 0):
print "ChanUtils.get_channel_id: returning channel with id", id
return chan
##############################################################
def destroy_channel(chan, verbose):
# Destroy the channel
try:
if (verbose > 0):
print "ChanUtils.destroy_channel: calling chan.destroy()"
chan.destroy()
except:
if (verbose > 0):
print "ChanUtils.destroy_channel: chan.destroy failed"
return 1
if (verbose > 0):
print "ChanUtils.destroy_channel: Destroyed the channel"
return 0
----------------------------------------------------------------------------
-----Original Message-----
From: owner-omniorb-list@uk.research.att.com
[mailto:owner-omniorb-list@uk.research.att.com]On Behalf Of Mark
Zimmerman
Sent: Wednesday, June 13, 2001 2:54 PM
To: omniorb-list@uk.research.att.com
Subject: [omniORB] Using omniNotify with omniOrbPy
Greetings:
I am looking for some hints on how to post notification events from a
python script using omniOrbPy. There seems to be nothing built-in to
support this. Am I on uncharted territory here?
I have started by running omniidl -bpython on files in $TOP/idl/COS
that seem to have some relevance. That works fine.
I am currently trying to go through some working C++ code and do the
same things in python. I am stuck trying to convert
channel = CosNA_EventChannel::_narrow(echannel_ref);
to python. It seems that it should be
channel = echannel_ref._narrow(SomethingOrOther.CosNA_EventChannel)
but I have grepped through all of the idl I can find looking for who
owns CosNA_EventChannel so SomethingOrOther is still unknown.
-- Mark