Rabbit WIP.

This commit is contained in:
Matthew Blissett 2017-09-04 16:03:17 +02:00
parent 6aaef03c5e
commit dc12747c0f
3 changed files with 157 additions and 68 deletions

113
queue.py Normal file
View File

@ -0,0 +1,113 @@
import server
import pika
from msgpack import packb
import signal
from json import loads as json_loads
import sys
from sys import stderr
mq_i = 0
exiting = False
to_queue_name = os.getenv('TO_QUEUE', 'made-tiles')
failed_queue_name = os.getenv('FAILED_QUEUE', 'failed-tiles')
class Coordinate:
def __init__(self, row, column, zoom):
self.row = row
self.column = column
self.zoom = zoom
def makeTile(ch, method, properties, data):
body = json_loads(data)
coord = Coordinate(body['coord']['row'], body['coord']['col'], body['coord']['zoom'])
path = '%d/%d/%d' % (coord.zoom, coord.column, coord.row)
# Fetch a tile.
rendered = False
global mq_i
global exiting
while not rendered:
try:
content = server.get_mvt(coord.zoom, coord.column, coord.row)
mq_i = mq_i + 1
if (mq_i % 100 == 0):
print('ToMQ', mq_i, coord)
msg = packb({
"zoom": coord.zoom,
"column": coord.column,
"row": coord.row,
"tile": content
})
mq_done_channel.basic_publish(exchange='', routing_key=to_queue_name, body=msg)
except:
# Something went wrong: try again? Log the error?
print('Exception')
global exiting
if exiting:
print('Exiting.')
ch.close()
break
print('Failed', coord, file=sys.stderr)
global dud_channel
dud_channel.basic_publish(exchange='', routing_key=failed_queue_name, body=data)
ch.basic_nack(delivery_tag = method.delivery_tag, requeue=False)
# MAX_ERRORS?
#if not error_list:
raise
#break
else:
# Successfully got the tile.
rendered = True
ch.basic_ack(delivery_tag = method.delivery_tag)
def exit_handler(signal, frame):
global exiting
exiting = True
global channel
global consumer_tag
print('You pressed Ctrl+C!')
channel.basic_cancel(consumer_tag=consumer_tag)
channel.close()
sys.exit(0)
def m():
if __name__ == "__main__":
mq_credentials = pika.PlainCredentials(os.getenv('MQ_USER','mblissett'), os.getenv('MQ_PASSWORD','mblissett'))
mq_connection = pika.BlockingConnection(pika.ConnectionParameters(os.getenv('MQ_HOST','mq.gbif.org'), os.getenv('MQ_PORT','5672'), os.getenv('MQ_VHOST','/users/mblissett'), mq_credentials))
global mq_done_channel
mq_done_channel = mq_connection.channel()
mq_done_channel.queue_declare(queue=os.getenv('TO_QUEUE','made-tiles'))
global mq_dud_channel
mq_dud_channel = mq_connection.channel()
mq_dud_channel.queue_declare(queue=os.getenv('FAILED_QUEUE','failed-tiles'))
global mq_channel
mq_channel = mq_connection.channel()
mq_channel.queue_declare(queue=os.getenv('FROM_QUEUE','do-tiles'))
mq_channel.basic_qos(prefetch_count=50)
global consumer_tag
consumer_tag = mq_channel.basic_consume(makeTile, queue=os.getenv('FROM_QUEUE','do-tiles'))
signal.signal(signal.SIGINT, exit_handler)
print('Waiting for messages. To exit press CTRL+C')
mq_channel.start_consuming()
m()

View File

@ -4,3 +4,5 @@ mercantile==0.9.0
pyproj==1.9.5.1
pyyaml==3.12
psycopg2==2.6.2
pika
msgpack-python

110
server.py
View File

