-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproxy
More file actions
executable file
·116 lines (102 loc) · 4.46 KB
/
proxy
File metadata and controls
executable file
·116 lines (102 loc) · 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
from anthemproxy.device import AnthemDevice
from anthemproxy.protocol import AnthemProtocol
from anthemproxy.discovery import AnthemDiscovery
from anthemproxy.proxy import AnthemProxy
import argparse
import asyncio
import ctypes
import logging
import multiprocessing
import os
async def discover(bind, port, broadcast, forward = False):
seen = {}
def show_device(packet):
output = packet.json
if not seen.get(output):
print(output)
seen[output] = True
return AnthemDiscovery.CONTINUE
discovery = AnthemDiscovery(host = bind, port = port, broadcast = broadcast, on_receive = show_device, forward = forward)
return await discovery.run()
async def proxy(bind, listen, host, port, alias, name, model, serial, forward = False):
def query_device(packet):
if packet.discover:
return AnthemDiscovery.CONTINUE
serialised.value = packet.device.json
return AnthemDiscovery.STOP
target_host = multiprocessing.Value(ctypes.c_wchar_p, '')
target_port = multiprocessing.Value('i', 0)
target = AnthemDevice(host = host, port = port, alias = alias, name = name, model = model, serial = serial)
announced = False
while not target.name or not target.usable:
if target.valid:
target_host.value = target.host
target_port.value = target.port
serialised = multiprocessing.Value(ctypes.c_wchar_p, '')
if not announced:
logging.info('Discovering target device details.')
announced = True
discovery = AnthemDiscovery(host = bind, port = listen, broadcast = host, on_receive = query_device, forward = forward)
await discovery.run()
canon = AnthemDevice.from_json(serialised.value)
if not target.host or target.host != canon.host:
target.host = canon.host
target.port = canon.port
logging.debug('Discovered host: %s', target.host)
if not target.name:
target.name = canon.name
logging.debug('Discovered name: %s', target.name)
if not target.model:
target.model = canon.model
logging.debug('Discovered model: %s', target.model)
if not target.serial:
target.serial = canon.serial
logging.debug('Discovered serial: %s', target.serial)
logging.info('Proxying: %s', target.json)
proxy = AnthemProxy(target, bind, listen, forward = forward)
return await proxy.run()
async def main(args):
host = args.bind
port = args.listen
action = args.action[0]
if action == 'discover':
return await discover(bind = args.bind, port = port, broadcast = args.host, forward = args.forward)
elif action == 'proxy':
return await proxy(bind = args.bind, listen = args.listen, host = args.host, port = args.port, alias = args.alias, name = args.name, model = args.model, serial = args.serial, forward = args.forward)
def env_or_arg(parser, *args, **kwargs):
canon = { k: v for k, v in kwargs.items() }
for arg in args:
if arg.startswith('--'):
key = '_'.join(['ANTHEMPROXY', arg[2:].upper()])
value = os.environ.get(key, kwargs.get('default'))
coerce = kwargs.get('type', str)
if coerce is bool:
del(canon['type'])
if value is not None:
try:
canon['default'] = coerce(value)
except:
pass
parser.add_argument(*args, **canon)
parser = argparse.ArgumentParser('anthem_proxy')
env_or_arg(parser, '-a', '--alias', help = 'Proxy name')
env_or_arg(parser, '-b', '--bind', default = AnthemProtocol.LISTEN, help = 'Bind address')
env_or_arg(parser, '-d', '--debug', type = bool, action = 'store_true', help = 'Debug mode')
env_or_arg(parser, '-f', '--forward', type = bool, action = 'store_true', help = 'Accept requests from other proxies')
env_or_arg(parser, '-l', '--listen', type = int, default = AnthemProtocol.PORT, help = 'Bind port')
env_or_arg(parser, '-m', '--model', help = 'Device model')
env_or_arg(parser, '-n', '--name', help = 'Device name')
env_or_arg(parser, '-p', '--port', type = int, default = AnthemProtocol.PORT, help = 'Device port')
env_or_arg(parser, '-s', '--serial', help = 'Device serial number')
env_or_arg(parser, '-t', '--host', help = 'Target device IP or hostname')
parser.add_argument('action', choices = ['discover', 'proxy'], nargs = 1)
args = parser.parse_args()
logging.basicConfig(level = logging.DEBUG if args.debug or os.getenv('DEBUG') else logging.INFO, format = r'%(asctime)s %(module)s.%(funcName)s:%(lineno)d %(levelname)s: %(message)s', datefmt = r'%Y-%m-%dT%H:%M:%SZ')
try:
asyncio.run(main(args))
except KeyboardInterrupt:
# XXX
pass
except:
raise