openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1619 lines
52 KiB

from collections import OrderedDict
from datetime import datetime
from decimal import Decimal
from xml.sax import handler, make_parser
import json
import re
import sys
import time
import requests
from overpy import exception
from overpy.__about__ import (
__author__, __copyright__, __email__, __license__, __summary__, __title__,
__uri__, __version__
)
PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3
XML_PARSER_DOM = 1
XML_PARSER_SAX = 2
# Try to convert some common attributes
# http://wiki.openstreetmap.org/wiki/Elements#Common_attributes
GLOBAL_ATTRIBUTE_MODIFIERS = {
"changeset": int,
"timestamp": lambda ts: datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ"),
"uid": int,
"version": int,
"visible": lambda v: v.lower() == "true"
}
def is_valid_type(element, cls):
"""
Test if an element is of a given type.
:param Element() element: The element instance to test
:param Element cls: The element class to test
:return: False or True
:rtype: Boolean
"""
return isinstance(element, cls) and element.id is not None
class Overpass(object):
"""
Class to access the Overpass API
:cvar default_max_retry_count: Global max number of retries (Default: 0)
:cvar default_retry_timeout: Global time to wait between tries (Default: 1.0s)
"""
default_max_retry_count = 0
default_read_chunk_size = 4096
default_retry_timeout = 1.0
default_url = "http://overpass-api.de/api/interpreter"
def __init__(self, read_chunk_size=None, url=None, xml_parser=XML_PARSER_SAX, max_retry_count=None, retry_timeout=None, timeout=5.0, headers=None):
"""
:param read_chunk_size: Max size of each chunk read from the server response
:type read_chunk_size: Integer
:param url: Optional URL of the Overpass server. Defaults to http://overpass-api.de/api/interpreter
:type url: str
:param xml_parser: The xml parser to use
:type xml_parser: Integer
:param max_retry_count: Max number of retries (Default: default_max_retry_count)
:type max_retry_count: Integer
:param retry_timeout: Time to wait between tries (Default: default_retry_timeout)
:type retry_timeout: float
:param timeout: HTTP request timeout
:type timeout: float
:param headers: HTTP request headers
:type headers: dict
"""
self.url = self.default_url
if url is not None:
self.url = url
self._regex_extract_error_msg = re.compile(b"\<p\>(?P<msg>\<strong\s.*?)\</p\>")
self._regex_remove_tag = re.compile(b"<[^>]*?>")
if read_chunk_size is None:
read_chunk_size = self.default_read_chunk_size
self.read_chunk_size = read_chunk_size
if max_retry_count is None:
max_retry_count = self.default_max_retry_count
self.max_retry_count = max_retry_count
if retry_timeout is None:
retry_timeout = self.default_retry_timeout
self.retry_timeout = retry_timeout
self.xml_parser = xml_parser
self.timeout = timeout
self.headers = headers
def _handle_remark_msg(self, msg):
"""
Try to parse the message provided with the remark tag or element.
:param str msg: The message
:raises overpy.exception.OverpassRuntimeError: If message starts with 'runtime error:'
:raises overpy.exception.OverpassRuntimeRemark: If message starts with 'runtime remark:'
:raises overpy.exception.OverpassUnknownError: If we are unable to identify the error
"""
msg = msg.strip()
if msg.startswith("runtime error:"):
raise exception.OverpassRuntimeError(msg=msg)
elif msg.startswith("runtime remark:"):
raise exception.OverpassRuntimeRemark(msg=msg)
raise exception.OverpassUnknownError(msg=msg)
def query(self, query):
"""
Query the Overpass API
:param String|Bytes query: The query string in Overpass QL
:return: The parsed result
:rtype: overpy.Result
"""
if not isinstance(query, bytes):
query = query.encode("utf-8")
retry_num = 0
retry_exceptions = []
do_retry = True if self.max_retry_count > 0 else False
while retry_num <= self.max_retry_count:
if retry_num > 0:
time.sleep(self.retry_timeout)
retry_num += 1
try:
if self.headers is not None:
r = requests.post(self.url, query, timeout=self.timeout, headers=self.headers)
else:
r = requests.post(self.url, query, timeout=self.timeout)
response = r.content
except (requests.exceptions.BaseHTTPError, requests.exceptions.RequestException) as e:
if not do_retry:
raise e
retry_exceptions.append(e)
continue
if r.status_code == 200:
content_type = r.headers["Content-Type"]
if content_type == "application/json":
return self.parse_json(response)
if content_type == "application/osm3s+xml":
return self.parse_xml(response)
e = exception.OverpassUnknownContentType(content_type)
if not do_retry:
raise e
retry_exceptions.append(e)
continue
elif r.status_code == 400:
msgs = []
for msg in self._regex_extract_error_msg.finditer(response):
tmp = self._regex_remove_tag.sub(b"", msg.group("msg"))
try:
tmp = tmp.decode("utf-8")
except UnicodeDecodeError:
tmp = repr(tmp)
msgs.append(tmp)
e = exception.OverpassBadRequest(
query,
msgs=msgs
)
if not do_retry:
raise e
retry_exceptions.append(e)
continue
elif r.status_code == 429:
e = exception.OverpassTooManyRequests
if not do_retry:
raise e
retry_exceptions.append(e)
continue
elif r.status_code == 504:
e = exception.OverpassGatewayTimeout
if not do_retry:
raise e
retry_exceptions.append(e)
continue
# No valid response code
e = exception.OverpassUnknownHTTPStatusCode(r.status_code)
if not do_retry:
raise e
retry_exceptions.append(e)
continue
raise exception.MaxRetriesReached(retry_count=retry_num, exceptions=retry_exceptions)
def parse_json(self, data, encoding="utf-8"):
"""
Parse raw response from Overpass service.
:param data: Raw JSON Data
:type data: String or Bytes
:param encoding: Encoding to decode byte string
:type encoding: String
:return: Result object
:rtype: overpy.Result
"""
if isinstance(data, bytes):
data = data.decode(encoding)
data = json.loads(data, parse_float=Decimal)
if "remark" in data:
self._handle_remark_msg(msg=data.get("remark"))
return Result.from_json(data, api=self)
def parse_xml(self, data, encoding="utf-8", parser=None):
"""
:param data: Raw XML Data
:type data: String or Bytes
:param encoding: Encoding to decode byte string
:type encoding: String
:return: Result object
:rtype: overpy.Result
"""
if parser is None:
parser = self.xml_parser
if isinstance(data, bytes):
data = data.decode(encoding)
if PY2 and not isinstance(data, str):
# Python 2.x: Convert unicode strings
data = data.encode(encoding)
m = re.compile("<remark>(?P<msg>[^<>]*)</remark>").search(data)
if m:
self._handle_remark_msg(m.group("msg"))
return Result.from_xml(data, api=self, parser=parser)
class Result(object):
"""
Class to handle the result.
"""
def __init__(self, elements=None, api=None):
"""
:param List elements:
:param api:
:type api: overpy.Overpass
"""
if elements is None:
elements = []
self._areas = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Area))
self._nodes = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Node))
self._ways = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Way))
self._relations = OrderedDict((element.id, element)
for element in elements if is_valid_type(element, Relation))
self._class_collection_map = {Node: self._nodes, Way: self._ways, Relation: self._relations, Area: self._areas}
self.api = api
def expand(self, other):
"""
Add all elements from an other result to the list of elements of this result object.
It is used by the auto resolve feature.
:param other: Expand the result with the elements from this result.
:type other: overpy.Result
:raises ValueError: If provided parameter is not instance of :class:`overpy.Result`
"""
if not isinstance(other, Result):
raise ValueError("Provided argument has to be instance of overpy:Result()")
other_collection_map = {Node: other.nodes, Way: other.ways, Relation: other.relations, Area: other.areas}
for element_type, own_collection in self._class_collection_map.items():
for element in other_collection_map[element_type]:
if is_valid_type(element, element_type) and element.id not in own_collection:
own_collection[element.id] = element
def append(self, element):
"""
Append a new element to the result.
:param element: The element to append
:type element: overpy.Element
"""
if is_valid_type(element, Element):
self._class_collection_map[element.__class__].setdefault(element.id, element)
def get_elements(self, filter_cls, elem_id=None):
"""
Get a list of elements from the result and filter the element type by a class.
:param filter_cls:
:param elem_id: ID of the object
:type elem_id: Integer
:return: List of available elements
:rtype: List
"""
result = []
if elem_id is not None:
try:
result = [self._class_collection_map[filter_cls][elem_id]]
except KeyError:
result = []
else:
for e in self._class_collection_map[filter_cls].values():
result.append(e)
return result
def get_ids(self, filter_cls):
"""
:param filter_cls:
:return:
"""
return list(self._class_collection_map[filter_cls].keys())
def get_node_ids(self):
return self.get_ids(filter_cls=Node)
def get_way_ids(self):
return self.get_ids(filter_cls=Way)
def get_relation_ids(self):
return self.get_ids(filter_cls=Relation)
def get_area_ids(self):
return self.get_ids(filter_cls=Area)
@classmethod
def from_json(cls, data, api=None):
"""
Create a new instance and load data from json object.
:param data: JSON data returned by the Overpass API
:type data: Dict
:param api:
:type api: overpy.Overpass
:return: New instance of Result object
:rtype: overpy.Result
"""
result = cls(api=api)
for elem_cls in [Node, Way, Relation, Area]:
for element in data.get("elements", []):
e_type = element.get("type")
if hasattr(e_type, "lower") and e_type.lower() == elem_cls._type_value:
result.append(elem_cls.from_json(element, result=result))
return result
@classmethod
def from_xml(cls, data, api=None, parser=None):
"""
Create a new instance and load data from xml data or object.
.. note::
If parser is set to None, the functions tries to find the best parse.
By default the SAX parser is chosen if a string is provided as data.
The parser is set to DOM if an xml.etree.ElementTree.Element is provided as data value.
:param data: Root element
:type data: str | xml.etree.ElementTree.Element
:param api: The instance to query additional information if required.
:type api: Overpass
:param parser: Specify the parser to use(DOM or SAX)(Default: None = autodetect, defaults to SAX)
:type parser: Integer | None
:return: New instance of Result object
:rtype: Result
"""
if parser is None:
if isinstance(data, str):
parser = XML_PARSER_SAX
else:
parser = XML_PARSER_DOM
result = cls(api=api)
if parser == XML_PARSER_DOM:
import xml.etree.ElementTree as ET
if isinstance(data, str):
root = ET.fromstring(data)
elif isinstance(data, ET.Element):
root = data
else:
raise exception.OverPyException("Unable to detect data type.")
for elem_cls in [Node, Way, Relation, Area]:
for child in root:
if child.tag.lower() == elem_cls._type_value:
result.append(elem_cls.from_xml(child, result=result))
elif parser == XML_PARSER_SAX:
if PY2:
from StringIO import StringIO
else:
from io import StringIO
source = StringIO(data)
sax_handler = OSMSAXHandler(result)
parser = make_parser()
parser.setContentHandler(sax_handler)
parser.parse(source)
else:
# ToDo: better exception
raise Exception("Unknown XML parser")
return result
def get_area(self, area_id, resolve_missing=False):
"""
Get an area by its ID.
:param area_id: The area ID
:type area_id: Integer
:param resolve_missing: Query the Overpass API if the area is missing in the result set.
:return: The area
:rtype: overpy.Area
:raises overpy.exception.DataIncomplete: The requested way is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and the area can't be resolved.
"""
areas = self.get_areas(area_id=area_id)
if len(areas) == 0:
if resolve_missing is False:
raise exception.DataIncomplete("Resolve missing area is disabled")
query = ("\n"
"[out:json];\n"
"area({area_id});\n"
"out body;\n"
)
query = query.format(
area_id=area_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
areas = self.get_areas(area_id=area_id)
if len(areas) == 0:
raise exception.DataIncomplete("Unable to resolve requested areas")
return areas[0]
def get_areas(self, area_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Area
:param area_id: The Id of the area
:type area_id: Integer
:return: List of elements
"""
return self.get_elements(Area, elem_id=area_id, **kwargs)
def get_node(self, node_id, resolve_missing=False):
"""
Get a node by its ID.
:param node_id: The node ID
:type node_id: Integer
:param resolve_missing: Query the Overpass API if the node is missing in the result set.
:return: The node
:rtype: overpy.Node
:raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved.
"""
nodes = self.get_nodes(node_id=node_id)
if len(nodes) == 0:
if not resolve_missing:
raise exception.DataIncomplete("Resolve missing nodes is disabled")
query = ("\n"
"[out:json];\n"
"node({node_id});\n"
"out body;\n"
)
query = query.format(
node_id=node_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
nodes = self.get_nodes(node_id=node_id)
if len(nodes) == 0:
raise exception.DataIncomplete("Unable to resolve all nodes")
return nodes[0]
def get_nodes(self, node_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Node()
:param node_id: The Id of the node
:type node_id: Integer
:return: List of elements
"""
return self.get_elements(Node, elem_id=node_id, **kwargs)
def get_relation(self, rel_id, resolve_missing=False):
"""
Get a relation by its ID.
:param rel_id: The relation ID
:type rel_id: Integer
:param resolve_missing: Query the Overpass API if the relation is missing in the result set.
:return: The relation
:rtype: overpy.Relation
:raises overpy.exception.DataIncomplete: The requested relation is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and the relation can't be resolved.
"""
relations = self.get_relations(rel_id=rel_id)
if len(relations) == 0:
if resolve_missing is False:
raise exception.DataIncomplete("Resolve missing relations is disabled")
query = ("\n"
"[out:json];\n"
"relation({relation_id});\n"
"out body;\n"
)
query = query.format(
relation_id=rel_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
relations = self.get_relations(rel_id=rel_id)
if len(relations) == 0:
raise exception.DataIncomplete("Unable to resolve requested reference")
return relations[0]
def get_relations(self, rel_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Relation
:param rel_id: Id of the relation
:type rel_id: Integer
:return: List of elements
"""
return self.get_elements(Relation, elem_id=rel_id, **kwargs)
def get_way(self, way_id, resolve_missing=False):
"""
Get a way by its ID.
:param way_id: The way ID
:type way_id: Integer
:param resolve_missing: Query the Overpass API if the way is missing in the result set.
:return: The way
:rtype: overpy.Way
:raises overpy.exception.DataIncomplete: The requested way is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and the way can't be resolved.
"""
ways = self.get_ways(way_id=way_id)
if len(ways) == 0:
if resolve_missing is False:
raise exception.DataIncomplete("Resolve missing way is disabled")
query = ("\n"
"[out:json];\n"
"way({way_id});\n"
"out body;\n"
)
query = query.format(
way_id=way_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
ways = self.get_ways(way_id=way_id)
if len(ways) == 0:
raise exception.DataIncomplete("Unable to resolve requested way")
return ways[0]
def get_ways(self, way_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Way
:param way_id: The Id of the way
:type way_id: Integer
:return: List of elements
"""
return self.get_elements(Way, elem_id=way_id, **kwargs)
area_ids = property(get_area_ids)
areas = property(get_areas)
node_ids = property(get_node_ids)
nodes = property(get_nodes)
relation_ids = property(get_relation_ids)
relations = property(get_relations)
way_ids = property(get_way_ids)
ways = property(get_ways)
class Element(object):
"""
Base element
"""
def __init__(self, attributes=None, result=None, tags=None):
"""
:param attributes: Additional attributes
:type attributes: Dict
:param result: The result object this element belongs to
:param tags: List of tags
:type tags: Dict
"""
self._result = result
self.attributes = attributes
# ToDo: Add option to modify attribute modifiers
attribute_modifiers = dict(GLOBAL_ATTRIBUTE_MODIFIERS.items())
for n, m in attribute_modifiers.items():
if n in self.attributes:
self.attributes[n] = m(self.attributes[n])
self.id = None
self.tags = tags
@classmethod
def get_center_from_json(cls, data):
"""
Get center information from json data
:param data: json data
:return: tuple with two elements: lat and lon
:rtype: tuple
"""
center_lat = None
center_lon = None
center = data.get("center")
if isinstance(center, dict):
center_lat = center.get("lat")
center_lon = center.get("lon")
if center_lat is None or center_lon is None:
raise ValueError("Unable to get lat or lon of way center.")
center_lat = Decimal(center_lat)
center_lon = Decimal(center_lon)
return (center_lat, center_lon)
@classmethod
def get_center_from_xml_dom(cls, sub_child):
center_lat = sub_child.attrib.get("lat")
center_lon = sub_child.attrib.get("lon")
if center_lat is None or center_lon is None:
raise ValueError("Unable to get lat or lon of way center.")
center_lat = Decimal(center_lat)
center_lon = Decimal(center_lon)
return center_lat, center_lon
class Area(Element):
"""
Class to represent an element of type area
"""
_type_value = "area"
def __init__(self, area_id=None, **kwargs):
"""
:param area_id: Id of the area element
:type area_id: Integer
:param kwargs: Additional arguments are passed directly to the parent class
"""
Element.__init__(self, **kwargs)
#: The id of the way
self.id = area_id
def __repr__(self):
return "<overpy.Area id={}>".format(self.id)
@classmethod
def from_json(cls, data, result=None):
"""
Create new Area element from JSON data
:param data: Element data from JSON
:type data: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Way
:rtype: overpy.Area
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
area_id = data.get("id")
attributes = {}
ignore = ["id", "tags", "type"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(area_id=area_id, attributes=attributes, tags=tags, result=result)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Way
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If the ref attribute of the xml node is not provided
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
area_id = child.attrib.get("id")
if area_id is not None:
area_id = int(area_id)
attributes = {}
ignore = ["id"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(area_id=area_id, attributes=attributes, tags=tags, result=result)
class Node(Element):
"""
Class to represent an element of type node
"""
_type_value = "node"
def __init__(self, node_id=None, lat=None, lon=None, **kwargs):
"""
:param lat: Latitude
:type lat: Decimal or Float
:param lon: Longitude
:type long: Decimal or Float
:param node_id: Id of the node element
:type node_id: Integer
:param kwargs: Additional arguments are passed directly to the parent class
"""
Element.__init__(self, **kwargs)
self.id = node_id
self.lat = lat
self.lon = lon
def __repr__(self):
return "<overpy.Node id={} lat={} lon={}>".format(self.id, self.lat, self.lon)
@classmethod
def from_json(cls, data, result=None):
"""
Create new Node element from JSON data
:param data: Element data from JSON
:type data: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Node
:rtype: overpy.Node
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
node_id = data.get("id")
lat = data.get("lat")
lon = data.get("lon")
attributes = {}
ignore = ["type", "id", "lat", "lon", "tags"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Node
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
node_id = child.attrib.get("id")
if node_id is not None:
node_id = int(node_id)
lat = child.attrib.get("lat")
if lat is not None:
lat = Decimal(lat)
lon = child.attrib.get("lon")
if lon is not None:
lon = Decimal(lon)
attributes = {}
ignore = ["id", "lat", "lon"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result)
class Way(Element):
"""
Class to represent an element of type way
"""
_type_value = "way"
def __init__(self, way_id=None, center_lat=None, center_lon=None, node_ids=None, **kwargs):
"""
:param node_ids: List of node IDs
:type node_ids: List or Tuple
:param way_id: Id of the way element
:type way_id: Integer
:param kwargs: Additional arguments are passed directly to the parent class
"""
Element.__init__(self, **kwargs)
#: The id of the way
self.id = way_id
#: List of Ids of the associated nodes
self._node_ids = node_ids
#: The lat/lon of the center of the way (optional depending on query)
self.center_lat = center_lat
self.center_lon = center_lon
def __repr__(self):
return "<overpy.Way id={} nodes={}>".format(self.id, self._node_ids)
@property
def nodes(self):
"""
List of nodes associated with the way.
"""
return self.get_nodes()
def get_nodes(self, resolve_missing=False):
"""
Get the nodes defining the geometry of the way
:param resolve_missing: Try to resolve missing nodes.
:type resolve_missing: Boolean
:return: List of nodes
:rtype: List of overpy.Node
:raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved.
"""
result = []
resolved = False
for node_id in self._node_ids:
try:
node = self._result.get_node(node_id)
except exception.DataIncomplete:
node = None
if node is not None:
result.append(node)
continue
if not resolve_missing:
raise exception.DataIncomplete("Resolve missing nodes is disabled")
# We tried to resolve the data but some nodes are still missing
if resolved:
raise exception.DataIncomplete("Unable to resolve all nodes")
query = ("\n"
"[out:json];\n"
"way({way_id});\n"
"node(w);\n"
"out body;\n"
)
query = query.format(
way_id=self.id
)
tmp_result = self._result.api.query(query)
self._result.expand(tmp_result)
resolved = True
try:
node = self._result.get_node(node_id)
except exception.DataIncomplete:
node = None
if node is None:
raise exception.DataIncomplete("Unable to resolve all nodes")
result.append(node)
return result
@classmethod
def from_json(cls, data, result=None):
"""
Create new Way element from JSON data
:param data: Element data from JSON
:type data: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Way
:rtype: overpy.Way
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
way_id = data.get("id")
node_ids = data.get("nodes")
(center_lat, center_lon) = cls.get_center_from_json(data=data)
attributes = {}
ignore = ["center", "id", "nodes", "tags", "type"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(
attributes=attributes,
center_lat=center_lat,
center_lon=center_lon,
node_ids=node_ids,
tags=tags,
result=result,
way_id=way_id
)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Way
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If the ref attribute of the xml node is not provided
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
node_ids = []
center_lat = None
center_lon = None
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
if sub_child.tag.lower() == "nd":
ref_id = sub_child.attrib.get("ref")
if ref_id is None:
raise ValueError("Unable to find required ref value.")
ref_id = int(ref_id)
node_ids.append(ref_id)
if sub_child.tag.lower() == "center":
(center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child)
way_id = child.attrib.get("id")
if way_id is not None:
way_id = int(way_id)
attributes = {}
ignore = ["id"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(way_id=way_id, center_lat=center_lat, center_lon=center_lon,
attributes=attributes, node_ids=node_ids, tags=tags, result=result)
class Relation(Element):
"""
Class to represent an element of type relation
"""
_type_value = "relation"
def __init__(self, rel_id=None, center_lat=None, center_lon=None, members=None, **kwargs):
"""
:param members:
:param rel_id: Id of the relation element
:type rel_id: Integer
:param kwargs:
:return:
"""
Element.__init__(self, **kwargs)
self.id = rel_id
self.members = members
#: The lat/lon of the center of the way (optional depending on query)
self.center_lat = center_lat
self.center_lon = center_lon
def __repr__(self):
return "<overpy.Relation id={}>".format(self.id)
@classmethod
def from_json(cls, data, result=None):
"""
Create new Relation element from JSON data
:param data: Element data from JSON
:type data: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Relation
:rtype: overpy.Relation
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
rel_id = data.get("id")
(center_lat, center_lon) = cls.get_center_from_json(data=data)
members = []
supported_members = [RelationNode, RelationWay, RelationRelation]
for member in data.get("members", []):
type_value = member.get("type")
for member_cls in supported_members:
if member_cls._type_value == type_value:
members.append(
member_cls.from_json(
member,
result=result
)
)
attributes = {}
ignore = ["id", "members", "tags", "type"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(
rel_id=rel_id,
attributes=attributes,
center_lat=center_lat,
center_lon=center_lon,
members=members,
tags=tags,
result=result
)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Relation
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
members = []
center_lat = None
center_lon = None
supported_members = [RelationNode, RelationWay, RelationRelation, RelationArea]
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
if sub_child.tag.lower() == "member":
type_value = sub_child.attrib.get("type")
for member_cls in supported_members:
if member_cls._type_value == type_value:
members.append(
member_cls.from_xml(
sub_child,
result=result
)
)
if sub_child.tag.lower() == "center":
(center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child)
rel_id = child.attrib.get("id")
if rel_id is not None:
rel_id = int(rel_id)
attributes = {}
ignore = ["id"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(
rel_id=rel_id,
attributes=attributes,
center_lat=center_lat,
center_lon=center_lon,
members=members,
tags=tags,
result=result
)
class RelationMember(object):
"""
Base class to represent a member of a relation.
"""
def __init__(self, attributes=None, geometry=None, ref=None, role=None, result=None):
"""
:param ref: Reference Id
:type ref: Integer
:param role: The role of the relation member
:type role: String
:param result:
"""
self.ref = ref
self._result = result
self.role = role
self.attributes = attributes
self.geometry = geometry
@classmethod
def from_json(cls, data, result=None):
"""
Create new RelationMember element from JSON data
:param child: Element data from JSON
:type child: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of RelationMember
:rtype: overpy.RelationMember
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
ref = data.get("ref")
role = data.get("role")
attributes = {}
ignore = ["geometry", "type", "ref", "role"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
geometry = data.get("geometry")
if isinstance(geometry, list):
geometry_orig = geometry
geometry = []
for v in geometry_orig:
geometry.append(
RelationWayGeometryValue(
lat=v.get("lat"),
lon=v.get("lon")
)
)
else:
geometry = None
return cls(
attributes=attributes,
geometry=geometry,
ref=ref,
role=role,
result=result
)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new RelationMember from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this element belongs to
:type result: overpy.Result
:return: New relation member oject
:rtype: overpy.RelationMember
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
"""
if child.attrib.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
ref = child.attrib.get("ref")
if ref is not None:
ref = int(ref)
role = child.attrib.get("role")
attributes = {}
ignore = ["geometry", "ref", "role", "type"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
geometry = None
for sub_child in child:
if sub_child.tag.lower() == "nd":
if geometry is None:
geometry = []
geometry.append(
RelationWayGeometryValue(
lat=Decimal(sub_child.attrib["lat"]),
lon=Decimal(sub_child.attrib["lon"])
)
)
return cls(
attributes=attributes,
geometry=geometry,
ref=ref,
role=role,
result=result
)
class RelationNode(RelationMember):
_type_value = "node"
def resolve(self, resolve_missing=False):
return self._result.get_node(self.ref, resolve_missing=resolve_missing)
def __repr__(self):
return "<overpy.RelationNode ref={} role={}>".format(self.ref, self.role)
class RelationWay(RelationMember):
_type_value = "way"
def resolve(self, resolve_missing=False):
return self._result.get_way(self.ref, resolve_missing=resolve_missing)
def __repr__(self):
return "<overpy.RelationWay ref={} role={}>".format(self.ref, self.role)
class RelationWayGeometryValue(object):
def __init__(self, lat, lon):
self.lat = lat
self.lon = lon
def __repr__(self):
return "<overpy.RelationWayGeometryValue lat={} lon={}>".format(self.lat, self.lon)
class RelationRelation(RelationMember):
_type_value = "relation"
def resolve(self, resolve_missing=False):
return self._result.get_relation(self.ref, resolve_missing=resolve_missing)
def __repr__(self):
return "<overpy.RelationRelation ref={} role={}>".format(self.ref, self.role)
class RelationArea(RelationMember):
_type_value = "area"
def resolve(self, resolve_missing=False):
return self._result.get_area(self.ref, resolve_missing=resolve_missing)
def __repr__(self):
return "<overpy.RelationArea ref={} role={}>".format(self.ref, self.role)
class OSMSAXHandler(handler.ContentHandler):
"""
SAX parser for Overpass XML response.
"""
#: Tuple of opening elements to ignore
ignore_start = ('osm', 'meta', 'note', 'bounds', 'remark')
#: Tuple of closing elements to ignore
ignore_end = ('osm', 'meta', 'note', 'bounds', 'remark', 'tag', 'nd', 'center')
def __init__(self, result):
"""
:param result: Append results to this result set.
:type result: overpy.Result
"""
handler.ContentHandler.__init__(self)
self._result = result
self._curr = {}
#: Current relation member object
self.cur_relation_member = None
def startElement(self, name, attrs):
"""
Handle opening elements.
:param name: Name of the element
:type name: String
:param attrs: Attributes of the element
:type attrs: Dict
"""
if name in self.ignore_start:
return
try:
handler = getattr(self, '_handle_start_%s' % name)
except AttributeError:
raise KeyError("Unknown element start '%s'" % name)
handler(attrs)
def endElement(self, name):
"""
Handle closing elements
:param name: Name of the element
:type name: String
"""
if name in self.ignore_end:
return
try:
handler = getattr(self, '_handle_end_%s' % name)
except AttributeError:
raise KeyError("Unknown element end '%s'" % name)
handler()
def _handle_start_center(self, attrs):
"""
Handle opening center element
:param attrs: Attributes of the element
:type attrs: Dict
"""
center_lat = attrs.get("lat")
center_lon = attrs.get("lon")
if center_lat is None or center_lon is None:
raise ValueError("Unable to get lat or lon of way center.")
self._curr["center_lat"] = Decimal(center_lat)
self._curr["center_lon"] = Decimal(center_lon)
def _handle_start_tag(self, attrs):
"""
Handle opening tag element
:param attrs: Attributes of the element
:type attrs: Dict
"""
try:
tag_key = attrs['k']
except KeyError:
raise ValueError("Tag without name/key.")
self._curr['tags'][tag_key] = attrs.get('v')
def _handle_start_node(self, attrs):
"""
Handle opening node element
:param attrs: Attributes of the element
:type attrs: Dict
"""
self._curr = {
'attributes': dict(attrs),
'lat': None,
'lon': None,
'node_id': None,
'tags': {}
}
if attrs.get('id', None) is not None:
self._curr['node_id'] = int(attrs['id'])
del self._curr['attributes']['id']
if attrs.get('lat', None) is not None:
self._curr['lat'] = Decimal(attrs['lat'])
del self._curr['attributes']['lat']
if attrs.get('lon', None) is not None:
self._curr['lon'] = Decimal(attrs['lon'])
del self._curr['attributes']['lon']
def _handle_end_node(self):
"""
Handle closing node element
"""
self._result.append(Node(result=self._result, **self._curr))
self._curr = {}
def _handle_start_way(self, attrs):
"""
Handle opening way element
:param attrs: Attributes of the element
:type attrs: Dict
"""
self._curr = {
'center_lat': None,
'center_lon': None,
'attributes': dict(attrs),
'node_ids': [],
'tags': {},
'way_id': None
}
if attrs.get('id', None) is not None:
self._curr['way_id'] = int(attrs['id'])
del self._curr['attributes']['id']
def _handle_end_way(self):
"""
Handle closing way element
"""
self._result.append(Way(result=self._result, **self._curr))
self._curr = {}
def _handle_start_area(self, attrs):
"""
Handle opening area element
:param attrs: Attributes of the element
:type attrs: Dict
"""
self._curr = {
'attributes': dict(attrs),
'tags': {},
'area_id': None
}
if attrs.get('id', None) is not None:
self._curr['area_id'] = int(attrs['id'])
del self._curr['attributes']['id']
def _handle_end_area(self):
"""
Handle closing area element
"""
self._result.append(Area(result=self._result, **self._curr))
self._curr = {}
def _handle_start_nd(self, attrs):
"""
Handle opening nd element
:param attrs: Attributes of the element
:type attrs: Dict
"""
if isinstance(self.cur_relation_member, RelationWay):
if self.cur_relation_member.geometry is None:
self.cur_relation_member.geometry = []
self.cur_relation_member.geometry.append(
RelationWayGeometryValue(
lat=Decimal(attrs["lat"]),
lon=Decimal(attrs["lon"])
)
)
else:
try:
node_ref = attrs['ref']
except KeyError:
raise ValueError("Unable to find required ref value.")
self._curr['node_ids'].append(int(node_ref))
def _handle_start_relation(self, attrs):
"""
Handle opening relation element
:param attrs: Attributes of the element
:type attrs: Dict
"""
self._curr = {
'attributes': dict(attrs),
'members': [],
'rel_id': None,
'tags': {}
}
if attrs.get('id', None) is not None:
self._curr['rel_id'] = int(attrs['id'])
del self._curr['attributes']['id']
def _handle_end_relation(self):
"""
Handle closing relation element
"""
self._result.append(Relation(result=self._result, **self._curr))
self._curr = {}
def _handle_start_member(self, attrs):
"""
Handle opening member element
:param attrs: Attributes of the element
:type attrs: Dict
"""
params = {
# ToDo: Parse attributes
'attributes': {},
'ref': None,
'result': self._result,
'role': None
}
if attrs.get('ref', None):
params['ref'] = int(attrs['ref'])
if attrs.get('role', None):
params['role'] = attrs['role']
cls_map = {
"area": RelationArea,
"node": RelationNode,
"relation": RelationRelation,
"way": RelationWay
}
cls = cls_map.get(attrs["type"])
if cls is None:
raise ValueError("Undefined type for member: '%s'" % attrs['type'])
self.cur_relation_member = cls(**params)
self._curr['members'].append(self.cur_relation_member)
def _handle_end_member(self):
self.cur_relation_member = None