2025-11-15 19:51:14 -07:00
#!/usr/bin/python3
if __name__ == " __main__ " :
print ( " Address Database Builder 2025 " )
print ( " Starting up... " )
import argparse , csv , zipfile , gzip , os , re , json , traceback , sys , multiprocessing
import concurrent . futures
from collections import deque
import pandas as pd
import dask . dataframe as dd
import gc
from multiprocessing import get_context
import sqlite3
from src . addressfunctions import normalizeAddress
from src . constants import ValidationException
import src . config
maxthreads = multiprocessing . cpu_count ( )
MAX_IN_FLIGHT = maxthreads * 2
os . environ [ " OPENBLAS_MAIN_FREE " ] = " 1 "
writelock = multiprocessing . Lock ( )
badcount = 0
skippedcount = 0
countrycode = " US "
def init_worker ( cfg : src . config . AppConfig ) :
src . config . set_config ( cfg )
def fixLatLon ( filepath ) :
cfg = src . config . get_config ( )
print ( " Repairing flipped latitude/longitude pairs in " + filepath )
fixedcount = 0
df = pd . read_csv ( filepath , keep_default_na = False , dtype = " str " )
skipstates = ( " VI " , " AK " , " HI " , " PR " )
for index , row in df . iterrows ( ) :
row . latitude = float ( row . latitude )
row . longitude = float ( row . longitude )
if row . latitude < - 90 or row . latitude > 90 :
df . at [ index , " latitude " ] , df . at [ index , " longitude " ] = row . longitude , row . latitude
fixedcount = fixedcount + 1
elif cfg . countryCode == " US " and row . state not in skipstates and ( row . longitude < - 171.791110603 or row . longitude > - 66.96466 ) :
df . at [ index , " latitude " ] , df . at [ index , " longitude " ] = row . longitude , row . latitude
fixedcount = fixedcount + 1
elif cfg . countryCode == " US " and row . state not in skipstates and ( row . latitude < 18.91619 or row . latitude > 71.3577635769 ) :
df . at [ index , " latitude " ] , df . at [ index , " longitude " ] = row . longitude , row . latitude
fixedcount = fixedcount + 1
df . to_csv ( filepath + " .coordfix.csv " , mode = ' a ' , index = False , header = not os . path . exists ( filepath + " .coordfix.csv " ) )
print ( " \n Done flipping " + filepath + " ! Fixed " + str ( fixedcount ) + " records. " )
def normalize ( number , street , street2 , city , state , zipcode , latitude , longitude , zipprefix = False , plus4 = " " , county = False ) :
cfg = src . config . get_config ( )
street1 = street
if len ( city ) > 4 and street1 . endswith ( " " + city ) :
# City name leaked into street field (Albany County Wyoming, for one)
street1 = street1 . removesuffix ( " " + city )
2025-11-20 20:48:07 -07:00
if cfg . verbose :
print ( " Starting to normalize address: " )
print ( " " , number , street1 , street2 , city , state , zipcode , round ( float ( latitude ) , 7 ) , round ( float ( longitude ) , 7 ) , plus4 , county )
2025-11-15 19:51:14 -07:00
addr = normalizeAddress ( number , street1 , street2 , city , state , zipcode , round ( float ( latitude ) , 7 ) , round ( float ( longitude ) , 7 ) , zipprefix , plus4 , county )
if len ( addr [ ' zip ' ] or " " ) < 5 or len ( addr [ ' plus4 ' ] or " " ) != 4 :
2025-11-20 20:48:07 -07:00
if cfg . verbose :
print ( " Address didn ' t match to a full ZIP+4 code. Trying more things. " )
print ( " " , re . sub ( " [^0-9] " , " " , addr [ ' number ' ] ) , addr [ ' street ' ] , addr [ ' unit ' ] , " " , addr [ ' state ' ] , addr [ ' zip ' ] )
2025-11-15 19:51:14 -07:00
# Try removing letters from address numbers, and ignore city field
2025-11-20 20:48:07 -07:00
addrstrip = normalizeAddress ( re . sub ( " [^0-9] " , " " , addr [ ' number ' ] ) , addr [ ' street ' ] , addr [ ' unit ' ] , " " , addr [ ' state ' ] , addr [ ' zip ' ] , addr [ ' latitude ' ] , addr [ ' longitude ' ] , False , addr [ ' plus4 ' ] , county )
2025-11-15 19:51:14 -07:00
# Use libpostal to analyze address deeper
if cfg . advancedMode and len ( addrstrip [ ' zip ' ] or " " ) < 5 or len ( addrstrip [ ' plus4 ' ] or " " ) != 4 :
try :
2025-11-20 20:48:07 -07:00
if cfg . verbose :
print ( " Using libpostal to break down and analyze address. " )
print ( " " , addrstrip )
from src . advancedparsing import advancedNormalize
2025-11-15 19:51:14 -07:00
addr = advancedNormalize ( addr [ ' number ' ] , addr [ ' street ' ] , addr [ ' unit ' ] , addr [ ' city ' ] , addr [ ' state ' ] , addr [ ' zip ' ] , addr [ ' latitude ' ] , addr [ ' longitude ' ] , False , addr [ ' plus4 ' ] , county )
except Exception as e :
2025-11-20 20:48:07 -07:00
if cfg . verbose :
print ( " libpostal crashed. " )
raise e
2025-11-15 19:51:14 -07:00
pass
# Do another normalize pass for good luck (maybe the previous one got the ZIP and now we can get the +4)
if len ( addr [ ' zip ' ] or " " ) < 5 or len ( addr [ ' plus4 ' ] or " " ) != 4 :
2025-11-20 20:48:07 -07:00
if cfg . verbose :
print ( " Doing a final normalization attempt after libpostal. " )
print ( " " , addr )
2025-11-15 19:51:14 -07:00
addr = normalizeAddress ( addr [ ' number ' ] , addr [ ' street ' ] , addr [ ' unit ' ] , addr [ ' city ' ] , addr [ ' state ' ] , addr [ ' zip ' ] , addr [ ' latitude ' ] , addr [ ' longitude ' ] , False , addr [ ' plus4 ' ] , county )
else :
addr = addrstrip
2025-11-20 20:48:07 -07:00
if cfg . verbose :
print ( " Final result after normalization: " )
print ( " " , addr )
2025-11-15 19:51:14 -07:00
return addr
def processOwnChunk ( chunk , chunkcount , outfilename , ignorestates , keeponlystates ) :
global badcount , skippedcount , writelock
cfg = src . config . get_config ( )
data = [ ]
print ( " " + str ( chunkcount ) + " " , end = " \r " , flush = True )
for index , row in chunk . iterrows ( ) :
if row . state in ignorestates :
skippedcount = skippedcount + 1
continue
if keeponlystates != [ ] and row . state not in keeponlystates :
skippedcount = skippedcount + 1
continue
try :
if not cfg . noSkip4 and len ( row . plus4 or " " ) == 4 :
addr = {
" number " : row . number ,
" street " : row . street ,
" unit " : row . street2 ,
" city " : row . city ,
" state " : row . state ,
" zip " : row . zip ,
" plus4 " : row . plus4 ,
" latitude " : round ( float ( row . latitude ) , 7 ) ,
" longitude " : round ( float ( row . longitude ) , 7 )
}
else :
addr = normalize ( row . number , row . street , row . street2 , row . city , row . state , row . zip , round ( float ( row . latitude ) , 7 ) , round ( float ( row . longitude ) , 7 ) , False , row . plus4 )
if addr [ " state " ] in ignorestates :
skippedcount = skippedcount + 1
continue
data . append ( [ addr [ ' number ' ] , addr [ ' street ' ] , addr [ ' unit ' ] , addr [ ' city ' ] , addr [ ' state ' ] , addr [ ' zip ' ] , addr [ ' plus4 ' ] , addr [ ' latitude ' ] , addr [ ' longitude ' ] , row . source ] )
except ValidationException as e :
badcount = badcount + 1
2025-11-28 23:39:56 -07:00
except KeyboardInterrupt :
os . _exit ( 0 )
2025-11-15 19:51:14 -07:00
except Exception as e :
print ( " W: Couldn ' t ingest address: " )
print ( row )
traceback . print_exc ( )
badcount = badcount + 1
out = pd . DataFrame ( data = data , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
with writelock :
out . to_csv ( outfilename , mode = ' a ' , index = False , header = not os . path . exists ( outfilename ) , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
gc . collect ( )
def importOwnFile ( filename , outfilename , ignorestates , keeponlystates ) :
global badcount , skippedcount , writelock
print ( " Processing addresses from " + filename )
columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ]
file = filename
chunkcount = 0
badcount = 0
skippedcount = 0
chunksize = 1000
in_flight = set ( )
with concurrent . futures . ProcessPoolExecutor ( max_workers = maxthreads , max_tasks_per_child = 100 , initializer = init_worker , initargs = ( cfg , ) ) as executor :
2025-11-28 23:39:56 -07:00
try :
for chunk in pd . read_csv ( file , chunksize = chunksize , usecols = columns , keep_default_na = False , dtype = {
" number " : " string " , " street " : " string " ,
" street2 " : " string " , " city " : " string " ,
" state " : " category " , " zip " : " string " ,
" plus4 " : " string " ,
" latitude " : " float32 " , " longitude " : " float32 " ,
" source " : " category " } , dtype_backend = " pyarrow " ) :
while len ( in_flight ) > = MAX_IN_FLIGHT :
done , in_flight = concurrent . futures . wait ( in_flight , return_when = concurrent . futures . FIRST_COMPLETED )
for fut in done :
fut . result ( )
fut = executor . submit ( processOwnChunk , chunk , chunkcount * chunksize , outfilename , ignorestates , keeponlystates )
in_flight . add ( fut )
chunkcount = chunkcount + 1
2025-11-15 19:51:14 -07:00
2025-11-28 23:39:56 -07:00
for fut in concurrent . futures . as_completed ( in_flight ) :
fut . result ( )
except KeyboardInterrupt :
print ( " \n Ctrl-C, exiting! " )
executor . shutdown ( cancel_futures = True )
sys . exit ( 0 )
2025-11-15 19:51:14 -07:00
print ( " \n Done processing! Parsed " + str ( chunkcount ) + " chunks. " )
print ( " There were " + str ( badcount ) + " unprocessable addresses. " )
if ignorestates :
print ( " There were " + str ( skippedcount ) + " addresses ignored due to your --ignorestates setting. " )
print ( " Saved to output file " + outfilename )
def processNadChunk ( chunk , chunkcount , outfilename , ignorestates , keeponlystates ) :
global badcount , skippedcount , writelock
print ( " " + str ( chunkcount ) + " " , end = " \r " , flush = True )
data = [ ]
for index , row in chunk . iterrows ( ) :
if row . State . upper ( ) in ignorestates :
skippedcount = skippedcount + 1
continue
if keeponlystates != [ ] and row . State . upper ( ) not in keeponlystates :
skippedcount = skippedcount + 1
continue
try :
town = row . Inc_Muni
if town == " Unincorporated " :
town = " "
if not town :
town = row . Post_City
if not town :
town = row . Uninc_Comm
addr = normalize ( row . AddNo_Full , row . StNam_Full , row . SubAddress , row . Inc_Muni , row . State , row . Zip_Code , round ( float ( row . Latitude ) , 7 ) , round ( float ( row . Longitude ) , 7 ) )
if addr [ " state " ] in ignorestates : # For example, AR's data claims to have MO addresses but the ZIP says they're in AR, so the first pass of this won't catch those
skippedcount = skippedcount + 1
continue
source = row . NAD_Source
source = source . replace ( " State of " , " " )
source = " NAD " + source
data . append ( [ addr [ ' number ' ] , addr [ ' street ' ] , addr [ ' unit ' ] , addr [ ' city ' ] , addr [ ' state ' ] , addr [ ' zip ' ] , addr [ ' plus4 ' ] , addr [ ' latitude ' ] , addr [ ' longitude ' ] , source ] )
except ValidationException as e :
badcount = badcount + 1
except Exception as e :
print ( " W: Couldn ' t ingest address: " )
print ( row )
traceback . print_exc ( )
badcount = badcount + 1
out = pd . DataFrame ( data = data , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
with writelock :
out . to_csv ( outfilename , mode = ' a ' , index = False , header = not os . path . exists ( outfilename ) , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
gc . collect ( )
def importNadFile ( filename , outfilename , ignorestates , keeponlystates , startatline ) :
global skippedcount , badcount
print ( " Importing National Address Database addresses from " + filename )
if startatline > 0 :
print ( " Skipping to line number " + str ( startatline ) )
columns = [
" AddNo_Full " ,
" StNam_Full " ,
" St_PreMod " ,
" St_PreDir " ,
" St_Name " ,
" SubAddress " ,
" Inc_Muni " ,
" Post_City " ,
" Uninc_Comm " ,
" Urbnztn_PR " ,
" State " ,
" Zip_Code " ,
" UUID " ,
" Longitude " ,
" Latitude " ,
" DateUpdate " ,
" NAD_Source " ,
]
file = filename
if filename . endswith ( " .zip " ) :
zf = zipfile . ZipFile ( filename , mode = " r " )
zipFiles = zf . namelist ( )
for fname in zipFiles :
if fname . upper ( ) . startswith ( " TXT/NAD " ) and fname . upper ( ) . endswith ( " .TXT " ) :
file = zf . open ( fname , mode = " r " , force_zip64 = True )
break
chunkcount = 0
chunksize = 1000
in_flight = set ( )
with concurrent . futures . ProcessPoolExecutor ( max_workers = maxthreads , mp_context = get_context ( " spawn " ) , max_tasks_per_child = 100 , initializer = init_worker , initargs = ( cfg , ) ) as executor :
for chunk in pd . read_csv ( file , chunksize = chunksize , header = 0 , skiprows = lambda i : 1 < = i < = startatline , usecols = columns , keep_default_na = False , dtype = {
" State " : " category " , " NAD_Source " : " category " ,
" Zip_Code " : " string " , " UUID " : " string " ,
" AddNo_Full " : " string " , " StNam_Full " : " string " , " St_PreMod " : " string " ,
" St_PreDir " : " string " , " St_Name " : " string " , " SubAddress " : " string " ,
" Inc_Muni " : " string " , " Post_City " : " string " , " Uninc_Comm " : " string " ,
" Urbnztn_PR " : " string " , " Longitude " : " float32 " , " Latitude " : " float32 " ,
" DateUpdate " : " string " } , dtype_backend = " pyarrow " ) :
while len ( in_flight ) > = MAX_IN_FLIGHT :
done , in_flight = concurrent . futures . wait ( in_flight , return_when = concurrent . futures . FIRST_COMPLETED )
for fut in done :
fut . result ( )
fut = executor . submit ( processNadChunk , chunk , chunkcount * chunksize , outfilename , ignorestates , keeponlystates )
in_flight . add ( fut )
chunkcount = chunkcount + 1
for fut in concurrent . futures . as_completed ( in_flight ) :
fut . result ( )
print ( " \n Done importing NAD! Processed " + str ( chunkcount ) + " chunks of " + str ( chunksize ) + " rows. " )
print ( " There were " + str ( badcount ) + " unprocessable addresses. " )
if ignorestates :
print ( " There were " + str ( skippedcount ) + " addresses ignored due to your --ignorestates setting. " )
print ( " Saved to output file " + outfilename )
def processOpenAddressRows ( rows , startindex , outfilename , ignorestates , source , stateOverride , zipprefix , citySuggestion , county = False ) :
global badcount , skippedcount , writelock
print ( " " + str ( startindex ) + " " , end = " \r " , flush = True )
linecount = 0
outdata = [ ]
emptylinecount = 0
for line in rows :
linecount = linecount + 1
try :
data = json . loads ( line )
if not data [ " properties " ] [ " number " ] and not data [ " properties " ] [ " street " ] :
emptylinecount = emptylinecount + 1
if not data [ " geometry " ] or not data [ " geometry " ] [ " coordinates " ] [ 0 ] or not data [ " geometry " ] [ " coordinates " ] [ 1 ] :
emptylinecount = emptylinecount + 1
state = data [ " properties " ] [ " region " ] . upper ( )
city = data [ " properties " ] [ " city " ] . upper ( ) . strip ( )
if stateOverride :
state = stateOverride
if state in ignorestates :
skippedcount = skippedcount + 1
continue
if data [ " geometry " ] is None :
badcount = badcount + 1
continue
if not data [ " properties " ] [ " number " ] or not data [ " properties " ] [ " street " ] or data [ " properties " ] [ " number " ] == " 0 " :
badcount = badcount + 1
continue
if citySuggestion and not city :
city = citySuggestion
if source == " OA/hawaii " and re . match ( r " ^[1-9][1-9][0-9] {4} " , data [ " properties " ] [ " number " ] ) :
# Source is broken/missing, and the last good version has the house numbers without dashes
# Hawaii has a specific and unique address numbering system
data [ " properties " ] [ " number " ] = data [ " properties " ] [ " number " ] [ : 2 ] + " - " + data [ " properties " ] [ " number " ] [ 2 : ]
addr = normalize ( data [ " properties " ] [ " number " ] , data [ " properties " ] [ " street " ] , data [ " properties " ] [ " unit " ] , city , state , data [ " properties " ] [ " postcode " ] , data [ " geometry " ] [ " coordinates " ] [ 1 ] , data [ " geometry " ] [ " coordinates " ] [ 0 ] , zipprefix , " " , county )
if addr [ " state " ] in ignorestates :
skippedcount = skippedcount + 1
continue
if addr [ " street " ] == " " :
badcount = badcount + 1
continue
if not source :
source = " OA/ " + addr [ " state " ]
outdata . append ( [ addr [ ' number ' ] , addr [ ' street ' ] , addr [ ' unit ' ] , addr [ ' city ' ] , addr [ ' state ' ] , addr [ ' zip ' ] , addr [ ' plus4 ' ] , addr [ ' latitude ' ] , addr [ ' longitude ' ] , source ] )
except ValidationException as e :
badcount = badcount + 1
except Exception as e :
traceback . print_exc ( )
print ( " Error encountered while processing " , line )
badcount = badcount + 1
if linecount > 0 and emptylinecount / linecount > .95 :
print ( " \n Warning: Empty chunk! " + str ( emptylinecount ) + " of " + str ( linecount ) + " rows had no address. " )
out = pd . DataFrame ( data = outdata , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
with writelock :
out . to_csv ( outfilename , mode = ' a ' , index = False , header = not os . path . exists ( outfilename ) , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
gc . collect ( )
def importOpenAddressFile ( filepath , outfilename , ignorestates , source , stateOverride , zipprefix ) :
global badcount , skippedcount
cfg = src . config . get_config ( )
print ( " Importing OpenAddresses data from " + filepath )
chunksize = 1000
linecount = 0
if stateOverride :
stateOverride = stateOverride . strip ( ) . upper ( )
file = filepath
if filepath . endswith ( " .gz " ) :
file = gzip . open ( filepath , ' rb ' )
else :
file = open ( file , ' r ' )
county = False
if not source or source == " " :
source = " OA/ " + filepath . split ( " / " ) [ - 1 ] . split ( " - " ) [ 0 ]
if source . startswith ( " OA/statewide " ) :
if stateOverride :
source = source . replace ( " statewide " , stateOverride )
else :
source = False
citySuggestion = False
if not cfg . citySuggestion and filepath . split ( " / " ) [ - 1 ] . startswith ( " city_of_ " ) :
# Set city suggestion using filename
citySuggestion = re . sub ( r ' \ d+ ' , ' ' , filepath . split ( " / " ) [ - 1 ] . split ( " - " ) [ 0 ] . replace ( " city_of_ " , " " ) . replace ( " _ " , " " ) . upper ( ) . strip ( ) )
if filepath . split ( " / " ) [ - 1 ] . endswith ( " -addresses-county.geojson " ) :
county = filepath . split ( " / " ) [ - 1 ] . split ( " - " ) [ 0 ] . replace ( " _ " , " " ) . upper ( ) . strip ( )
print ( " Detected county from filename: " + county + " , will use for ZIP Code hinting " )
lines = [ ]
in_flight = set ( )
with concurrent . futures . ProcessPoolExecutor ( max_workers = maxthreads , mp_context = get_context ( " spawn " ) , max_tasks_per_child = 1000 , initializer = init_worker , initargs = ( cfg , ) ) as executor :
for line in file :
lines . append ( line )
linecount = linecount + 1
if len ( lines ) > = chunksize :
while len ( in_flight ) > = MAX_IN_FLIGHT :
done , in_flight = concurrent . futures . wait ( in_flight , return_when = concurrent . futures . FIRST_COMPLETED )
for fut in done :
fut . result ( )
fut = executor . submit ( processOpenAddressRows , lines , linecount , outfilename , ignorestates , source , stateOverride , zipprefix , citySuggestion , county )
in_flight . add ( fut )
lines = [ ]
fut = executor . submit ( processOpenAddressRows , lines , linecount , outfilename , ignorestates , source , stateOverride , zipprefix , citySuggestion , county )
in_flight . add ( fut )
for fut in concurrent . futures . as_completed ( in_flight ) :
fut . result ( )
file . close ( )
print ( " \n Done importing OpenAddresses! Processed " + str ( linecount ) + " entries. " )
print ( " There were " + str ( badcount ) + " unprocessable addresses. " )
if ignorestates :
print ( " There were " + str ( skippedcount ) + " addresses ignored due to your --ignorestates setting. " )
print ( " Saved to output file " + outfilename )
return
def importOSMFile ( filename , outfilename ) :
"""
Overpass API query for data input ( replace name = Montana with the region you want ) :
[ out : csv ( : : " lat " , : : " lon " , " addr:housenumber " , " addr:street " , " addr:city " , " addr:state " , " addr:postcode " ) ] [ timeout : 120 ] ;
area [ " name " = " Montana " ] - > . boundaryarea ;
node [ " addr:housenumber " ] [ " addr:street " ] ( area . boundaryarea ) ;
out ;
way [ " addr:housenumber " ] [ " addr:street " ] ( area . boundaryarea ) ;
out center ;
relation [ " addr:housenumber " ] [ " addr:street " ] ( area . boundaryarea ) ;
out center ;
"""
print ( " Importing OSM Overpass data from " + filename )
columns = [
" @lat " ,
" @lon " ,
" addr:housenumber " ,
" addr:street " ,
" addr:city " ,
" addr:state " ,
" addr:postcode "
]
file = filename
chunkcount = 0
badcount = 0
skippedcount = 0
source = " OpenStreetMap.org. License: ODbL "
for chunk in pd . read_csv ( file , sep = ' \t ' , chunksize = 100 , usecols = columns , keep_default_na = False , dtype = " str " ) :
print ( " " + str ( chunkcount * 100 ) + " " , end = " \r " , flush = True )
data = [ ]
for index , row in chunk . iterrows ( ) :
try :
addr = normalize ( row [ " addr:housenumber " ] , row [ " addr:street " ] , " " , row [ " addr:city " ] , row [ " addr:state " ] , row [ " addr:postcode " ] , row [ " @lat " ] , row [ " @lon " ] )
data . append ( [ addr [ ' number ' ] , addr [ ' street ' ] , addr [ ' unit ' ] , addr [ ' city ' ] , addr [ ' state ' ] , addr [ ' zip ' ] , addr [ ' plus4 ' ] , addr [ ' latitude ' ] , addr [ ' longitude ' ] , source ] )
except ValidationException as e :
badcount = badcount + 1
except Exception as e :
print ( " W: Couldn ' t ingest address: " )
print ( row )
traceback . print_exc ( )
badcount = badcount + 1
out = pd . DataFrame ( data = data , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
out . to_csv ( outfilename , mode = ' a ' , index = False , header = not os . path . exists ( outfilename ) , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
chunkcount = chunkcount + 1
print ( " \n Done importing OSM! Processed " + str ( chunkcount ) + " chunks. " )
print ( " There were " + str ( badcount ) + " unprocessable addresses. " )
print ( " Saved to output file " + outfilename )
def importNARFile ( filename , outfilename ) :
print ( " Importing Statistics Canada data from " + filename )
zf = zipfile . ZipFile ( filename , mode = " r " )
zipFiles = zf . namelist ( )
locationFileList = { }
addressFileList = { }
provinceCodes = [ 10 , 11 , 12 , 13 , 24 , 35 , 46 , 47 , 48 , 59 , 60 , 61 , 62 ]
for c in provinceCodes :
addressFileList [ str ( c ) ] = [ ]
locationFileList [ str ( c ) ] = [ ]
# = zf.open(fname, mode="r", force_zip64=True)
for fname in zipFiles :
if fname . startswith ( " Addresses/Address_ " ) and fname . endswith ( " .csv " ) :
number = fname . replace ( " Addresses/Address_ " , " " ) . replace ( " .csv " , " " ) . split ( " _ " ) [ 0 ]
addressFileList [ number ] . append ( fname )
elif fname . startswith ( " Locations/Location_ " ) and fname . endswith ( " .csv " ) :
number = fname . replace ( " Locations/Location_ " , " " ) . replace ( " .csv " , " " ) . split ( " _ " ) [ 0 ]
locationFileList [ number ] . append ( fname )
print ( " \n Merging address and location tables... " )
mergecount = 0
dataframes = [ ]
addrcols = [ " LOC_GUID " , " APT_NO_LABEL " , " CIVIC_NO " , " CIVIC_NO_SUFFIX " , " MAIL_STREET_NAME " , " MAIL_STREET_TYPE " , " MAIL_STREET_DIR " , " MAIL_MUN_NAME " , " MAIL_PROV_ABVN " , " MAIL_POSTAL_CODE " , " BU_N_CIVIC_ADD " ]
loccols = [ " LOC_GUID " , " BG_LATITUDE " , " BG_LONGITUDE " ]
for provinceId in provinceCodes :
print ( " " + str ( mergecount + 1 ) + " " , end = " \r " , flush = True )
readaf = map ( lambda addrFilename : dd . read_csv ( " zip:// " + addrFilename , storage_options = { ' fo ' : filename } , usecols = addrcols , keep_default_na = False , dtype = " str " ) , addressFileList [ str ( provinceId ) ] )
readlf = map ( lambda locationFilename : dd . read_csv ( " zip:// " + locationFilename , storage_options = { ' fo ' : filename } , usecols = loccols , keep_default_na = False , dtype = " str " ) , locationFileList [ str ( provinceId ) ] )
addressFrame = dd . concat ( list ( readaf ) , ignore_index = False )
locationFrame = dd . concat ( list ( readlf ) , ignore_index = False )
dataframes . append ( dd . merge ( addressFrame , locationFrame , on = [ " LOC_GUID " ] ) )
mergecount = mergecount + 1
print ( " \n Processing addresses... " )
file = filename
alladdrcount = 0
skippedcount = 0
source = " StatsCan NAR "
provinceIndex = 0
for df in dataframes :
print ( " \n Processing province ID " + str ( provinceCodes [ provinceIndex ] ) )
data = [ ]
addrcount = 0
for index , row in df . iterrows ( ) :
if ( addrcount % 100 == 0 ) :
print ( " " + str ( addrcount ) + " " , end = " \r " , flush = True )
number = ( " " . join ( filter ( None , [ row [ " CIVIC_NO " ] , row [ " CIVIC_NO_SUFFIX " ] ] ) ) ) . strip ( ) . upper ( )
street = ( " " . join ( filter ( None , [ row [ " MAIL_STREET_NAME " ] , row [ " MAIL_STREET_TYPE " ] , row [ " MAIL_STREET_DIR " ] ] ) ) ) . strip ( ) . upper ( )
apt = row [ " APT_NO_LABEL " ] . strip ( ) . upper ( )
if street == " " :
# PO BOX probably
if row [ " BU_N_CIVIC_ADD " ] . startswith ( " PO BOX " ) :
data . append ( [ row [ " BU_N_CIVIC_ADD " ] . replace ( " PO BOX " , " " ) . strip ( ) , " PO BOX " , " " , row [ " MAIL_MUN_NAME " ] , row [ " MAIL_PROV_ABVN " ] , row [ " MAIL_POSTAL_CODE " ] , " " , row [ " BG_LATITUDE " ] , row [ " BG_LONGITUDE " ] , source ] )
else :
skippedcount = skippedcount + 1
else :
data . append ( [ number , street , apt , row [ " MAIL_MUN_NAME " ] , row [ " MAIL_PROV_ABVN " ] , row [ " MAIL_POSTAL_CODE " ] , " " , row [ " BG_LATITUDE " ] , row [ " BG_LONGITUDE " ] , source ] )
addrcount = addrcount + 1
if len ( data ) > = 1000 : # Dump to file so we don't use tons of RAM
out = pd . DataFrame ( data = data , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
out . to_csv ( outfilename , mode = ' a ' , index = False , header = not os . path . exists ( outfilename ) , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
data = [ ]
out = pd . DataFrame ( data = data , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
out . to_csv ( outfilename , mode = ' a ' , index = False , header = not os . path . exists ( outfilename ) , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " plus4 " , " latitude " , " longitude " , " source " ] )
alladdrcount = alladdrcount + addrcount
provinceIndex = provinceIndex + 1
print ( " \n Done importing NAR! Processed " + str ( alladdrcount ) + " addresses. " )
print ( " Skipped " + str ( skippedcount ) + " invalid mailing addresses. " )
print ( " Saved to output file " + outfilename )
def removeDupes ( filepath ) :
print ( " Removing duplicate and incomplete addresses from " + filepath )
chunkcount = 0
chunksize = 20000000
for chunk in pd . read_csv ( filepath , chunksize = chunksize , keep_default_na = False , dtype = " str " , usecols = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " latitude " , " longitude " , " source " ] ) :
print ( " . " , end = " " , flush = True )
chunk . replace ( ' ' , None , inplace = True )
chunk . dropna ( subset = [ ' zip ' , ' number ' , ' street ' , ' city ' , ' state ' , ' latitude ' , ' longitude ' ] , inplace = True )
chunk . sort_values ( by = " plus4 " , ascending = False , inplace = True , na_position = " last " ) # Make sure the address duplicate with a +4 is kept
chunk . drop_duplicates ( subset = [ " number " , " street " , " street2 " , " city " , " state " , " zip " ] , keep = " first " , inplace = True )
chunk . to_csv ( filepath + " .dedup.csv " , mode = ' a ' , index = False , header = not os . path . exists ( filepath + " .dedup.csv " ) , columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " latitude " , " longitude " , " source " ] )
chunkcount = chunkcount + 1
print ( " \n Done removing duplicates from " + filepath + " ! Processed " + str ( chunkcount ) + " chunks of " + str ( chunksize ) + " records. " )
def tosqlite ( addressfile , dbfile ) :
global countrycode
cfg = src . config . get_config ( )
print ( " \n Reading addresses from " + addressfile )
file = addressfile
if addressfile . endswith ( " .gz " ) :
file = gzip . open ( addressfile , ' rb ' )
else :
file = open ( addressfile , ' r ' )
connection = sqlite3 . connect ( dbfile )
cursor = connection . cursor ( )
cursor . execute ( """ CREATE TABLE IF NOT EXISTS `addresses` (
` zipcode ` VARCHAR ( 6 ) NOT NULL ,
` number ` VARCHAR ( 30 ) NOT NULL ,
` street ` VARCHAR ( 200 ) NOT NULL ,
` street2 ` VARCHAR ( 20 ) ,
` city ` VARCHAR ( 50 ) NOT NULL ,
` state ` CHAR ( 2 ) NOT NULL ,
` plus4 ` CHAR ( 4 ) ,
` country ` CHAR ( 2 ) NOT NULL DEFAULT " US " ,
` latitude ` DECIMAL ( 8 , 6 ) NOT NULL ,
` longitude ` DECIMAL ( 9 , 6 ) NOT NULL ,
` source ` VARCHAR ( 40 ) ,
UNIQUE ( zipcode , number , street , street2 , country )
) """ )
cursor . execute ( " DROP TABLE IF EXISTS `addresses_temp` " )
cursor . execute ( """ CREATE TABLE IF NOT EXISTS `addresses_temp` (
` zipcode ` CHAR ( 6 ) NOT NULL ,
` number ` VARCHAR ( 30 ) NOT NULL ,
` street ` VARCHAR ( 200 ) NOT NULL ,
` street2 ` VARCHAR ( 20 ) ,
` city ` VARCHAR ( 50 ) NOT NULL ,
` state ` CHAR ( 2 ) NOT NULL ,
` plus4 ` CHAR ( 4 ) ,
` country ` CHAR ( 2 ) NOT NULL DEFAULT " US " ,
` latitude ` DECIMAL ( 8 , 6 ) NOT NULL ,
` longitude ` DECIMAL ( 9 , 6 ) NOT NULL ,
` source ` VARCHAR ( 40 )
) """ )
cursor . execute ( """ CREATE INDEX IF NOT EXISTS `latitude_longitude` ON `addresses` (
` latitude ` ,
` longitude `
) """ )
cursor . execute ( """ CREATE INDEX IF NOT EXISTS `number_street` ON `addresses` (
` number ` ,
` street `
) """ )
cursor . execute ( """ CREATE INDEX IF NOT EXISTS `state_city` ON `addresses` (
` state ` ,
` city `
) """ )
cursor . execute ( """ CREATE INDEX IF NOT EXISTS `zipcode_number` ON `addresses` (
` zipcode ` ,
` number `
) """ )
cursor . execute ( """ CREATE INDEX IF NOT EXISTS `country` ON `addresses` (
` country `
) """ )
chunksize = 5000
chunkcount = 0
rowschanged = 0
columns = [ " number " , " street " , " street2 " , " city " , " state " , " zip " , " latitude " , " longitude " , " source " ]
if cfg . appendPlus4 :
columns . append ( " plus4 " )
for chunk in pd . read_csv ( file , chunksize = chunksize , usecols = columns , keep_default_na = False , dtype = " str " ) :
chunk = chunk . rename ( columns = { ' zip ' : ' zipcode ' } )
chunk . insert ( 7 , " country " , countrycode )
# Replace empty values with NULL
chunk . replace ( ' ' , None , inplace = True )
# Replace null street2 with empty string so the SQLite UNIQUE clause will work
chunk . fillna ( { " street2 " : " " } , inplace = True )
# Remove null values that aren't allowed
chunk . dropna ( subset = [ ' zipcode ' , ' number ' , ' street ' , ' city ' , ' state ' , ' latitude ' , ' longitude ' ] , inplace = True )
print ( " " + str ( chunkcount * chunksize ) + " " , end = " \r " , flush = True )
# Write chunk to SQLite
cursor . execute ( " DELETE FROM addresses_temp " )
chunk . to_sql ( " addresses_temp " , connection , if_exists = ' append ' , index = False , dtype = {
" zipcode " : " CHAR(6) " ,
" number " : " VARCHAR(30) " ,
" street " : " VARCHAR(200) " ,
" street2 " : " VARCHAR(20) " ,
" city " : " VARCHAR(50) " ,
" state " : " CHAR(2) " ,
" plus4 " : " CHAR(4) " ,
" country " : " CHAR(2) " ,
" latitude " : " DECIMAL(8,6) " ,
" longitude " : " DECIMAL(9,6) " ,
" source " : " VARCHAR(40) "
} )
chunkcount = chunkcount + 1
cursor . execute ( " INSERT OR IGNORE INTO addresses SELECT * FROM addresses_temp " )
rowschanged = rowschanged + cursor . rowcount
if chunkcount % 5000 == 0 : # VACUUM every 10 million inserts
print ( " Optimizing database... " , end = " \r " , flush = True )
connection . executescript ( " VACUUM " )
print ( " " , end = " \r " , flush = True )
connection . executescript ( " DROP TABLE addresses_temp " )
cursor . execute ( " DELETE FROM addresses WHERE number= \" 0 \" " )
rowschanged = rowschanged + cursor . rowcount
if rowschanged > 10000000 :
print ( " \n Optimizing database... " )
connection . executescript ( " VACUUM; ANALYZE; PRAGMA optimize; " )
print ( " Done converting to SQLite! Processed " + str ( chunkcount ) + " chunks ( " + str ( chunksize ) + " records per chunk). " )
print ( str ( rowschanged ) + " records inserted. " )
connection . close ( )
print ( " Saved to output file " + dbfile )
return rowschanged
if __name__ == " __main__ " :
2025-11-28 23:39:56 -07:00
try :
parser = argparse . ArgumentParser (
description = " Tools to build a standardized U.S. address database from free source data. "
)
parser . add_argument ( " file " , help = " Address file(s) to process. " , nargs = ' + ' )
parser . add_argument ( " --outputfile " , help = " Filename to output address data to. If unspecified, set to \" ./data/out.csv \" or \" ./data/out.sqlite \" , depending on options set. " )
parser . add_argument (
" --filetype " ,
help = " Type of address file to ingest. nad=National Address Database, oa=OpenAddresses, adb=CSV created by this script, osm=OpenStreetMap Overpass API (see main.py source for query to use), nar=Statistics Canada National Address Register " ,
choices = [ " nad " , " oa " , " adb " , " osm " , " nar " ] ,
)
parser . add_argument ( " --state " , help = " Some OpenAddresses files don ' t have the state field set. Do it manually here. " )
parser . add_argument ( " --ignorestates " , help = " Comma-separated two-letter state names. Addresses in these states will be skipped over. " )
parser . add_argument ( " --onlystates " , help = " Comma-separated two-letter state names. Addresses NOT in these states will be skipped over. " )
parser . add_argument ( " --source " , help = " Set the data source name (OpenAddresses only). Autodetected based on filename if not set. " )
parser . add_argument ( " --dedup " , help = " Remove duplicate records in an already-ingested address file, and saves it to folder/file.dedup.csv. Only catches \" nearby \" duplicates; processes 20,000,000 records at a time. " , action = ' store_true ' )
parser . add_argument ( " --fixlatlon " , help = " Detect and repair flipped latitude/longitude pairs in an already-ingested address file, and saves it to [filename].coordfix.csv. " , action = ' store_true ' )
parser . add_argument ( " --tosqlite " , help = " Output to a SQLite3 database. Only works on output CSV data from this script. " , action = ' store_true ' )
parser . add_argument ( " --appendplus4 " , help = " Append ZIP+4 data to all records. Fairly slow. " , action = ' store_true ' )
parser . add_argument ( " --appendunitlabel " , help = " Append unit label (APT, STE, etc) to unit numbers using ZIP+4 data. " , action = ' store_true ' )
parser . add_argument ( " --zipprefix " , help = " When searching for a ZIP, assume it starts with the digits provided for faster lookups. " )
parser . add_argument ( " -a " , help = " Allow appending to existing output file. " , action = ' store_true ' )
parser . add_argument ( " --cpu " , help = " Number of CPU cores to use for parallel processing. " )
parser . add_argument ( " --country " , help = " Two-letter country code. Default is US. " )
parser . add_argument ( " --city " , help = " City name to assume when there ' s no city or postal code in the source data. Useful for OpenAddresses city_of_ data files. " )
parser . add_argument ( " --startat " , help = " Skip to this line number in the input file (NAD) " )
parser . add_argument ( " --census " , help = " Enable looking up missing ZIP codes in the U.S. Census Geocoder when we have a full address, city, and state but no ZIP. " , action = ' store_true ' )
parser . add_argument ( " --libpostal " , help = " Use libpostal address parsing and expansions to match bad addresses to a ZIP+4. Automatically enables --appendplus4. " , action = ' store_true ' )
parser . add_argument ( " --noskip4 " , help = " When processing own file format, don ' t skip normalizing records that have a ZIP+4 already. " , action = " store_true " )
parser . add_argument ( " -v " , help = " Verbose output (for development) " , action = " store_true " )
args = parser . parse_args ( )
startAtLine = 0
2025-11-15 19:51:14 -07:00
2025-11-28 23:39:56 -07:00
appendPlus4 = False
appendUnitLabel = False
2025-11-15 19:51:14 -07:00
useCensusToFillEmptyZIPs = False
2025-11-28 23:39:56 -07:00
countryCode = " US "
citySuggestion = False
advancedMode = False
noSkip4 = False
verbose = False
2025-11-15 19:51:14 -07:00
2025-11-28 23:39:56 -07:00
if args . v :
verbose = True
print ( " Verbose mode engaged! " )
if args . libpostal :
advancedMode = True
appendPlus4 = True
if advancedMode :
from src . advancedparsing import advancedNormalize
print ( " Using libpostal to work harder on bad addresses. " )
if args . appendplus4 :
appendPlus4 = True
if appendPlus4 :
print ( " Trying to match to ZIP+4 codes for every address! " )
if args . noskip4 :
noSkip4 = True
if noSkip4 :
print ( " Also normalizing records that have a +4 in the input data. " )
if args . appendunitlabel :
appendUnitLabel = True
if args . census :
useCensusToFillEmptyZIPs = True
else :
useCensusToFillEmptyZIPs = False
if useCensusToFillEmptyZIPs :
print ( " Census geocoder enabled! RIP your network maybe " )
statesToIgnore = [ ]
if args . ignorestates :
statesToIgnore = re . sub ( r " [^a-zA-Z,]+ " , " " , args . ignorestates . upper ( ) ) . split ( " , " )
statesToKeep = [ ]
if args . onlystates :
statesToKeep = re . sub ( r " [^a-zA-Z,]+ " , " " , args . onlystates . upper ( ) ) . split ( " , " )
zipprefix = False
if args . zipprefix :
zipprefix = args . zipprefix
if args . cpu :
maxthreads = int ( args . cpu )
if args . country :
if len ( args . country ) != 2 :
print ( " Invalid country code " + args . country + " , exiting. " )
sys . exit ( 1 )
countrycode = args . country . upper ( )
countryCode = countrycode
if args . startat and args . startat . isdigit ( ) :
startAtLine = int ( args . startat )
if args . city :
citySuggestion = args . city . strip ( ) . toUpper ( )
cfg = src . config . AppConfig ( appendPlus4 = appendPlus4 , appendUnitLabel = appendUnitLabel , countryCode = countryCode , citySuggestion = citySuggestion , useCensusToFillEmptyZIPs = useCensusToFillEmptyZIPs , advancedMode = advancedMode , noSkip4 = noSkip4 , verbose = verbose )
src . config . set_config ( cfg )
if args . dedup :
with concurrent . futures . ProcessPoolExecutor ( max_workers = maxthreads ) as executor :
for file in args . file :
executor . submit ( removeDupes , file )
elif args . fixlatlon :
with concurrent . futures . ProcessPoolExecutor ( max_workers = maxthreads ) as executor :
for file in args . file :
executor . submit ( fixLatLon , file )
elif args . tosqlite :
outputfile = " ./data/out.sqlite "
if args . outputfile :
outputfile = args . outputfile
if args . a != True and os . path . exists ( args . outputfile ) :
print ( " Output file already exists, exiting! " )
sys . exit ( )
rowschanged = 0
filesimported = 0
2025-11-15 19:51:14 -07:00
for file in args . file :
2025-11-28 23:39:56 -07:00
rowschanged = rowschanged + tosqlite ( file , outputfile )
filesimported = filesimported + 1
print ( " \n Done importing " + str ( filesimported ) + " files. " + str ( rowschanged ) + " records inserted. " )
elif args . file :
outputfile = " ./data/out.csv "
if args . outputfile :
outputfile = args . outputfile
if args . a != True and os . path . exists ( args . outputfile ) :
print ( " Output file already exists, exiting! " )
sys . exit ( )
if args . filetype == " nad " :
for file in args . file :
importNadFile ( file , outputfile , statesToIgnore , statesToKeep , startAtLine )
elif args . filetype == " adb " :
for file in args . file :
importOwnFile ( file , outputfile , statesToIgnore , statesToKeep )
elif args . filetype == " osm " :
for file in args . file :
importOSMFile ( file , outputfile )
elif args . filetype == " nar " :
countrycode = " CA "
for file in args . file :
importNARFile ( file , outputfile )
elif args . filetype == " oa " :
source = " "
if args . source :
source = args . source
for file in args . file :
importOpenAddressFile ( file , outputfile , statesToIgnore , source , args . state , zipprefix )
except KeyboardInterrupt :
os . _exit ( 0 )