-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgeocoding.py
More file actions
196 lines (165 loc) · 6.37 KB
/
geocoding.py
File metadata and controls
196 lines (165 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
__copyright__ = "Copyright 2025 TU Dresden / KOMET Project"
__author__ = "Daniel Nüst & KOMET Team"
__license__ = "AGPL v3"
import re
from geopy.extra.rate_limiter import RateLimiter
from geopy.geocoders import GeoNames, Nominatim, Photon
from utils.logger import get_logger
logger = get_logger(__name__)
PROVIDERS = {
"nominatim": Nominatim,
"geonames": GeoNames,
"photon": Photon,
}
class GeocodingService:
"""Reverse-geocodes WKT geometries via geopy."""
def __init__(
self,
provider="nominatim",
user_agent="janeway-geometadata",
geonames_username="",
):
provider = provider.lower()
if provider not in PROVIDERS:
raise ValueError(f"Unknown geocoding provider: {provider}")
kwargs = {}
if provider == "nominatim":
kwargs["user_agent"] = user_agent
elif provider == "geonames":
if not geonames_username:
raise ValueError(
"GeoNames requires a username. "
"Register at https://www.geonames.org/login"
)
kwargs["username"] = geonames_username
elif provider == "photon":
kwargs["user_agent"] = user_agent
self.geocoder = PROVIDERS[provider](**kwargs)
self.reverse = RateLimiter(self.geocoder.reverse, min_delay_seconds=1.1)
def extract_coordinates_from_wkt(self, wkt):
"""Extract deduplicated (lat, lng) pairs from a WKT string.
WKT uses lng-lat order; this method flips to lat-lng for geopy.
"""
pairs = re.findall(r"(-?\d+\.?\d*)\s+(-?\d+\.?\d*)", wkt)
seen = set()
coords = []
for lng_str, lat_str in pairs:
lng = float(lng_str)
lat = float(lat_str)
key = (lat, lng)
if key not in seen:
seen.add(key)
coords.append((lat, lng))
return coords
def reverse_geocode_coordinates(self, coords, max_points=10):
"""Reverse-geocode a list of (lat, lng) pairs.
Samples evenly if more than *max_points* coordinates.
Returns a list of geopy Location objects (None entries filtered out).
"""
if len(coords) > max_points:
coords = self._sample_coordinates(coords, max_points)
results = []
for lat, lng in coords:
try:
result = self.reverse((lat, lng), exactly_one=True, language="en")
if result:
results.append(result)
except Exception:
logger.debug("Reverse geocoding failed for (%s, %s)", lat, lng)
return results
def find_common_location_description(self, results):
"""Derive a common place_name and admin_units from geocoded results.
Returns ``{"place_name": "...", "admin_units": "..."}``.
"""
if not results:
return {"place_name": "", "admin_units": ""}
hierarchies = []
for result in results:
hierarchy = self._extract_admin_hierarchy(result)
if hierarchy:
hierarchies.append(hierarchy)
if not hierarchies:
return {"place_name": "", "admin_units": ""}
if len(hierarchies) == 1:
h = hierarchies[0]
return {
"place_name": ", ".join(h),
"admin_units": ", ".join(h),
}
common = self._find_common_suffix(hierarchies)
place_name = ", ".join(common) if common else ""
admin_units = ", ".join(common) if common else ""
return {"place_name": place_name, "admin_units": admin_units}
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
@staticmethod
def _sample_coordinates(coords, max_points):
"""Return an evenly-spaced sample including first and last."""
if len(coords) <= max_points:
return coords
indices = set()
indices.add(0)
indices.add(len(coords) - 1)
step = (len(coords) - 1) / (max_points - 1)
for i in range(1, max_points - 1):
indices.add(round(i * step))
return [coords[i] for i in sorted(indices)]
@staticmethod
def _extract_admin_hierarchy(result):
"""Extract [city, state, country] from a geopy Location's raw dict."""
raw = getattr(result, "raw", {}) or {}
# Nominatim / Photon store address info under "address"
address = raw.get("address", {})
if not address:
# GeoNames uses a flat structure
parts = []
for key in ("name", "adminName1", "countryName"):
val = raw.get(key)
if val:
parts.append(val)
return parts if parts else None
city = (
address.get("city")
or address.get("town")
or address.get("village")
or address.get("municipality")
or address.get("county")
)
state = address.get("state") or address.get("region") or address.get("province")
country = address.get("country")
parts = [p for p in [city, state, country] if p]
return parts if parts else None
@staticmethod
def _find_common_suffix(hierarchies):
"""Find the longest common suffix across lists of strings."""
if not hierarchies:
return []
reversed_lists = [list(reversed(h)) for h in hierarchies]
common = []
for items in zip(*reversed_lists):
if len(set(items)) == 1:
common.append(items[0])
else:
break
return list(reversed(common))
def reverse_geocode_wkt(
wkt,
provider="nominatim",
user_agent="janeway-geometadata",
geonames_username="",
max_points=10,
):
"""Convenience function: reverse-geocode a WKT string.
Returns ``{"place_name": "...", "admin_units": "..."}``.
"""
service = GeocodingService(
provider=provider,
user_agent=user_agent,
geonames_username=geonames_username,
)
coords = service.extract_coordinates_from_wkt(wkt)
if not coords:
return {"place_name": "", "admin_units": ""}
results = service.reverse_geocode_coordinates(coords, max_points=max_points)
return service.find_common_location_description(results)