Thread - network down detectorΒΆ

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# -*- coding: utf-8 -*-
"""
threaded.network_down_detector.py
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

A fully-functional connection-observer using socket & threading.
Works on Python 2.7 as well as on 3.6

This example demonstrates basic concept of connection observer - entity
that is fully responsible for:
- observing data coming from connection till it catches what it is waiting for
- parsing that data to have "caught event" stored in expected form
- storing that result internally for later retrieval

Please note that this example is LAYER-1 usage which means:
- observer can't run by its own, must be fed with data (passive observer)
- observer can't be awaited, must be queried for status before asking for data
Another words - low level manual combining of all the pieces.
"""

__author__ = 'Grzegorz Latuszek'
__copyright__ = 'Copyright (C) 2018, Nokia'
__email__ = 'grzegorz.latuszek@nokia.com'

import logging
import socket
import sys
import os
import threading
import time
from contextlib import closing

from moler.threaded_moler_connection import ThreadedMolerConnection

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))  # allow finding modules in examples/

from network_toggle_observers import NetworkDownDetector


# ===================== Moler's connection-observer usage ======================


def ping_observing_task(address):
    logger = logging.getLogger('moler.user.app-code')

    # Lowest layer of Moler's usage (you manually glue all elements):
    # 1. create observer
    net_down_detector = NetworkDownDetector('10.0.2.15')
    # 2. ThreadedMolerConnection is a proxy-glue between observer (speaks str)
    #                                   and threaded-connection (speaks bytes)
    moler_conn = ThreadedMolerConnection(decoder=lambda data: data.decode("utf-8"))
    # 3a. glue from proxy to observer
    moler_conn.subscribe(net_down_detector.data_received)

    logger.debug('waiting for data to observe')
    for connection_data in tcp_connection(address):
        # 3b. glue to proxy from external-IO (threaded tcp client connection)
        #   (client code has to pass it's received data into Moler's connection)
        moler_conn.data_received(connection_data)
        # 4. Moler's client code must manually check status of observer ...
        if net_down_detector.done():
            # 5. ... to know when it can ask for result
            net_down_time = net_down_detector.result()
            timestamp = time.strftime("%H:%M:%S", time.localtime(net_down_time))
            logger.debug('Network is down from {}'.format(timestamp))
            break


# ==============================================================================
def tcp_connection(address):
    """Generator reading from tcp network transport layer"""
    logger = logging.getLogger('threaded.tcp-connection')
    logger.debug('... connecting to tcp://{}:{}'.format(*address))
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.connect(address)

    with closing(client_socket):
        while True:
            data = client_socket.recv(128)
            if data:
                logger.debug('<<< {!r}'.format(data))
                yield data
            else:
                logger.debug("... closed")
                break


def main(address):
    # Starting the client
    client_thread = threading.Thread(target=ping_observing_task, args=(address,))
    client_thread.start()
    client_thread.join()


# ==============================================================================
if __name__ == '__main__':
    from threaded_ping_server import start_ping_servers, stop_ping_servers

    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s |%(name)25s |%(message)s',
        datefmt='%H:%M:%S',
        stream=sys.stderr,
    )
    local_address = ('localhost', 5670)
    servers = start_ping_servers([(local_address, '10.0.2.15')])
    main(local_address)
    stop_ping_servers(servers)

'''
LOG OUTPUT

16:58:04 | threaded.ping.tcp-server |Ping Sim started at tcp://localhost:5670
16:58:04 | threaded.ping.tcp-server |WARNING - I'll be tired too much just after first client!
16:58:04 |      moler.user.app-code |waiting for data to observe
16:58:04 |  threaded.tcp-connection |... connecting to tcp://localhost:5670
16:58:04 | threaded.ping.tcp-server |connection accepted - client at tcp://127.0.0.1:56582
16:58:04 |  threaded.tcp-connection |<<< b'\n'
16:58:05 |  threaded.tcp-connection |<<< b'greg@debian:~$ ping 10.0.2.15\n'
16:58:06 |  threaded.tcp-connection |<<< b'PING 10.0.2.15 (10.0.2.15) 56(84) bytes of data.\n'
16:58:07 |  threaded.tcp-connection |<<< b'64 bytes from 10.0.2.15: icmp_req=1 ttl=64 time=0.080 ms\n'
16:58:08 |  threaded.tcp-connection |<<< b'64 bytes from 10.0.2.15: icmp_req=2 ttl=64 time=0.037 ms\n'
16:58:09 |  threaded.tcp-connection |<<< b'64 bytes from 10.0.2.15: icmp_req=3 ttl=64 time=0.045 ms\n'
16:58:10 |  threaded.tcp-connection |<<< b'ping: sendmsg: Network is unreachable\n'
16:58:10 |  moler.net-down-detector |Network is down!
16:58:10 |      moler.user.app-code |Network is down from 16:58:10
16:58:10 | threaded.ping.tcp-server |Ping Sim: I'm tired after this client ... bye
16:58:12 | threaded.ping.tcp-server |Connection closed
'''
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# -*- coding: utf-8 -*-
"""
threaded.network_down_multi-detectors.py
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

A fully-functional connection-observer using socket & threading.
Works on Python 2.7 as well as on 3.6

This example demonstrates multiple connection observers working
on multiple connections.
Shows following concepts:
- multiple observers may observe single connection
- each one is focused on different data (processing decomposition)
- client code may run observers on different connections
- client code may "start" observers in sequence
- external-IO-connection must be given Moler's connection for data forwarding
"""

