diff --git a/main.py b/main.py index 738551e..640e37e 100755 --- a/main.py +++ b/main.py @@ -130,6 +130,8 @@ def processOwnChunk(chunk, chunkcount, outfilename, ignorestates, keeponlystates data.append([addr['number'], addr['street'], addr['unit'], addr['city'], addr['state'], addr['zip'], addr['plus4'], addr['latitude'], addr['longitude'], row.source]) except ValidationException as e: badcount = badcount + 1 + except KeyboardInterrupt: + os._exit(0) except Exception as e: print("W: Couldn't ingest address:") print(row) @@ -152,24 +154,29 @@ def importOwnFile(filename, outfilename, ignorestates, keeponlystates): chunksize = 1000 in_flight = set() with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads, max_tasks_per_child=100, initializer=init_worker, initargs=(cfg,)) as executor: - for chunk in pd.read_csv(file, chunksize=chunksize, usecols=columns, keep_default_na=False, dtype={ - "number":"string","street":"string", - "street2":"string","city":"string", - "state":"category", "zip":"string", - "plus4": "string", - "latitude":"float32", "longitude":"float32", - "source":"category"}, dtype_backend="pyarrow"): + try: + for chunk in pd.read_csv(file, chunksize=chunksize, usecols=columns, keep_default_na=False, dtype={ + "number":"string","street":"string", + "street2":"string","city":"string", + "state":"category", "zip":"string", + "plus4": "string", + "latitude":"float32", "longitude":"float32", + "source":"category"}, dtype_backend="pyarrow"): + + while len(in_flight) >= MAX_IN_FLIGHT: + done, in_flight = concurrent.futures.wait(in_flight, return_when=concurrent.futures.FIRST_COMPLETED) + for fut in done: + fut.result() + fut = executor.submit(processOwnChunk, chunk, chunkcount * chunksize, outfilename, ignorestates, keeponlystates) + in_flight.add(fut) + chunkcount = chunkcount + 1 - while len(in_flight) >= MAX_IN_FLIGHT: - done, in_flight = concurrent.futures.wait(in_flight, return_when=concurrent.futures.FIRST_COMPLETED) - for fut in done: - fut.result() - fut = executor.submit(processOwnChunk, chunk, chunkcount * chunksize, outfilename, ignorestates, keeponlystates) - in_flight.add(fut) - chunkcount = chunkcount + 1 - - for fut in concurrent.futures.as_completed(in_flight): - fut.result() + for fut in concurrent.futures.as_completed(in_flight): + fut.result() + except KeyboardInterrupt: + print("\nCtrl-C, exiting!") + executor.shutdown(cancel_futures=True) + sys.exit(0) print("\nDone processing! Parsed " + str(chunkcount) + " chunks.") print("There were " + str(badcount) + " unprocessable addresses.") @@ -647,154 +654,158 @@ def tosqlite(addressfile, dbfile): return rowschanged if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Tools to build a standardized U.S. address database from free source data." - ) - parser.add_argument("file", help="Address file(s) to process.", nargs='+') - parser.add_argument("--outputfile", help="Filename to output address data to. If unspecified, set to \"./data/out.csv\" or \"./data/out.sqlite\", depending on options set.") - parser.add_argument( - "--filetype", - help="Type of address file to ingest. nad=National Address Database, oa=OpenAddresses, adb=CSV created by this script, osm=OpenStreetMap Overpass API (see main.py source for query to use), nar=Statistics Canada National Address Register", - choices=["nad", "oa", "adb", "osm", "nar"], - ) - parser.add_argument("--state", help="Some OpenAddresses files don't have the state field set. Do it manually here.") - parser.add_argument("--ignorestates", help="Comma-separated two-letter state names. Addresses in these states will be skipped over.") - parser.add_argument("--onlystates", help="Comma-separated two-letter state names. Addresses NOT in these states will be skipped over.") - parser.add_argument("--source", help="Set the data source name (OpenAddresses only). Autodetected based on filename if not set.") - parser.add_argument("--dedup", help="Remove duplicate records in an already-ingested address file, and saves it to folder/file.dedup.csv. Only catches \"nearby\" duplicates; processes 20,000,000 records at a time.", action='store_true') - parser.add_argument("--fixlatlon", help="Detect and repair flipped latitude/longitude pairs in an already-ingested address file, and saves it to [filename].coordfix.csv.", action='store_true') - parser.add_argument("--tosqlite", help="Output to a SQLite3 database. Only works on output CSV data from this script.", action='store_true') - parser.add_argument("--appendplus4", help="Append ZIP+4 data to all records. Fairly slow.", action='store_true') - parser.add_argument("--appendunitlabel", help="Append unit label (APT, STE, etc) to unit numbers using ZIP+4 data.", action='store_true') - parser.add_argument("--zipprefix", help="When searching for a ZIP, assume it starts with the digits provided for faster lookups.") - parser.add_argument("-a", help="Allow appending to existing output file.", action='store_true') - parser.add_argument("--cpu", help="Number of CPU cores to use for parallel processing.") - parser.add_argument("--country", help="Two-letter country code. Default is US.") - parser.add_argument("--city", help="City name to assume when there's no city or postal code in the source data. Useful for OpenAddresses city_of_ data files.") - parser.add_argument("--startat", help="Skip to this line number in the input file (NAD)") - parser.add_argument("--census", help="Enable looking up missing ZIP codes in the U.S. Census Geocoder when we have a full address, city, and state but no ZIP.", action='store_true') - parser.add_argument("--libpostal", help="Use libpostal address parsing and expansions to match bad addresses to a ZIP+4. Automatically enables --appendplus4.", action='store_true') - parser.add_argument("--noskip4", help="When processing own file format, don't skip normalizing records that have a ZIP+4 already.", action="store_true") - parser.add_argument("-v", help="Verbose output (for development)", action="store_true") + try: + parser = argparse.ArgumentParser( + description="Tools to build a standardized U.S. address database from free source data." + ) + parser.add_argument("file", help="Address file(s) to process.", nargs='+') + parser.add_argument("--outputfile", help="Filename to output address data to. If unspecified, set to \"./data/out.csv\" or \"./data/out.sqlite\", depending on options set.") + parser.add_argument( + "--filetype", + help="Type of address file to ingest. nad=National Address Database, oa=OpenAddresses, adb=CSV created by this script, osm=OpenStreetMap Overpass API (see main.py source for query to use), nar=Statistics Canada National Address Register", + choices=["nad", "oa", "adb", "osm", "nar"], + ) + parser.add_argument("--state", help="Some OpenAddresses files don't have the state field set. Do it manually here.") + parser.add_argument("--ignorestates", help="Comma-separated two-letter state names. Addresses in these states will be skipped over.") + parser.add_argument("--onlystates", help="Comma-separated two-letter state names. Addresses NOT in these states will be skipped over.") + parser.add_argument("--source", help="Set the data source name (OpenAddresses only). Autodetected based on filename if not set.") + parser.add_argument("--dedup", help="Remove duplicate records in an already-ingested address file, and saves it to folder/file.dedup.csv. Only catches \"nearby\" duplicates; processes 20,000,000 records at a time.", action='store_true') + parser.add_argument("--fixlatlon", help="Detect and repair flipped latitude/longitude pairs in an already-ingested address file, and saves it to [filename].coordfix.csv.", action='store_true') + parser.add_argument("--tosqlite", help="Output to a SQLite3 database. Only works on output CSV data from this script.", action='store_true') + parser.add_argument("--appendplus4", help="Append ZIP+4 data to all records. Fairly slow.", action='store_true') + parser.add_argument("--appendunitlabel", help="Append unit label (APT, STE, etc) to unit numbers using ZIP+4 data.", action='store_true') + parser.add_argument("--zipprefix", help="When searching for a ZIP, assume it starts with the digits provided for faster lookups.") + parser.add_argument("-a", help="Allow appending to existing output file.", action='store_true') + parser.add_argument("--cpu", help="Number of CPU cores to use for parallel processing.") + parser.add_argument("--country", help="Two-letter country code. Default is US.") + parser.add_argument("--city", help="City name to assume when there's no city or postal code in the source data. Useful for OpenAddresses city_of_ data files.") + parser.add_argument("--startat", help="Skip to this line number in the input file (NAD)") + parser.add_argument("--census", help="Enable looking up missing ZIP codes in the U.S. Census Geocoder when we have a full address, city, and state but no ZIP.", action='store_true') + parser.add_argument("--libpostal", help="Use libpostal address parsing and expansions to match bad addresses to a ZIP+4. Automatically enables --appendplus4.", action='store_true') + parser.add_argument("--noskip4", help="When processing own file format, don't skip normalizing records that have a ZIP+4 already.", action="store_true") + parser.add_argument("-v", help="Verbose output (for development)", action="store_true") - args = parser.parse_args() + args = parser.parse_args() - startAtLine = 0 - - appendPlus4 = False - appendUnitLabel = False - useCensusToFillEmptyZIPs = False - countryCode = "US" - citySuggestion = False - advancedMode = False - noSkip4 = False - verbose = False - - if args.v: - verbose = True - print("Verbose mode engaged!") - - if args.libpostal: - advancedMode = True - appendPlus4 = True - if advancedMode: - from src.advancedparsing import advancedNormalize - print("Using libpostal to work harder on bad addresses.") - - if args.appendplus4: - appendPlus4 = True - if appendPlus4: - print("Trying to match to ZIP+4 codes for every address!") + startAtLine = 0 - if args.noskip4: - noSkip4 = True - if noSkip4: - print("Also normalizing records that have a +4 in the input data.") - - if args.appendunitlabel: - appendUnitLabel = True - - if args.census: - useCensusToFillEmptyZIPs = True - else: + appendPlus4 = False + appendUnitLabel = False useCensusToFillEmptyZIPs = False - if useCensusToFillEmptyZIPs: - print("Census geocoder enabled! RIP your network maybe") - - statesToIgnore = [] - if args.ignorestates: - statesToIgnore = re.sub(r"[^a-zA-Z,]+", "", args.ignorestates.upper()).split(",") - statesToKeep = [] - if args.onlystates: - statesToKeep = re.sub(r"[^a-zA-Z,]+", "", args.onlystates.upper()).split(",") - - zipprefix = False - if args.zipprefix: - zipprefix = args.zipprefix - - if args.cpu: - maxthreads = int(args.cpu) - - if args.country: - if len(args.country) != 2: - print("Invalid country code " + args.country + ", exiting.") - sys.exit(1) - countrycode = args.country.upper() - countryCode = countrycode - if args.startat and args.startat.isdigit(): - startAtLine = int(args.startat) - - if args.city: - citySuggestion = args.city.strip().toUpper() + countryCode = "US" + citySuggestion = False + advancedMode = False + noSkip4 = False + verbose = False - cfg = src.config.AppConfig(appendPlus4=appendPlus4, appendUnitLabel=appendUnitLabel, countryCode=countryCode, citySuggestion=citySuggestion, useCensusToFillEmptyZIPs=useCensusToFillEmptyZIPs, advancedMode=advancedMode, noSkip4=noSkip4, verbose=verbose) - - src.config.set_config(cfg) + if args.v: + verbose = True + print("Verbose mode engaged!") + + if args.libpostal: + advancedMode = True + appendPlus4 = True + if advancedMode: + from src.advancedparsing import advancedNormalize + print("Using libpostal to work harder on bad addresses.") - if args.dedup: - with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads) as executor: + if args.appendplus4: + appendPlus4 = True + if appendPlus4: + print("Trying to match to ZIP+4 codes for every address!") + + if args.noskip4: + noSkip4 = True + if noSkip4: + print("Also normalizing records that have a +4 in the input data.") + + if args.appendunitlabel: + appendUnitLabel = True + + if args.census: + useCensusToFillEmptyZIPs = True + else: + useCensusToFillEmptyZIPs = False + if useCensusToFillEmptyZIPs: + print("Census geocoder enabled! RIP your network maybe") + + statesToIgnore = [] + if args.ignorestates: + statesToIgnore = re.sub(r"[^a-zA-Z,]+", "", args.ignorestates.upper()).split(",") + statesToKeep = [] + if args.onlystates: + statesToKeep = re.sub(r"[^a-zA-Z,]+", "", args.onlystates.upper()).split(",") + + zipprefix = False + if args.zipprefix: + zipprefix = args.zipprefix + + if args.cpu: + maxthreads = int(args.cpu) + + if args.country: + if len(args.country) != 2: + print("Invalid country code " + args.country + ", exiting.") + sys.exit(1) + countrycode = args.country.upper() + countryCode = countrycode + if args.startat and args.startat.isdigit(): + startAtLine = int(args.startat) + + if args.city: + citySuggestion = args.city.strip().toUpper() + + cfg = src.config.AppConfig(appendPlus4=appendPlus4, appendUnitLabel=appendUnitLabel, countryCode=countryCode, citySuggestion=citySuggestion, useCensusToFillEmptyZIPs=useCensusToFillEmptyZIPs, advancedMode=advancedMode, noSkip4=noSkip4, verbose=verbose) + + src.config.set_config(cfg) + + if args.dedup: + with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads) as executor: + for file in args.file: + executor.submit(removeDupes, file) + elif args.fixlatlon: + with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads) as executor: + for file in args.file: + executor.submit(fixLatLon, file) + elif args.tosqlite: + outputfile = "./data/out.sqlite" + if args.outputfile: + outputfile = args.outputfile + if args.a != True and os.path.exists(args.outputfile): + print("Output file already exists, exiting!") + sys.exit() + rowschanged = 0 + filesimported = 0 for file in args.file: - executor.submit(removeDupes, file) - elif args.fixlatlon: - with concurrent.futures.ProcessPoolExecutor(max_workers=maxthreads) as executor: - for file in args.file: - executor.submit(fixLatLon, file) - elif args.tosqlite: - outputfile = "./data/out.sqlite" - if args.outputfile: - outputfile = args.outputfile - if args.a != True and os.path.exists(args.outputfile): - print("Output file already exists, exiting!") - sys.exit() - rowschanged = 0 - filesimported = 0 - for file in args.file: - rowschanged = rowschanged + tosqlite(file, outputfile) - filesimported = filesimported + 1 - print("\nDone importing " + str(filesimported) + " files. " + str(rowschanged) + " records inserted.") - elif args.file: - outputfile = "./data/out.csv" - if args.outputfile: - outputfile = args.outputfile - if args.a != True and os.path.exists(args.outputfile): - print("Output file already exists, exiting!") - sys.exit() - if args.filetype == "nad": - for file in args.file: - importNadFile(file, outputfile, statesToIgnore, statesToKeep, startAtLine) - elif args.filetype == "adb": - for file in args.file: - importOwnFile(file, outputfile, statesToIgnore, statesToKeep) - elif args.filetype == "osm": - for file in args.file: - importOSMFile(file, outputfile) - elif args.filetype == "nar": - countrycode = "CA" - for file in args.file: - importNARFile(file, outputfile) - elif args.filetype == "oa": - source = "" - if args.source: - source = args.source - for file in args.file: - importOpenAddressFile(file, outputfile, statesToIgnore, source, args.state, zipprefix) + rowschanged = rowschanged + tosqlite(file, outputfile) + filesimported = filesimported + 1 + print("\nDone importing " + str(filesimported) + " files. " + str(rowschanged) + " records inserted.") + elif args.file: + outputfile = "./data/out.csv" + if args.outputfile: + outputfile = args.outputfile + if args.a != True and os.path.exists(args.outputfile): + print("Output file already exists, exiting!") + sys.exit() + if args.filetype == "nad": + for file in args.file: + importNadFile(file, outputfile, statesToIgnore, statesToKeep, startAtLine) + elif args.filetype == "adb": + for file in args.file: + importOwnFile(file, outputfile, statesToIgnore, statesToKeep) + elif args.filetype == "osm": + for file in args.file: + importOSMFile(file, outputfile) + elif args.filetype == "nar": + countrycode = "CA" + for file in args.file: + importNARFile(file, outputfile) + elif args.filetype == "oa": + source = "" + if args.source: + source = args.source + for file in args.file: + importOpenAddressFile(file, outputfile, statesToIgnore, source, args.state, zipprefix) + except KeyboardInterrupt: + os._exit(0) +