#!/usr/bin/env python """ Author : flo date : 2015-12-31 purpose : download this zip file from cms.gov in code then extract one particular text file : from this ICD 10 package and process the text file into SQL statements : to then be inserted into a mysql DB container. download : 2016-Code-Descriptions-in-Tabular-Order.zip process : icd10cm_order_2016 run : python icd10.py comments : i am using pycharm editor community edition, there are some yellow underline warning : which i have no clue as to why it is underline. : woot woot, no more missing records. """ from __future__ import print_function import base64 import sys import os import re import StringIO import zipfile try: import mysql.connector from mysql.connector import errorcode except ImportError as err: print(err) print("Missing Mysql Connector plugin.") print("Try: pip install mysql-connector-python") exit(1) try: import requests except ImportError as err: print(err) print("Missing requests plugin") print("Try: pip install requests") exit(1) def printauthor(): author = "YXNjaGVuYmFjaEBnbWFpbC5jb20=" print(base64.b64decode(author)) def createicd10schema(): DB_NAME = 'icd10' icd10Tables = {} icd10Tables['icd10Code'] = ( "CREATE TABLE IF NOT EXISTS `icd10Code` (" " `ID` INT(11) NOT NULL AUTO_INCREMENT," " `icd10OrderID` VARCHAR(10) NULL DEFAULT NULL," " `icd10Codes` VARCHAR(10) NULL DEFAULT NULL," " `icd10HeaderCode` VARCHAR(1) NULL DEFAULT NULL," " `icd10ShortDesc` VARCHAR(250) NULL DEFAULT NULL," " `icd10LongDesc` VARCHAR(250) NULL DEFAULT NULL," " PRIMARY KEY (`ID`)" ") ENGINE=InnoDB") config = {'user': 'USERNAME', 'password': 'YOURPASSWORD', 'host': 'MYSQLServerHostName', 'database': 'mysql', 'raise_on_warnings': True} conn = mysql.connector.connect(**config) cur = conn.cursor(buffered=True) try: cur.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'latin1'".format(DB_NAME)) except mysql.connector.Error as err: print("Failed Creating Database: {}".format(err)) exit(1) try: conn.database = DB_NAME except mysql.connector.Error as err: if err.errno == errorcode.ER_BAD_DB_ERROR: createicd10schema() conn.database = DB_NAME else: print(err) exit(1) for name, ddl in icd10Tables.iteritems(): try: print("Creating table {}: ".format(name), end='') cur.execute(ddl) except mysql.connector.Error as err: if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: print("Already Exists.") else: print(err.msg) print("Table and Db Schema Created.") conn.commit() cur.close() conn.close() def getdata(): sess = requests.Session() url = 'https://www.cms.gov/Medicare/Coding/ICD10/Downloads/2016-Code-Descriptions-in-Tabular-Order.zip' data = sess.get(url, verify=True) if data.ok: # DUMP Internet Data to local fs if sys.platform == "win32": z = zipfile.ZipFile(StringIO.StringIO(data.content)) z.extractall() z.close() else: z = zipfile.ZipFile(StringIO.StringIO(data.content)) z.extractall() z.close() else: print("This ICD10 CM code file doesnt seem to be intact during this download process. " + url) exit(1) sess.close() def parsedata(): """ Read in data after text file has been extracted via the above function. It then adds escape characters depending on what types of apostrophes are in each row of data. icd10OrderFiles.pdf specified the length of each fields. """ icdfile = 'icd10cm_order_2016.txt' f = open(icdfile, 'r') data = [] icd10list = [] icd10code = [] icd10final = [] result = [] sql, tmp1 = "", "" count = 1 # processing atoms from a pseudo-randomized state for i in f: data.append(i) # adding additional atoms into atoms for line in data: line = re.sub(r"\'", "\\'", line) line = re.sub(r"\"", "\\\"", line) icd10list.append(line) # Splitting Atoms for a in icd10list: shortdesc = re.sub("\s\s+", ' ', a[16:77]) longdesc = re.sub("\s\s+", ' ', a[77:]) line = a[:5] + " " + a[6:14] + " " + a[14:15] + " " + shortdesc + " " + longdesc icd10final.append(line) # Splitting smaller Atoms for a in icd10final: cl = re.findall(r"\S+(?:\s\S+)*", a) icd10code.append(cl) # combining Atoms into one giant one for b in icd10code: if len(b) == 5: for c in b: tmp1 = tmp1 + '\'' + c + '\'' + ',' tmp1 = tmp1.rstrip(',') sql = 'INSERT INTO `icd10Code` VALUES ( ' + str(count) + ',' + tmp1 + ');' + '\n' result.append(sql) # very important be needing this line below tmp1, c, sql = "", "", "" count += 1 else: print("Danger, Will Robinson Danger ! -- What is infinity + infinity ! -- ") exit(1) f.close() return result def injectdatatodb(data): """ :param data: data holds the entire sql data. """ config = {'user': 'USERNAME', 'password': 'YOURPASSWORD', 'host': 'MYSQLServerHostName', 'database': 'icd10', 'raise_on_warnings': True} conn = mysql.connector.connect(**config) cur = conn.cursor(buffered=True) for sql in data: cur.execute(sql) conn.commit() cur.close() conn.close() return data def writetofile(icddata): """ :param icddata: icddata holds the entire sql data. """ icdfile = "icd10SQLData.sql" if sys.platform == "win32": path = os.getcwd() path = path + "\\" + icdfile f = open(path, 'a') for i in icddata: f.write(i) else: path = os.getcwd() path = path + "/" + icdfile f = open(path, 'a') for i in icddata: f.write(i) f.close() def main(): createicd10schema() print() print("Downloading data file from cms.gov.") getdata() print("Done Downloading Data file from cms.gov.") print() print("Parsing Data file to escape single and double quotes and extra spacing for short and long descriptions.") b = parsedata() print("Done parsing data.") print() print("Inserting Data into Database. ") c = injectdatatodb(b) print("Done inserting Data to Database.") print() print("Saving icd10cm_order_2016.txt and icd10SQLData.sql to your computer.") writetofile(c) print("Done Saving files to your computer.") printauthor() exit(0) # added proper main function if __name__ == "__main__": main()