__author__ = 'Grzegorz Latuszek'
__copyright__ = 'Copyright (C) 2018, Nokia'
__email__ = 'grzegorz.latuszek@nokia.com'

import logging
import socket
import sys
import os
import threading
import time
from contextlib import closing

from moler.threaded_moler_connection import ThreadedMolerConnection

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))  # allow finding modules in examples/

from network_toggle_observers import NetworkDownDetector, NetworkUpDetector


# ===================== Moler's connection-observer usage ======================


def ping_observing_task(address, ping_ip):
    logger = logging.getLogger('moler.user.app-code')
    net_addr = 'tcp://{}:{}'.format(*address)

    # Lowest layer of Moler's usage (you manually glue all elements):
    # 1. create observers
    net_down_detector = NetworkDownDetector(ping_ip)
    net_drop_found = False
    net_up_detector = NetworkUpDetector(ping_ip)
    moler_conn = ThreadedMolerConnection(decoder=lambda data: data.decode("utf-8"))
    # 2. virtually "start" observer by making it data-listener
    moler_conn.subscribe(net_down_detector.data_received)

    info = '{} on {} using {}'.format(ping_ip, net_addr, net_down_detector)
    logger.debug('observe ' + info)
    for _ in tcp_connection(address, moler_conn):
        # anytime new data comes it may change status of observer
        if not net_drop_found and net_down_detector.done():
            net_drop_found = True
            net_down_time = net_down_detector.result()
            timestamp = time.strftime("%H:%M:%S", time.localtime(net_down_time))
            logger.debug('Network {} is down from {}'.format(ping_ip, timestamp))
            # 3. virtually "stop" that observer
            moler_conn.unsubscribe(net_down_detector.data_received)
            # 4. and start subsequent one (to know when net is back "up")
            info = '{} on {} using {}'.format(ping_ip, net_addr, net_up_detector)
            logger.debug('observe ' + info)
            moler_conn.subscribe(net_up_detector.data_received)
        if net_up_detector.done():
            net_up_time = net_up_detector.result()
            timestamp = time.strftime("%H:%M:%S", time.localtime(net_up_time))
            logger.debug('Network {} is back "up" from {}'.format(ping_ip, timestamp))
            # 5. virtually "stop" that observer
            moler_conn.unsubscribe(net_up_detector.data_received)
            break


# ==============================================================================
def tcp_connection(address, moler_conn):
    """Forwarder reading from tcp network transport layer"""
    logger = logging.getLogger('threaded.tcp-connection({}:{})'.format(*address))
    logger.debug('... connecting to tcp://{}:{}'.format(*address))
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.connect(address)

    with closing(client_socket):
        while True:
            data = client_socket.recv(128)
            if data:
                logger.debug('<<< {!r}'.format(data))
                # Forward received data into Moler's connection
                moler_conn.data_received(data)
                yield data
            else:
                logger.debug("... closed")
                break


def main(connections2observe4ip):
    # Starting the clients
    connections = []
    for address, ping_ip in connections2observe4ip:
        client_thread = threading.Thread(target=ping_observing_task,
                                         args=(address, ping_ip))
        client_thread.start()
        connections.append(client_thread)
    # await observers job to be done
    for client_thread in connections:
        client_thread.join()


# ==============================================================================
if __name__ == '__main__':
    from threaded_ping_server import start_ping_servers, stop_ping_servers

    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s |%(name)-40s |%(message)s',
        datefmt='%H:%M:%S',
        stream=sys.stderr,
    )
    connections2observe = [(('localhost', 5671), '10.0.2.15'),
                           (('localhost', 5672), '10.0.2.16')]
    servers = start_ping_servers(connections2observe)
    main(connections2observe)
    stop_ping_servers(servers)

