from collections import OrderedDict
from datetime import datetime
from decimal import Decimal
from xml . sax import handler , make_parser
import json
import re
import sys
import time
import requests
from overpy import exception
from overpy . __about__ import (
__author__ , __copyright__ , __email__ , __license__ , __summary__ , __title__ ,
__uri__ , __version__
)
PY2 = sys . version_info [ 0 ] == 2
PY3 = sys . version_info [ 0 ] == 3
XML_PARSER_DOM = 1
XML_PARSER_SAX = 2
# Try to convert some common attributes
# http://wiki.openstreetmap.org/wiki/Elements#Common_attributes
GLOBAL_ATTRIBUTE_MODIFIERS = {
" changeset " : int ,
" timestamp " : lambda ts : datetime . strptime ( ts , " % Y- % m- %d T % H: % M: % SZ " ) ,
" uid " : int ,
" version " : int ,
" visible " : lambda v : v . lower ( ) == " true "
}
def is_valid_type ( element , cls ) :
"""
Test if an element is of a given type .
: param Element ( ) element : The element instance to test
: param Element cls : The element class to test
: return : False or True
: rtype : Boolean
"""
return isinstance ( element , cls ) and element . id is not None
class Overpass ( object ) :
"""
Class to access the Overpass API
: cvar default_max_retry_count : Global max number of retries ( Default : 0 )
: cvar default_retry_timeout : Global time to wait between tries ( Default : 1.0 s )
"""
default_max_retry_count = 0
default_read_chunk_size = 4096
default_retry_timeout = 1.0
default_url = " http://overpass-api.de/api/interpreter "
def __init__ ( self , read_chunk_size = None , url = None , xml_parser = XML_PARSER_SAX , max_retry_count = None , retry_timeout = None , timeout = 5.0 , headers = None ) :
"""
: param read_chunk_size : Max size of each chunk read from the server response
: type read_chunk_size : Integer
: param url : Optional URL of the Overpass server . Defaults to http : / / overpass - api . de / api / interpreter
: type url : str
: param xml_parser : The xml parser to use
: type xml_parser : Integer
: param max_retry_count : Max number of retries ( Default : default_max_retry_count )
: type max_retry_count : Integer
: param retry_timeout : Time to wait between tries ( Default : default_retry_timeout )
: type retry_timeout : float
: param timeout : HTTP request timeout
: type timeout : float
: param headers : HTTP request headers
: type headers : dict
"""
self . url = self . default_url
if url is not None :
self . url = url
self . _regex_extract_error_msg = re . compile ( b " \ <p \ >(?P<msg> \ <strong \ s.*?) \ </p \ > " )
self . _regex_remove_tag = re . compile ( b " <[^>]*?> " )
if read_chunk_size is None :
read_chunk_size = self . default_read_chunk_size
self . read_chunk_size = read_chunk_size
if max_retry_count is None :
max_retry_count = self . default_max_retry_count
self . max_retry_count = max_retry_count
if retry_timeout is None :
retry_timeout = self . default_retry_timeout
self . retry_timeout = retry_timeout
self . xml_parser = xml_parser
self . timeout = timeout
self . headers = headers
def _handle_remark_msg ( self , msg ) :
"""
Try to parse the message provided with the remark tag or element .
: param str msg : The message
: raises overpy . exception . OverpassRuntimeError : If message starts with ' runtime error: '
: raises overpy . exception . OverpassRuntimeRemark : If message starts with ' runtime remark: '
: raises overpy . exception . OverpassUnknownError : If we are unable to identify the error
"""
msg = msg . strip ( )
if msg . startswith ( " runtime error: " ) :
raise exception . OverpassRuntimeError ( msg = msg )
elif msg . startswith ( " runtime remark: " ) :
raise exception . OverpassRuntimeRemark ( msg = msg )
raise exception . OverpassUnknownError ( msg = msg )
def query ( self , query ) :
"""
Query the Overpass API
: param String | Bytes query : The query string in Overpass QL
: return : The parsed result
: rtype : overpy . Result
"""
if not isinstance ( query , bytes ) :
query = query . encode ( " utf-8 " )
retry_num = 0
retry_exceptions = [ ]
do_retry = True if self . max_retry_count > 0 else False
while retry_num < = self . max_retry_count :
if retry_num > 0 :
time . sleep ( self . retry_timeout )
retry_num + = 1
try :
if self . headers is not None :
r = requests . post ( self . url , query , timeout = self . timeout , headers = self . headers )
else :
r = requests . post ( self . url , query , timeout = self . timeout )
response = r . content
except ( requests . exceptions . BaseHTTPError , requests . exceptions . RequestException ) as e :
if not do_retry :
raise e
retry_exceptions . append ( e )
continue
if r . status_code == 200 :
content_type = r . headers [ " Content-Type " ]
if content_type == " application/json " :
return self . parse_json ( response )
if content_type == " application/osm3s+xml " :
return self . parse_xml ( response )
e = exception . OverpassUnknownContentType ( content_type )
if not do_retry :
raise e
retry_exceptions . append ( e )
continue
elif r . status_code == 400 :
msgs = [ ]
for msg in self . _regex_extract_error_msg . finditer ( response ) :
tmp = self . _regex_remove_tag . sub ( b " " , msg . group ( " msg " ) )
try :
tmp = tmp . decode ( " utf-8 " )
except UnicodeDecodeError :
tmp = repr ( tmp )
msgs . append ( tmp )
e = exception . OverpassBadRequest (
query ,
msgs = msgs
)
if not do_retry :
raise e
retry_exceptions . append ( e )
continue
elif r . status_code == 429 :
e = exception . OverpassTooManyRequests
if not do_retry :
raise e
retry_exceptions . append ( e )
continue
elif r . status_code == 504 :
e = exception . OverpassGatewayTimeout
if not do_retry :
raise e
retry_exceptions . append ( e )
continue
# No valid response code
e = exception . OverpassUnknownHTTPStatusCode ( r . status_code )
if not do_retry :
raise e
retry_exceptions . append ( e )
continue
raise exception . MaxRetriesReached ( retry_count = retry_num , exceptions = retry_exceptions )
def parse_json ( self , data , encoding = " utf-8 " ) :
"""
Parse raw response from Overpass service .
: param data : Raw JSON Data
: type data : String or Bytes
: param encoding : Encoding to decode byte string
: type encoding : String
: return : Result object
: rtype : overpy . Result
"""
if isinstance ( data , bytes ) :
data = data . decode ( encoding )
data = json . loads ( data , parse_float = Decimal )
if " remark " in data :
self . _handle_remark_msg ( msg = data . get ( " remark " ) )
return Result . from_json ( data , api = self )
def parse_xml ( self , data , encoding = " utf-8 " , parser = None ) :
"""
: param data : Raw XML Data
: type data : String or Bytes
: param encoding : Encoding to decode byte string
: type encoding : String
: return : Result object
: rtype : overpy . Result
"""
if parser is None :
parser = self . xml_parser
if isinstance ( data , bytes ) :
data = data . decode ( encoding )
if PY2 and not isinstance ( data , str ) :
# Python 2.x: Convert unicode strings
data = data . encode ( encoding )
m = re . compile ( " <remark>(?P<msg>[^<>]*)</remark> " ) . search ( data )
if m :
self . _handle_remark_msg ( m . group ( " msg " ) )
return Result . from_xml ( data , api = self , parser = parser )
class Result ( object ) :
"""
Class to handle the result .
"""
def __init__ ( self , elements = None , api = None ) :
"""
: param List elements :
: param api :
: type api : overpy . Overpass
"""
if elements is None :
elements = [ ]
self . _areas = OrderedDict ( ( element . id , element ) for element in elements if is_valid_type ( element , Area ) )
self . _nodes = OrderedDict ( ( element . id , element ) for element in elements if is_valid_type ( element , Node ) )
self . _ways = OrderedDict ( ( element . id , element ) for element in elements if is_valid_type ( element , Way ) )
self . _relations = OrderedDict ( ( element . id , element )
for element in elements if is_valid_type ( element , Relation ) )
self . _class_collection_map = { Node : self . _nodes , Way : self . _ways , Relation : self . _relations , Area : self . _areas }
self . api = api
def expand ( self , other ) :
"""
Add all elements from an other result to the list of elements of this result object .
It is used by the auto resolve feature .
: param other : Expand the result with the elements from this result .
: type other : overpy . Result
: raises ValueError : If provided parameter is not instance of : class : ` overpy . Result `
"""
if not isinstance ( other , Result ) :
raise ValueError ( " Provided argument has to be instance of overpy:Result() " )
other_collection_map = { Node : other . nodes , Way : other . ways , Relation : other . relations , Area : other . areas }
for element_type , own_collection in self . _class_collection_map . items ( ) :
for element in other_collection_map [ element_type ] :
if is_valid_type ( element , element_type ) and element . id not in own_collection :
own_collection [ element . id ] = element
def append ( self , element ) :
"""
Append a new element to the result .
: param element : The element to append
: type element : overpy . Element
"""
if is_valid_type ( element , Element ) :
self . _class_collection_map [ element . __class__ ] . setdefault ( element . id , element )
def get_elements ( self , filter_cls , elem_id = None ) :
"""
Get a list of elements from the result and filter the element type by a class .
: param filter_cls :
: param elem_id : ID of the object
: type elem_id : Integer
: return : List of available elements
: rtype : List
"""
result = [ ]
if elem_id is not None :
try :
result = [ self . _class_collection_map [ filter_cls ] [ elem_id ] ]
except KeyError :
result = [ ]
else :
for e in self . _class_collection_map [ filter_cls ] . values ( ) :
result . append ( e )
return result
def get_ids ( self , filter_cls ) :
"""
: param filter_cls :
: return :
"""
return list ( self . _class_collection_map [ filter_cls ] . keys ( ) )
def get_node_ids ( self ) :
return self . get_ids ( filter_cls = Node )
def get_way_ids ( self ) :
return self . get_ids ( filter_cls = Way )
def get_relation_ids ( self ) :
return self . get_ids ( filter_cls = Relation )
def get_area_ids ( self ) :
return self . get_ids ( filter_cls = Area )
@classmethod
def from_json ( cls , data , api = None ) :
"""
Create a new instance and load data from json object .
: param data : JSON data returned by the Overpass API
: type data : Dict
: param api :
: type api : overpy . Overpass
: return : New instance of Result object
: rtype : overpy . Result
"""
result = cls ( api = api )
for elem_cls in [ Node , Way , Relation , Area ] :
for element in data . get ( " elements " , [ ] ) :
e_type = element . get ( " type " )
if hasattr ( e_type , " lower " ) and e_type . lower ( ) == elem_cls . _type_value :
result . append ( elem_cls . from_json ( element , result = result ) )
return result
@classmethod
def from_xml ( cls , data , api = None , parser = None ) :
"""
Create a new instance and load data from xml data or object .
. . note : :
If parser is set to None , the functions tries to find the best parse .
By default the SAX parser is chosen if a string is provided as data .
The parser is set to DOM if an xml . etree . ElementTree . Element is provided as data value .
: param data : Root element
: type data : str | xml . etree . ElementTree . Element
: param api : The instance to query additional information if required .
: type api : Overpass
: param parser : Specify the parser to use ( DOM or SAX ) ( Default : None = autodetect , defaults to SAX )
: type parser : Integer | None
: return : New instance of Result object
: rtype : Result
"""
if parser is None :
if isinstance ( data , str ) :
parser = XML_PARSER_SAX
else :
parser = XML_PARSER_DOM
result = cls ( api = api )
if parser == XML_PARSER_DOM :
import xml . etree . ElementTree as ET
if isinstance ( data , str ) :
root = ET . fromstring ( data )
elif isinstance ( data , ET . Element ) :
root = data
else :
raise exception . OverPyException ( " Unable to detect data type. " )
for elem_cls in [ Node , Way , Relation , Area ] :
for child in root :
if child . tag . lower ( ) == elem_cls . _type_value :
result . append ( elem_cls . from_xml ( child , result = result ) )
elif parser == XML_PARSER_SAX :
if PY2 :
from StringIO import StringIO
else :
from io import StringIO
source = StringIO ( data )
sax_handler = OSMSAXHandler ( result )
parser = make_parser ( )
parser . setContentHandler ( sax_handler )
parser . parse ( source )
else :
# ToDo: better exception
raise Exception ( " Unknown XML parser " )
return result
def get_area ( self , area_id , resolve_missing = False ) :
"""
Get an area by its ID .
: param area_id : The area ID
: type area_id : Integer
: param resolve_missing : Query the Overpass API if the area is missing in the result set .
: return : The area
: rtype : overpy . Area
: raises overpy . exception . DataIncomplete : The requested way is not available in the result cache .
: raises overpy . exception . DataIncomplete : If resolve_missing is True and the area can ' t be resolved.
"""
areas = self . get_areas ( area_id = area_id )
if len ( areas ) == 0 :
if resolve_missing is False :
raise exception . DataIncomplete ( " Resolve missing area is disabled " )
query = ( " \n "
" [out:json]; \n "
" area( {area_id} ); \n "
" out body; \n "
)
query = query . format (
area_id = area_id
)
tmp_result = self . api . query ( query )
self . expand ( tmp_result )
areas = self . get_areas ( area_id = area_id )
if len ( areas ) == 0 :
raise exception . DataIncomplete ( " Unable to resolve requested areas " )
return areas [ 0 ]
def get_areas ( self , area_id = None , * * kwargs ) :
"""
Alias for get_elements ( ) but filter the result by Area
: param area_id : The Id of the area
: type area_id : Integer
: return : List of elements
"""
return self . get_elements ( Area , elem_id = area_id , * * kwargs )
def get_node ( self , node_id , resolve_missing = False ) :
"""
Get a node by its ID .
: param node_id : The node ID
: type node_id : Integer
: param resolve_missing : Query the Overpass API if the node is missing in the result set .
: return : The node
: rtype : overpy . Node
: raises overpy . exception . DataIncomplete : At least one referenced node is not available in the result cache .
: raises overpy . exception . DataIncomplete : If resolve_missing is True and at least one node can ' t be resolved.
"""
nodes = self . get_nodes ( node_id = node_id )
if len ( nodes ) == 0 :
if not resolve_missing :
raise exception . DataIncomplete ( " Resolve missing nodes is disabled " )
query = ( " \n "
" [out:json]; \n "
" node( {node_id} ); \n "
" out body; \n "
)
query = query . format (
node_id = node_id
)
tmp_result = self . api . query ( query )
self . expand ( tmp_result )
nodes = self . get_nodes ( node_id = node_id )
if len ( nodes ) == 0 :
raise exception . DataIncomplete ( " Unable to resolve all nodes " )
return nodes [ 0 ]
def get_nodes ( self , node_id = None , * * kwargs ) :
"""
Alias for get_elements ( ) but filter the result by Node ( )
: param node_id : The Id of the node
: type node_id : Integer
: return : List of elements
"""
return self . get_elements ( Node , elem_id = node_id , * * kwargs )
def get_relation ( self , rel_id , resolve_missing = False ) :
"""
Get a relation by its ID .
: param rel_id : The relation ID
: type rel_id : Integer
: param resolve_missing : Query the Overpass API if the relation is missing in the result set .
: return : The relation
: rtype : overpy . Relation
: raises overpy . exception . DataIncomplete : The requested relation is not available in the result cache .
: raises overpy . exception . DataIncomplete : If resolve_missing is True and the relation can ' t be resolved.
"""
relations = self . get_relations ( rel_id = rel_id )
if len ( relations ) == 0 :
if resolve_missing is False :
raise exception . DataIncomplete ( " Resolve missing relations is disabled " )
query = ( " \n "
" [out:json]; \n "
" relation( {relation_id} ); \n "
" out body; \n "
)
query = query . format (
relation_id = rel_id
)
tmp_result = self . api . query ( query )
self . expand ( tmp_result )
relations = self . get_relations ( rel_id = rel_id )
if len ( relations ) == 0 :
raise exception . DataIncomplete ( " Unable to resolve requested reference " )
return relations [ 0 ]
def get_relations ( self , rel_id = None , * * kwargs ) :
"""
Alias for get_elements ( ) but filter the result by Relation
: param rel_id : Id of the relation
: type rel_id : Integer
: return : List of elements
"""
return self . get_elements ( Relation , elem_id = rel_id , * * kwargs )
def get_way ( self , way_id , resolve_missing = False ) :
"""
Get a way by its ID .
: param way_id : The way ID
: type way_id : Integer
: param resolve_missing : Query the Overpass API if the way is missing in the result set .
: return : The way
: rtype : overpy . Way
: raises overpy . exception . DataIncomplete : The requested way is not available in the result cache .
: raises overpy . exception . DataIncomplete : If resolve_missing is True and the way can ' t be resolved.
"""
ways = self . get_ways ( way_id = way_id )
if len ( ways ) == 0 :
if resolve_missing is False :
raise exception . DataIncomplete ( " Resolve missing way is disabled " )
query = ( " \n "
" [out:json]; \n "
" way( {way_id} ); \n "
" out body; \n "
)
query = query . format (
way_id = way_id
)
tmp_result = self . api . query ( query )
self . expand ( tmp_result )
ways = self . get_ways ( way_id = way_id )
if len ( ways ) == 0 :
raise exception . DataIncomplete ( " Unable to resolve requested way " )
return ways [ 0 ]
def get_ways ( self , way_id = None , * * kwargs ) :
"""
Alias for get_elements ( ) but filter the result by Way
: param way_id : The Id of the way
: type way_id : Integer
: return : List of elements
"""
return self . get_elements ( Way , elem_id = way_id , * * kwargs )
area_ids = property ( get_area_ids )
areas = property ( get_areas )
node_ids = property ( get_node_ids )
nodes = property ( get_nodes )
relation_ids = property ( get_relation_ids )
relations = property ( get_relations )
way_ids = property ( get_way_ids )
ways = property ( get_ways )
class Element ( object ) :
"""
Base element
"""
def __init__ ( self , attributes = None , result = None , tags = None ) :
"""
: param attributes : Additional attributes
: type attributes : Dict
: param result : The result object this element belongs to
: param tags : List of tags
: type tags : Dict
"""
self . _result = result
self . attributes = attributes
# ToDo: Add option to modify attribute modifiers
attribute_modifiers = dict ( GLOBAL_ATTRIBUTE_MODIFIERS . items ( ) )
for n , m in attribute_modifiers . items ( ) :
if n in self . attributes :
self . attributes [ n ] = m ( self . attributes [ n ] )
self . id = None
self . tags = tags
@classmethod
def get_center_from_json ( cls , data ) :
"""
Get center information from json data
: param data : json data
: return : tuple with two elements : lat and lon
: rtype : tuple
"""
center_lat = None
center_lon = None
center = data . get ( " center " )
if isinstance ( center , dict ) :
center_lat = center . get ( " lat " )
center_lon = center . get ( " lon " )
if center_lat is None or center_lon is None :
raise ValueError ( " Unable to get lat or lon of way center. " )
center_lat = Decimal ( center_lat )
center_lon = Decimal ( center_lon )
return ( center_lat , center_lon )
@classmethod
def get_center_from_xml_dom ( cls , sub_child ) :
center_lat = sub_child . attrib . get ( " lat " )
center_lon = sub_child . attrib . get ( " lon " )
if center_lat is None or center_lon is None :
raise ValueError ( " Unable to get lat or lon of way center. " )
center_lat = Decimal ( center_lat )
center_lon = Decimal ( center_lon )
return center_lat , center_lon
class Area ( Element ) :
"""
Class to represent an element of type area
"""
_type_value = " area "
def __init__ ( self , area_id = None , * * kwargs ) :
"""
: param area_id : Id of the area element
: type area_id : Integer
: param kwargs : Additional arguments are passed directly to the parent class
"""
Element . __init__ ( self , * * kwargs )
#: The id of the way
self . id = area_id
def __repr__ ( self ) :
return " <overpy.Area id= {} > " . format ( self . id )
@classmethod
def from_json ( cls , data , result = None ) :
"""
Create new Area element from JSON data
: param data : Element data from JSON
: type data : Dict
: param result : The result this element belongs to
: type result : overpy . Result
: return : New instance of Way
: rtype : overpy . Area
: raises overpy . exception . ElementDataWrongType : If type value of the passed JSON data does not match .
"""
if data . get ( " type " ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = data . get ( " type " )
)
tags = data . get ( " tags " , { } )
area_id = data . get ( " id " )
attributes = { }
ignore = [ " id " , " tags " , " type " ]
for n , v in data . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
return cls ( area_id = area_id , attributes = attributes , tags = tags , result = result )
@classmethod
def from_xml ( cls , child , result = None ) :
"""
Create new way element from XML data
: param child : XML node to be parsed
: type child : xml . etree . ElementTree . Element
: param result : The result this node belongs to
: type result : overpy . Result
: return : New Way oject
: rtype : overpy . Way
: raises overpy . exception . ElementDataWrongType : If name of the xml child node doesn ' t match
: raises ValueError : If the ref attribute of the xml node is not provided
: raises ValueError : If a tag doesn ' t have a name
"""
if child . tag . lower ( ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = child . tag . lower ( )
)
tags = { }
for sub_child in child :
if sub_child . tag . lower ( ) == " tag " :
name = sub_child . attrib . get ( " k " )
if name is None :
raise ValueError ( " Tag without name/key. " )
value = sub_child . attrib . get ( " v " )
tags [ name ] = value
area_id = child . attrib . get ( " id " )
if area_id is not None :
area_id = int ( area_id )
attributes = { }
ignore = [ " id " ]
for n , v in child . attrib . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
return cls ( area_id = area_id , attributes = attributes , tags = tags , result = result )
class Node ( Element ) :
"""
Class to represent an element of type node
"""
_type_value = " node "
def __init__ ( self , node_id = None , lat = None , lon = None , * * kwargs ) :
"""
: param lat : Latitude
: type lat : Decimal or Float
: param lon : Longitude
: type long : Decimal or Float
: param node_id : Id of the node element
: type node_id : Integer
: param kwargs : Additional arguments are passed directly to the parent class
"""
Element . __init__ ( self , * * kwargs )
self . id = node_id
self . lat = lat
self . lon = lon
def __repr__ ( self ) :
return " <overpy.Node id= {} lat= {} lon= {} > " . format ( self . id , self . lat , self . lon )
@classmethod
def from_json ( cls , data , result = None ) :
"""
Create new Node element from JSON data
: param data : Element data from JSON
: type data : Dict
: param result : The result this element belongs to
: type result : overpy . Result
: return : New instance of Node
: rtype : overpy . Node
: raises overpy . exception . ElementDataWrongType : If type value of the passed JSON data does not match .
"""
if data . get ( " type " ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = data . get ( " type " )
)
tags = data . get ( " tags " , { } )
node_id = data . get ( " id " )
lat = data . get ( " lat " )
lon = data . get ( " lon " )
attributes = { }
ignore = [ " type " , " id " , " lat " , " lon " , " tags " ]
for n , v in data . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
return cls ( node_id = node_id , lat = lat , lon = lon , tags = tags , attributes = attributes , result = result )
@classmethod
def from_xml ( cls , child , result = None ) :
"""
Create new way element from XML data
: param child : XML node to be parsed
: type child : xml . etree . ElementTree . Element
: param result : The result this node belongs to
: type result : overpy . Result
: return : New Way oject
: rtype : overpy . Node
: raises overpy . exception . ElementDataWrongType : If name of the xml child node doesn ' t match
: raises ValueError : If a tag doesn ' t have a name
"""
if child . tag . lower ( ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = child . tag . lower ( )
)
tags = { }
for sub_child in child :
if sub_child . tag . lower ( ) == " tag " :
name = sub_child . attrib . get ( " k " )
if name is None :
raise ValueError ( " Tag without name/key. " )
value = sub_child . attrib . get ( " v " )
tags [ name ] = value
node_id = child . attrib . get ( " id " )
if node_id is not None :
node_id = int ( node_id )
lat = child . attrib . get ( " lat " )
if lat is not None :
lat = Decimal ( lat )
lon = child . attrib . get ( " lon " )
if lon is not None :
lon = Decimal ( lon )
attributes = { }
ignore = [ " id " , " lat " , " lon " ]
for n , v in child . attrib . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
return cls ( node_id = node_id , lat = lat , lon = lon , tags = tags , attributes = attributes , result = result )
class Way ( Element ) :
"""
Class to represent an element of type way
"""
_type_value = " way "
def __init__ ( self , way_id = None , center_lat = None , center_lon = None , node_ids = None , * * kwargs ) :
"""
: param node_ids : List of node IDs
: type node_ids : List or Tuple
: param way_id : Id of the way element
: type way_id : Integer
: param kwargs : Additional arguments are passed directly to the parent class
"""
Element . __init__ ( self , * * kwargs )
#: The id of the way
self . id = way_id
#: List of Ids of the associated nodes
self . _node_ids = node_ids
#: The lat/lon of the center of the way (optional depending on query)
self . center_lat = center_lat
self . center_lon = center_lon
def __repr__ ( self ) :
return " <overpy.Way id= {} nodes= {} > " . format ( self . id , self . _node_ids )
@property
def nodes ( self ) :
"""
List of nodes associated with the way .
"""
return self . get_nodes ( )
def get_nodes ( self , resolve_missing = False ) :
"""
Get the nodes defining the geometry of the way
: param resolve_missing : Try to resolve missing nodes .
: type resolve_missing : Boolean
: return : List of nodes
: rtype : List of overpy . Node
: raises overpy . exception . DataIncomplete : At least one referenced node is not available in the result cache .
: raises overpy . exception . DataIncomplete : If resolve_missing is True and at least one node can ' t be resolved.
"""
result = [ ]
resolved = False
for node_id in self . _node_ids :
try :
node = self . _result . get_node ( node_id )
except exception . DataIncomplete :
node = None
if node is not None :
result . append ( node )
continue
if not resolve_missing :
raise exception . DataIncomplete ( " Resolve missing nodes is disabled " )
# We tried to resolve the data but some nodes are still missing
if resolved :
raise exception . DataIncomplete ( " Unable to resolve all nodes " )
query = ( " \n "
" [out:json]; \n "
" way( {way_id} ); \n "
" node(w); \n "
" out body; \n "
)
query = query . format (
way_id = self . id
)
tmp_result = self . _result . api . query ( query )
self . _result . expand ( tmp_result )
resolved = True
try :
node = self . _result . get_node ( node_id )
except exception . DataIncomplete :
node = None
if node is None :
raise exception . DataIncomplete ( " Unable to resolve all nodes " )
result . append ( node )
return result
@classmethod
def from_json ( cls , data , result = None ) :
"""
Create new Way element from JSON data
: param data : Element data from JSON
: type data : Dict
: param result : The result this element belongs to
: type result : overpy . Result
: return : New instance of Way
: rtype : overpy . Way
: raises overpy . exception . ElementDataWrongType : If type value of the passed JSON data does not match .
"""
if data . get ( " type " ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = data . get ( " type " )
)
tags = data . get ( " tags " , { } )
way_id = data . get ( " id " )
node_ids = data . get ( " nodes " )
( center_lat , center_lon ) = cls . get_center_from_json ( data = data )
attributes = { }
ignore = [ " center " , " id " , " nodes " , " tags " , " type " ]
for n , v in data . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
return cls (
attributes = attributes ,
center_lat = center_lat ,
center_lon = center_lon ,
node_ids = node_ids ,
tags = tags ,
result = result ,
way_id = way_id
)
@classmethod
def from_xml ( cls , child , result = None ) :
"""
Create new way element from XML data
: param child : XML node to be parsed
: type child : xml . etree . ElementTree . Element
: param result : The result this node belongs to
: type result : overpy . Result
: return : New Way oject
: rtype : overpy . Way
: raises overpy . exception . ElementDataWrongType : If name of the xml child node doesn ' t match
: raises ValueError : If the ref attribute of the xml node is not provided
: raises ValueError : If a tag doesn ' t have a name
"""
if child . tag . lower ( ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = child . tag . lower ( )
)
tags = { }
node_ids = [ ]
center_lat = None
center_lon = None
for sub_child in child :
if sub_child . tag . lower ( ) == " tag " :
name = sub_child . attrib . get ( " k " )
if name is None :
raise ValueError ( " Tag without name/key. " )
value = sub_child . attrib . get ( " v " )
tags [ name ] = value
if sub_child . tag . lower ( ) == " nd " :
ref_id = sub_child . attrib . get ( " ref " )
if ref_id is None :
raise ValueError ( " Unable to find required ref value. " )
ref_id = int ( ref_id )
node_ids . append ( ref_id )
if sub_child . tag . lower ( ) == " center " :
( center_lat , center_lon ) = cls . get_center_from_xml_dom ( sub_child = sub_child )
way_id = child . attrib . get ( " id " )
if way_id is not None :
way_id = int ( way_id )
attributes = { }
ignore = [ " id " ]
for n , v in child . attrib . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
return cls ( way_id = way_id , center_lat = center_lat , center_lon = center_lon ,
attributes = attributes , node_ids = node_ids , tags = tags , result = result )
class Relation ( Element ) :
"""
Class to represent an element of type relation
"""
_type_value = " relation "
def __init__ ( self , rel_id = None , center_lat = None , center_lon = None , members = None , * * kwargs ) :
"""
: param members :
: param rel_id : Id of the relation element
: type rel_id : Integer
: param kwargs :
: return :
"""
Element . __init__ ( self , * * kwargs )
self . id = rel_id
self . members = members
#: The lat/lon of the center of the way (optional depending on query)
self . center_lat = center_lat
self . center_lon = center_lon
def __repr__ ( self ) :
return " <overpy.Relation id= {} > " . format ( self . id )
@classmethod
def from_json ( cls , data , result = None ) :
"""
Create new Relation element from JSON data
: param data : Element data from JSON
: type data : Dict
: param result : The result this element belongs to
: type result : overpy . Result
: return : New instance of Relation
: rtype : overpy . Relation
: raises overpy . exception . ElementDataWrongType : If type value of the passed JSON data does not match .
"""
if data . get ( " type " ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = data . get ( " type " )
)
tags = data . get ( " tags " , { } )
rel_id = data . get ( " id " )
( center_lat , center_lon ) = cls . get_center_from_json ( data = data )
members = [ ]
supported_members = [ RelationNode , RelationWay , RelationRelation ]
for member in data . get ( " members " , [ ] ) :
type_value = member . get ( " type " )
for member_cls in supported_members :
if member_cls . _type_value == type_value :
members . append (
member_cls . from_json (
member ,
result = result
)
)
attributes = { }
ignore = [ " id " , " members " , " tags " , " type " ]
for n , v in data . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
return cls (
rel_id = rel_id ,
attributes = attributes ,
center_lat = center_lat ,
center_lon = center_lon ,
members = members ,
tags = tags ,
result = result
)
@classmethod
def from_xml ( cls , child , result = None ) :
"""
Create new way element from XML data
: param child : XML node to be parsed
: type child : xml . etree . ElementTree . Element
: param result : The result this node belongs to
: type result : overpy . Result
: return : New Way oject
: rtype : overpy . Relation
: raises overpy . exception . ElementDataWrongType : If name of the xml child node doesn ' t match
: raises ValueError : If a tag doesn ' t have a name
"""
if child . tag . lower ( ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = child . tag . lower ( )
)
tags = { }
members = [ ]
center_lat = None
center_lon = None
supported_members = [ RelationNode , RelationWay , RelationRelation , RelationArea ]
for sub_child in child :
if sub_child . tag . lower ( ) == " tag " :
name = sub_child . attrib . get ( " k " )
if name is None :
raise ValueError ( " Tag without name/key. " )
value = sub_child . attrib . get ( " v " )
tags [ name ] = value
if sub_child . tag . lower ( ) == " member " :
type_value = sub_child . attrib . get ( " type " )
for member_cls in supported_members :
if member_cls . _type_value == type_value :
members . append (
member_cls . from_xml (
sub_child ,
result = result
)
)
if sub_child . tag . lower ( ) == " center " :
( center_lat , center_lon ) = cls . get_center_from_xml_dom ( sub_child = sub_child )
rel_id = child . attrib . get ( " id " )
if rel_id is not None :
rel_id = int ( rel_id )
attributes = { }
ignore = [ " id " ]
for n , v in child . attrib . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
return cls (
rel_id = rel_id ,
attributes = attributes ,
center_lat = center_lat ,
center_lon = center_lon ,
members = members ,
tags = tags ,
result = result
)
class RelationMember ( object ) :
"""
Base class to represent a member of a relation .
"""
def __init__ ( self , attributes = None , geometry = None , ref = None , role = None , result = None ) :
"""
: param ref : Reference Id
: type ref : Integer
: param role : The role of the relation member
: type role : String
: param result :
"""
self . ref = ref
self . _result = result
self . role = role
self . attributes = attributes
self . geometry = geometry
@classmethod
def from_json ( cls , data , result = None ) :
"""
Create new RelationMember element from JSON data
: param child : Element data from JSON
: type child : Dict
: param result : The result this element belongs to
: type result : overpy . Result
: return : New instance of RelationMember
: rtype : overpy . RelationMember
: raises overpy . exception . ElementDataWrongType : If type value of the passed JSON data does not match .
"""
if data . get ( " type " ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = data . get ( " type " )
)
ref = data . get ( " ref " )
role = data . get ( " role " )
attributes = { }
ignore = [ " geometry " , " type " , " ref " , " role " ]
for n , v in data . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
geometry = data . get ( " geometry " )
if isinstance ( geometry , list ) :
geometry_orig = geometry
geometry = [ ]
for v in geometry_orig :
geometry . append (
RelationWayGeometryValue (
lat = v . get ( " lat " ) ,
lon = v . get ( " lon " )
)
)
else :
geometry = None
return cls (
attributes = attributes ,
geometry = geometry ,
ref = ref ,
role = role ,
result = result
)
@classmethod
def from_xml ( cls , child , result = None ) :
"""
Create new RelationMember from XML data
: param child : XML node to be parsed
: type child : xml . etree . ElementTree . Element
: param result : The result this element belongs to
: type result : overpy . Result
: return : New relation member oject
: rtype : overpy . RelationMember
: raises overpy . exception . ElementDataWrongType : If name of the xml child node doesn ' t match
"""
if child . attrib . get ( " type " ) != cls . _type_value :
raise exception . ElementDataWrongType (
type_expected = cls . _type_value ,
type_provided = child . tag . lower ( )
)
ref = child . attrib . get ( " ref " )
if ref is not None :
ref = int ( ref )
role = child . attrib . get ( " role " )
attributes = { }
ignore = [ " geometry " , " ref " , " role " , " type " ]
for n , v in child . attrib . items ( ) :
if n in ignore :
continue
attributes [ n ] = v
geometry = None
for sub_child in child :
if sub_child . tag . lower ( ) == " nd " :
if geometry is None :
geometry = [ ]
geometry . append (
RelationWayGeometryValue (
lat = Decimal ( sub_child . attrib [ " lat " ] ) ,
lon = Decimal ( sub_child . attrib [ " lon " ] )
)
)
return cls (
attributes = attributes ,
geometry = geometry ,
ref = ref ,
role = role ,
result = result
)
class RelationNode ( RelationMember ) :
_type_value = " node "
def resolve ( self , resolve_missing = False ) :
return self . _result . get_node ( self . ref , resolve_missing = resolve_missing )
def __repr__ ( self ) :
return " <overpy.RelationNode ref= {} role= {} > " . format ( self . ref , self . role )
class RelationWay ( RelationMember ) :
_type_value = " way "
def resolve ( self , resolve_missing = False ) :
return self . _result . get_way ( self . ref , resolve_missing = resolve_missing )
def __repr__ ( self ) :
return " <overpy.RelationWay ref= {} role= {} > " . format ( self . ref , self . role )
class RelationWayGeometryValue ( object ) :
def __init__ ( self , lat , lon ) :
self . lat = lat
self . lon = lon
def __repr__ ( self ) :
return " <overpy.RelationWayGeometryValue lat= {} lon= {} > " . format ( self . lat , self . lon )
class RelationRelation ( RelationMember ) :
_type_value = " relation "
def resolve ( self , resolve_missing = False ) :
return self . _result . get_relation ( self . ref , resolve_missing = resolve_missing )
def __repr__ ( self ) :
return " <overpy.RelationRelation ref= {} role= {} > " . format ( self . ref , self . role )
class RelationArea ( RelationMember ) :
_type_value = " area "
def resolve ( self , resolve_missing = False ) :
return self . _result . get_area ( self . ref , resolve_missing = resolve_missing )
def __repr__ ( self ) :
return " <overpy.RelationArea ref= {} role= {} > " . format ( self . ref , self . role )
class OSMSAXHandler ( handler . ContentHandler ) :
"""
SAX parser for Overpass XML response .
"""
#: Tuple of opening elements to ignore
ignore_start = ( ' osm ' , ' meta ' , ' note ' , ' bounds ' , ' remark ' )
#: Tuple of closing elements to ignore
ignore_end = ( ' osm ' , ' meta ' , ' note ' , ' bounds ' , ' remark ' , ' tag ' , ' nd ' , ' center ' )
def __init__ ( self , result ) :
"""
: param result : Append results to this result set .
: type result : overpy . Result
"""
handler . ContentHandler . __init__ ( self )
self . _result = result
self . _curr = { }
#: Current relation member object
self . cur_relation_member = None
def startElement ( self , name , attrs ) :
"""
Handle opening elements .
: param name : Name of the element
: type name : String
: param attrs : Attributes of the element
: type attrs : Dict
"""
if name in self . ignore_start :
return
try :
handler = getattr ( self , ' _handle_start_ %s ' % name )
except AttributeError :
raise KeyError ( " Unknown element start ' %s ' " % name )
handler ( attrs )
def endElement ( self , name ) :
"""
Handle closing elements
: param name : Name of the element
: type name : String
"""
if name in self . ignore_end :
return
try :
handler = getattr ( self , ' _handle_end_ %s ' % name )
except AttributeError :
raise KeyError ( " Unknown element end ' %s ' " % name )
handler ( )
def _handle_start_center ( self , attrs ) :
"""
Handle opening center element
: param attrs : Attributes of the element
: type attrs : Dict
"""
center_lat = attrs . get ( " lat " )
center_lon = attrs . get ( " lon " )
if center_lat is None or center_lon is None :
raise ValueError ( " Unable to get lat or lon of way center. " )
self . _curr [ " center_lat " ] = Decimal ( center_lat )
self . _curr [ " center_lon " ] = Decimal ( center_lon )
def _handle_start_tag ( self , attrs ) :
"""
Handle opening tag element
: param attrs : Attributes of the element
: type attrs : Dict
"""
try :
tag_key = attrs [ ' k ' ]
except KeyError :
raise ValueError ( " Tag without name/key. " )
self . _curr [ ' tags ' ] [ tag_key ] = attrs . get ( ' v ' )
def _handle_start_node ( self , attrs ) :
"""
Handle opening node element
: param attrs : Attributes of the element
: type attrs : Dict
"""
self . _curr = {
' attributes ' : dict ( attrs ) ,
' lat ' : None ,
' lon ' : None ,
' node_id ' : None ,
' tags ' : { }
}
if attrs . get ( ' id ' , None ) is not None :
self . _curr [ ' node_id ' ] = int ( attrs [ ' id ' ] )
del self . _curr [ ' attributes ' ] [ ' id ' ]
if attrs . get ( ' lat ' , None ) is not None :
self . _curr [ ' lat ' ] = Decimal ( attrs [ ' lat ' ] )
del self . _curr [ ' attributes ' ] [ ' lat ' ]
if attrs . get ( ' lon ' , None ) is not None :
self . _curr [ ' lon ' ] = Decimal ( attrs [ ' lon ' ] )
del self . _curr [ ' attributes ' ] [ ' lon ' ]
def _handle_end_node ( self ) :
"""
Handle closing node element
"""
self . _result . append ( Node ( result = self . _result , * * self . _curr ) )
self . _curr = { }
def _handle_start_way ( self , attrs ) :
"""
Handle opening way element
: param attrs : Attributes of the element
: type attrs : Dict
"""
self . _curr = {
' center_lat ' : None ,
' center_lon ' : None ,
' attributes ' : dict ( attrs ) ,
' node_ids ' : [ ] ,
' tags ' : { } ,
' way_id ' : None
}
if attrs . get ( ' id ' , None ) is not None :
self . _curr [ ' way_id ' ] = int ( attrs [ ' id ' ] )
del self . _curr [ ' attributes ' ] [ ' id ' ]
def _handle_end_way ( self ) :
"""
Handle closing way element
"""
self . _result . append ( Way ( result = self . _result , * * self . _curr ) )
self . _curr = { }
def _handle_start_area ( self , attrs ) :
"""
Handle opening area element
: param attrs : Attributes of the element
: type attrs : Dict
"""
self . _curr = {
' attributes ' : dict ( attrs ) ,
' tags ' : { } ,
' area_id ' : None
}
if attrs . get ( ' id ' , None ) is not None :
self . _curr [ ' area_id ' ] = int ( attrs [ ' id ' ] )
del self . _curr [ ' attributes ' ] [ ' id ' ]
def _handle_end_area ( self ) :
"""
Handle closing area element
"""
self . _result . append ( Area ( result = self . _result , * * self . _curr ) )
self . _curr = { }
def _handle_start_nd ( self , attrs ) :
"""
Handle opening nd element
: param attrs : Attributes of the element
: type attrs : Dict
"""
if isinstance ( self . cur_relation_member , RelationWay ) :
if self . cur_relation_member . geometry is None :
self . cur_relation_member . geometry = [ ]
self . cur_relation_member . geometry . append (
RelationWayGeometryValue (
lat = Decimal ( attrs [ " lat " ] ) ,
lon = Decimal ( attrs [ " lon " ] )
)
)
else :
try :
node_ref = attrs [ ' ref ' ]
except KeyError :
raise ValueError ( " Unable to find required ref value. " )
self . _curr [ ' node_ids ' ] . append ( int ( node_ref ) )
def _handle_start_relation ( self , attrs ) :
"""
Handle opening relation element
: param attrs : Attributes of the element
: type attrs : Dict
"""
self . _curr = {
' attributes ' : dict ( attrs ) ,
' members ' : [ ] ,
' rel_id ' : None ,
' tags ' : { }
}
if attrs . get ( ' id ' , None ) is not None :
self . _curr [ ' rel_id ' ] = int ( attrs [ ' id ' ] )
del self . _curr [ ' attributes ' ] [ ' id ' ]
def _handle_end_relation ( self ) :
"""
Handle closing relation element
"""
self . _result . append ( Relation ( result = self . _result , * * self . _curr ) )
self . _curr = { }
def _handle_start_member ( self , attrs ) :
"""
Handle opening member element
: param attrs : Attributes of the element
: type attrs : Dict
"""
params = {
# ToDo: Parse attributes
' attributes ' : { } ,
' ref ' : None ,
' result ' : self . _result ,
' role ' : None
}
if attrs . get ( ' ref ' , None ) :
params [ ' ref ' ] = int ( attrs [ ' ref ' ] )
if attrs . get ( ' role ' , None ) :
params [ ' role ' ] = attrs [ ' role ' ]
cls_map = {
" area " : RelationArea ,
" node " : RelationNode ,
" relation " : RelationRelation ,
" way " : RelationWay
}
cls = cls_map . get ( attrs [ " type " ] )
if cls is None :
raise ValueError ( " Undefined type for member: ' %s ' " % attrs [ ' type ' ] )
self . cur_relation_member = cls ( * * params )
self . _curr [ ' members ' ] . append ( self . cur_relation_member )
def _handle_end_member ( self ) :
self . cur_relation_member = None