-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPingSensor.py
More file actions
67 lines (53 loc) · 1.63 KB
/
PingSensor.py
File metadata and controls
67 lines (53 loc) · 1.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import socket
import icmplib
import struct
import PowerControllerSensor
import os
import time
import logging
process_id = os.getpid()
BUFSIZE=1500
class PingSensor(PowerControllerSensor.PowerControllerSensor):
def __init__(self, addr, interval=1):
PowerControllerSensor.PowerControllerSensor.__init__(self)
self.sock = socket.socket(socket.AF_INET,socket.SOCK_RAW,
socket.getprotobyname('icmp'))
self.sock.connect((addr,22))
self.poll_interval = float(interval)
self.last_response_time = 0
self.addr = addr
## create ping packet
self.base_packet = icmplib.Packet((8,0))
self.seq_num = 0
self.send_ping()
def send_ping(self):
self.seq_num += 1;
pdata = struct.pack("!HHd",process_id,self.seq_num,time.time())
## send initial packet
self.base_packet.data = pdata
self.sock.send(self.base_packet.packet)
def get_state(self):
if (time.time() - self.last_response_time) > self.poll_interval*2:
return set()
else:
return set([self.addr + "_alive"])
def get_selectable_fds(self):
return [self.sock.fileno()]
def get_poll_interval(self):
return self.poll_interval
def do_poll(self):
self.send_ping()
def do_read(self):
## recv packet
buf = self.sock.recv(BUFSIZE)
current_time = time.time()
## parse packet; remove IP header first
r = icmplib.Packet.parse(buf[20:])
## parse ping data
(ident,seq,timestamp) = struct.unpack("!HHd",r.data)
## calculate rounttrip time
rtt = current_time - timestamp
rtt *= 1000
logging.debug("%d bytes from %s: id=%s, seq=%u, rtt=%.3f ms" % (len(buf),
"addr", ident, seq, rtt)
self.last_response_time = current_time