From dc12747c0f94cb421a2dea298b813d30306c82d7 Mon Sep 17 00:00:00 2001 From: Matthew Blissett Date: Mon, 4 Sep 2017 16:03:17 +0200 Subject: [PATCH] Rabbit WIP. --- queue.py | 113 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 + server.py | 110 ++++++++++++++++++--------------------------- 3 files changed, 157 insertions(+), 68 deletions(-) create mode 100644 queue.py diff --git a/queue.py b/queue.py new file mode 100644 index 0000000..0932778 --- /dev/null +++ b/queue.py @@ -0,0 +1,113 @@ +import server + +import pika +from msgpack import packb +import signal + +from json import loads as json_loads + +import sys +from sys import stderr + +mq_i = 0 +exiting = False + +to_queue_name = os.getenv('TO_QUEUE', 'made-tiles') +failed_queue_name = os.getenv('FAILED_QUEUE', 'failed-tiles') + +class Coordinate: + def __init__(self, row, column, zoom): + self.row = row + self.column = column + self.zoom = zoom + +def makeTile(ch, method, properties, data): + body = json_loads(data) + coord = Coordinate(body['coord']['row'], body['coord']['col'], body['coord']['zoom']) + path = '%d/%d/%d' % (coord.zoom, coord.column, coord.row) + + # Fetch a tile. + rendered = False + + global mq_i + global exiting + + while not rendered: + try: + content = server.get_mvt(coord.zoom, coord.column, coord.row) + + mq_i = mq_i + 1 + if (mq_i % 100 == 0): + print('ToMQ', mq_i, coord) + + msg = packb({ + "zoom": coord.zoom, + "column": coord.column, + "row": coord.row, + "tile": content + }) + + mq_done_channel.basic_publish(exchange='', routing_key=to_queue_name, body=msg) + + except: + # Something went wrong: try again? Log the error? + + print('Exception') + global exiting + if exiting: + print('Exiting.') + ch.close() + break + + print('Failed', coord, file=sys.stderr) + global dud_channel + dud_channel.basic_publish(exchange='', routing_key=failed_queue_name, body=data) + ch.basic_nack(delivery_tag = method.delivery_tag, requeue=False) + # MAX_ERRORS? + #if not error_list: + raise + #break + + else: + # Successfully got the tile. + rendered = True + ch.basic_ack(delivery_tag = method.delivery_tag) + +def exit_handler(signal, frame): + global exiting + exiting = True + global channel + global consumer_tag + print('You pressed Ctrl+C!') + channel.basic_cancel(consumer_tag=consumer_tag) + channel.close() + + sys.exit(0) + + +def m(): + if __name__ == "__main__": + mq_credentials = pika.PlainCredentials(os.getenv('MQ_USER','mblissett'), os.getenv('MQ_PASSWORD','mblissett')) + mq_connection = pika.BlockingConnection(pika.ConnectionParameters(os.getenv('MQ_HOST','mq.gbif.org'), os.getenv('MQ_PORT','5672'), os.getenv('MQ_VHOST','/users/mblissett'), mq_credentials)) + + global mq_done_channel + mq_done_channel = mq_connection.channel() + mq_done_channel.queue_declare(queue=os.getenv('TO_QUEUE','made-tiles')) + + global mq_dud_channel + mq_dud_channel = mq_connection.channel() + mq_dud_channel.queue_declare(queue=os.getenv('FAILED_QUEUE','failed-tiles')) + + global mq_channel + mq_channel = mq_connection.channel() + mq_channel.queue_declare(queue=os.getenv('FROM_QUEUE','do-tiles')) + mq_channel.basic_qos(prefetch_count=50) + global consumer_tag + consumer_tag = mq_channel.basic_consume(makeTile, queue=os.getenv('FROM_QUEUE','do-tiles')) + + signal.signal(signal.SIGINT, exit_handler) + + print('Waiting for messages. To exit press CTRL+C') + mq_channel.start_consuming() + +m() diff --git a/requirements.txt b/requirements.txt index bb95b17..fc0f2f4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,5 @@ mercantile==0.9.0 pyproj==1.9.5.1 pyyaml==3.12 psycopg2==2.6.2 +pika +msgpack-python diff --git a/server.py b/server.py index 13a6e20..b4e7334 100644 --- a/server.py +++ b/server.py @@ -49,6 +49,13 @@ def GeneratePrepared(layers): layers = GetTM2Source("/mapping/data.yml") prepared = GeneratePrepared(layers) +connection_string = 'postgresql://'+os.getenv('POSTGRES_USER','openmaptiles')+':'+os.getenv('POSTGRES_PASSWORD','openmaptiles')+'@'+os.getenv('POSTGRES_HOST','postgres')+':'+os.getenv('POSTGRES_PORT','5432')+'/'+os.getenv('POSTGRES_DB','openmaptiles') +engine = create_engine(connection_string) +inspector = inspect(engine) +DBSession = sessionmaker(bind=engine) +session = DBSession() +print("Running prepare statement") +session.execute(prepared) def bounds(zoom,x,y,buff): #inProj = pyproj.Proj(init='epsg:3575') @@ -111,79 +118,45 @@ def replace_tokens(query,tilebounds,buffered_tilebounds,scale_denom,z): return start +def get_mvt(zoom,x,y): + try: # Sanitize the inputs + sani_zoom,sani_x,sani_y = float(zoom),float(x),float(y) + del zoom,x,y + except: + print('suspicious') + return 1 + + scale_denom = zoom_to_scale_denom(sani_zoom) + tilebounds = bounds(sani_zoom,sani_x,sani_y,0) + + buffered_tilebounds = [] + chunk = "" + for buffer_size in buffer_sizes: + t = bounds(sani_zoom,sani_x,sani_y,int(buffer_size)/512.0) + buffered_tilebounds.append(t) + + chunk = chunk + ", !bbox_"+buffer_size+"_buffer!" + + final_query = "EXECUTE gettile(!bbox!, !scale_denominator!, !pixel_width!, !pixel_height!"+chunk+");" + sent_query = replace_tokens(final_query,tilebounds,buffered_tilebounds,scale_denom,sani_zoom) + print('Final query', sent_query) + response = list(session.execute(sent_query)) + print (sani_zoom, sani_x, sani_y) + print(sent_query) + layers = filter(None,list(itertools.chain.from_iterable(response))) + final_tile = b'' + for layer in layers: + final_tile = final_tile + io.BytesIO(layer).getvalue() + return final_tile + class GetTile(tornado.web.RequestHandler): def get(self, zoom,x,y): - engine = create_engine('postgresql://'+os.getenv('POSTGRES_USER','openmaptiles')+':'+os.getenv('POSTGRES_PASSWORD','openmaptiles')+'@'+os.getenv('POSTGRES_HOST','postgres')+':'+os.getenv('POSTGRES_PORT','5432')+'/'+os.getenv('POSTGRES_DB','openmaptiles')) - inspector = inspect(engine) - DBSession = sessionmaker(bind=engine) - self.session = DBSession() - print("Running prepare statement") - self.session.execute(prepared) - self.set_header("Content-Type", "application/x-protobuf") self.set_header("Content-Disposition", "attachment") self.set_header("Access-Control-Allow-Origin", "*") - response = self.get_mvt(zoom,x,y) - - self.session.close() - - #self.write(response) - - def get_mvt(self, zoom,x,y): - try: # Sanitize the inputs - sani_zoom,sani_x,sani_y = float(zoom),float(x),float(y) - del zoom,x,y - except: - print('suspicious') - return 1 - - scale_denom = zoom_to_scale_denom(sani_zoom) - tilebounds = bounds(sani_zoom,sani_x,sani_y,0) - #s_nobuffer,w_nobuffer,n_nobuffer,e_nobuffer = str(tilebounds_nobuffer['s']),str(tilebounds_nobuffer['w']),str(tilebounds_nobuffer['n']),str(tilebounds_nobuffer['e']) - - #tilebounds = bounds(sani_zoom,sani_x,sani_y,0.50) - #s,w,n,e = str(tilebounds['s']),str(tilebounds['w']),str(tilebounds['n']),str(tilebounds['e']) - - buffered_tilebounds = [] - chunk = "" - for buffer_size in buffer_sizes: - t = bounds(sani_zoom,sani_x,sani_y,int(buffer_size)/512.0) - buffered_tilebounds.append(t) - - chunk = chunk + ", !bbox_"+buffer_size+"_buffer!" - - final_query = "EXECUTE gettile(!bbox!, !scale_denominator!, !pixel_width!, !pixel_height!"+chunk+");" - sent_query = replace_tokens(final_query,tilebounds,buffered_tilebounds,scale_denom,sani_zoom) - print('Final query', sent_query) - response = list(self.session.execute(sent_query)) - print (sani_zoom, sani_x, sani_y) - print(sent_query) - layers = filter(None,list(itertools.chain.from_iterable(response))) - final_tile = b'' - for layer in layers: - final_tile = final_tile + io.BytesIO(layer).getvalue() - self.write(final_tile) - #return final_tile - -# Buffers -# aeroway.yaml: buffer_size: 4 -# boundary.yaml: buffer_size: 4 -# building.yaml: buffer_size: 4 -# landcover.yaml: buffer_size: 4 -# landuse.yaml: buffer_size: 4 -# park.yaml: buffer_size: 4 -# transportation.yaml: buffer_size: 4 -# water.yaml: buffer_size: 4 -# waterway.yaml: buffer_size: 4 -# housenumber.yaml: buffer_size: 8 -# transportation_name.yaml: buffer_size: 8 -# graticules.yaml: buffer_size: 64 -# mountain_peak.yaml: buffer_size: 64 -# poi.yaml: buffer_size: 64 -# place.yaml: buffer_size: 256 -# water_name.yaml: buffer_size: 256 -# Need 4/256, 8/256, 64/256 and 256/256 + response = get_mvt(zoom,x,y) + self.write(response) def m(): if __name__ == "__main__": @@ -195,9 +168,10 @@ def m(): server = tornado.httpserver.HTTPServer(application) server.bind(8080) - server.start(0) + server.start(1) print("Postserve started..") #application.listen(8080) + tornado.ioloop.IOLoop.instance().start() m()