1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 | # -*- coding: utf-8 -*-
"""
asyncio.network_down_detector.py
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
A fully-functional connection-observer using asyncio. Requires Python 3.6+.
This example demonstrates basic concept of connection observer - entity
that is fully responsible for:
- observing data coming from connection till it catches what it is waiting for
- parsing that data to have "caught event" stored in expected form
- storing that result internally for later retrieval
Please note that this example is LAYER-1 usage which means:
- observer can't run by its own, must be fed with data (passive observer)
- observer can't be awaited, must be queried for status before asking for data
Another words - low level manual combining of all the pieces.
"""
__author__ = 'Grzegorz Latuszek'
__copyright__ = 'Copyright (C) 2018, Nokia'
__email__ = 'grzegorz.latuszek@nokia.com'
import asyncio
import logging
import sys
import os
import time
from moler.threaded_moler_connection import ThreadedMolerConnection
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) # allow finding modules in examples/
from network_toggle_observers import NetworkDownDetector
# ===================== Moler's connection-observer usage ======================
async def ping_observing_task(address):
logger = logging.getLogger('moler.user.app-code')
# Lowest layer of Moler's usage (you manually glue all elements):
# 1. create observer
net_down_detector = NetworkDownDetector('10.0.2.15')
# 2. ThreadedMolerConnection is a proxy-glue between observer (speaks str)
# and asyncio-connection (speaks bytes)
moler_conn = ThreadedMolerConnection(decoder=lambda data: data.decode("utf-8"))
# 3a. glue from proxy to observer
moler_conn.subscribe(net_down_detector.data_received)
logger.debug('waiting for data to observe')
async for connection_data in tcp_connection(address):
# 3b. glue to proxy from external-IO (asyncio tcp client connection)
# (client code has to pass it's received data into Moler's connection)
moler_conn.data_received(connection_data)
# 4. Moler's client code must manually check status of observer ...
if net_down_detector.done():
# 5. ... to know when it can ask for result
net_down_time = net_down_detector.result()
timestamp = time.strftime("%H:%M:%S", time.localtime(net_down_time))
logger.debug('Network is down from {}'.format(timestamp))
break
# ==============================================================================
async def tcp_connection(address):
"""Async generator reading from tcp network transport layer"""
logger = logging.getLogger('asyncio.tcp-connection')
logger.debug('... connecting to tcp://{}:{}'.format(*address))
reader, writer = await asyncio.open_connection(*address)
try:
while True:
data = await reader.read(128)
if data:
logger.debug('<<< {!r}'.format(data))
yield data
else:
break
finally:
logger.debug('... closing')
writer.close()
# ==============================================================================
if __name__ == '__main__':
from threaded_ping_server import start_ping_servers, stop_ping_servers
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s |%(name)40s |%(message)s',
datefmt='%H:%M:%S',
stream=sys.stderr,
)
event_loop = asyncio.get_event_loop()
server_address = ('127.0.0.1', 5678)
servers = start_ping_servers([(server_address, '10.0.2.15')])
try:
event_loop.run_until_complete(ping_observing_task(server_address))
finally:
stop_ping_servers(servers)
event_loop.close()
'''
LOG OUTPUT
16:56:30 | asyncio |Using selector: SelectSelector
16:56:30 | asyncio.ping.tcp-server |Ping Sim started at tcp://127.0.0.1:5678
16:56:30 | asyncio.ping.tcp-server |WARNING - I'll be tired too much just after first client!
16:56:30 | moler.user.app-code |waiting for data to observe
16:56:30 | asyncio.tcp-connection |... connecting to tcp://127.0.0.1:5678
16:56:30 | asyncio.ping.tcp-server |connection accepted - client at tcp://127.0.0.1:56556
16:56:30 | asyncio.tcp-connection |<<< b'\n'
16:56:31 | asyncio.tcp-connection |<<< b'greg@debian:~$ ping 10.0.2.15\n'
16:56:32 | asyncio.tcp-connection |<<< b'PING 10.0.2.15 (10.0.2.15) 56(84) bytes of data.\n'
16:56:33 | asyncio.tcp-connection |<<< b'64 bytes from 10.0.2.15: icmp_req=1 ttl=64 time=0.080 ms\n'
16:56:34 | asyncio.tcp-connection |<<< b'64 bytes from 10.0.2.15: icmp_req=2 ttl=64 time=0.037 ms\n'
16:56:35 | asyncio.tcp-connection |<<< b'64 bytes from 10.0.2.15: icmp_req=3 ttl=64 time=0.045 ms\n'
16:56:36 | asyncio.tcp-connection |<<< b'ping: sendmsg: Network is unreachable\n'
16:56:36 | moler.net-down-detector |Network is down!
16:56:36 | moler.user.app-code |Network is down from 16:56:36
16:56:36 | asyncio.tcp-connection |... closing
16:56:38 | asyncio.ping.tcp-server |Ping Sim: I'm tired after this client ... will do sepuku
'''
|