Phase 1: Download remote content to local
import os
import sys
class MigratePhase1WebToContent():
def __init__(self):
self._wget = os.path.abspath('../thirdparty/wget/wget.exe')
assert os.path.isfile(self._wget)
def migrate(self, src_uri, dest_content):
dest_dir = os.path.dirname(dest_content)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
wget_cmdline = '''%s \"%s\" --waitretry=10 -O %s''' % (self._wget, src_uri, dest_content)
os.system(wget_cmdline)
def main():
m = MigratePhase1WebToContent()
m.migrate('''http://brk.tse.com.tw:8000/isin/C_public.jsp?strMode=2''',
'''./content/listed_company.html''')
m.migrate('''http://brk.tse.com.tw:8000/isin/C_public.jsp?strMode=4''',
'''./content/toc_company.html''')
if __name__ == '__main__':
sys.exit(main())
Phase 2: Parse local content to CSV
"""
lxml install: http://www.lfd.uci.edu/~gohlke/pythonlibs/#lxml
"""
import csv
import os
import sys
from lxml import html
class MigratePhase2ContentToCsv():
def __init__(self):
pass
def migrate(self, src_content, dest_csvfile):
assert os.path.isfile(src_content)
dest_dir = os.path.dirname(dest_csvfile)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
src_file_h = open(src_content)
content = src_file_h.read()
src_file_h.close()
rows = []
table = html.fromstring(content)
for row in table.xpath('//body/table[@class="h4"]/tr'):
columns = [ _.strip() for _ in row.xpath('./td/text()')]
if len(columns) is 5:
rows.append(columns[0].split() + columns[1:4] + [''] + columns[4:])
elif len(columns) is 6:
rows.append(columns[0].split() + columns[1:])
csv_writer = csv.writer(open(dest_csvfile, 'w'), delimiter=',', quoting=csv.QUOTE_ALL)
csv_writer.writerows(rows)
def main():
m = MigratePhase2ContentToCsv()
m.migrate('''./content/listed_company.html''',
'''./csv/listed_company.csv''')
m.migrate('''./content/toc_company.html''',
'''./csv/toc_company.csv''')
if __name__ == '__main__':
sys.exit(main())
Phase 3: Import CSV into SQLite
SQLite schema (without normalization):
drop table if exists StockCode;
create table if not exists StockCode
(
creation_dt datetime default current_timestamp,
code text unique,
name text unique,
isin_code text unique,
listing_date datetime,
market_category text,
industry_category text,
cfi_code text
);
Python script:
import csv
import os
import sqlite3
import sys
class MigratePhase3CsvToSqlite():
def __init__(self):
self.sql_insert = '''insert or ignore into
StockCode(code, name, isin_code, listing_date,
market_category, industry_category, cfi_code)
values('%s', '%s', '%s', '%s', '%s', '%s', '%s')
'''
def migrate(self, src_csvfile, dest_dbfile):
assert os.path.isfile(src_csvfile)
assert os.path.isfile(dest_dbfile)
conn = sqlite3.connect(dest_dbfile)
cursor = conn.cursor()
csv_reader = csv.reader(open(src_csvfile,"r"))
for row in csv_reader:
sql_cmd = self.sql_insert % (row[0], row[1], row[2], row[3],
row[4], row[5], row[6])
cursor.execute(sql_cmd)
conn.commit()
cursor.close()
conn.close()
def main():
m = MigratePhase3CsvToSqlite()
m.migrate('''./csv/listed_company.csv''',
'''./db/stock_code.db''')
m.migrate('''./csv/toc_company.csv''',
'''./db/stock_code.db''')
if __name__ == '__main__':
sys.exit(main())
沒有留言:
張貼留言