[omniORB] push_events filterable_data, header and remainder_of_body
Anna Patil
patilanna at hotmail.com
Tue Jan 4 19:12:30 UTC 2022
Hi All,
tried push_event scripts with the help of you old posts and mail mailing list. I am stuck at defining filterable_data, header and remainder_of_body. How to proceed further, please suggest. Below is the my code.
class NmsSession_I(nmsSession__POA.NmsSession_I):
pass
class StructuredPushConsumerImpl(CosNotifyComm__POA.StructuredPushConsumer):
def __init__(self, orb):
self.orb = orb
class psconsumer(CosEventComm__POA.PushConsumer):
def __init__(self):
pass
def main():
argv = ["-ORBInitRef", "NameService=corbaloc:iiop:10.10.118.3:12001/NameService"]
#
# Start orb.
global orb
orb=CORBA.ORB_init(argv, CORBA.ORB_ID)
# Process Options
verbose=0
needNameService=0
channelName="EventChannel"
factoryName="EventChannelFactory"
criteria=[]
action="start"
try:
#
# Get Name Service root context.
rootContext=None
try:
action="resolve initial reference 'NameService'"
obj=orb.resolve_initial_references("NameService")
rootContext=obj._narrow(CosNaming.NamingContext)
if rootContext is None:
raise CORBA.OBJECT_NOT_EXIST(0,CORBA.COMPLETED_NO)
except CORBA.Exception as ex:
if needNameService:
raise ex
else:
sys.stderr.write("Warning - failed to %s\n"%action)
### Initilizing POA ###
name = [CosNaming.NameComponent("TMF_MTNM","Class"),
CosNaming.NameComponent("HUAWEI","Vendor"),
CosNaming.NameComponent("Huawei/NCE","EmsInstance"),
CosNaming.NameComponent("2.0","Version"),
CosNaming.NameComponent("Huawei/NCE","EmsSessionFactory_I")]
try:
obj = rootContext.resolve(name)
except CosNaming.NamingContext.NotFound as ex:
print("Except->Name not found")
print(ex)
try:
poa = orb.resolve_initial_references("RootPOA")
except Exception as ex:
print('POA initial References Error: {}'.format(ex))
try:
poaManager = poa._get_the_POAManager()
except Exception as ex:
print('POA get manager error : {}'.format(ex))
try:
poaManager.activate()
except Exception as ex:
print('POA manager Activation Error: {}'.format(ex))
### Narrow the object
ems_session = obj._narrow(emsSessionFactory.EmsSessionFactory_I)
if ems_session is None:
print("Object reference is not an EmsSessionFactory_I")
### Get NMS session
nms_session_i = NmsSession_I()
nms_session_o = nms_session_i._this()
if nms_session_o is None:
print("Object reference is not an NmsSession_I")
sys.exit(1)
##Get ems Session ###
try:
session = ems_session.getEmsSession("user1", "passwd", nms_session_o)
#print(dir(session))
except Exception as ex:
print('System Error: {}'.format(ex))
sys.exit(1)
#print(dir(session))
try:
eventChannel = session.getEventChannel()
except globaldefs.ProcessingFailureException as ex:
print('System Error: {}'.format(ex))
eventChannel.destroy()
session.endSession()
sys.exit(1)
#
#impl = StructuredPushConsumerImpl(orb)
#obj = impl._this()
eventTypeAdd = [ CosNotification.EventType("tmf_mtnm", "NT_ALARM") ]
eventTypeRemove = [ CosNotification.EventType("tmf_mtnm", "NT_HEARTBEAT") ]
try:
consumerAdmin, consID = eventChannel.new_for_consumers(CosNotifyChannelAdmin.AND_OP)
#print(consumerAdmin[0])
#print(dir(InterFilterGroupOperator))
except Exception as e:
print(e)
try:
consumerAdmin.subscription_change(eventTypeAdd, eventTypeRemove)
except Exception as e:
print(e)
try:
consumerAdmin.remove_all_filters()
except Exception as e:
print(e)
subFilter="1==1"
ffp = eventChannel._get_default_filter_factory()
exp = [CosNotifyFilter.ConstraintExp ([ CosNotification.EventType("tmf_mtnm", "NT_ALARM") ] , subFilter)]
exp1 = [CosNotifyFilter.ConstraintExp ([ CosNotification.EventType("tmf_mtnm", "NT_HEARTBEAT") ] , subFilter)]
try:
alarmFilter = eventChannel._get_default_filter_factory()
channel_filter = alarmFilter.create_filter("TCL")
channel_filter.add_constraints(exp)
consumerAdmin.add_filter(channel_filter)
except Exception as e:
print(e)
impl = StructuredPushConsumerImpl(orb)
obj = impl._this()
consumer_admin = eventChannel._get_default_consumer_admin()
supplier_admin = eventChannel._get_default_supplier_admin()
(proxy_consumer, proxy_id) = supplier_admin.obtain_notification_push_consumer(CosNotifyChannelAdmin.STRUCTURED_EVENT)
proxy_consumer = proxy_consumer._narrow(CosNotifyChannelAdmin.StructuredProxyPushConsumer)
proxy_consumer.connect_structured_push_supplier(CosNotifyComm__POA.StructuredPushSupplier()._this())
if proxy_consumer != None:
# Build the filterable data array.
AlarmSerialNo = 0
nativeEMSName = ''
nativeProbableCause = ''
SlaveShelf = ''
whatsay = ''
#
filterable_data = [
CosNotification.Property ( "AlarmSerialNo",CORBA.Any(CORBA._tc_ulong, AlarmSerialNo)),
CosNotification.Property ( "nativeEMSName",CORBA.Any(CORBA._tc_string, nativeEMSName)),
CosNotification.Property ( "nativeProbableCause",CORBA.Any(CORBA._tc_string, nativeProbableCause)),
CosNotification.Property ( "SlaveShelf",CORBA.Any(CORBA._tc_string, SlaveShelf))
]
while True:
try:
event = CosNotification.StructuredEvent (header, filterable_data, remainder_of_body)
events = proxy_consumer.push_structured_event(event)
print(events)
except Exception as e:
print(e)
except CORBA.Exception as ex:
sys.stderr.write("CORBA exception, unable to %s.\n"%action)
################################################################################
# If this file is executed directly, then we start here.
if(__name__=="__main__"):
main()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://www.omniorb-support.com/pipermail/omniorb-list/attachments/20220104/56475af1/attachment.html>
More information about the omniORB-list
mailing list