[omniORB] Notification Service with omniorbPy
Mark Zimmerman
markzimm at frii.com
Wed Jan 25 11:54:21 GMT 2006
On Wed, Jan 25, 2006 at 06:39:41PM +0100, Niko S?nderhauf wrote:
> Hello,
>
> I try to use the Notification Service with omniorbPy. I was looking for
> some examples, but all I found was how to use the Event Service. Could
> somebody please post a short example or a link?
>
> So here is my scenario:
> We already have a naming-service and some structured-push-suppliers
> running. (They use ACE/TAO as ORB, but that shouldn't matter. However, I
> want my clients written in Python, so I have to use omniorbPy, right?)
> Now I want to register something (say class myConsumer) to all the
> events with eventname "TestEvent". As far as I saw from some C++
> examples, myConsumer has to be derived from structuredPushConsumer and
> has a method called "push" or something, which is called every time an
> event with name "TestEvent" is broadcasted by the Notification Service.
>
> I have already written a few functions to get references to all the
> objects in the naming service. So I can call
>
> ec=getInterface("EventChannel",CosNotifyChannelAdmin.EventChannel)
>
I have attached a short example that might get you started.
-- Mark
-------------- next part --------------
#!/usr/bin/env python
from omniORB import CORBA
import CosNaming
import CosNotification
import CosNotifyComm
import CosNotifyComm__POA
import CosNotifyChannelAdmin
import CosNotifyFilter
import time
import sys
import signal
class NotChanListen (CosNotifyComm__POA.StructuredPushConsumer):
#-----------------------------------------------------------------------------
#
def __init__ (self, orb, evName, subFilter="1==1"): # Constructor
#.............................................................................
# Resolve the naming service.
#
ns = orb.resolve_initial_references("NameService")
rootCon = ns._narrow(CosNaming.NamingContext)
#.............................................................................
# Resolve the event channel.
#
name = [CosNaming.NameComponent("EventChannel","EventChannel")]
echannel_ref = rootCon.resolve(name)
channel = echannel_ref._narrow(CosNotifyChannelAdmin.EventChannel)
#.............................................................................
# Resolve the Consumer Admin.
#
self.cadmin, consID = channel.new_for_consumers (
CosNotifyChannelAdmin.AND_OP)
#.............................................................................
# Create the ProxyPushSupplier.
#
psupp, prxID = self.cadmin.obtain_notification_push_supplier (
CosNotifyChannelAdmin.STRUCTURED_EVENT)
self.psupp = psupp._narrow(
CosNotifyChannelAdmin.StructuredProxyPushSupplier)
#.............................................................................
# Create a default filter and bind it to the supplier.
#
ffp = channel._get_default_filter_factory()
self.filter = ffp.create_filter("EXTENDED_TCL")
exp = [ \
CosNotifyFilter.ConstraintExp (
[ CosNotification.EventType("SPW", evName) ]
, subFilter
)
]
self.cis1 = self.filter.add_constraints (exp)
#.............................................................................
# Register the push consumer.
#
id1 = self.cadmin.add_filter(self.filter)
self.psupp.connect_structured_push_consumer(self._this())
def disconnect (self):
self.cadmin.remove_all_filters()
self.filter.destroy()
self.psupp.disconnect_structured_push_supplier()
self.cadmin.destroy()
#-----------------------------------------------------------------------------
#
def handleEvent (self, event):
print 'You forgot to override handleEvent.'
def push_structured_event ( self, event ):
self.handleEvent (event)
def disconnect_structured_push_consumer (self):
pass
def offer_change (self, added, removed):
pass
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
class LogListen (NotChanListen):
#-----------------------------------------------------------------------------
#
def __init__ (self, orb):
NotChanListen.__init__ (self, orb, 'Log')
#-----------------------------------------------------------------------------
#
def handleEvent (self, event):
evdata = {'Severity':9, 'HostName':'unknown', 'UserName':'unknown',
'PID':0, 'PPID':0, 'ProgramName':'unknown', 'Time':-1}
for field in event.filterable_data:
evdata[field.name] = field.value.value()
message = event.remainder_of_body.value()
if evdata['Time'] > 0:
when = time.gmtime (evdata['Time'])
tstr = time.strftime ("%d-%h-%Y:%T", when)
else:
tstr = 'unknown'
print '%1d %20s %8s %8s %5d %5d %10s %s' % \
(evdata['Severity'], tstr, evdata['UserName'], \
evdata['HostName'], evdata['PID'], evdata['PPID'], \
evdata['ProgramName'], message)
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
def handler(signum, frame):
listener.disconnect()
orb.shutdown(1)
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
poa = orb.resolve_initial_references("RootPOA")
listener = LogListen(orb)
poaManager = poa._get_the_POAManager()
poaManager.activate()
signal.signal(signal.SIGINT, handler)
orb.run()
More information about the omniORB-list
mailing list