3333logger = logging .getLogger (__name__ )
3434
3535
36- def send_trap (host , port , object_identity , mib_to_load , * var_binds ):
36+ def send_trap (
37+ host , port , object_identity , mib_to_load , community , mp_model , * var_binds
38+ ):
3739 iterator = sendNotification (
3840 SnmpEngine (),
39- CommunityData ("public" , mpModel = 0 ),
41+ CommunityData (community , mpModel = mp_model ),
4042 UdpTransportTarget ((host , port )),
4143 ContextData (),
4244 "trap" ,
@@ -73,20 +75,57 @@ def send_v3_trap(host, port, object_identity, *var_binds):
7375 logger .error (f"{ error_indication } " )
7476
7577
76- def test_integration (request , setup_splunk ):
78+ def test_trap_v1 (request , setup_splunk ):
7779 trap_external_ip = request .config .getoption ("trap_external_ip" )
7880 logger .info (f"I have: { trap_external_ip } " )
7981
8082 time .sleep (2 )
8183 # send trap
8284 varbind1 = ("1.3.6.1.6.3.1.1.4.3.0" , "1.3.6.1.4.1.20408.4.1.1.2" )
83- varbind2 = ("1.3.6.1.2.1.1.1 .0" , OctetString ("my system " ))
85+ varbind2 = ("1.3.6.1.2.1.1.4 .0" , OctetString ("my contact " ))
8486 send_trap (
85- trap_external_ip , 162 , "1.3.6.1.6.3.1.1.5.2" , "SNMPv2-MIB" , varbind1 , varbind2
87+ trap_external_ip ,
88+ 162 ,
89+ "1.3.6.1.6.3.1.1.5.2" ,
90+ "SNMPv2-MIB" ,
91+ "publicv1" ,
92+ 0 ,
93+ varbind1 ,
94+ varbind2 ,
8695 )
8796
8897 # wait for the message to be processed
98+ time .sleep (5 )
99+
100+ search_query = """search index="netops" sourcetype="sc4snmp:traps" earliest=-1m
101+ | head 1"""
102+
103+ result_count , events_count = splunk_single_search (setup_splunk , search_query )
104+
105+ assert result_count == 1
106+
107+
108+ def test_trap_v2 (request , setup_splunk ):
109+ trap_external_ip = request .config .getoption ("trap_external_ip" )
110+ logger .info (f"I have: { trap_external_ip } " )
111+
89112 time .sleep (2 )
113+ # send trap
114+ varbind1 = ("1.3.6.1.6.3.1.1.4.3.0" , "1.3.6.1.4.1.20408.4.1.1.2" )
115+ varbind2 = ("1.3.6.1.2.1.1.1.0" , OctetString ("my system" ))
116+ send_trap (
117+ trap_external_ip ,
118+ 162 ,
119+ "1.3.6.1.6.3.1.1.5.2" ,
120+ "SNMPv2-MIB" ,
121+ "homelab" ,
122+ 1 ,
123+ varbind1 ,
124+ varbind2 ,
125+ )
126+
127+ # wait for the message to be processed
128+ time .sleep (5 )
90129
91130 search_query = """search index="netops" sourcetype="sc4snmp:traps" earliest=-1m
92131 | head 1"""
@@ -103,10 +142,12 @@ def test_added_varbind(request, setup_splunk):
103142 time .sleep (2 )
104143 # send trap
105144 varbind1 = ("1.3.6.1.2.1.1.1.0" , OctetString ("test_added_varbind" ))
106- send_trap (trap_external_ip , 162 , "1.3.6.1.2.1.2.1" , "SNMPv2-MIB" , varbind1 )
145+ send_trap (
146+ trap_external_ip , 162 , "1.3.6.1.2.1.2.1" , "SNMPv2-MIB" , "public" , 1 , varbind1
147+ )
107148
108149 # wait for the message to be processed
109- time .sleep (2 )
150+ time .sleep (5 )
110151
111152 search_query = (
112153 """search index="netops" "SNMPv2-MIB.sysDescr.value"="test_added_varbind" """
@@ -125,7 +166,15 @@ def test_many_traps(request, setup_splunk):
125166 # send trap
126167 varbind1 = ("1.3.6.1.2.1.1.1.0" , OctetString ("test_many_traps" ))
127168 for _ in range (5 ):
128- send_trap (trap_external_ip , 162 , "1.3.6.1.2.1.2.1" , "SNMPv2-MIB" , varbind1 )
169+ send_trap (
170+ trap_external_ip ,
171+ 162 ,
172+ "1.3.6.1.2.1.2.1" ,
173+ "SNMPv2-MIB" ,
174+ "public" ,
175+ 1 ,
176+ varbind1 ,
177+ )
129178
130179 # wait for the message to be processed
131180 time .sleep (2 )
@@ -148,7 +197,14 @@ def test_more_than_one_varbind(request, setup_splunk):
148197 varbind1 = ("1.3.6.1.2.1.1.4.0" , OctetString ("test_more_than_one_varbind_contact" ))
149198 varbind2 = ("1.3.6.1.2.1.1.1.0" , OctetString ("test_more_than_one_varbind" ))
150199 send_trap (
151- trap_external_ip , 162 , "1.3.6.1.2.1.2.1" , "SNMPv2-MIB" , varbind1 , varbind2
200+ trap_external_ip ,
201+ 162 ,
202+ "1.3.6.1.2.1.2.1" ,
203+ "SNMPv2-MIB" ,
204+ "public" ,
205+ 1 ,
206+ varbind1 ,
207+ varbind2 ,
152208 )
153209
154210 # wait for the message to be processed
@@ -170,7 +226,13 @@ def test_loading_mibs(request, setup_splunk):
170226 # send trap
171227 varbind1 = ("1.3.6.1.6.3.1.1.4.1.0" , "1.3.6.1.4.1.15597.1.1.1.1.0.1" )
172228 send_trap (
173- trap_external_ip , 162 , "1.3.6.1.4.1.15597.1.1.1.1" , "SNMPv2-MIB" , varbind1
229+ trap_external_ip ,
230+ 162 ,
231+ "1.3.6.1.4.1.15597.1.1.1.1" ,
232+ "SNMPv2-MIB" ,
233+ "public" ,
234+ 1 ,
235+ varbind1 ,
174236 )
175237
176238 # wait for the message to be processed
0 commit comments