@ -49,6 +49,13 @@ def GeneratePrepared(layers):
layers = GetTM2Source("/mapping/data.yml")
prepared = GeneratePrepared(layers)
connection_string = 'postgresql://'+os.getenv('POSTGRES_USER','openmaptiles')+':'+os.getenv('POSTGRES_PASSWORD','openmaptiles')+'@'+os.getenv('POSTGRES_HOST','postgres')+':'+os.getenv('POSTGRES_PORT','5432')+'/'+os.getenv('POSTGRES_DB','openmaptiles')
engine = create_engine(connection_string)
inspector = inspect(engine)
DBSession = sessionmaker(bind=engine)
session = DBSession()
print("Running prepare statement")
session.execute(prepared)
def bounds(zoom,x,y,buff):
#inProj = pyproj.Proj(init='epsg:3575')
@ -111,79 +118,45 @@ def replace_tokens(query,tilebounds,buffered_tilebounds,scale_denom,z):
return start
def get_mvt(zoom,x,y):
try: # Sanitize the inputs
sani_zoom,sani_x,sani_y = float(zoom),float(x),float(y)
del zoom,x,y
except:
print('suspicious')
return 1
scale_denom = zoom_to_scale_denom(sani_zoom)
tilebounds = bounds(sani_zoom,sani_x,sani_y,0)
buffered_tilebounds = []
chunk = ""
for buffer_size in buffer_sizes:
t = bounds(sani_zoom,sani_x,sani_y,int(buffer_size)/512.0)
buffered_tilebounds.append(t)
chunk = chunk + ", !bbox_"+buffer_size+"_buffer!"
final_query = "EXECUTE gettile(!bbox!, !scale_denominator!, !pixel_width!, !pixel_height!"+chunk+");"
sent_query = replace_tokens(final_query,tilebounds,buffered_tilebounds,scale_denom,sani_zoom)
print('Final query', sent_query)
response = list(session.execute(sent_query))
print (sani_zoom, sani_x, sani_y)
print(sent_query)
layers = filter(None,list(itertools.chain.from_iterable(response)))
final_tile = b''
for layer in layers:
final_tile = final_tile + io.BytesIO(layer).getvalue()
return final_tile
class GetTile(tornado.web.RequestHandler):
def get(self, zoom,x,y):
engine = create_engine('postgresql://'+os.getenv('POSTGRES_USER','openmaptiles')+':'+os.getenv('POSTGRES_PASSWORD','openmaptiles')+'@'+os.getenv('POSTGRES_HOST','postgres')+':'+os.getenv('POSTGRES_PORT','5432')+'/'+os.getenv('POSTGRES_DB','openmaptiles'))
inspector = inspect(engine)
DBSession = sessionmaker(bind=engine)
self.session = DBSession()
print("Running prepare statement")
self.session.execute(prepared)
self.set_header("Content-Type", "application/x-protobuf")
self.set_header("Content-Disposition", "attachment")
self.set_header("Access-Control-Allow-Origin", "*")
response = self.get_mvt(zoom,x,y)
self.session.close()
#self.write(response)
def get_mvt(self, zoom,x,y):
try: # Sanitize the inputs
sani_zoom,sani_x,sani_y = float(zoom),float(x),float(y)
del zoom,x,y
except:
print('suspicious')
return 1
scale_denom = zoom_to_scale_denom(sani_zoom)
tilebounds = bounds(sani_zoom,sani_x,sani_y,0)
#s_nobuffer,w_nobuffer,n_nobuffer,e_nobuffer = str(tilebounds_nobuffer['s']),str(tilebounds_nobuffer['w']),str(tilebounds_nobuffer['n']),str(tilebounds_nobuffer['e'])
#tilebounds = bounds(sani_zoom,sani_x,sani_y,0.50)
#s,w,n,e = str(tilebounds['s']),str(tilebounds['w']),str(tilebounds['n']),str(tilebounds['e'])
buffered_tilebounds = []
chunk = ""
for buffer_size in buffer_sizes:
t = bounds(sani_zoom,sani_x,sani_y,int(buffer_size)/512.0)
buffered_tilebounds.append(t)
chunk = chunk + ", !bbox_"+buffer_size+"_buffer!"
final_query = "EXECUTE gettile(!bbox!, !scale_denominator!, !pixel_width!, !pixel_height!"+chunk+");"
sent_query = replace_tokens(final_query,tilebounds,buffered_tilebounds,scale_denom,sani_zoom)
print('Final query', sent_query)
response = list(self.session.execute(sent_query))
print (sani_zoom, sani_x, sani_y)
print(sent_query)
layers = filter(None,list(itertools.chain.from_iterable(response)))
final_tile = b''
for layer in layers:
final_tile = final_tile + io.BytesIO(layer).getvalue()
self.write(final_tile)
#return final_tile
# Buffers
# aeroway.yaml: buffer_size: 4
# boundary.yaml: buffer_size: 4
# building.yaml: buffer_size: 4
# landcover.yaml: buffer_size: 4
# landuse.yaml: buffer_size: 4
# park.yaml: buffer_size: 4
# transportation.yaml: buffer_size: 4
# water.yaml: buffer_size: 4
# waterway.yaml: buffer_size: 4
# housenumber.yaml: buffer_size: 8
# transportation_name.yaml: buffer_size: 8
# graticules.yaml: buffer_size: 64
# mountain_peak.yaml: buffer_size: 64
# poi.yaml: buffer_size: 64
# place.yaml: buffer_size: 256
# water_name.yaml: buffer_size: 256
# Need 4/256, 8/256, 64/256 and 256/256
response = get_mvt(zoom,x,y)
self.write(response)
def m():
if __name__ == "__main__":
@ -195,9 +168,10 @@ def m():
server = tornado.httpserver.HTTPServer(application)
server.bind(8080)
server.start(0)
server.start(1)
print("Postserve started..")
#application.listen(8080)
tornado.ioloop.IOLoop.instance().start()
m()