Sourcing Phase 1 - Download:
"""
Example:
http://www2.census.gov/wholesale/xls/mwts/historic1.xls
"""
import os
import re
import sys
class MigrateWebToContent():
def __init__(self):
self.url = '''http://www2.census.gov/wholesale/xls/mwts/historic1.xls'''
self.wget = os.path.abspath('../thirdparty/wget/wget.exe')
assert os.path.isfile(self.wget)
def migrate(self, dest_dir):
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
self.__wget(self.url, dest_dir)
def __wget(self, url, dest_dir):
dest_file = os.path.join(dest_dir, re.compile('https?://').sub('', url))
dest_file_dir = os.path.dirname(dest_file)
if not os.path.exists(dest_file_dir):
os.makedirs(dest_file_dir)
wget_cmdline = '''%s \"%s\" --waitretry=3 -P %s''' % (self.wget, url, dest_file_dir)
os.system(wget_cmdline)
def main():
m = MigrateWebToContent()
m.migrate('./content/')
if __name__ == '__main__':
sys.exit(main())
Sourcing Phase 2 - Rename:
import logging
import os
import shutil
import sys
import logger
class MigrateContentToRenamed():
def __init__(self):
self.__logger = logging.getLogger()
def migrate_batch(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for file in os.listdir(src_dir):
shutil.copy(os.path.join(src_dir, file), os.path.join(dest_dir, file))
def main():
logger.config_root(level=logging.INFO)
m = MigrateContentToRenamed()
m.migrate_batch('./content/www2.census.gov/wholesale/xls/mwts',
'./renamed/')
if __name__ == '__main__':
sys.exit(main())
Sourcing Phase 3 - Write to SQLite:
"""
Schema:
drop table if exists MonthlyWholesaleTradeReport;
create table if not exists MonthlyWholesaleTradeReport
(
creation_dt datetime default current_timestamp,
period datetime not null,
item text,
number text,
revision int default 0,
unique (period, item, number, revision) on conflict ignore
);
Schema:
drop table if exists MonthlyWholesaleTradeReport;
create table if not exists MonthlyWholesaleTradeReport
(
creation_dt datetime default current_timestamp,
period datetime not null,
item text,
number text,
revision int default 0,
unique (period, item, number, revision) on conflict ignore
);
Record format: (period_date, item, number)
Reference: Package xlrd - https://github.com/takluyver/xlrd/zipball/py3
"""
import logging
import os
import sqlite3
import sys
import xlrd
import logger
class MigrateRenamedToSqlite():
def __init__(self):
self.__logger = logging.getLogger()
self.items = [
'sales',
'inventories',
'inventories_sales_ratios',
]
self.sql_insert = '''replace into
MonthlyWholesaleTradeReport(period, item, number)
values('%s', '%s', '%s')
'''
def migrate_batch(self, src_dir, dest_db):
assert os.path.isdir(src_dir)
for file in os.listdir(src_dir):
src_xls = os.path.join(src_dir, file)
self.migrate(src_xls, dest_db)
def migrate(self, src_xls, dest_db):
self.__logger.debug('''%s => %s''' % (src_xls, dest_db))
records = self.__fetch_records(src_xls)
self.__write_db(records, dest_db)
def __fetch_records(self, src_xls):
book = xlrd.open_workbook(src_xls)
sheet = book.sheet_by_index(0)
data_rows = {}
for i in range(sheet.nrows):
if sheet.cell_value(i, 0) == 'NAICS Code':
data_rows[i] = 'P' # Period
elif sheet.cell_value(i, 0) == 42:
data_rows[i] = 'N' # Number
records = []
curr_year = None
curr_item = -1
for key in sorted(data_rows):
if data_rows[key] == 'P':
curr_year = sheet.cell_value(key, 3)
if data_rows[key] == 'N':
curr_item = (curr_item + 1) % 3
for month in range(12):
r = [
'''%s-%02d-01''' % (curr_year, month + 1),
self.items[curr_item],
sheet.cell_value(key, month + 3),
]
records.append(r)
return records
def __write_db(self, records, dest_db):
conn = sqlite3.connect(dest_db)
cursor = conn.cursor()
for r in records:
if r[2] != 'NA':
sql_cmd = self.sql_insert % (r[0], r[1], r[2])
cursor.execute(sql_cmd)
#print(sql_cmd)
self.__logger.debug('''(period, item, number): (%s, %s, %s) => %s'''
% (r[0], r[1], r[2], dest_db))
conn.commit()
cursor.close()
conn.close()
def main():
logger.config_root(level=logging.DEBUG)
m = MigrateRenamedToSqlite()
m.migrate_batch('./renamed', '../db/economicstotal.db')
if __name__ == '__main__':
sys.exit(main())
沒有留言:
張貼留言