'''
LOG OUTPUT

16:37:44 |threaded.ping.tcp-server(5671)           |Ping Sim started at tcp://localhost:5671
16:37:44 |threaded.ping.tcp-server(5672)           |Ping Sim started at tcp://localhost:5672
16:37:44 |moler.user.app-code                      |observe 10.0.2.15 on tcp://localhost:5671 using NetworkDownDetector(id:46109472)
16:37:44 |threaded.tcp-connection(localhost:5671)  |... connecting to tcp://localhost:5671
16:37:44 |moler.user.app-code                      |observe 10.0.2.16 on tcp://localhost:5672 using NetworkDownDetector(id:46184544)
16:37:44 |threaded.tcp-connection(localhost:5672)  |... connecting to tcp://localhost:5672
16:37:44 |threaded.ping.tcp-server(5671 -> 61044)  |connection accepted - client at tcp://127.0.0.1:61044
16:37:44 |threaded.tcp-connection(localhost:5671)  |<<< b'\n'
16:37:44 |threaded.ping.tcp-server(5672 -> 61045)  |connection accepted - client at tcp://127.0.0.1:61045
16:37:44 |threaded.tcp-connection(localhost:5672)  |<<< b'\n'
16:37:45 |threaded.tcp-connection(localhost:5671)  |<<< b'greg@debian:~$ ping 10.0.2.15\n'
16:37:45 |threaded.tcp-connection(localhost:5672)  |<<< b'greg@debian:~$ ping 10.0.2.16\n'
16:37:46 |threaded.tcp-connection(localhost:5671)  |<<< b'PING 10.0.2.15 (10.0.2.15) 56(84) bytes of data.\n'
16:37:46 |threaded.tcp-connection(localhost:5672)  |<<< b'PING 10.0.2.16 (10.0.2.16) 56(84) bytes of data.\n'
16:37:47 |threaded.tcp-connection(localhost:5671)  |<<< b'64 bytes from 10.0.2.15: icmp_req=1 ttl=64 time=0.080 ms\n'
16:37:47 |threaded.tcp-connection(localhost:5672)  |<<< b'64 bytes from 10.0.2.16: icmp_req=1 ttl=64 time=0.080 ms\n'
16:37:48 |threaded.tcp-connection(localhost:5671)  |<<< b'64 bytes from 10.0.2.15: icmp_req=2 ttl=64 time=0.037 ms\n'
16:37:48 |threaded.tcp-connection(localhost:5672)  |<<< b'64 bytes from 10.0.2.16: icmp_req=2 ttl=64 time=0.037 ms\n'
16:37:49 |threaded.tcp-connection(localhost:5671)  |<<< b'64 bytes from 10.0.2.15: icmp_req=3 ttl=64 time=0.045 ms\n'
16:37:49 |threaded.tcp-connection(localhost:5672)  |<<< b'64 bytes from 10.0.2.16: icmp_req=3 ttl=64 time=0.045 ms\n'
16:37:50 |threaded.tcp-connection(localhost:5671)  |<<< b'ping: sendmsg: Network is unreachable\n'
16:37:50 |moler.NetworkDownDetector(id:46109472)   |Network 10.0.2.15 is down!
16:37:50 |moler.user.app-code                      |Network 10.0.2.15 is down from 16:37:50
16:37:50 |moler.user.app-code                      |observe 10.0.2.15 on tcp://localhost:5671 using NetworkUpDetector(id:46110368)
16:37:50 |threaded.tcp-connection(localhost:5672)  |<<< b'ping: sendmsg: Network is unreachable\n'
16:37:50 |moler.NetworkDownDetector(id:46184544)   |Network 10.0.2.16 is down!
16:37:50 |moler.user.app-code                      |Network 10.0.2.16 is down from 16:37:50
16:37:50 |moler.user.app-code                      |observe 10.0.2.16 on tcp://localhost:5672 using NetworkUpDetector(id:46184488)
16:37:51 |threaded.tcp-connection(localhost:5671)  |<<< b'ping: sendmsg: Network is unreachable\n'
16:37:51 |threaded.tcp-connection(localhost:5672)  |<<< b'ping: sendmsg: Network is unreachable\n'
16:37:52 |threaded.tcp-connection(localhost:5671)  |<<< b'ping: sendmsg: Network is unreachable\n'
16:37:52 |threaded.tcp-connection(localhost:5672)  |<<< b'ping: sendmsg: Network is unreachable\n'
16:37:53 |threaded.tcp-connection(localhost:5671)  |<<< b'64 bytes from 10.0.2.15: icmp_req=7 ttl=64 time=0.123 ms\n'
16:37:53 |moler.NetworkUpDetector(id:46110368)     |Network 10.0.2.15 is up!
16:37:53 |moler.user.app-code                      |Network 10.0.2.15 is back "up" from 16:37:53
16:37:53 |threaded.tcp-connection(localhost:5672)  |<<< b'64 bytes from 10.0.2.16: icmp_req=7 ttl=64 time=0.123 ms\n'
16:37:53 |moler.NetworkUpDetector(id:46184488)     |Network 10.0.2.16 is up!
16:37:53 |moler.user.app-code                      |Network 10.0.2.16 is back "up" from 16:37:53
16:37:53 |threaded.ping.tcp-server(5671)           |Ping Sim: ... bye
16:37:53 |threaded.ping.tcp-server(5672)           |Ping Sim: ... bye
16:37:55 |threaded.ping.tcp-server(5671 -> 61044)  |Connection closed
16:37:55 |threaded.ping.tcp-server(5672 -> 61045)  |Connection closed
'''