2012年9月24日 星期一

Refactoring - Economicstotal

Project Name: Economicstotal


Source code preview:


1. Sourcing manager
sourcing_manager.py

import logging
import sys

import src.common.logger as logger

def source_capacity_utilization():
    import src.capacity_utilization.sourcing as sourcing
    s = sourcing.Sourcing()
    s.source()

def source_industrial_capacity():
    import src.industrial_capacity.sourcing as sourcing
    s = sourcing.Sourcing()
    s.source()

def source_ism_report_on_business():
    import src.ism_report_on_business.sourcing as sourcing
    s = sourcing.Sourcing()
    s.source()
   
def source_wholesale_trade():
    import src.wholesale_trade.sourcing as sourcing
    s = sourcing.Sourcing()
    s.source()

def source_cpi_us():
    import src.cpi_us.sourcing as sourcing
    s = sourcing.Sourcing()
    s.source()

def source_sales_retail():
    import src.sales_retail.sourcing as sourcing
    s = sourcing.Sourcing()
    s.source()

def source_twse_market_capitalization():
    import src.twse_market_capitalization.sourcing as sourcing
    s = sourcing.Sourcing()
    s.source()
   
def main():
    logger.config_root(level=logging.DEBUG)
    #source_capacity_utilization()
    #source_industrial_capacity()
    #source_ism_report_on_business()
    #source_wholesale_trade()
    #source_cpi_us()
    #source_sales_retail()
    source_twse_market_capitalization()
   
if __name__ == '__main__':
    sys.exit(main())


2. Put all python source codes in ./src subdirectory
src/__init__.py ((None))


3. Common module
src/common/__init__.py ((None))
src/common/logger.py
import logging
import sys

FORMAT = "%(asctime)s %(filename)s [%(levelname)s] %(message)s"
DATEFMT = "%H:%M:%S"

def config_root(level=logging.INFO,
                threshold=logging.WARNING,
                format=FORMAT,
                datefmt=DATEFMT):
    root = logging.getLogger()
    root.setLevel(level)
    formatter = logging.Formatter(format, datefmt)

    stdout_handler = logging.StreamHandler(sys.stdout)
    stdout_handler.setLevel(level)
    stdout_handler.setFormatter(logging.Formatter(format, datefmt))
    root.addHandler(stdout_handler)

xlrd_util.py
import datetime
import xlrd

def cell_value(cell, datemode):
    if cell.ctype == xlrd.XL_CELL_DATE:
        t = xlrd.xldate_as_tuple(cell.value, datemode)
        if t[3:] == (0, 0, 0):
            return datetime.date(t[0], t[1], t[2])
        return datetime.date(t[0], t[1], t[2], t[3], t[4], t[5])
    if cell.ctype == xlrd.XL_CELL_EMPTY:
        return None
    if cell.ctype == xlrd.XL_CELL_BOOLEAN:
        return cell.value == 1
    return cell.value    

sourcing_template.py
import os
import re
import shutil
import sys

