[omniORB] Fwd: StructuredPushConsumer
Kyle Dunn
kdunn926 at gmail.com
Mon Apr 6 14:37:00 BST 2009
I am trying to craft a generic NotifyStructuredPushConsumer and am
having trouble doing: obj._narrow(CosNaming.NamingContextExt). At
runtime I receive this:
File "notifyConsumer.py", line 33, in initCorbaOrb
self.rootContextExt = obj._narrow(CosNaming.NamingContextExt)
TypeError: CORBA.Object._narrow() argument 1 must be type, not classobj
There is a good chance this is a trivial mistake, however it is
imperative for my application to be able to distinguish the naming
context.
The relevant code looks as follows:
class CorbaManager:
def initCorbaOrb(self, paramDict):
paramList = ['eventMan']
if "orbInitRef" in paramDict:
paramList.append('-ORBInitRef')
paramList.append(paramDict['orbInitRef'])
if "endpoint" in paramDict:
paramList.append('-ORBendPoint')
paramList.append(paramDict['endpoint'])
print paramList
self.corbaParamDict = paramDict
orb = CORBA.ORB_init(paramList)
#orb = CORBA.ORB_init(paramList, CORBA.ORB_ID) ORB_ID is set by CORBA
obj = orb.resolve_initial_references("NameService")
self.rootContextExt = obj._narrow(CosNaming.NamingContextExt)
obj = orb.resolve_initial_references("RootPOA")
self.poaManager = obj._get_the_POAManager()
self.poaManager.activate()
print "POA State: ", self.poaManager.get_state()
def registerEventListeners(self, listenerList):
for lInfo in listenerList:
domain = lInfo['domain']
channelName = lInfo['channelName']
eventName = lInfo['eventName']
obj = self.rootContextExt.resolve_str(domain + "/" + channelName)
eventChannel = obj._narrow(CosNotifyChannelAdmin.EventChannel)
adid = eventChannel.new_for_consumers(CosNotifyChannelAdmin.\
InterFilterGroupOperator._item(1))
eventType = CosNotification.EventType(domain, eventName)
eventAddList = [eventType]
eventRemoveList = []
adid[0].subscription_change(eventAddList, eventRemoveList)
proxySupplierArray = adid[0].obtain_notification_push_supplier(\
CosNotifyChannelAdmin.ClientType._items[1])
proxySupplier = proxySupplierArray[0]
structProxySupplier = (proxySupplier._narrow
(CosNotifyChannelAdmin.
StructuredProxyPushSupplier))
self.listener = EventListener(lInfo)
pushConsumer = self.listener._this()
structProxySupplier.connect_structured_push_consumer(pushConsumer)
More information about the omniORB-list
mailing list