SQLite3 Schema:
create table if not exists OperatingIncome
(
creation_dt datetime default current_timestamp,
report_date datetime not null,
stock_code text not null,
activity_date datetime not null,
income real,
unique (report_date, stock_code, activity_date) on conflict ignore
);
Python sourcing.py:
import csv
import logging
import os
import xlrd
import datetime
from lxml import html
from ..common import sourcing_mops
from ..common import date_util as date_util
class Sourcing(sourcing_mops.SourcingMops):
def __init__(self):
self.LOGGER = logging.getLogger()
self.URL_TEMPLATE = '''http://mops.twse.com.tw/t21/sii/t21sc03_%s_%s.html'''
self.DATES = []
self.HTML_DIR = '''./dataset/operating_income/html/'''
self.CSV_DIR = '''./dataset/operating_income/csv/'''
self.DB_FILE = './db/stocktotal.db'
self.SQL_INSERT = '''insert or ignore into OperatingIncome(
report_date,
stock_code,
activity_date,
income
) values(?, ?, ?, ?)'''
def source(self, begin_date, end_date):
sourcing_mops.SourcingMops.init_dates(self, begin_date, end_date)
sourcing_mops.SourcingMops.source_url_to_html(self, self.HTML_DIR)
self.source_html_to_csv(self.HTML_DIR, self.CSV_DIR)
sourcing_mops.SourcingMops.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)
def source_html_to_csv(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for date in reversed(self.DATES):
self.source_html_to_csv_single(src_dir, dest_dir, date)
def source_html_to_csv_single(self, src_dir, dest_dir, date):
src_file = self.get_filename(src_dir, date, 'html')
dest_file = self.get_filename(dest_dir, date, 'csv')
self.LOGGER.debug('''%s => %s''' % (src_file, dest_file))
assert os.path.isfile(src_file)
dest_fd = open(dest_file, 'w', newline='')
csv_writer = csv.writer(dest_fd)
src_fd = open(src_file, 'rb')
src_content = src_fd.read()
src_fd.close()
content = None
try:
content = html.fromstring(src_content.decode('big5-hkscs').replace(' ', ' '))
except UnicodeDecodeError as e:
self.LOGGER.debug(e)
content = html.fromstring(src_content.decode('gb18030').replace(' ', ' '))
for category in content.xpath('//html/body/center/table'):
for co_list in category.xpath('./tr/td[@colspan="2"]/table'):
for co in co_list.xpath('./tr[@align="right"]'):
# Ignore summary of this category
summary = co.xpath('./th/text()')
if len(summary) is 1:
continue
items = co.xpath('./td/text()')
assert len(items) is 10
stock_code = items[0]
this_month_record = [
date,
stock_code,
date_util.get_this_month_by(date),
items[2].strip().replace(',','')
]
last_month_record = [
date,
stock_code,
date_util.get_last_month_by(date),
items[3].strip().replace(',','')
]
last_year_record = [
date,
stock_code,
date_util.get_last_year_by(date),
items[4].strip().replace(',','')
]
csv_writer.writerow(this_month_record)
csv_writer.writerow(last_month_record)
csv_writer.writerow(last_year_record)
dest_fd.close()
def get_url(self, date):
return self.URL_TEMPLATE % (date.year - 1911, date.month)
def get_filename(self, src_dir, date, ext):
return os.path.join(src_dir, date.strftime('%Y-%m') + '.' + ext)
sourcing_mops.py
((後來想想,還是得用 date => url,然後把 date 記起來,這樣才方便。之後再來想怎麼 refactoring 吧。))
# coding: big5
import csv
import logging
import os
import shutil
import sqlite3
from datetime import date
from datetime import datetime
class SourcingMops():
def __init__(self):
self.LOGGER = logging.getLogger()
self.DATES = []
self.HTML_DIR = ''
self.XLS_DIR = ''
self.CSV_DIR = ''
self.DB_FILE = './db/stocktotal.db'
self.SQL_INSERT = ''
def init_dates(self, begin_date, end_date):
begin = datetime.strptime(begin_date, '%Y-%m-%d')
end = datetime.strptime(end_date, '%Y-%m-%d')
monthly_begin = 12 * begin.year + begin.month - 1
monthly_end = 12 * end.year + end.month
for monthly in range(monthly_begin, monthly_end):
year, month = divmod(monthly, 12)
self.DATES.append(date(year, month + 1, 1))
def source_url_to_html(self, dest_dir):
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for date in self.DATES:
url = self.get_url(date)
dest_file = self.get_filename(dest_dir, date, 'html')
self.__wget(url, dest_file)
def source_zip_to_xls(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for date in self.DATES:
src_file = self.get_filename(src_dir, date, 'zip')
dest_file = self.get_filename(dest_dir, date, 'xls')
self.source_zip_to_xls_single(src_file, dest_dir, dest_file)
def source_zip_to_xls_single(self, src_file, dest_dir, dest_file):
assert os.path.isfile(src_file)
assert os.path.isdir(dest_dir)
sevenzip_output_dir = os.path.join(dest_dir, 'sevenzip_output_dir')
self.__sevenzip_extract(src_file, sevenzip_output_dir)
if not os.path.exists(sevenzip_output_dir):
self.LOGGER.info('''%s => Failure to extract''' % src_file)
return
file_list = os.listdir(sevenzip_output_dir)
assert len(file_list) is 1
sevenzip_output_file = os.path.join(sevenzip_output_dir, file_list[0])
shutil.copy(sevenzip_output_file, dest_file)
shutil.rmtree(sevenzip_output_dir)
def source_csv_to_sqlite(self, src_dir, dest_db, sql_insert):
assert os.path.isdir(src_dir)
assert os.path.isfile(dest_db)
for date in self.DATES:
src_file = self.get_filename(src_dir, date, 'csv')
if os.path.isfile(src_file):
self.source_csv_to_sqlite_single(src_file, dest_db, sql_insert)
def source_csv_to_sqlite_single(self, src_file, dest_db, sql_insert):
self.LOGGER.debug('''%s => %s''' % (src_file, dest_db))
fd = open(src_file, 'r')
csv_reader = csv.reader(fd)
conn = sqlite3.connect(dest_db)
cursor = conn.cursor()
for row in csv_reader:
cursor.execute(sql_insert, row)
self.LOGGER.debug(row)
conn.commit()
cursor.close()
conn.close()
fd.close()
def get_url(self, date):
pass
def get_filename(self, src_dir, date, ext):
pass
def __wget(self, url, dest_file):
wget = os.path.abspath('./src/thirdparty/wget/wget.exe')
assert os.path.isfile(wget)
wget_cmdline = '''%s -N \"%s\" --waitretry=3 -O \"%s\"''' % (wget, url, dest_file)
os.system(wget_cmdline)
def __sevenzip_extract(self, src_file, dest_dir):
sevenzip = os.path.abspath('./src/thirdparty/sevenzip/7z.exe')
assert os.path.isfile(sevenzip)
sevenzip_cmdline = '''%s e %s -y -o%s''' % (sevenzip, src_file, dest_dir)
os.system(sevenzip_cmdline)
date_util.py
import datetime
def get_last_month():
today = datetime.date.today()
first = datetime.date(day=1, month=today.month, year=today.year)
last_month = first - datetime.timedelta(days=1)
return datetime.date(day=1, month=last_month.month, year=last_month.year)
def get_this_month():
today = datetime.date.today()
return datetime.date(day=1, month=today.month, year=today.year)
def get_yesterday():
return datetime.date.today() - datetime.timedelta(days=1)
def get_last_month_by(someday):
first = datetime.date(day=1, month=someday.month, year=someday.year)
last_month = first - datetime.timedelta(days=1)
return datetime.date(day=1, month=last_month.month, year=last_month.year)
def get_this_month_by(someday):
return datetime.date(day=1, month=someday.month, year=someday.year)
def get_last_year_by(someday):
first = datetime.date(day=1, month=someday.month, year=someday.year)
assert first.day is 1
return datetime.date(day=1, month=first.month, year=first.year-1)
沒有留言:
張貼留言