class SourcingTemplate():

    def __init__(self):
        self.URLS = []
        self.LOCAL_DIR = './'
        self.DEFLATED_DIR = './'

    def source(self):
        self.source_url_to_local(self.LOCAL_DIR)       
        for url in self.URLS:
            src_file = os.path.join(self.LOCAL_DIR, re.compile('https?://|ftp://').sub('', url))
            src_dir = os.path.dirname(src_file)
            self.source_local_to_deflated_batch(src_dir, self.DEFLATED_DIR)
            
    def source_url_to_local(self, dest_dir):
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for url in self.URLS:
            self.__wget(url, dest_dir)

    def source_local_to_deflated_batch(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            shutil.copy(os.path.join(src_dir, file), os.path.join(dest_dir, file))    
           
    def __wget(self, url, dest_dir):
        dest_file = os.path.join(dest_dir, re.compile('https?://|ftp://').sub('', url))
        dest_file_dir = os.path.dirname(dest_file)
        if not os.path.exists(dest_file_dir):
            os.makedirs(dest_file_dir)

        wget = os.path.abspath('./src/thirdparty/wget/wget.exe')
        assert os.path.isfile(wget)            
        wget_cmdline = '''%s -N \"%s\" --waitretry=3 -P %s''' % (wget, url, dest_file_dir)
        os.system(wget_cmdline)    


4. Sourcing capacity utilization. 
src/capacity_utilization/__init__.py ((None))
src/capacity_utilization/sourcing.py
"""
Web Content
===========
Data from January 1986 to present 
Capacity Utilization: Manufacturing, Mining, and Utilities
<http://www.federalreserve.gov/releases/g17/ipdisk/utl_sa.txt>

Data through 1985 
Capacity Utilization: Manufacturing, Mining, and Utilities
<http://www.federalreserve.gov/releases/g17/iphist/utlhist_sa.txt>

Record format:
<http://www.federalreserve.gov/releases/g17/download.htm>

The format for each line in the files consists of an industry code,
a year, and 12 months of data (to four decimal places) when available.
The data have various start dates; the earliest is 1919. 
"""
import logging
import os
import sqlite3

from ..common import logger
from ..common import sourcing_template

class Sourcing(sourcing_template.SourcingTemplate):

    def __init__(self):
        sourcing_template.SourcingTemplate.__init__(self)
        self.__logger = logging.getLogger()
        self.URLS = [
            'http://www.federalreserve.gov/releases/g17/ipdisk/utl_sa.txt',
            'http://www.federalreserve.gov/releases/g17/iphist/utlhist_sa.txt',
        ]
        self.LOCAL_DIR = './dataset/capacity_utilization/local/'
        self.DEFLATED_DIR = './dataset/capacity_utilization/deflated/'
        self.DB_FILE = './db/economicstotal.db'
        self.ITEM_PREFIX = '''\"B50001\"'''
        self.SQL_INSERT = '''replace into
            CapacityUtilization(period, number) values('%s', '%s')'''

    def source(self):
        sourcing_template.SourcingTemplate.source(self)
        self.source_deflated_to_sqlite_batch(self.DEFLATED_DIR, self.DB_FILE)

    def source_deflated_to_sqlite_batch(self, src_dir, dest_db):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            src_file = os.path.join(src_dir, file)
            self.source_deflated_to_sqlite(src_file, dest_db)
    
    def source_deflated_to_sqlite(self, src_file, dest_db):
        self.__logger.debug('''%s => %s''' % (src_file, dest_db))
        records = self.__build_db_records(src_file)
        self.__write_db(records, dest_db)
        
    def __build_db_records(self, src_file):
        src_file_fd = open(src_file, 'rb')
        content = src_file_fd.read()
        src_file_fd.close()

        records = []
        lines = content.decode('utf-8').split('\n')
        for line in lines:
            if line.startswith(self.ITEM_PREFIX):
                tokens = line.split()
                assert len(tokens) > 1
                year = tokens[1]
                for i in range(2, len(tokens)):
                    r = [
                        '''%s-%02d-01''' % (year, i - 1),
                        tokens[i]
                    ]
                    records.append(r)
        return records     

    def __write_db(self, records, dest_db):
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
      
        for r in records:
            sql_cmd = self.SQL_INSERT % (r[0], r[1])
            cursor.execute(sql_cmd)
            self.__logger.debug('''(period, number): (%s, %s) => %s'''
                                % (r[0], r[1], dest_db))
        conn.commit()
        cursor.close()
        conn.close()    

src/capacity_utilization/schema.sql
drop table if exists CapacityUtilization;

create table if not exists CapacityUtilization 
(
    creation_dt datetime default current_timestamp,
    period datetime not null,
    number text,
    revision int default 0,
    unique (period, revision) on conflict ignore
);


5.  Sourcing CPI US
src/cpi_us/__init__.py
src/cpi_us/sourcing.py
"""
<http://www.bls.gov/cpi/>
<ftp://ftp.bls.gov/pub/time.series/cu/>

U.S. All items, 1982-84=100 - CUUR0000SA0

Record example:
series_id year period value footnote_codes
CUUR0000SA0       2012 M06     229.478          
CUUR0000SA0       2012 M07     229.104          
CUUR0000SA0       2012 M08     230.379
"""
import logging
import os
import sqlite3

from ..common import logger
from ..common import sourcing_template

class Sourcing(sourcing_template.SourcingTemplate):

    def __init__(self):
        sourcing_template.SourcingTemplate.__init__(self)
        self.__logger = logging.getLogger()
        self.URLS = ['ftp://ftp.bls.gov/pub/time.series/cu/cu.data.1.AllItems']
        self.LOCAL_DIR = './dataset/cpi_us/local/'
        self.DEFLATED_DIR = './dataset/cpi_us/deflated/'
        self.DB_FILE = './db/economicstotal.db'    
        self.ITEM_PREFIX = 'CUUR0000SA0'
        self.SQL_INSERT = '''replace into
            CpiUs(period, number) values('%s', '%s')
            '''

    def source(self):
        sourcing_template.SourcingTemplate.source(self)
        self.source_deflated_to_sqlite_batch(self.DEFLATED_DIR, self.DB_FILE)

    def source_deflated_to_sqlite_batch(self, src_dir, dest_db):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            src_file = os.path.join(src_dir, file)
            self.source_deflated_to_sqlite(src_file, dest_db)            

    def source_deflated_to_sqlite(self, src_file, dest_db):
        self.__logger.debug('''%s => %s''' % (src_file, dest_db))
        records = self.__build_db_records(src_file)
        self.__write_db(records, dest_db)            

    def __build_db_records(self, src_file):
        src_file_fd = open(src_file, 'rb')
        content = src_file_fd.read()
        src_file_fd.close()

        records = []
        lines = content.decode('utf-8').split('\n')
        for line in lines:
            if line.startswith(self.ITEM_PREFIX):
                tokens = line.split()
                assert len(tokens) == 4
                
                # M13 indicates annual averages => Ignore
                if tokens[2] == 'M13':
                    continue
                
                r = [ '''%s-%s-01''' % (tokens[1], tokens[2][1:]), tokens[3] ]
                records.append(r)
        return records     

    def __write_db(self, records, dest_db):
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
      
        for r in records:
            sql_cmd = self.SQL_INSERT % (r[0], r[1])
            cursor.execute(sql_cmd)
            self.__logger.debug('''(period, number): (%s, %s) => %s'''
                                % (r[0], r[1], dest_db))
        
        conn.commit()
        cursor.close()
        conn.close()

src/cpi_us/schema.sql
drop table if exists CpiUs;

create table if not exists CpiUs 
(
    creation_dt datetime default current_timestamp,
    period datetime not null,
    number text,
    revision int default 0,
    unique (period, revision) on conflict ignore
);


6. Sourcing industrial capacity
src/industrial_capacity/__init__.py
src/industrial_capacity/sourcing.py
"""
Web Content
===========
Data from January 1986 to present 
Industrial Capacity: Manufacturing, Mining, and Utilities
<http://www.federalreserve.gov/releases/g17/ipdisk/cap_sa.txt>

Data through 1985 
Industrial Capacity: Manufacturing, Mining, and Utilities
<http://www.federalreserve.gov/releases/g17/iphist/caphist_sa.txt>

Record format:
<http://www.federalreserve.gov/releases/g17/download.htm>

The format for each line in the files consists of an industry code,
a year, and 12 months of data (to four decimal places) when available.
The data have various start dates; the earliest is 1919. 
"""
import logging
import os
import sqlite3

from ..common import logger
from ..common import sourcing_template

class Sourcing(sourcing_template.SourcingTemplate):

    def __init__(self):
        sourcing_template.SourcingTemplate.__init__(self)
        self.__logger = logging.getLogger()
        self.URLS = [
            'http://www.federalreserve.gov/releases/g17/ipdisk/cap_sa.txt',
            'http://www.federalreserve.gov/releases/g17/iphist/caphist_sa.txt',
        ]
        self.LOCAL_DIR = './dataset/industrial_capacity/local/'
        self.DEFLATED_DIR = './dataset/industrial_capacity/deflated/'
        self.DB_FILE = './db/economicstotal.db'      
        self.ITEM_PREFIX = '''\"B50001\"'''
        self.SQL_INSERT = '''replace into
            IndustrialCapacity(period, number) values('%s', '%s')'''

    def source(self):
        sourcing_template.SourcingTemplate.source(self)
        self.source_deflated_to_sqlite_batch(self.DEFLATED_DIR, self.DB_FILE)
            
    def source_deflated_to_sqlite_batch(self, src_dir, dest_db):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            src_file = os.path.join(src_dir, file)
            self.source_deflated_to_sqlite(src_file, dest_db)
    
    def source_deflated_to_sqlite(self, src_file, dest_db):
        self.__logger.debug('''%s => %s''' % (src_file, dest_db))
        records = self.__build_db_records(src_file)
        self.__write_db(records, dest_db)
       
    def __build_db_records(self, src_file):
        src_file_fd = open(src_file, 'rb')
        content = src_file_fd.read()
        src_file_fd.close()

        records = []
        lines = content.decode('utf-8').split('\n')
        for line in lines:
            if line.startswith(self.ITEM_PREFIX):
                tokens = line.split()
                assert len(tokens) > 1
                year = tokens[1]
                for i in range(2, len(tokens)):
                    r = [
                        '''%s-%02d-01''' % (year, i - 1),
                        tokens[i]
                    ]
                    records.append(r)
        return records     

    def __write_db(self, records, dest_db):
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
      
        for r in records:
            sql_cmd = self.SQL_INSERT % (r[0], r[1])
            cursor.execute(sql_cmd)
            self.__logger.debug('''(period, number): (%s, %s) => %s'''
                                % (r[0], r[1], dest_db))
        conn.commit()
        cursor.close()
        conn.close()    

src/industrial_capacity/schema.sql
drop table if exists IndustrialCapacity;

create table if not exists IndustrialCapacity 
(
    creation_dt datetime default current_timestamp,
    period datetime not null,
    number text,
    revision int default 0,
    unique (period,  revision) on conflict ignore
);

7. Sourcing ISM report on business
src/ism_report_on_business/__init__.py
src/ism_report_on_business/sourcing.py
"""
ISM Manufacturing Report On Business Historical Information
<http://www.ism.ws/ISMReport/content.cfm?ItemNumber=13339&navItemNumber=12958>

All Manufactuing Indexes in one file
<http://www.ism.ws/files/ISMReport/MfgAllIndexes12.xls>
"""
import datetime
import logging
import os
import sqlite3
import xlrd

from ..common import logger
from ..common import sourcing_template
from ..common import xlrd_util

class Sourcing(sourcing_template.SourcingTemplate):

    def __init__(self):
        sourcing_template.SourcingTemplate.__init__(self)
        self.__logger = logging.getLogger()
        self.URLS = ['http://www.ism.ws/files/ISMReport/MfgAllIndexes12.xls']
        self.LOCAL_DIR = './dataset/ism_report_on_business/local/'
        self.DEFLATED_DIR = './dataset/ism_report_on_business/deflated/'
        self.DB_FILE = './db/economicstotal.db'     
        self.SQL_INSERT = '''replace into
            IsmReportOnBusiness(period, item, number) values('%s', '%s', '%s')
            '''

    def source(self):
        sourcing_template.SourcingTemplate.source(self)
        self.source_deflated_to_sqlite_batch(self.DEFLATED_DIR, self.DB_FILE)

    def source_deflated_to_sqlite_batch(self, src_dir, dest_db):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            src_file = os.path.join(src_dir, file)
            self.source_deflated_to_sqlite(src_file, dest_db)            

    def source_deflated_to_sqlite(self, src_file, dest_db):
        self.__logger.debug('''%s => %s''' % (src_file, dest_db))
        records = self.__build_db_records(src_file)
        self.__write_db(records, dest_db)            

    def __build_db_records(self, src_file):
        book = xlrd.open_workbook(src_file)
        
        records = []
        for sheet_name in book.sheet_names():
            sheet = book.sheet_by_name(sheet_name)
            for i in range(sheet.nrows):
                cell = sheet.cell(i, 0)
                if cell.ctype == xlrd.XL_CELL_DATE:
                    cell_period = xlrd_util.cell_value(cell, book.datemode)
                    period = self.__align_date(cell_period)
                    item = sheet_name.replace('\'', '')
                    # PMI index on column 1. Others' index on column 5
                    number = sheet.cell_value(i, 1 if item == 'PMI' else 5)

                    if number or number != '':
                        r = [str(period), item, number]
                        records.append(r)
        return records
    
    def __write_db(self, records, dest_db):
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
      
        for r in records:
            sql_cmd = self.SQL_INSERT % (r[0], r[1], r[2])
            cursor.execute(sql_cmd)
            self.__logger.debug('''(%s, %s, %s) => %s''' % (r[0], r[1], r[2], dest_db))

        conn.commit()
        cursor.close()
        conn.close()

    # ISM Report on Business Excel has some non-aligned date
    def __align_date(self, date):
        return datetime.date(date.year, date.month, 1)

src/ism_report_on_business/schema.sql
drop table if exists IsmReportOnBusiness;

create table if not exists IsmReportOnBusiness 
(
    creation_dt datetime default current_timestamp,
    period datetime not null,
    item text,
    number text,
    revision int default 0,
    unique (period, item, revision) on conflict ignore
);

8. Sourcing Advance Monthly Sales for Retail and Food Services
src/sales_retail/__init__.py
src/sales_retail/sourcing.py
"""
<http://www.census.gov/econ/currentdata/export/csv?programCode=MARTS&timeSlotType=12&startYear=1992&endYear=2012&categoryCode=44X72&dataTypeCode=SM&geoLevelCode=US&adjusted=yes&errorData=no&internal=false>

CSV Content
===========
"U.S. Census Bureau"
"Source: Advance Monthly Sales for Retail and Food Services"
"44X72: Retail Trade and Food Services: U.S. Total"
"Seasonally Adjusted Sales - Monthly [Millions of Dollars]"
"Period: 1992 to 2012"
"Data Extracted on: September 17, 2012 (9:52 pm)"

Period,Value
Jan-1992,164083
Feb-1992,164260
Mar-1992,163747
Apr-1992,164759
May-1992,165610
Jun-1992,166089
"""
import csv
import datetime
import logging
import os
import sqlite3

from ..common import logger
from ..common import sourcing_template

class Sourcing(sourcing_template.SourcingTemplate):

    def __init__(self):
        sourcing_template.SourcingTemplate.__init__(self)
        self.__logger = logging.getLogger()
        self.URL_TEMPLATE = '''http://www.census.gov/econ/currentdata/export/csv?programCode=MARTS&timeSlotType=12&startYear=%d&endYear=%d&categoryCode=44X72&dataTypeCode=SM&geoLevelCode=US&adjusted=yes&errorData=no&internal=false'''
        self.LOCAL_DIR = './dataset/sales_retail/local/'
        self.DEFLATED_DIR = './dataset/sales_retail/deflated/'
        self.DB_FILE = './db/economicstotal.db'     
        self.SQL_INSERT = '''insert or ignore into
            AdvanceMonthlyRetailTradeReport(period, number) values('%s', '%s')
            '''
        self.__init_url()

    def source(self):
        sourcing_template.SourcingTemplate.source(self)
        self.source_deflated_to_sqlite_batch(self.DEFLATED_DIR, self.DB_FILE)

    def source_deflated_to_sqlite_batch(self, src_dir, dest_db):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            src_file = os.path.join(src_dir, file)
            self.source_deflated_to_sqlite(src_file, dest_db)            

    def source_deflated_to_sqlite(self, src_file, dest_db):
        self.__logger.debug('''%s => %s''' % (src_file, dest_db))
        assert os.path.isfile(src_file)
        assert os.path.isfile(dest_db)
        
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
        is_began = False
        csv_reader = csv.reader(open(src_file, 'r'))
        for row in csv_reader:
            if not is_began:
                if len(row) is 2:
                    is_began = True
                    continue
            elif row[1] != 'NA':
                period = self.__period_str(row[0])
                number = row[1]
                sql_cmd = self.SQL_INSERT % (period, number)
                cursor.execute(sql_cmd)

                self.__logger.debug('''(period, value): (%s, %s) => %s'''
                                    % (period, number, dest_db))
        conn.commit()
        cursor.close()
        conn.close()

    def __init_url(self):
        start_year = 1992
        end_year = datetime.datetime.now().year
        url = self.URL_TEMPLATE % (start_year, end_year)
        self.URLS.append(url)

    def __period_str(self, period):
        return datetime.datetime.strptime(period, '%b-%Y').strftime('%Y-%m-%d')

src/sales_retail/schema.sql
drop table if exists AdvanceMonthlyRetailTradeReport;

create table if not exists AdvanceMonthlyRetailTradeReport 
(
    creation_dt datetime default current_timestamp,
    period datetime not null,
    number text,
    revision int default 0,
    unique (period, revision) on conflict ignore
);

9. Sourcing TWSE market capitalization
src/twse_market_capitalization/__init__.py
src/twse_market_capitalization/sourcing.py
"""
<http://www.twse.com.tw/ch/statistics/statistics_week.php>
<http://www.twse.com.tw/ch/statistics/download/week.zip>
"""
import logging
import os
import sqlite3
import xlrd

from ..common import logger
from ..common import sourcing_template
from ..common import xlrd_util

class Sourcing(sourcing_template.SourcingTemplate):

    def __init__(self):
        sourcing_template.SourcingTemplate.__init__(self)
        self.__logger = logging.getLogger()
        self.URLS = ['http://www.twse.com.tw/ch/statistics/download/week.zip']
        self.LOCAL_DIR = './dataset/twse_market_capitalization/local/'
        self.DEFLATED_DIR = './dataset/twse_market_capitalization/deflated/'
        self.UNZIPPED_DIR = './dataset/twse_market_capitalization/unzipped/'
        self.DB_FILE = './db/economicstotal.db'     
        self.SQL_INSERT = '''replace into
            TwseMarketCapitalization(period, number) values('%s', '%s')
            '''

    def source(self):
        sourcing_template.SourcingTemplate.source(self)
        self.source_deflated_to_unzipped(self.DEFLATED_DIR, self.UNZIPPED_DIR)
        self.source_unzipped_to_sqlite_batch(self.UNZIPPED_DIR, self.DB_FILE)

    def source_deflated_to_unzipped_batch(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            src_file = os.path.join(src_dir, file)
            self.source_deflated_to_unzipped(src_file, dest_dir)

    def source_deflated_to_unzipped(self, src_file, dest_dir):
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        self.__unzip(src_file, dest_dir)
        
    def source_unzipped_to_sqlite_batch(self, src_dir, dest_db):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            src_file = os.path.join(src_dir, file)
            self.source_unzipped_to_sqlite(src_file, dest_db)            

    def source_unzipped_to_sqlite(self, src_file, dest_db):
        self.__logger.debug('''%s => %s''' % (src_file, dest_db))
        records = self.__build_db_records(src_file)
        self.__write_db(records, dest_db)            
        
    def __unzip(self, src_zip, dest_dir):
        sevenzip = os.path.abspath('./src/thirdparty/sevenzip/7z.exe')
        assert os.path.isfile(sevenzip)
        sevenzip_cmdline = '''%s e %s -aoa -o%s''' % \
                           (sevenzip, src_zip, dest_dir)
        os.system(sevenzip_cmdline)
        
    def __build_db_records(self, src_file):
        book = xlrd.open_workbook(src_file)
        sheet = book.sheet_by_index(0)

        records = []
        for i in range(2, sheet.nrows):
            r = [
                xlrd_util.cell_value(sheet.cell(i, 0), book.datemode),
                sheet.cell_value(i, 1)
            ]
            records.append(r)
        return records   
    
    def __write_db(self, records, dest_db):
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
      
        for r in records:
            sql_cmd = self.SQL_INSERT % (r[0], r[1])
            cursor.execute(sql_cmd)
            self.__logger.debug('''(period, number): (%s, %s) => %s'''
                                % (r[0], r[1], dest_db))
                                
        conn.commit()
        cursor.close()
        conn.close()

src/twse_market_capitalization/schema.sql
drop table if exists TwseMarketCapitalization;

create table if not exists TwseMarketCapitalization 
(
    creation_dt datetime default current_timestamp,
    period datetime not null,
    number text,
    revision int default 0,
    unique (period, revision) on conflict ignore
);


10. Sourcing Monthly Wholesale Trade Report
src/wholesale_trade/__init__.py
src/wholesale_trade/sourcing.py
"""
<http://www.census.gov/wholesale/>
<http://www2.census.gov/wholesale/xls/mwts/historic1.xls>

Record format: (period_date, item, number)
Reference: Package xlrd - https://github.com/takluyver/xlrd/zipball/py3
"""
import logging
import os
import sqlite3
import xlrd

from ..common import logger
from ..common import sourcing_template

class Sourcing(sourcing_template.SourcingTemplate):

    def __init__(self):
        sourcing_template.SourcingTemplate.__init__(self)
        self.__logger = logging.getLogger()
        self.URLS = ['http://www2.census.gov/wholesale/xls/mwts/historic1.xls']
        self.LOCAL_DIR = './dataset/wholesale_trade/local/'
        self.DEFLATED_DIR = './dataset/wholesale_trade/deflated/'
        self.DB_FILE = './db/economicstotal.db'     
        self.ITEMS = [
            'sales',
            'inventories',
            'inventories_sales_ratios',
        ]
        self.SQL_INSERT = '''replace into
            MonthlyWholesaleTradeReport(period, item, number)
            values('%s', '%s', '%s')
            '''

    def source(self):
        sourcing_template.SourcingTemplate.source(self)
        self.source_deflated_to_sqlite_batch(self.DEFLATED_DIR, self.DB_FILE)

    def source_deflated_to_sqlite_batch(self, src_dir, dest_db):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            src_file = os.path.join(src_dir, file)
            self.source_deflated_to_sqlite(src_file, dest_db)            

    def source_deflated_to_sqlite(self, src_file, dest_db):
        self.__logger.debug('''%s => %s''' % (src_file, dest_db))
        records = self.__build_db_records(src_file)
        self.__write_db(records, dest_db)            

    def __build_db_records(self, src_file):
        book = xlrd.open_workbook(src_file)
        sheet = book.sheet_by_index(0)

        data_rows = {}
        for i in range(sheet.nrows):
            if sheet.cell_value(i, 0) == 'NAICS Code':
                data_rows[i] = 'P' # Period
            elif sheet.cell_value(i, 0) == 42:
                data_rows[i] = 'N' # Number

        records = []
        curr_year = None
        curr_item = -1        
        for key in sorted(data_rows):
            if data_rows[key] == 'P':
                curr_year = sheet.cell_value(key, 3)
            if data_rows[key] == 'N':
                curr_item = (curr_item + 1) % 3
                for month in range(12):
                    r = [
                        '''%s-%02d-01''' % (curr_year, month + 1),
                        self.ITEMS[curr_item],
                        sheet.cell_value(key, month + 3),
                    ]
                    records.append(r)
        return records   
    
    def __write_db(self, records, dest_db):
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
      
        for r in records:
            if r[2] != 'NA':
                sql_cmd = self.SQL_INSERT % (r[0], r[1], r[2])
                cursor.execute(sql_cmd)
                self.__logger.debug('''(period, item, number): (%s, %s, %s) => %s'''
                                    % (r[0], r[1], r[2], dest_db))
        
        conn.commit()
        cursor.close()
        conn.close()

src/wholesale_trade/schema.sql
drop table if exists MonthlyWholesaleTradeReport;

create table if not exists MonthlyWholesaleTradeReport 
(
    creation_dt datetime default current_timestamp,
    period datetime not null,
    item text,
    number text,
    revision int default 0,
    unique (period, item, revision) on conflict ignore
);


11. Third party tools
src/thirdparty/sevenzip/7z.exe
src/thirdparty/wget/wget.exe

沒有留言:

張貼留言