diff --git a/pyextra/overpy/__about__.py b/pyextra/overpy/__about__.py
deleted file mode 100644
index 33c6c493c8..0000000000
--- a/pyextra/overpy/__about__.py
+++ /dev/null
@@ -1,22 +0,0 @@
-__all__ = [
- "__author__",
- "__copyright__",
- "__email__",
- "__license__",
- "__summary__",
- "__title__",
- "__uri__",
- "__version__",
-]
-
-__title__ = "overpy"
-__summary__ = "Python Wrapper to access the OpenStreepMap Overpass API"
-__uri__ = "https://github.com/DinoTools/python-overpy"
-
-__version__ = "0.4"
-
-__author__ = "PhiBo (DinoTools)"
-__email__ = ""
-
-__license__ = "MIT"
-__copyright__ = "Copyright 2014-2016 %s" % __author__
diff --git a/pyextra/overpy/__init__.py b/pyextra/overpy/__init__.py
deleted file mode 100644
index 2836080ab7..0000000000
--- a/pyextra/overpy/__init__.py
+++ /dev/null
@@ -1,1619 +0,0 @@
-from collections import OrderedDict
-from datetime import datetime
-from decimal import Decimal
-from xml.sax import handler, make_parser
-import json
-import re
-import sys
-import time
-import requests
-
-from overpy import exception
-from overpy.__about__ import (
- __author__, __copyright__, __email__, __license__, __summary__, __title__,
- __uri__, __version__
-)
-
-PY2 = sys.version_info[0] == 2
-PY3 = sys.version_info[0] == 3
-
-XML_PARSER_DOM = 1
-XML_PARSER_SAX = 2
-
-# Try to convert some common attributes
-# http://wiki.openstreetmap.org/wiki/Elements#Common_attributes
-GLOBAL_ATTRIBUTE_MODIFIERS = {
- "changeset": int,
- "timestamp": lambda ts: datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ"),
- "uid": int,
- "version": int,
- "visible": lambda v: v.lower() == "true"
-}
-
-
-def is_valid_type(element, cls):
- """
- Test if an element is of a given type.
-
- :param Element() element: The element instance to test
- :param Element cls: The element class to test
- :return: False or True
- :rtype: Boolean
- """
- return isinstance(element, cls) and element.id is not None
-
-
-class Overpass(object):
- """
- Class to access the Overpass API
-
- :cvar default_max_retry_count: Global max number of retries (Default: 0)
- :cvar default_retry_timeout: Global time to wait between tries (Default: 1.0s)
- """
- default_max_retry_count = 0
- default_read_chunk_size = 4096
- default_retry_timeout = 1.0
- default_url = "http://overpass-api.de/api/interpreter"
-
- def __init__(self, read_chunk_size=None, url=None, xml_parser=XML_PARSER_SAX, max_retry_count=None, retry_timeout=None, timeout=5.0, headers=None):
- """
- :param read_chunk_size: Max size of each chunk read from the server response
- :type read_chunk_size: Integer
- :param url: Optional URL of the Overpass server. Defaults to http://overpass-api.de/api/interpreter
- :type url: str
- :param xml_parser: The xml parser to use
- :type xml_parser: Integer
- :param max_retry_count: Max number of retries (Default: default_max_retry_count)
- :type max_retry_count: Integer
- :param retry_timeout: Time to wait between tries (Default: default_retry_timeout)
- :type retry_timeout: float
- :param timeout: HTTP request timeout
- :type timeout: float
- :param headers: HTTP request headers
- :type headers: dict
- """
- self.url = self.default_url
- if url is not None:
- self.url = url
-
- self._regex_extract_error_msg = re.compile(b"\
(?P\")
- self._regex_remove_tag = re.compile(b"<[^>]*?>")
- if read_chunk_size is None:
- read_chunk_size = self.default_read_chunk_size
- self.read_chunk_size = read_chunk_size
-
- if max_retry_count is None:
- max_retry_count = self.default_max_retry_count
- self.max_retry_count = max_retry_count
-
- if retry_timeout is None:
- retry_timeout = self.default_retry_timeout
- self.retry_timeout = retry_timeout
-
- self.xml_parser = xml_parser
- self.timeout = timeout
- self.headers = headers
-
- def _handle_remark_msg(self, msg):
- """
- Try to parse the message provided with the remark tag or element.
-
- :param str msg: The message
- :raises overpy.exception.OverpassRuntimeError: If message starts with 'runtime error:'
- :raises overpy.exception.OverpassRuntimeRemark: If message starts with 'runtime remark:'
- :raises overpy.exception.OverpassUnknownError: If we are unable to identify the error
- """
- msg = msg.strip()
- if msg.startswith("runtime error:"):
- raise exception.OverpassRuntimeError(msg=msg)
- elif msg.startswith("runtime remark:"):
- raise exception.OverpassRuntimeRemark(msg=msg)
- raise exception.OverpassUnknownError(msg=msg)
-
- def query(self, query):
- """
- Query the Overpass API
-
- :param String|Bytes query: The query string in Overpass QL
- :return: The parsed result
- :rtype: overpy.Result
- """
- if not isinstance(query, bytes):
- query = query.encode("utf-8")
-
- retry_num = 0
- retry_exceptions = []
- do_retry = True if self.max_retry_count > 0 else False
- while retry_num <= self.max_retry_count:
- if retry_num > 0:
- time.sleep(self.retry_timeout)
- retry_num += 1
-
- try:
- if self.headers is not None:
- r = requests.post(self.url, query, timeout=self.timeout, headers=self.headers)
- else:
- r = requests.post(self.url, query, timeout=self.timeout)
- response = r.content
- except (requests.exceptions.BaseHTTPError, requests.exceptions.RequestException) as e:
- if not do_retry:
- raise e
- retry_exceptions.append(e)
- continue
-
- if r.status_code == 200:
- content_type = r.headers["Content-Type"]
-
- if content_type == "application/json":
- return self.parse_json(response)
-
- if content_type == "application/osm3s+xml":
- return self.parse_xml(response)
-
- e = exception.OverpassUnknownContentType(content_type)
- if not do_retry:
- raise e
- retry_exceptions.append(e)
- continue
- elif r.status_code == 400:
- msgs = []
- for msg in self._regex_extract_error_msg.finditer(response):
- tmp = self._regex_remove_tag.sub(b"", msg.group("msg"))
- try:
- tmp = tmp.decode("utf-8")
- except UnicodeDecodeError:
- tmp = repr(tmp)
- msgs.append(tmp)
-
- e = exception.OverpassBadRequest(
- query,
- msgs=msgs
- )
- if not do_retry:
- raise e
- retry_exceptions.append(e)
- continue
- elif r.status_code == 429:
- e = exception.OverpassTooManyRequests
- if not do_retry:
- raise e
- retry_exceptions.append(e)
- continue
- elif r.status_code == 504:
- e = exception.OverpassGatewayTimeout
- if not do_retry:
- raise e
- retry_exceptions.append(e)
- continue
-
- # No valid response code
- e = exception.OverpassUnknownHTTPStatusCode(r.status_code)
- if not do_retry:
- raise e
- retry_exceptions.append(e)
- continue
-
- raise exception.MaxRetriesReached(retry_count=retry_num, exceptions=retry_exceptions)
-
- def parse_json(self, data, encoding="utf-8"):
- """
- Parse raw response from Overpass service.
-
- :param data: Raw JSON Data
- :type data: String or Bytes
- :param encoding: Encoding to decode byte string
- :type encoding: String
- :return: Result object
- :rtype: overpy.Result
- """
- if isinstance(data, bytes):
- data = data.decode(encoding)
-
- data = json.loads(data, parse_float=Decimal)
- if "remark" in data:
- self._handle_remark_msg(msg=data.get("remark"))
- return Result.from_json(data, api=self)
-
- def parse_xml(self, data, encoding="utf-8", parser=None):
- """
-
- :param data: Raw XML Data
- :type data: String or Bytes
- :param encoding: Encoding to decode byte string
- :type encoding: String
- :return: Result object
- :rtype: overpy.Result
- """
- if parser is None:
- parser = self.xml_parser
- if isinstance(data, bytes):
- data = data.decode(encoding)
- if PY2 and not isinstance(data, str):
- # Python 2.x: Convert unicode strings
- data = data.encode(encoding)
-
- m = re.compile("(?P[^<>]*)").search(data)
- if m:
- self._handle_remark_msg(m.group("msg"))
-
- return Result.from_xml(data, api=self, parser=parser)
-
-
-class Result(object):
- """
- Class to handle the result.
- """
-
- def __init__(self, elements=None, api=None):
- """
-
- :param List elements:
- :param api:
- :type api: overpy.Overpass
- """
- if elements is None:
- elements = []
- self._areas = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Area))
- self._nodes = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Node))
- self._ways = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Way))
- self._relations = OrderedDict((element.id, element)
- for element in elements if is_valid_type(element, Relation))
- self._class_collection_map = {Node: self._nodes, Way: self._ways, Relation: self._relations, Area: self._areas}
- self.api = api
-
- def expand(self, other):
- """
- Add all elements from an other result to the list of elements of this result object.
-
- It is used by the auto resolve feature.
-
- :param other: Expand the result with the elements from this result.
- :type other: overpy.Result
- :raises ValueError: If provided parameter is not instance of :class:`overpy.Result`
- """
- if not isinstance(other, Result):
- raise ValueError("Provided argument has to be instance of overpy:Result()")
-
- other_collection_map = {Node: other.nodes, Way: other.ways, Relation: other.relations, Area: other.areas}
- for element_type, own_collection in self._class_collection_map.items():
- for element in other_collection_map[element_type]:
- if is_valid_type(element, element_type) and element.id not in own_collection:
- own_collection[element.id] = element
-
- def append(self, element):
- """
- Append a new element to the result.
-
- :param element: The element to append
- :type element: overpy.Element
- """
- if is_valid_type(element, Element):
- self._class_collection_map[element.__class__].setdefault(element.id, element)
-
- def get_elements(self, filter_cls, elem_id=None):
- """
- Get a list of elements from the result and filter the element type by a class.
-
- :param filter_cls:
- :param elem_id: ID of the object
- :type elem_id: Integer
- :return: List of available elements
- :rtype: List
- """
- result = []
- if elem_id is not None:
- try:
- result = [self._class_collection_map[filter_cls][elem_id]]
- except KeyError:
- result = []
- else:
- for e in self._class_collection_map[filter_cls].values():
- result.append(e)
- return result
-
- def get_ids(self, filter_cls):
- """
-
- :param filter_cls:
- :return:
- """
- return list(self._class_collection_map[filter_cls].keys())
-
- def get_node_ids(self):
- return self.get_ids(filter_cls=Node)
-
- def get_way_ids(self):
- return self.get_ids(filter_cls=Way)
-
- def get_relation_ids(self):
- return self.get_ids(filter_cls=Relation)
-
- def get_area_ids(self):
- return self.get_ids(filter_cls=Area)
-
- @classmethod
- def from_json(cls, data, api=None):
- """
- Create a new instance and load data from json object.
-
- :param data: JSON data returned by the Overpass API
- :type data: Dict
- :param api:
- :type api: overpy.Overpass
- :return: New instance of Result object
- :rtype: overpy.Result
- """
- result = cls(api=api)
- for elem_cls in [Node, Way, Relation, Area]:
- for element in data.get("elements", []):
- e_type = element.get("type")
- if hasattr(e_type, "lower") and e_type.lower() == elem_cls._type_value:
- result.append(elem_cls.from_json(element, result=result))
-
- return result
-
- @classmethod
- def from_xml(cls, data, api=None, parser=None):
- """
- Create a new instance and load data from xml data or object.
-
- .. note::
- If parser is set to None, the functions tries to find the best parse.
- By default the SAX parser is chosen if a string is provided as data.
- The parser is set to DOM if an xml.etree.ElementTree.Element is provided as data value.
-
- :param data: Root element
- :type data: str | xml.etree.ElementTree.Element
- :param api: The instance to query additional information if required.
- :type api: Overpass
- :param parser: Specify the parser to use(DOM or SAX)(Default: None = autodetect, defaults to SAX)
- :type parser: Integer | None
- :return: New instance of Result object
- :rtype: Result
- """
- if parser is None:
- if isinstance(data, str):
- parser = XML_PARSER_SAX
- else:
- parser = XML_PARSER_DOM
-
- result = cls(api=api)
- if parser == XML_PARSER_DOM:
- import xml.etree.ElementTree as ET
- if isinstance(data, str):
- root = ET.fromstring(data)
- elif isinstance(data, ET.Element):
- root = data
- else:
- raise exception.OverPyException("Unable to detect data type.")
-
- for elem_cls in [Node, Way, Relation, Area]:
- for child in root:
- if child.tag.lower() == elem_cls._type_value:
- result.append(elem_cls.from_xml(child, result=result))
-
- elif parser == XML_PARSER_SAX:
- if PY2:
- from StringIO import StringIO
- else:
- from io import StringIO
- source = StringIO(data)
- sax_handler = OSMSAXHandler(result)
- parser = make_parser()
- parser.setContentHandler(sax_handler)
- parser.parse(source)
- else:
- # ToDo: better exception
- raise Exception("Unknown XML parser")
- return result
-
- def get_area(self, area_id, resolve_missing=False):
- """
- Get an area by its ID.
-
- :param area_id: The area ID
- :type area_id: Integer
- :param resolve_missing: Query the Overpass API if the area is missing in the result set.
- :return: The area
- :rtype: overpy.Area
- :raises overpy.exception.DataIncomplete: The requested way is not available in the result cache.
- :raises overpy.exception.DataIncomplete: If resolve_missing is True and the area can't be resolved.
- """
- areas = self.get_areas(area_id=area_id)
- if len(areas) == 0:
- if resolve_missing is False:
- raise exception.DataIncomplete("Resolve missing area is disabled")
-
- query = ("\n"
- "[out:json];\n"
- "area({area_id});\n"
- "out body;\n"
- )
- query = query.format(
- area_id=area_id
- )
- tmp_result = self.api.query(query)
- self.expand(tmp_result)
-
- areas = self.get_areas(area_id=area_id)
-
- if len(areas) == 0:
- raise exception.DataIncomplete("Unable to resolve requested areas")
-
- return areas[0]
-
- def get_areas(self, area_id=None, **kwargs):
- """
- Alias for get_elements() but filter the result by Area
-
- :param area_id: The Id of the area
- :type area_id: Integer
- :return: List of elements
- """
- return self.get_elements(Area, elem_id=area_id, **kwargs)
-
- def get_node(self, node_id, resolve_missing=False):
- """
- Get a node by its ID.
-
- :param node_id: The node ID
- :type node_id: Integer
- :param resolve_missing: Query the Overpass API if the node is missing in the result set.
- :return: The node
- :rtype: overpy.Node
- :raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache.
- :raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved.
- """
- nodes = self.get_nodes(node_id=node_id)
- if len(nodes) == 0:
- if not resolve_missing:
- raise exception.DataIncomplete("Resolve missing nodes is disabled")
-
- query = ("\n"
- "[out:json];\n"
- "node({node_id});\n"
- "out body;\n"
- )
- query = query.format(
- node_id=node_id
- )
- tmp_result = self.api.query(query)
- self.expand(tmp_result)
-
- nodes = self.get_nodes(node_id=node_id)
-
- if len(nodes) == 0:
- raise exception.DataIncomplete("Unable to resolve all nodes")
-
- return nodes[0]
-
- def get_nodes(self, node_id=None, **kwargs):
- """
- Alias for get_elements() but filter the result by Node()
-
- :param node_id: The Id of the node
- :type node_id: Integer
- :return: List of elements
- """
- return self.get_elements(Node, elem_id=node_id, **kwargs)
-
- def get_relation(self, rel_id, resolve_missing=False):
- """
- Get a relation by its ID.
-
- :param rel_id: The relation ID
- :type rel_id: Integer
- :param resolve_missing: Query the Overpass API if the relation is missing in the result set.
- :return: The relation
- :rtype: overpy.Relation
- :raises overpy.exception.DataIncomplete: The requested relation is not available in the result cache.
- :raises overpy.exception.DataIncomplete: If resolve_missing is True and the relation can't be resolved.
- """
- relations = self.get_relations(rel_id=rel_id)
- if len(relations) == 0:
- if resolve_missing is False:
- raise exception.DataIncomplete("Resolve missing relations is disabled")
-
- query = ("\n"
- "[out:json];\n"
- "relation({relation_id});\n"
- "out body;\n"
- )
- query = query.format(
- relation_id=rel_id
- )
- tmp_result = self.api.query(query)
- self.expand(tmp_result)
-
- relations = self.get_relations(rel_id=rel_id)
-
- if len(relations) == 0:
- raise exception.DataIncomplete("Unable to resolve requested reference")
-
- return relations[0]
-
- def get_relations(self, rel_id=None, **kwargs):
- """
- Alias for get_elements() but filter the result by Relation
-
- :param rel_id: Id of the relation
- :type rel_id: Integer
- :return: List of elements
- """
- return self.get_elements(Relation, elem_id=rel_id, **kwargs)
-
- def get_way(self, way_id, resolve_missing=False):
- """
- Get a way by its ID.
-
- :param way_id: The way ID
- :type way_id: Integer
- :param resolve_missing: Query the Overpass API if the way is missing in the result set.
- :return: The way
- :rtype: overpy.Way
- :raises overpy.exception.DataIncomplete: The requested way is not available in the result cache.
- :raises overpy.exception.DataIncomplete: If resolve_missing is True and the way can't be resolved.
- """
- ways = self.get_ways(way_id=way_id)
- if len(ways) == 0:
- if resolve_missing is False:
- raise exception.DataIncomplete("Resolve missing way is disabled")
-
- query = ("\n"
- "[out:json];\n"
- "way({way_id});\n"
- "out body;\n"
- )
- query = query.format(
- way_id=way_id
- )
- tmp_result = self.api.query(query)
- self.expand(tmp_result)
-
- ways = self.get_ways(way_id=way_id)
-
- if len(ways) == 0:
- raise exception.DataIncomplete("Unable to resolve requested way")
-
- return ways[0]
-
- def get_ways(self, way_id=None, **kwargs):
- """
- Alias for get_elements() but filter the result by Way
-
- :param way_id: The Id of the way
- :type way_id: Integer
- :return: List of elements
- """
- return self.get_elements(Way, elem_id=way_id, **kwargs)
-
- area_ids = property(get_area_ids)
- areas = property(get_areas)
- node_ids = property(get_node_ids)
- nodes = property(get_nodes)
- relation_ids = property(get_relation_ids)
- relations = property(get_relations)
- way_ids = property(get_way_ids)
- ways = property(get_ways)
-
-
-class Element(object):
- """
- Base element
- """
-
- def __init__(self, attributes=None, result=None, tags=None):
- """
- :param attributes: Additional attributes
- :type attributes: Dict
- :param result: The result object this element belongs to
- :param tags: List of tags
- :type tags: Dict
- """
-
- self._result = result
- self.attributes = attributes
- # ToDo: Add option to modify attribute modifiers
- attribute_modifiers = dict(GLOBAL_ATTRIBUTE_MODIFIERS.items())
- for n, m in attribute_modifiers.items():
- if n in self.attributes:
- self.attributes[n] = m(self.attributes[n])
- self.id = None
- self.tags = tags
-
- @classmethod
- def get_center_from_json(cls, data):
- """
- Get center information from json data
-
- :param data: json data
- :return: tuple with two elements: lat and lon
- :rtype: tuple
- """
- center_lat = None
- center_lon = None
- center = data.get("center")
- if isinstance(center, dict):
- center_lat = center.get("lat")
- center_lon = center.get("lon")
- if center_lat is None or center_lon is None:
- raise ValueError("Unable to get lat or lon of way center.")
- center_lat = Decimal(center_lat)
- center_lon = Decimal(center_lon)
- return (center_lat, center_lon)
-
- @classmethod
- def get_center_from_xml_dom(cls, sub_child):
- center_lat = sub_child.attrib.get("lat")
- center_lon = sub_child.attrib.get("lon")
- if center_lat is None or center_lon is None:
- raise ValueError("Unable to get lat or lon of way center.")
- center_lat = Decimal(center_lat)
- center_lon = Decimal(center_lon)
- return center_lat, center_lon
-
-
-class Area(Element):
- """
- Class to represent an element of type area
- """
-
- _type_value = "area"
-
- def __init__(self, area_id=None, **kwargs):
- """
- :param area_id: Id of the area element
- :type area_id: Integer
- :param kwargs: Additional arguments are passed directly to the parent class
-
- """
-
- Element.__init__(self, **kwargs)
- #: The id of the way
- self.id = area_id
-
- def __repr__(self):
- return "".format(self.id)
-
- @classmethod
- def from_json(cls, data, result=None):
- """
- Create new Area element from JSON data
-
- :param data: Element data from JSON
- :type data: Dict
- :param result: The result this element belongs to
- :type result: overpy.Result
- :return: New instance of Way
- :rtype: overpy.Area
- :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
- """
- if data.get("type") != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=data.get("type")
- )
-
- tags = data.get("tags", {})
-
- area_id = data.get("id")
-
- attributes = {}
- ignore = ["id", "tags", "type"]
- for n, v in data.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- return cls(area_id=area_id, attributes=attributes, tags=tags, result=result)
-
- @classmethod
- def from_xml(cls, child, result=None):
- """
- Create new way element from XML data
-
- :param child: XML node to be parsed
- :type child: xml.etree.ElementTree.Element
- :param result: The result this node belongs to
- :type result: overpy.Result
- :return: New Way oject
- :rtype: overpy.Way
- :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
- :raises ValueError: If the ref attribute of the xml node is not provided
- :raises ValueError: If a tag doesn't have a name
- """
- if child.tag.lower() != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=child.tag.lower()
- )
-
- tags = {}
-
- for sub_child in child:
- if sub_child.tag.lower() == "tag":
- name = sub_child.attrib.get("k")
- if name is None:
- raise ValueError("Tag without name/key.")
- value = sub_child.attrib.get("v")
- tags[name] = value
-
- area_id = child.attrib.get("id")
- if area_id is not None:
- area_id = int(area_id)
-
- attributes = {}
- ignore = ["id"]
- for n, v in child.attrib.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- return cls(area_id=area_id, attributes=attributes, tags=tags, result=result)
-
-
-class Node(Element):
- """
- Class to represent an element of type node
- """
-
- _type_value = "node"
-
- def __init__(self, node_id=None, lat=None, lon=None, **kwargs):
- """
- :param lat: Latitude
- :type lat: Decimal or Float
- :param lon: Longitude
- :type long: Decimal or Float
- :param node_id: Id of the node element
- :type node_id: Integer
- :param kwargs: Additional arguments are passed directly to the parent class
- """
-
- Element.__init__(self, **kwargs)
- self.id = node_id
- self.lat = lat
- self.lon = lon
-
- def __repr__(self):
- return "".format(self.id, self.lat, self.lon)
-
- @classmethod
- def from_json(cls, data, result=None):
- """
- Create new Node element from JSON data
-
- :param data: Element data from JSON
- :type data: Dict
- :param result: The result this element belongs to
- :type result: overpy.Result
- :return: New instance of Node
- :rtype: overpy.Node
- :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
- """
- if data.get("type") != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=data.get("type")
- )
-
- tags = data.get("tags", {})
-
- node_id = data.get("id")
- lat = data.get("lat")
- lon = data.get("lon")
-
- attributes = {}
- ignore = ["type", "id", "lat", "lon", "tags"]
- for n, v in data.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result)
-
- @classmethod
- def from_xml(cls, child, result=None):
- """
- Create new way element from XML data
-
- :param child: XML node to be parsed
- :type child: xml.etree.ElementTree.Element
- :param result: The result this node belongs to
- :type result: overpy.Result
- :return: New Way oject
- :rtype: overpy.Node
- :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
- :raises ValueError: If a tag doesn't have a name
- """
- if child.tag.lower() != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=child.tag.lower()
- )
-
- tags = {}
-
- for sub_child in child:
- if sub_child.tag.lower() == "tag":
- name = sub_child.attrib.get("k")
- if name is None:
- raise ValueError("Tag without name/key.")
- value = sub_child.attrib.get("v")
- tags[name] = value
-
- node_id = child.attrib.get("id")
- if node_id is not None:
- node_id = int(node_id)
- lat = child.attrib.get("lat")
- if lat is not None:
- lat = Decimal(lat)
- lon = child.attrib.get("lon")
- if lon is not None:
- lon = Decimal(lon)
-
- attributes = {}
- ignore = ["id", "lat", "lon"]
- for n, v in child.attrib.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result)
-
-
-class Way(Element):
- """
- Class to represent an element of type way
- """
-
- _type_value = "way"
-
- def __init__(self, way_id=None, center_lat=None, center_lon=None, node_ids=None, **kwargs):
- """
- :param node_ids: List of node IDs
- :type node_ids: List or Tuple
- :param way_id: Id of the way element
- :type way_id: Integer
- :param kwargs: Additional arguments are passed directly to the parent class
-
- """
-
- Element.__init__(self, **kwargs)
- #: The id of the way
- self.id = way_id
-
- #: List of Ids of the associated nodes
- self._node_ids = node_ids
-
- #: The lat/lon of the center of the way (optional depending on query)
- self.center_lat = center_lat
- self.center_lon = center_lon
-
- def __repr__(self):
- return "".format(self.id, self._node_ids)
-
- @property
- def nodes(self):
- """
- List of nodes associated with the way.
- """
- return self.get_nodes()
-
- def get_nodes(self, resolve_missing=False):
- """
- Get the nodes defining the geometry of the way
-
- :param resolve_missing: Try to resolve missing nodes.
- :type resolve_missing: Boolean
- :return: List of nodes
- :rtype: List of overpy.Node
- :raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache.
- :raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved.
- """
- result = []
- resolved = False
-
- for node_id in self._node_ids:
- try:
- node = self._result.get_node(node_id)
- except exception.DataIncomplete:
- node = None
-
- if node is not None:
- result.append(node)
- continue
-
- if not resolve_missing:
- raise exception.DataIncomplete("Resolve missing nodes is disabled")
-
- # We tried to resolve the data but some nodes are still missing
- if resolved:
- raise exception.DataIncomplete("Unable to resolve all nodes")
-
- query = ("\n"
- "[out:json];\n"
- "way({way_id});\n"
- "node(w);\n"
- "out body;\n"
- )
- query = query.format(
- way_id=self.id
- )
- tmp_result = self._result.api.query(query)
- self._result.expand(tmp_result)
- resolved = True
-
- try:
- node = self._result.get_node(node_id)
- except exception.DataIncomplete:
- node = None
-
- if node is None:
- raise exception.DataIncomplete("Unable to resolve all nodes")
-
- result.append(node)
-
- return result
-
- @classmethod
- def from_json(cls, data, result=None):
- """
- Create new Way element from JSON data
-
- :param data: Element data from JSON
- :type data: Dict
- :param result: The result this element belongs to
- :type result: overpy.Result
- :return: New instance of Way
- :rtype: overpy.Way
- :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
- """
- if data.get("type") != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=data.get("type")
- )
-
- tags = data.get("tags", {})
-
- way_id = data.get("id")
- node_ids = data.get("nodes")
- (center_lat, center_lon) = cls.get_center_from_json(data=data)
-
- attributes = {}
- ignore = ["center", "id", "nodes", "tags", "type"]
- for n, v in data.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- return cls(
- attributes=attributes,
- center_lat=center_lat,
- center_lon=center_lon,
- node_ids=node_ids,
- tags=tags,
- result=result,
- way_id=way_id
- )
-
- @classmethod
- def from_xml(cls, child, result=None):
- """
- Create new way element from XML data
-
- :param child: XML node to be parsed
- :type child: xml.etree.ElementTree.Element
- :param result: The result this node belongs to
- :type result: overpy.Result
- :return: New Way oject
- :rtype: overpy.Way
- :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
- :raises ValueError: If the ref attribute of the xml node is not provided
- :raises ValueError: If a tag doesn't have a name
- """
- if child.tag.lower() != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=child.tag.lower()
- )
-
- tags = {}
- node_ids = []
- center_lat = None
- center_lon = None
-
- for sub_child in child:
- if sub_child.tag.lower() == "tag":
- name = sub_child.attrib.get("k")
- if name is None:
- raise ValueError("Tag without name/key.")
- value = sub_child.attrib.get("v")
- tags[name] = value
- if sub_child.tag.lower() == "nd":
- ref_id = sub_child.attrib.get("ref")
- if ref_id is None:
- raise ValueError("Unable to find required ref value.")
- ref_id = int(ref_id)
- node_ids.append(ref_id)
- if sub_child.tag.lower() == "center":
- (center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child)
-
- way_id = child.attrib.get("id")
- if way_id is not None:
- way_id = int(way_id)
-
- attributes = {}
- ignore = ["id"]
- for n, v in child.attrib.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- return cls(way_id=way_id, center_lat=center_lat, center_lon=center_lon,
- attributes=attributes, node_ids=node_ids, tags=tags, result=result)
-
-
-class Relation(Element):
- """
- Class to represent an element of type relation
- """
-
- _type_value = "relation"
-
- def __init__(self, rel_id=None, center_lat=None, center_lon=None, members=None, **kwargs):
- """
- :param members:
- :param rel_id: Id of the relation element
- :type rel_id: Integer
- :param kwargs:
- :return:
- """
-
- Element.__init__(self, **kwargs)
- self.id = rel_id
- self.members = members
-
- #: The lat/lon of the center of the way (optional depending on query)
- self.center_lat = center_lat
- self.center_lon = center_lon
-
- def __repr__(self):
- return "".format(self.id)
-
- @classmethod
- def from_json(cls, data, result=None):
- """
- Create new Relation element from JSON data
-
- :param data: Element data from JSON
- :type data: Dict
- :param result: The result this element belongs to
- :type result: overpy.Result
- :return: New instance of Relation
- :rtype: overpy.Relation
- :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
- """
- if data.get("type") != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=data.get("type")
- )
-
- tags = data.get("tags", {})
-
- rel_id = data.get("id")
- (center_lat, center_lon) = cls.get_center_from_json(data=data)
-
- members = []
-
- supported_members = [RelationNode, RelationWay, RelationRelation]
- for member in data.get("members", []):
- type_value = member.get("type")
- for member_cls in supported_members:
- if member_cls._type_value == type_value:
- members.append(
- member_cls.from_json(
- member,
- result=result
- )
- )
-
- attributes = {}
- ignore = ["id", "members", "tags", "type"]
- for n, v in data.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- return cls(
- rel_id=rel_id,
- attributes=attributes,
- center_lat=center_lat,
- center_lon=center_lon,
- members=members,
- tags=tags,
- result=result
- )
-
- @classmethod
- def from_xml(cls, child, result=None):
- """
- Create new way element from XML data
-
- :param child: XML node to be parsed
- :type child: xml.etree.ElementTree.Element
- :param result: The result this node belongs to
- :type result: overpy.Result
- :return: New Way oject
- :rtype: overpy.Relation
- :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
- :raises ValueError: If a tag doesn't have a name
- """
- if child.tag.lower() != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=child.tag.lower()
- )
-
- tags = {}
- members = []
- center_lat = None
- center_lon = None
-
- supported_members = [RelationNode, RelationWay, RelationRelation, RelationArea]
- for sub_child in child:
- if sub_child.tag.lower() == "tag":
- name = sub_child.attrib.get("k")
- if name is None:
- raise ValueError("Tag without name/key.")
- value = sub_child.attrib.get("v")
- tags[name] = value
- if sub_child.tag.lower() == "member":
- type_value = sub_child.attrib.get("type")
- for member_cls in supported_members:
- if member_cls._type_value == type_value:
- members.append(
- member_cls.from_xml(
- sub_child,
- result=result
- )
- )
- if sub_child.tag.lower() == "center":
- (center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child)
-
- rel_id = child.attrib.get("id")
- if rel_id is not None:
- rel_id = int(rel_id)
-
- attributes = {}
- ignore = ["id"]
- for n, v in child.attrib.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- return cls(
- rel_id=rel_id,
- attributes=attributes,
- center_lat=center_lat,
- center_lon=center_lon,
- members=members,
- tags=tags,
- result=result
- )
-
-
-class RelationMember(object):
- """
- Base class to represent a member of a relation.
- """
-
- def __init__(self, attributes=None, geometry=None, ref=None, role=None, result=None):
- """
- :param ref: Reference Id
- :type ref: Integer
- :param role: The role of the relation member
- :type role: String
- :param result:
- """
- self.ref = ref
- self._result = result
- self.role = role
- self.attributes = attributes
- self.geometry = geometry
-
- @classmethod
- def from_json(cls, data, result=None):
- """
- Create new RelationMember element from JSON data
-
- :param child: Element data from JSON
- :type child: Dict
- :param result: The result this element belongs to
- :type result: overpy.Result
- :return: New instance of RelationMember
- :rtype: overpy.RelationMember
- :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
- """
- if data.get("type") != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=data.get("type")
- )
-
- ref = data.get("ref")
- role = data.get("role")
-
- attributes = {}
- ignore = ["geometry", "type", "ref", "role"]
- for n, v in data.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- geometry = data.get("geometry")
- if isinstance(geometry, list):
- geometry_orig = geometry
- geometry = []
- for v in geometry_orig:
- geometry.append(
- RelationWayGeometryValue(
- lat=v.get("lat"),
- lon=v.get("lon")
- )
- )
- else:
- geometry = None
-
- return cls(
- attributes=attributes,
- geometry=geometry,
- ref=ref,
- role=role,
- result=result
- )
-
- @classmethod
- def from_xml(cls, child, result=None):
- """
- Create new RelationMember from XML data
-
- :param child: XML node to be parsed
- :type child: xml.etree.ElementTree.Element
- :param result: The result this element belongs to
- :type result: overpy.Result
- :return: New relation member oject
- :rtype: overpy.RelationMember
- :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
- """
- if child.attrib.get("type") != cls._type_value:
- raise exception.ElementDataWrongType(
- type_expected=cls._type_value,
- type_provided=child.tag.lower()
- )
-
- ref = child.attrib.get("ref")
- if ref is not None:
- ref = int(ref)
- role = child.attrib.get("role")
-
- attributes = {}
- ignore = ["geometry", "ref", "role", "type"]
- for n, v in child.attrib.items():
- if n in ignore:
- continue
- attributes[n] = v
-
- geometry = None
- for sub_child in child:
- if sub_child.tag.lower() == "nd":
- if geometry is None:
- geometry = []
- geometry.append(
- RelationWayGeometryValue(
- lat=Decimal(sub_child.attrib["lat"]),
- lon=Decimal(sub_child.attrib["lon"])
- )
- )
-
- return cls(
- attributes=attributes,
- geometry=geometry,
- ref=ref,
- role=role,
- result=result
- )
-
-
-class RelationNode(RelationMember):
- _type_value = "node"
-
- def resolve(self, resolve_missing=False):
- return self._result.get_node(self.ref, resolve_missing=resolve_missing)
-
- def __repr__(self):
- return "".format(self.ref, self.role)
-
-
-class RelationWay(RelationMember):
- _type_value = "way"
-
- def resolve(self, resolve_missing=False):
- return self._result.get_way(self.ref, resolve_missing=resolve_missing)
-
- def __repr__(self):
- return "".format(self.ref, self.role)
-
-
-class RelationWayGeometryValue(object):
- def __init__(self, lat, lon):
- self.lat = lat
- self.lon = lon
-
- def __repr__(self):
- return "".format(self.lat, self.lon)
-
-
-class RelationRelation(RelationMember):
- _type_value = "relation"
-
- def resolve(self, resolve_missing=False):
- return self._result.get_relation(self.ref, resolve_missing=resolve_missing)
-
- def __repr__(self):
- return "".format(self.ref, self.role)
-
-
-class RelationArea(RelationMember):
- _type_value = "area"
-
- def resolve(self, resolve_missing=False):
- return self._result.get_area(self.ref, resolve_missing=resolve_missing)
-
- def __repr__(self):
- return "".format(self.ref, self.role)
-
-
-class OSMSAXHandler(handler.ContentHandler):
- """
- SAX parser for Overpass XML response.
- """
- #: Tuple of opening elements to ignore
- ignore_start = ('osm', 'meta', 'note', 'bounds', 'remark')
- #: Tuple of closing elements to ignore
- ignore_end = ('osm', 'meta', 'note', 'bounds', 'remark', 'tag', 'nd', 'center')
-
- def __init__(self, result):
- """
- :param result: Append results to this result set.
- :type result: overpy.Result
- """
- handler.ContentHandler.__init__(self)
- self._result = result
- self._curr = {}
- #: Current relation member object
- self.cur_relation_member = None
-
- def startElement(self, name, attrs):
- """
- Handle opening elements.
-
- :param name: Name of the element
- :type name: String
- :param attrs: Attributes of the element
- :type attrs: Dict
- """
- if name in self.ignore_start:
- return
- try:
- handler = getattr(self, '_handle_start_%s' % name)
- except AttributeError:
- raise KeyError("Unknown element start '%s'" % name)
- handler(attrs)
-
- def endElement(self, name):
- """
- Handle closing elements
-
- :param name: Name of the element
- :type name: String
- """
- if name in self.ignore_end:
- return
- try:
- handler = getattr(self, '_handle_end_%s' % name)
- except AttributeError:
- raise KeyError("Unknown element end '%s'" % name)
- handler()
-
- def _handle_start_center(self, attrs):
- """
- Handle opening center element
-
- :param attrs: Attributes of the element
- :type attrs: Dict
- """
- center_lat = attrs.get("lat")
- center_lon = attrs.get("lon")
- if center_lat is None or center_lon is None:
- raise ValueError("Unable to get lat or lon of way center.")
- self._curr["center_lat"] = Decimal(center_lat)
- self._curr["center_lon"] = Decimal(center_lon)
-
- def _handle_start_tag(self, attrs):
- """
- Handle opening tag element
-
- :param attrs: Attributes of the element
- :type attrs: Dict
- """
- try:
- tag_key = attrs['k']
- except KeyError:
- raise ValueError("Tag without name/key.")
- self._curr['tags'][tag_key] = attrs.get('v')
-
- def _handle_start_node(self, attrs):
- """
- Handle opening node element
-
- :param attrs: Attributes of the element
- :type attrs: Dict
- """
- self._curr = {
- 'attributes': dict(attrs),
- 'lat': None,
- 'lon': None,
- 'node_id': None,
- 'tags': {}
- }
- if attrs.get('id', None) is not None:
- self._curr['node_id'] = int(attrs['id'])
- del self._curr['attributes']['id']
- if attrs.get('lat', None) is not None:
- self._curr['lat'] = Decimal(attrs['lat'])
- del self._curr['attributes']['lat']
- if attrs.get('lon', None) is not None:
- self._curr['lon'] = Decimal(attrs['lon'])
- del self._curr['attributes']['lon']
-
- def _handle_end_node(self):
- """
- Handle closing node element
- """
- self._result.append(Node(result=self._result, **self._curr))
- self._curr = {}
-
- def _handle_start_way(self, attrs):
- """
- Handle opening way element
-
- :param attrs: Attributes of the element
- :type attrs: Dict
- """
- self._curr = {
- 'center_lat': None,
- 'center_lon': None,
- 'attributes': dict(attrs),
- 'node_ids': [],
- 'tags': {},
- 'way_id': None
- }
- if attrs.get('id', None) is not None:
- self._curr['way_id'] = int(attrs['id'])
- del self._curr['attributes']['id']
-
- def _handle_end_way(self):
- """
- Handle closing way element
- """
- self._result.append(Way(result=self._result, **self._curr))
- self._curr = {}
-
- def _handle_start_area(self, attrs):
- """
- Handle opening area element
-
- :param attrs: Attributes of the element
- :type attrs: Dict
- """
- self._curr = {
- 'attributes': dict(attrs),
- 'tags': {},
- 'area_id': None
- }
- if attrs.get('id', None) is not None:
- self._curr['area_id'] = int(attrs['id'])
- del self._curr['attributes']['id']
-
- def _handle_end_area(self):
- """
- Handle closing area element
- """
- self._result.append(Area(result=self._result, **self._curr))
- self._curr = {}
-
- def _handle_start_nd(self, attrs):
- """
- Handle opening nd element
-
- :param attrs: Attributes of the element
- :type attrs: Dict
- """
- if isinstance(self.cur_relation_member, RelationWay):
- if self.cur_relation_member.geometry is None:
- self.cur_relation_member.geometry = []
- self.cur_relation_member.geometry.append(
- RelationWayGeometryValue(
- lat=Decimal(attrs["lat"]),
- lon=Decimal(attrs["lon"])
- )
- )
- else:
- try:
- node_ref = attrs['ref']
- except KeyError:
- raise ValueError("Unable to find required ref value.")
- self._curr['node_ids'].append(int(node_ref))
-
- def _handle_start_relation(self, attrs):
- """
- Handle opening relation element
-
- :param attrs: Attributes of the element
- :type attrs: Dict
- """
- self._curr = {
- 'attributes': dict(attrs),
- 'members': [],
- 'rel_id': None,
- 'tags': {}
- }
- if attrs.get('id', None) is not None:
- self._curr['rel_id'] = int(attrs['id'])
- del self._curr['attributes']['id']
-
- def _handle_end_relation(self):
- """
- Handle closing relation element
- """
- self._result.append(Relation(result=self._result, **self._curr))
- self._curr = {}
-
- def _handle_start_member(self, attrs):
- """
- Handle opening member element
-
- :param attrs: Attributes of the element
- :type attrs: Dict
- """
-
- params = {
- # ToDo: Parse attributes
- 'attributes': {},
- 'ref': None,
- 'result': self._result,
- 'role': None
- }
- if attrs.get('ref', None):
- params['ref'] = int(attrs['ref'])
- if attrs.get('role', None):
- params['role'] = attrs['role']
-
- cls_map = {
- "area": RelationArea,
- "node": RelationNode,
- "relation": RelationRelation,
- "way": RelationWay
- }
- cls = cls_map.get(attrs["type"])
- if cls is None:
- raise ValueError("Undefined type for member: '%s'" % attrs['type'])
-
- self.cur_relation_member = cls(**params)
- self._curr['members'].append(self.cur_relation_member)
-
- def _handle_end_member(self):
- self.cur_relation_member = None
diff --git a/pyextra/overpy/exception.py b/pyextra/overpy/exception.py
deleted file mode 100644
index 3d8416a125..0000000000
--- a/pyextra/overpy/exception.py
+++ /dev/null
@@ -1,166 +0,0 @@
-class OverPyException(BaseException):
- """OverPy base exception"""
- pass
-
-
-class DataIncomplete(OverPyException):
- """
- Raised if the requested data isn't available in the result.
- Try to improve the query or to resolve the missing data.
- """
- def __init__(self, *args, **kwargs):
- OverPyException.__init__(
- self,
- "Data incomplete try to improve the query to resolve the missing data",
- *args,
- **kwargs
- )
-
-
-class ElementDataWrongType(OverPyException):
- """
- Raised if the provided element does not match the expected type.
-
- :param type_expected: The expected element type
- :type type_expected: String
- :param type_provided: The provided element type
- :type type_provided: String|None
- """
- def __init__(self, type_expected, type_provided=None):
- self.type_expected = type_expected
- self.type_provided = type_provided
-
- def __str__(self):
- return "Type expected '%s' but '%s' provided" % (
- self.type_expected,
- str(self.type_provided)
- )
-
-
-class MaxRetriesReached(OverPyException):
- """
- Raised if max retries reached and the Overpass server didn't respond with a result.
- """
- def __init__(self, retry_count, exceptions):
- self.exceptions = exceptions
- self.retry_count = retry_count
-
- def __str__(self):
- return "Unable get any result from the Overpass API server after %d retries." % self.retry_count
-
-
-class OverpassBadRequest(OverPyException):
- """
- Raised if the Overpass API service returns a syntax error.
-
- :param query: The encoded query how it was send to the server
- :type query: Bytes
- :param msgs: List of error messages
- :type msgs: List
- """
- def __init__(self, query, msgs=None):
- self.query = query
- if msgs is None:
- msgs = []
- self.msgs = msgs
-
- def __str__(self):
- tmp_msgs = []
- for tmp_msg in self.msgs:
- if not isinstance(tmp_msg, str):
- tmp_msg = str(tmp_msg)
- tmp_msgs.append(tmp_msg)
-
- return "\n".join(tmp_msgs)
-
-
-class OverpassError(OverPyException):
- """
- Base exception to report errors if the response returns a remark tag or element.
-
- .. note::
- If you are not sure which of the subexceptions you should use, use this one and try to parse the message.
-
- For more information have a look at https://github.com/DinoTools/python-overpy/issues/62
-
- :param str msg: The message from the remark tag or element
- """
- def __init__(self, msg=None):
- #: The message from the remark tag or element
- self.msg = msg
-
- def __str__(self):
- if self.msg is None:
- return "No error message provided"
- if not isinstance(self.msg, str):
- return str(self.msg)
- return self.msg
-
-
-class OverpassGatewayTimeout(OverPyException):
- """
- Raised if load of the Overpass API service is too high and it can't handle the request.
- """
- def __init__(self):
- OverPyException.__init__(self, "Server load too high")
-
-
-class OverpassRuntimeError(OverpassError):
- """
- Raised if the server returns a remark-tag(xml) or remark element(json) with a message starting with
- 'runtime error:'.
- """
- pass
-
-
-class OverpassRuntimeRemark(OverpassError):
- """
- Raised if the server returns a remark-tag(xml) or remark element(json) with a message starting with
- 'runtime remark:'.
- """
- pass
-
-
-class OverpassTooManyRequests(OverPyException):
- """
- Raised if the Overpass API service returns a 429 status code.
- """
- def __init__(self):
- OverPyException.__init__(self, "Too many requests")
-
-
-class OverpassUnknownContentType(OverPyException):
- """
- Raised if the reported content type isn't handled by OverPy.
-
- :param content_type: The reported content type
- :type content_type: None or String
- """
- def __init__(self, content_type):
- self.content_type = content_type
-
- def __str__(self):
- if self.content_type is None:
- return "No content type returned"
- return "Unknown content type: %s" % self.content_type
-
-
-class OverpassUnknownError(OverpassError):
- """
- Raised if the server returns a remark-tag(xml) or remark element(json) and we are unable to find any reason.
- """
- pass
-
-
-class OverpassUnknownHTTPStatusCode(OverPyException):
- """
- Raised if the returned HTTP status code isn't handled by OverPy.
-
- :param code: The HTTP status code
- :type code: Integer
- """
- def __init__(self, code):
- self.code = code
-
- def __str__(self):
- return "Unknown/Unhandled status code: %d" % self.code
\ No newline at end of file
diff --git a/pyextra/overpy/helper.py b/pyextra/overpy/helper.py
deleted file mode 100644
index e3ac0170bc..0000000000
--- a/pyextra/overpy/helper.py
+++ /dev/null
@@ -1,64 +0,0 @@
-__author__ = 'mjob'
-
-import overpy
-
-
-def get_street(street, areacode, api=None):
- """
- Retrieve streets in a given bounding area
-
- :param overpy.Overpass api: First street of intersection
- :param String street: Name of street
- :param String areacode: The OSM id of the bounding area
- :return: Parsed result
- :raises overpy.exception.OverPyException: If something bad happens.
- """
- if api is None:
- api = overpy.Overpass()
-
- query = """
- area(%s)->.location;
- (
- way[highway][name="%s"](area.location);
- - (
- way[highway=service](area.location);
- way[highway=track](area.location);
- );
- );
- out body;
- >;
- out skel qt;
- """
-
- data = api.query(query % (areacode, street))
-
- return data
-
-
-def get_intersection(street1, street2, areacode, api=None):
- """
- Retrieve intersection of two streets in a given bounding area
-
- :param overpy.Overpass api: First street of intersection
- :param String street1: Name of first street of intersection
- :param String street2: Name of second street of intersection
- :param String areacode: The OSM id of the bounding area
- :return: List of intersections
- :raises overpy.exception.OverPyException: If something bad happens.
- """
- if api is None:
- api = overpy.Overpass()
-
- query = """
- area(%s)->.location;
- (
- way[highway][name="%s"](area.location); node(w)->.n1;
- way[highway][name="%s"](area.location); node(w)->.n2;
- );
- node.n1.n2;
- out meta;
- """
-
- data = api.query(query % (areacode, street1, street2))
-
- return data.get_nodes()