2012年9月18日 星期二

Monthly Wholesale Trade Report

Link: http://www.census.gov/wholesale/



Sourcing Phase 1 - Download:


"""
Example:
    http://www2.census.gov/wholesale/xls/mwts/historic1.xls
"""
import os
import re
import sys

class MigrateWebToContent():

    def __init__(self):
        self.url = '''http://www2.census.gov/wholesale/xls/mwts/historic1.xls'''
        self.wget = os.path.abspath('../thirdparty/wget/wget.exe')
        assert os.path.isfile(self.wget)

    def migrate(self, dest_dir):
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        self.__wget(self.url, dest_dir)

    def __wget(self, url, dest_dir):
        dest_file = os.path.join(dest_dir, re.compile('https?://').sub('', url))
        dest_file_dir = os.path.dirname(dest_file)
        if not os.path.exists(dest_file_dir):
            os.makedirs(dest_file_dir)
     
        wget_cmdline = '''%s \"%s\" --waitretry=3 -P %s''' % (self.wget, url, dest_file_dir)
        os.system(wget_cmdline)

def main():
    m = MigrateWebToContent()
    m.migrate('./content/')

if __name__ == '__main__':
    sys.exit(main())



Sourcing Phase 2 - Rename:

import logging
import os
import shutil
import sys

import logger

class MigrateContentToRenamed():

    def __init__(self):
        self.__logger = logging.getLogger()
    
    def migrate_batch(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            shutil.copy(os.path.join(src_dir, file), os.path.join(dest_dir, file))

def main():
    logger.config_root(level=logging.INFO)
    m = MigrateContentToRenamed()
    m.migrate_batch('./content/www2.census.gov/wholesale/xls/mwts',
                    './renamed/')
if __name__ == '__main__':
    sys.exit(main())



Sourcing Phase 3 - Write to SQLite:

"""
Schema:

drop table if exists MonthlyWholesaleTradeReport;

create table if not exists MonthlyWholesaleTradeReport 
(
    creation_dt datetime default current_timestamp,
    period datetime not null,
    item text,
    number text,
    revision int default 0,
    unique (period, item, number, revision) on conflict ignore
);


Record format: (period_date, item, number)
Reference: Package xlrd - https://github.com/takluyver/xlrd/zipball/py3
"""
import logging
import os
import sqlite3
import sys
import xlrd

import logger

class MigrateRenamedToSqlite():

    def __init__(self):
        self.__logger = logging.getLogger()
        self.items = [
            'sales',
            'inventories',
            'inventories_sales_ratios',
        ]
        self.sql_insert = '''replace into
            MonthlyWholesaleTradeReport(period, item, number)
            values('%s', '%s', '%s')
            '''
        
    def migrate_batch(self, src_dir, dest_db):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            src_xls = os.path.join(src_dir, file)
            self.migrate(src_xls, dest_db)

    def migrate(self, src_xls, dest_db):
        self.__logger.debug('''%s => %s''' % (src_xls, dest_db))
        records = self.__fetch_records(src_xls)
        self.__write_db(records, dest_db)

    def __fetch_records(self, src_xls):
        book = xlrd.open_workbook(src_xls)
        sheet = book.sheet_by_index(0)

        data_rows = {}
        for i in range(sheet.nrows):
            if sheet.cell_value(i, 0) == 'NAICS Code':
                data_rows[i] = 'P' # Period
            elif sheet.cell_value(i, 0) == 42:
                data_rows[i] = 'N' # Number

        records = []
        curr_year = None
        curr_item = -1        
        for key in sorted(data_rows):
            if data_rows[key] == 'P':
                curr_year = sheet.cell_value(key, 3)
            if data_rows[key] == 'N':
                curr_item = (curr_item + 1) % 3
                for month in range(12):
                    r = [
                        '''%s-%02d-01''' % (curr_year, month + 1),
                        self.items[curr_item],
                        sheet.cell_value(key, month + 3),
                    ]
                    records.append(r)
        return records     

    def __write_db(self, records, dest_db):
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
      
        for r in records:
            if r[2] != 'NA':
                sql_cmd = self.sql_insert % (r[0], r[1], r[2])
                cursor.execute(sql_cmd)
                #print(sql_cmd)
                self.__logger.debug('''(period, item, number): (%s, %s, %s) => %s'''
                                    % (r[0], r[1], r[2], dest_db))
        
        conn.commit()
        cursor.close()
        conn.close()

def main():
    logger.config_root(level=logging.DEBUG)
    m = MigrateRenamedToSqlite()
    m.migrate_batch('./renamed', '../db/economicstotal.db')
if __name__ == '__main__':
    sys.exit(main())

沒有留言:

張貼留言