Make it exit more cleanly on Ctrl-C

This commit is contained in:
Skylar Ittner 2025-11-28 23:39:56 -07:00
parent 8cd7e5c6f6
commit b37313adb4

319
main.py
View File

@ -130,6 +130,8 @@ def processOwnChunk(chunk, chunkcount, outfilename, ignorestates, keeponlystates
data.append([addr['number'], addr['street'], addr['unit'], addr['city'], addr['state'], addr['zip'], addr['plus4'], addr['latitude'], addr['longitude'], row.source]) data.append([addr['number'], addr['street'], addr['unit'], addr['city'], addr['state'], addr['zip'], addr['plus4'], addr['latitude'], addr['longitude'], row.source])
except ValidationException as e: except ValidationException as e:
badcount = badcount + 1 badcount = badcount + 1
except KeyboardInterrupt:
os._exit(0)
except Exception as e: except Exception as e:
print("W: Couldn't ingest address:") print("W: Couldn't ingest address:")
print(row) print(row)
@ -152,24 +154,29 @@ def importOwnFile(filename, outfilename, ignorestates, keeponlystates):
chunksize = 1000 chunksize = 1000
in_flight = set() in_flight = set()
with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads, max_tasks_per_child=100, initializer=init_worker, initargs=(cfg,)) as executor: with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads, max_tasks_per_child=100, initializer=init_worker, initargs=(cfg,)) as executor:
for chunk in pd.read_csv(file, chunksize=chunksize, usecols=columns, keep_default_na=False, dtype={ try:
"number":"string","street":"string", for chunk in pd.read_csv(file, chunksize=chunksize, usecols=columns, keep_default_na=False, dtype={
"street2":"string","city":"string", "number":"string","street":"string",
"state":"category", "zip":"string", "street2":"string","city":"string",
"plus4": "string", "state":"category", "zip":"string",
"latitude":"float32", "longitude":"float32", "plus4": "string",
"source":"category"}, dtype_backend="pyarrow"): "latitude":"float32", "longitude":"float32",
"source":"category"}, dtype_backend="pyarrow"):
while len(in_flight) >= MAX_IN_FLIGHT: while len(in_flight) >= MAX_IN_FLIGHT:
done, in_flight = concurrent.futures.wait(in_flight, return_when=concurrent.futures.FIRST_COMPLETED) done, in_flight = concurrent.futures.wait(in_flight, return_when=concurrent.futures.FIRST_COMPLETED)
for fut in done: for fut in done:
fut.result() fut.result()
fut = executor.submit(processOwnChunk, chunk, chunkcount * chunksize, outfilename, ignorestates, keeponlystates) fut = executor.submit(processOwnChunk, chunk, chunkcount * chunksize, outfilename, ignorestates, keeponlystates)
in_flight.add(fut) in_flight.add(fut)
chunkcount = chunkcount + 1 chunkcount = chunkcount + 1
for fut in concurrent.futures.as_completed(in_flight): for fut in concurrent.futures.as_completed(in_flight):
fut.result() fut.result()
except KeyboardInterrupt:
print("\nCtrl-C, exiting!")
executor.shutdown(cancel_futures=True)
sys.exit(0)
print("\nDone processing! Parsed " + str(chunkcount) + " chunks.") print("\nDone processing! Parsed " + str(chunkcount) + " chunks.")
print("There were " + str(badcount) + " unprocessable addresses.") print("There were " + str(badcount) + " unprocessable addresses.")
@ -647,154 +654,158 @@ def tosqlite(addressfile, dbfile):
return rowschanged return rowschanged
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser( try:
description="Tools to build a standardized U.S. address database from free source data." parser = argparse.ArgumentParser(
) description="Tools to build a standardized U.S. address database from free source data."
parser.add_argument("file", help="Address file(s) to process.", nargs='+') )
parser.add_argument("--outputfile", help="Filename to output address data to. If unspecified, set to \"./data/out.csv\" or \"./data/out.sqlite\", depending on options set.") parser.add_argument("file", help="Address file(s) to process.", nargs='+')
parser.add_argument( parser.add_argument("--outputfile", help="Filename to output address data to. If unspecified, set to \"./data/out.csv\" or \"./data/out.sqlite\", depending on options set.")
"--filetype", parser.add_argument(
help="Type of address file to ingest. nad=National Address Database, oa=OpenAddresses, adb=CSV created by this script, osm=OpenStreetMap Overpass API (see main.py source for query to use), nar=Statistics Canada National Address Register", "--filetype",
choices=["nad", "oa", "adb", "osm", "nar"], help="Type of address file to ingest. nad=National Address Database, oa=OpenAddresses, adb=CSV created by this script, osm=OpenStreetMap Overpass API (see main.py source for query to use), nar=Statistics Canada National Address Register",
) choices=["nad", "oa", "adb", "osm", "nar"],
parser.add_argument("--state", help="Some OpenAddresses files don't have the state field set. Do it manually here.") )
parser.add_argument("--ignorestates", help="Comma-separated two-letter state names. Addresses in these states will be skipped over.") parser.add_argument("--state", help="Some OpenAddresses files don't have the state field set. Do it manually here.")
parser.add_argument("--onlystates", help="Comma-separated two-letter state names. Addresses NOT in these states will be skipped over.") parser.add_argument("--ignorestates", help="Comma-separated two-letter state names. Addresses in these states will be skipped over.")
parser.add_argument("--source", help="Set the data source name (OpenAddresses only). Autodetected based on filename if not set.") parser.add_argument("--onlystates", help="Comma-separated two-letter state names. Addresses NOT in these states will be skipped over.")
parser.add_argument("--dedup", help="Remove duplicate records in an already-ingested address file, and saves it to folder/file.dedup.csv. Only catches \"nearby\" duplicates; processes 20,000,000 records at a time.", action='store_true') parser.add_argument("--source", help="Set the data source name (OpenAddresses only). Autodetected based on filename if not set.")
parser.add_argument("--fixlatlon", help="Detect and repair flipped latitude/longitude pairs in an already-ingested address file, and saves it to [filename].coordfix.csv.", action='store_true') parser.add_argument("--dedup", help="Remove duplicate records in an already-ingested address file, and saves it to folder/file.dedup.csv. Only catches \"nearby\" duplicates; processes 20,000,000 records at a time.", action='store_true')
parser.add_argument("--tosqlite", help="Output to a SQLite3 database. Only works on output CSV data from this script.", action='store_true') parser.add_argument("--fixlatlon", help="Detect and repair flipped latitude/longitude pairs in an already-ingested address file, and saves it to [filename].coordfix.csv.", action='store_true')
parser.add_argument("--appendplus4", help="Append ZIP+4 data to all records. Fairly slow.", action='store_true') parser.add_argument("--tosqlite", help="Output to a SQLite3 database. Only works on output CSV data from this script.", action='store_true')
parser.add_argument("--appendunitlabel", help="Append unit label (APT, STE, etc) to unit numbers using ZIP+4 data.", action='store_true') parser.add_argument("--appendplus4", help="Append ZIP+4 data to all records. Fairly slow.", action='store_true')
parser.add_argument("--zipprefix", help="When searching for a ZIP, assume it starts with the digits provided for faster lookups.") parser.add_argument("--appendunitlabel", help="Append unit label (APT, STE, etc) to unit numbers using ZIP+4 data.", action='store_true')
parser.add_argument("-a", help="Allow appending to existing output file.", action='store_true') parser.add_argument("--zipprefix", help="When searching for a ZIP, assume it starts with the digits provided for faster lookups.")
parser.add_argument("--cpu", help="Number of CPU cores to use for parallel processing.") parser.add_argument("-a", help="Allow appending to existing output file.", action='store_true')
parser.add_argument("--country", help="Two-letter country code. Default is US.") parser.add_argument("--cpu", help="Number of CPU cores to use for parallel processing.")
parser.add_argument("--city", help="City name to assume when there's no city or postal code in the source data. Useful for OpenAddresses city_of_ data files.") parser.add_argument("--country", help="Two-letter country code. Default is US.")
parser.add_argument("--startat", help="Skip to this line number in the input file (NAD)") parser.add_argument("--city", help="City name to assume when there's no city or postal code in the source data. Useful for OpenAddresses city_of_ data files.")
parser.add_argument("--census", help="Enable looking up missing ZIP codes in the U.S. Census Geocoder when we have a full address, city, and state but no ZIP.", action='store_true') parser.add_argument("--startat", help="Skip to this line number in the input file (NAD)")
parser.add_argument("--libpostal", help="Use libpostal address parsing and expansions to match bad addresses to a ZIP+4. Automatically enables --appendplus4.", action='store_true') parser.add_argument("--census", help="Enable looking up missing ZIP codes in the U.S. Census Geocoder when we have a full address, city, and state but no ZIP.", action='store_true')
parser.add_argument("--noskip4", help="When processing own file format, don't skip normalizing records that have a ZIP+4 already.", action="store_true") parser.add_argument("--libpostal", help="Use libpostal address parsing and expansions to match bad addresses to a ZIP+4. Automatically enables --appendplus4.", action='store_true')
parser.add_argument("-v", help="Verbose output (for development)", action="store_true") parser.add_argument("--noskip4", help="When processing own file format, don't skip normalizing records that have a ZIP+4 already.", action="store_true")
parser.add_argument("-v", help="Verbose output (for development)", action="store_true")
args = parser.parse_args() args = parser.parse_args()
startAtLine = 0 startAtLine = 0
appendPlus4 = False appendPlus4 = False
appendUnitLabel = False appendUnitLabel = False
useCensusToFillEmptyZIPs = False
countryCode = "US"
citySuggestion = False
advancedMode = False
noSkip4 = False
verbose = False
if args.v:
verbose = True
print("Verbose mode engaged!")
if args.libpostal:
advancedMode = True
appendPlus4 = True
if advancedMode:
from src.advancedparsing import advancedNormalize
print("Using libpostal to work harder on bad addresses.")
if args.appendplus4:
appendPlus4 = True
if appendPlus4:
print("Trying to match to ZIP+4 codes for every address!")
if args.noskip4:
noSkip4 = True
if noSkip4:
print("Also normalizing records that have a +4 in the input data.")
if args.appendunitlabel:
appendUnitLabel = True
if args.census:
useCensusToFillEmptyZIPs = True
else:
useCensusToFillEmptyZIPs = False useCensusToFillEmptyZIPs = False
if useCensusToFillEmptyZIPs: countryCode = "US"
print("Census geocoder enabled! RIP your network maybe") citySuggestion = False
advancedMode = False
noSkip4 = False
verbose = False
statesToIgnore = [] if args.v:
if args.ignorestates: verbose = True
statesToIgnore = re.sub(r"[^a-zA-Z,]+", "", args.ignorestates.upper()).split(",") print("Verbose mode engaged!")
statesToKeep = []
if args.onlystates:
statesToKeep = re.sub(r"[^a-zA-Z,]+", "", args.onlystates.upper()).split(",")
zipprefix = False if args.libpostal:
if args.zipprefix: advancedMode = True
zipprefix = args.zipprefix appendPlus4 = True
if advancedMode:
from src.advancedparsing import advancedNormalize
print("Using libpostal to work harder on bad addresses.")
if args.cpu: if args.appendplus4:
maxthreads = int(args.cpu) appendPlus4 = True
if appendPlus4:
print("Trying to match to ZIP+4 codes for every address!")
if args.country: if args.noskip4:
if len(args.country) != 2: noSkip4 = True
print("Invalid country code " + args.country + ", exiting.") if noSkip4:
sys.exit(1) print("Also normalizing records that have a +4 in the input data.")
countrycode = args.country.upper()
countryCode = countrycode
if args.startat and args.startat.isdigit():
startAtLine = int(args.startat)
if args.city: if args.appendunitlabel:
citySuggestion = args.city.strip().toUpper() appendUnitLabel = True
cfg = src.config.AppConfig(appendPlus4=appendPlus4, appendUnitLabel=appendUnitLabel, countryCode=countryCode, citySuggestion=citySuggestion, useCensusToFillEmptyZIPs=useCensusToFillEmptyZIPs, advancedMode=advancedMode, noSkip4=noSkip4, verbose=verbose) if args.census:
useCensusToFillEmptyZIPs = True
else:
useCensusToFillEmptyZIPs = False
if useCensusToFillEmptyZIPs:
print("Census geocoder enabled! RIP your network maybe")
src.config.set_config(cfg) statesToIgnore = []
if args.ignorestates:
statesToIgnore = re.sub(r"[^a-zA-Z,]+", "", args.ignorestates.upper()).split(",")
statesToKeep = []
if args.onlystates:
statesToKeep = re.sub(r"[^a-zA-Z,]+", "", args.onlystates.upper()).split(",")
if args.dedup: zipprefix = False
with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads) as executor: if args.zipprefix:
zipprefix = args.zipprefix
if args.cpu:
maxthreads = int(args.cpu)
if args.country:
if len(args.country) != 2:
print("Invalid country code " + args.country + ", exiting.")
sys.exit(1)
countrycode = args.country.upper()
countryCode = countrycode
if args.startat and args.startat.isdigit():
startAtLine = int(args.startat)
if args.city:
citySuggestion = args.city.strip().toUpper()
cfg = src.config.AppConfig(appendPlus4=appendPlus4, appendUnitLabel=appendUnitLabel, countryCode=countryCode, citySuggestion=citySuggestion, useCensusToFillEmptyZIPs=useCensusToFillEmptyZIPs, advancedMode=advancedMode, noSkip4=noSkip4, verbose=verbose)
src.config.set_config(cfg)
if args.dedup:
with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads) as executor:
for file in args.file:
executor.submit(removeDupes, file)
elif args.fixlatlon:
with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads) as executor:
for file in args.file:
executor.submit(fixLatLon, file)
elif args.tosqlite:
outputfile = "./data/out.sqlite"
if args.outputfile:
outputfile = args.outputfile
if args.a != True and os.path.exists(args.outputfile):
print("Output file already exists, exiting!")
sys.exit()
rowschanged = 0
filesimported = 0
for file in args.file: for file in args.file:
executor.submit(removeDupes, file) rowschanged = rowschanged + tosqlite(file, outputfile)
elif args.fixlatlon: filesimported = filesimported + 1
with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads) as executor: print("\nDone importing " + str(filesimported) + " files. " + str(rowschanged) + " records inserted.")
for file in args.file: elif args.file:
executor.submit(fixLatLon, file) outputfile = "./data/out.csv"
elif args.tosqlite: if args.outputfile:
outputfile = "./data/out.sqlite" outputfile = args.outputfile
if args.outputfile: if args.a != True and os.path.exists(args.outputfile):
outputfile = args.outputfile print("Output file already exists, exiting!")
if args.a != True and os.path.exists(args.outputfile): sys.exit()
print("Output file already exists, exiting!") if args.filetype == "nad":
sys.exit() for file in args.file:
rowschanged = 0 importNadFile(file, outputfile, statesToIgnore, statesToKeep, startAtLine)
filesimported = 0 elif args.filetype == "adb":
for file in args.file: for file in args.file:
rowschanged = rowschanged + tosqlite(file, outputfile) importOwnFile(file, outputfile, statesToIgnore, statesToKeep)
filesimported = filesimported + 1 elif args.filetype == "osm":
print("\nDone importing " + str(filesimported) + " files. " + str(rowschanged) + " records inserted.") for file in args.file:
elif args.file: importOSMFile(file, outputfile)
outputfile = "./data/out.csv" elif args.filetype == "nar":
if args.outputfile: countrycode = "CA"
outputfile = args.outputfile for file in args.file:
if args.a != True and os.path.exists(args.outputfile): importNARFile(file, outputfile)
print("Output file already exists, exiting!") elif args.filetype == "oa":
sys.exit() source = ""
if args.filetype == "nad": if args.source:
for file in args.file: source = args.source
importNadFile(file, outputfile, statesToIgnore, statesToKeep, startAtLine) for file in args.file:
elif args.filetype == "adb": importOpenAddressFile(file, outputfile, statesToIgnore, source, args.state, zipprefix)
for file in args.file: except KeyboardInterrupt:
importOwnFile(file, outputfile, statesToIgnore, statesToKeep) os._exit(0)
elif args.filetype == "osm":
for file in args.file:
importOSMFile(file, outputfile)
elif args.filetype == "nar":
countrycode = "CA"
for file in args.file:
importNARFile(file, outputfile)
elif args.filetype == "oa":
source = ""
if args.source:
source = args.source
for file in args.file:
importOpenAddressFile(file, outputfile, statesToIgnore, source, args.state, zipprefix)