2012年9月9日 星期日

上市櫃證券國際證券辨識號碼 => SQLite


Phase 1: Download remote content to local 

import os
import sys

class MigratePhase1WebToContent():

    def __init__(self):
        self._wget = os.path.abspath('../thirdparty/wget/wget.exe')
        assert os.path.isfile(self._wget)
 
    def migrate(self, src_uri, dest_content):
        dest_dir = os.path.dirname(dest_content)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        wget_cmdline = '''%s \"%s\" --waitretry=10 -O %s''' % (self._wget, src_uri, dest_content)
        os.system(wget_cmdline)

def main():
    m = MigratePhase1WebToContent()
    m.migrate('''http://brk.tse.com.tw:8000/isin/C_public.jsp?strMode=2''',
              '''./content/listed_company.html''')
    m.migrate('''http://brk.tse.com.tw:8000/isin/C_public.jsp?strMode=4''',
              '''./content/toc_company.html''')

if __name__ == '__main__':
    sys.exit(main())



Phase 2: Parse local content to CSV

"""
lxml install: http://www.lfd.uci.edu/~gohlke/pythonlibs/#lxml
"""
import csv
import os
import sys
from lxml import html

class MigratePhase2ContentToCsv():

    def __init__(self):
        pass
    
    def migrate(self, src_content, dest_csvfile):
        assert os.path.isfile(src_content)
        dest_dir = os.path.dirname(dest_csvfile)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        src_file_h = open(src_content)
        content = src_file_h.read()
        src_file_h.close()

        rows = []
        table = html.fromstring(content)
        for row in table.xpath('//body/table[@class="h4"]/tr'):
            columns = [ _.strip() for _ in row.xpath('./td/text()')]
            if len(columns) is 5:
                rows.append(columns[0].split() + columns[1:4] + [''] + columns[4:])
            elif len(columns) is 6:
                rows.append(columns[0].split() + columns[1:])

        csv_writer = csv.writer(open(dest_csvfile, 'w'), delimiter=',', quoting=csv.QUOTE_ALL)
        csv_writer.writerows(rows)

def main():
    m = MigratePhase2ContentToCsv()
    m.migrate('''./content/listed_company.html''',
              '''./csv/listed_company.csv''')
    m.migrate('''./content/toc_company.html''', 
              '''./csv/toc_company.csv''')

if __name__ == '__main__':
    sys.exit(main())


Phase 3: Import CSV into SQLite

SQLite schema (without normalization):

drop table if exists StockCode;

create table if not exists StockCode
(
    creation_dt datetime default current_timestamp,
    code text unique,
    name text unique,
    isin_code text unique,
    listing_date datetime,
    market_category text,
    industry_category text,
    cfi_code text
);


Python script:

import csv
import os
import sqlite3
import sys

class MigratePhase3CsvToSqlite():

    def __init__(self):
        self.sql_insert = '''insert or ignore into
            StockCode(code, name, isin_code, listing_date,
                      market_category, industry_category, cfi_code)
            values('%s', '%s', '%s', '%s', '%s', '%s', '%s')
            '''
     
    def migrate(self, src_csvfile, dest_dbfile):
        assert os.path.isfile(src_csvfile)
        assert os.path.isfile(dest_dbfile)
     
        conn = sqlite3.connect(dest_dbfile)
        cursor = conn.cursor()
        csv_reader = csv.reader(open(src_csvfile,"r"))
        for row in csv_reader:
            sql_cmd = self.sql_insert % (row[0], row[1], row[2], row[3],
                                         row[4], row[5], row[6])
            cursor.execute(sql_cmd)      
        conn.commit()
        cursor.close()
        conn.close()

def main():
    m = MigratePhase3CsvToSqlite()
    m.migrate('''./csv/listed_company.csv''',
              '''./db/stock_code.db''')
    m.migrate('''./csv/toc_company.csv''',
              '''./db/stock_code.db''')

if __name__ == '__main__':
    sys.exit(main())


沒有留言:

張貼留言