Source Code Structure
.\sourcing_balance_sheet.py
.\sourcing_cash_flow_stmt.py
.\sourcing_income_stmt.py
.\src\balance_sheet\sourcing.py
.\src\cash_flow_stmt\sourcing.py
.\src\income_stmt\sourcing.py
.\src\common\sourcing_base.py
.\src\common\logger.py
Details
.\sourcing_balance_sheet.py
import logging
import sys
import src.stock_code.getter as getter
import src.balance_sheet.sourcing as sourcing
import src.common.logger as logger
def source_url_to_local():
logger.config_root(level=logging.DEBUG)
s = sourcing.Sourcing()
s.source_url_to_local('1101', 2010, 4)
def source_local_to_sqlite():
logger.config_root(level=logging.INFO)
g = getter.Getter()
s = sourcing.Sourcing()
for stock_code in g.get():
try:
s.source_local_to_sqlite(stock_code)
except AssertionError as e:
print("Assertion error: {0}".format(stock_code))
if __name__ == '__main__':
#sys.exit(source_url_to_local())
sys.exit(source_local_to_sqlite())
.\sourcing_cash_flow_stmt.py
import logging
import sys
import src.stock_code.getter as getter
import src.cash_flow_stmt.sourcing as sourcing
import src.common.logger as logger
def source_url_to_local():
logger.config_root(level=logging.DEBUG)
s = sourcing.Sourcing()
s.source_url_to_local('1101', 2010, 4)
def source_local_to_sqlite():
logger.config_root(level=logging.DEBUG)
g = getter.Getter()
s = sourcing.Sourcing()
for stock_code in g.get():
try:
s.source_local_to_sqlite(stock_code)
except AssertionError as e:
print("Assertion error: {0}".format(stock_code))
if __name__ == '__main__':
#sys.exit(source_url_to_local())
sys.exit(source_local_to_sqlite())
.\sourcing_income_stmt.py
import logging
import sys
import src.stock_code.getter as getter
import src.income_stmt.sourcing as sourcing
import src.common.logger as logger
def source_url_to_local():
logger.config_root(level=logging.DEBUG)
s = sourcing.Sourcing()
s.source_url_to_local('1101', 2010, 4)
def source_local_to_sqlite():
logger.config_root(level=logging.DEBUG)
g = getter.Getter()
s = sourcing.Sourcing()
for stock_code in g.get():
try:
s.source_local_to_sqlite(stock_code)
except AssertionError as e:
print("Assertion error: {0}".format(stock_code))
if __name__ == '__main__':
#sys.exit(source_url_to_local())
sys.exit(source_local_to_sqlite())
.\src\balance_sheet\sourcing.py
import os
from ..common import sourcing_base
class Sourcing(sourcing_base.SourcingBase):
def __init__(self):
sourcing_base.SourcingBase.__init__(self)
self.SQL_INSERT = '''insert or ignore into
BalanceSheet(stock_code, report_type, report_date, activity_date, item, number)
values(?, ?, ?, ?, ?, ?)
'''
def source_url_to_local(self, stock_code, year, season):
self.__init_dirs(stock_code)
self.__init_urls(stock_code, year, season)
sourcing_base.SourcingBase.source_url_to_local(self, self.LOCAL_DIR)
def source_local_to_sqlite(self, stock_code):
self.__init_dirs(stock_code)
#local_file_dir = os.path.join(self.LOCAL_DIR, 'mops.twse.com.tw\mops\web')
#sourcing_base.SourcingBase.source_local_to_deflated(self, local_file_dir, self.DEFLATED_DIR)
#sourcing_base.SourcingBase.source_deflated_to_csv(self, self.DEFLATED_DIR, self.CSV_DIR)
sourcing_base.SourcingBase.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)
def __init_dirs(self, stock_code):
self.LOCAL_DIR = os.path.join('./dataset/balance_sheet/local/', stock_code)
self.DEFLATED_DIR = os.path.join('./dataset/balance_sheet/deflated/', stock_code)
self.CSV_DIR = os.path.join('./dataset/balance_sheet/csv/', stock_code)
def __init_urls(self, stock_code, year, season):
self.URLS = [
self.URL_TEMPLATE % ('t05st32', stock_code, season, year - 1911),
self.URL_TEMPLATE % ('t05st34', stock_code, season, year - 1911),
]
.\src\cash_flow_stmt\sourcing.py
import csv
import logging
import os
import shutil
from lxml import html
from ..common import logger
from ..common import sourcing_base
class Sourcing(sourcing_base.SourcingBase):
def __init__(self):
sourcing_base.SourcingBase.__init__(self)
self.TEXT_DIR = ''
self.ITEM_PREFIXES = {
'Operating' : [
'營業活動',
'│營業活動'
],
'Investing' : [
'投資活動',
'│投資活動'
],
'Financing' : [
'融資活動',
'│融資活動',
'理財活動',
'不影響現金流量之融資活動'
],
}
self.SQL_INSERT = '''insert or ignore into
CashFlowStmt(stock_code, report_type, report_date, activity_date, item, number)
values(?, ?, ?, ?, ?, ?)
'''
def source_url_to_local(self, stock_code, year, season):
self.__init_dirs(stock_code)
self.__init_urls(stock_code, year, season)
sourcing_base.SourcingBase.source_url_to_local(self, self.LOCAL_DIR)
def source_local_to_sqlite(self, stock_code):
self.__init_dirs(stock_code)
#local_file_dir = os.path.join(self.LOCAL_DIR, 'mops.twse.com.tw\mops\web')
#sourcing_base.SourcingBase.source_local_to_deflated(self, local_file_dir, self.DEFLATED_DIR)
#self.source_deflated_to_text(self.DEFLATED_DIR, self.TEXT_DIR)
#self.source_text_to_csv(self.TEXT_DIR, self.CSV_DIR)
sourcing_base.SourcingBase.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)
def source_deflated_to_text(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for file in os.listdir(src_dir):
file_name, file_ext = os.path.splitext(file)
txt_file = os.path.join(dest_dir, file_name + '.txt')
self.source_deflated_to_text_single(os.path.join(src_dir, file), txt_file)
def source_deflated_to_text_single(self, src_file, dest_file):
self.LOGGER.debug('''%s => %s''' % (src_file, dest_file))
if os.path.getsize(src_file) is 0:
shutil.copy(src_file, dest_file)
return
src_file_fd = open(src_file, 'rb')
content = src_file_fd.read()
src_file_fd.close()
table = b''
try:
table = html.fromstring(content.decode('utf-8'))
except UnicodeDecodeError as e:
self.LOGGER.debug(e)
table = html.fromstring(content.decode('big5'))
xpath_stmt = table.xpath('//body/table[@class="hasBorder"]/tr/td/pre/text()')
if len(xpath_stmt) is 1:
with open(dest_file, 'w', encoding='utf-8') as fd:
fd.write(xpath_stmt[0].strip())
return
xpath_no_record = table.xpath('//body/center/h3/text()')
if len(xpath_no_record) is 1:
with open(dest_file, 'w', encoding='utf-8') as fd:
fd.write(xpath_no_record[0].strip())
return
def source_text_to_csv(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for file in os.listdir(src_dir):
file_name, file_ext = os.path.splitext(file)
csv_file = os.path.join(dest_dir, file_name + '.csv')
self.source_text_to_csv_single(os.path.join(src_dir, file), csv_file)
def source_text_to_csv_single(self, src_txt, dest_csv):
self.LOGGER.debug('''%s => %s''' % (src_txt, dest_csv))
fd = open(src_txt, 'rb')
content = fd.read()
fd.close()
lines = content.decode('utf-8').split('\n')
# No record
if len(lines) is 1:
msg = lines[0]
if msg in self.WHITE_MSG:
self.LOGGER.info('''%s => %s => No record''' % (src_txt, msg))
else:
self.LOGGER.error('''%s => %s''' % (src_txt, msg))
# Has record
else:
items = self.__fetch_items(lines)
rows = self.__build_records(src_txt, items)
csv_writer = csv.writer(open(dest_csv, 'w', newline=''))
csv_writer.writerows(rows)
def __init_dirs(self, stock_code):
self.LOCAL_DIR = os.path.join('./dataset/cash_flow_stmt/local/', stock_code)
self.DEFLATED_DIR = os.path.join('./dataset/cash_flow_stmt/deflated/', stock_code)
self.TEXT_DIR = os.path.join('./dataset/cash_flow_stmt/text/', stock_code)
self.CSV_DIR = os.path.join('./dataset/cash_flow_stmt/csv/', stock_code)
def __init_urls(self, stock_code, year, season):
self.URLS = [
self.URL_TEMPLATE % ('t05st36', stock_code, season, year - 1911),
self.URL_TEMPLATE % ('t05st39', stock_code, season, year - 1911),
]
def __fetch_items(self, lines):
items = {
'Operating' : [],
'Investing' : [],
'Financing' : [],
}
for line in lines:
line_strip = line.strip()
for key in items:
for prefix in self.ITEM_PREFIXES[key]:
if line_strip.startswith(prefix):
items[key].append(line)
for key in items:
self.LOGGER.debug('''%s: %s''', key, items[key])
return items
def __build_records(self, src_txt, items):
records = []
for item in items:
for line in items[item]:
words = self.__split_words(line)
if len(words) > 2:
number = self.__get_number(words[1])
last_number = self.__get_number(words[2])
record = [item, number, last_number]
records.append(record)
self.LOGGER.info('''record: %s''', record)
return records
def __split_words(self, line):
words = line.split()
word_num = len(words)
for i, word in enumerate(words):
if (word == '(') or (word == '($'):
next_i = i + 1
if next_i < word_num:
words[next_i] = '(' + words[next_i]
fixed_words = []
for word in words:
if (word != '') and (word != '(') and (word != '($') and (word != '$'):
fixed_words.append(word)
return fixed_words
def __get_number(self, number):
number = number.strip()
number = number.replace('$', '').replace(',', '')
if (number[0] == '(') and (number[-1] == ')'):
number = '-' + number[1:-1]
return number
.\src\income_stmt\sourcing.py
import os
from ..common import sourcing_base
class Sourcing(sourcing_base.SourcingBase):
def __init__(self):
sourcing_base.SourcingBase.__init__(self)
self.SQL_INSERT = '''insert or ignore into
IncomeStmt(stock_code, report_type, report_date, activity_date, item, number)
values(?, ?, ?, ?, ?, ?)
'''
def source_url_to_local(self, stock_code, year, season):
self.__init_dirs(stock_code)
self.__init_urls(stock_code, year, season)
sourcing_base.SourcingBase.source_url_to_local(self, self.LOCAL_DIR)
def source_local_to_sqlite(self, stock_code):
self.__init_dirs(stock_code)
#local_file_dir = os.path.join(self.LOCAL_DIR, 'mops.twse.com.tw\mops\web')
#sourcing_base.SourcingBase.source_local_to_deflated(self, local_file_dir, self.DEFLATED_DIR)
#sourcing_base.SourcingBase.source_deflated_to_csv(self, self.DEFLATED_DIR, self.CSV_DIR)
sourcing_base.SourcingBase.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)
def __init_dirs(self, stock_code):
self.LOCAL_DIR = os.path.join('./dataset/income_stmt/local/', stock_code)
self.DEFLATED_DIR = os.path.join('./dataset/income_stmt/deflated/', stock_code)
self.CSV_DIR = os.path.join('./dataset/income_stmt/csv/', stock_code)
def __init_urls(self, stock_code, year, season):
self.URLS = [
self.URL_TEMPLATE % ('t05st32', stock_code, season, year - 1911),
self.URL_TEMPLATE % ('t05st34', stock_code, season, year - 1911),
]
.\src\common\sourcing_base.py
import csv
import logging
import os
import re
import shutil
import sqlite3
from lxml import etree
from lxml import html
from ..common import logger
class SourcingBase():
def __init__(self):
self.LOGGER = logging.getLogger()
self.URL_TEMPLATE = \
'''http://mops.twse.com.tw/mops/web/ajax_%s?TYPEK=all&TYPEK2=&checkbtn=&co_id=%s&code1=&encodeURIComponent=1&firstin=1&isnew=false&keyword4=&off=1&queryName=co_id&season=%02d&step=1&year=%d'''
self.URLS = []
self.LOCAL_DIR = './'
self.DEFLATED_DIR = './'
self.CSV_DIR = './'
self.DB_FILE = './db/stocktotal.db'
self.SQL_INSERT = ''
self.WHITE_MSG = [
'資料庫中查無需求資料 !',
'資料庫中查無需求資料',
'無應編製合併財報之子公司',
'外國發行人免申報個別財務報表資訊,請至合併財務報表查詢',
]
self.SEASON_STR_MAP = {
'01' : '-03-31',
'02' : '-06-30',
'03' : '-09-30',
'04' : '-12-31'
}
self.REPORT_TYPE_MAP = {
't05st32' : 'I', # Individual Income Statement
't05st34' : 'C', # Consolidated Income Statement
't05st36' : 'I', # Individual Cash Flow Statement
't05st39' : 'C', # Consolidated Cash Flow Statement
't05st31' : 'I', # Individual Balance Sheet
't05st33' : 'C', # Consolidated Balance Sheet
}
def source_url_to_local(self, dest_dir):
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for url in self.URLS:
self.__wget(url, dest_dir)
def source_local_to_deflated(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for file in os.listdir(src_dir):
prog_name = file[5:12]
args = self.__parse_args(file)
html_file = '''%s_%s_%s_%s.html''' % \
(prog_name, args['co_id'], args['year'], args['season'])
shutil.copy(os.path.join(src_dir, file), os.path.join(dest_dir, html_file))
def source_deflated_to_csv(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for file in os.listdir(src_dir):
file_name, file_ext = os.path.splitext(file)
dest_file = os.path.join(dest_dir, file_name + '.csv')
self.source_deflated_to_csv_single(os.path.join(src_dir, file), dest_file)
def source_deflated_to_csv_single(self, src_file, dest_file):
self.LOGGER.debug('''%s => %s''' % (src_file, dest_file))
src_file_fd = open(src_file, 'rb')
content = src_file_fd.read()
src_file_fd.close()
# wget timeout => 0 filesize web content => should be source again.
if content == b'':
self.LOGGER.error('''%s => 0 filesize''' % src_file)
return
table = b''
try:
table = html.fromstring(content.decode('utf-8').replace(' ', ' '))
except UnicodeDecodeError as e:
self.LOGGER.debug(e)
table = html.fromstring(content.decode('big5').replace(' ', ' '))
except Exception as e:
self.LOGGER.error(e)
return
xpath_no_record = table.xpath('//body/center/h3/text()')
if len(xpath_no_record) is 1:
with open(dest_file, 'w') as fd:
fd.write(xpath_no_record[0].strip())
return
csv_writer = csv.writer(open(dest_file, 'w', newline=''))
for tr in table.xpath('//tr'):
tds = tr.xpath('./td/text()')
if len(tds) is 5:
csv_record = [tds[0].strip(), tds[1].strip(), tds[3].strip()]
csv_writer.writerow(csv_record)
def source_csv_to_sqlite(self, src_dir, dest_db, sql_insert):
assert os.path.isdir(src_dir)
for file in os.listdir(src_dir):
self.source_csv_to_sqlite_single(os.path.join(src_dir, file), dest_db, sql_insert)
def source_csv_to_sqlite_single(self, src_file, dest_db, sql_insert):
self.LOGGER.debug('''%s => %s''' % (src_file, dest_db))
assert os.path.isfile(src_file)
assert os.path.isfile(dest_db)
file_name, file_ext = os.path.splitext(os.path.basename(src_file))
report_code, stock_code, year, season = file_name.split('_')
report_type = self.REPORT_TYPE_MAP[report_code]
date = self.__get_date(year, season)
conn = sqlite3.connect(dest_db)
cursor = conn.cursor()
csv_reader = csv.reader(open(src_file, 'r'))
for row in csv_reader:
if len(row) is 1:
msg = row[0]
if msg in self.WHITE_MSG:
self.LOGGER.info('''%s => %s => No record''' % (src_file, msg))
else:
self.LOGGER.error('''%s => %s''' % (src_file, msg))
elif len(row) in (2, 3):
cursor.execute(self.SQL_INSERT, \
(stock_code, report_type, date, date, row[0], row[1]))
if len(row) is 3:
last_date = self.__get_last_date(year, season)
cursor.execute(self.SQL_INSERT, \
(stock_code, report_type, date, last_date, row[0], row[2]))
conn.commit()
cursor.close()
conn.close()
def __get_date(self, year, season):
return str(int(year) + 1911) + self.SEASON_STR_MAP[season]
def __get_last_date(self, year, season):
return str(int(year) + 1910) + self.SEASON_STR_MAP[season]
def __wget(self, url, dest_dir):
url_to_filepath = re.compile('https?://|ftp://').sub('', url).replace(':', '_')
dest_file = os.path.join(dest_dir, url_to_filepath)
dest_file_dir = os.path.dirname(dest_file)
if not os.path.exists(dest_file_dir):
os.makedirs(dest_file_dir)
wget = os.path.abspath('./src/thirdparty/wget/wget.exe')
assert os.path.isfile(wget)
wget_cmdline = '''%s -N \"%s\" --waitretry=3 -P %s''' % (wget, url, dest_file_dir)
os.system(wget_cmdline)
def __parse_args(self, args_line):
args = {}
for kvs in args_line.split('&'):
kv = kvs.split('=')
args[kv[0]] = kv[1]
return args
.\src\common\logger.py
import logging
import sys
FORMAT = "%(asctime)s %(filename)s [%(levelname)s] %(message)s"
DATEFMT = "%H:%M:%S"
def config_root(level=logging.INFO,
threshold=logging.WARNING,
format=FORMAT,
datefmt=DATEFMT):
root = logging.getLogger()
root.setLevel(level)
formatter = logging.Formatter(format, datefmt)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(level)
stdout_handler.setFormatter(logging.Formatter(format, datefmt))
root.addHandler(stdout_handler)
#stderr_handler = logging.StreamHandler(sys.stderr)
#stderr_handler.setLevel(logging.ERROR)
#stderr_handler.setFormatter(logging.Formatter(format, datefmt))
#root.addHandler(stderr_handler)
大功告成。
《King Lear》
I cannot leave my heart into my mouth.
Cordelia 註定杯具了。
I yet beseech your majesty, --If for I want that glib and oily art,To speak and purpose not; since what I well intend,I'll do't before I speak, -- that you make knownIt is no vicious blot, murder, or foulness,No unchaste action, or dishonour'd step,That hath deprived me of your grace and favour;But even for want of that for which I am richer,A still-soliciting eye, and such a tongueAs I am glad I have not, though not to have itHath lost me in your liking.大家愛聽美言,特別是在位君主或是總統,特別渴求贊同的眼光,愛聽好聽的話。嘴巴說用人品德要兼優,實際上就是挑哈巴狗而已。
Better thou你們這些不吹我喇叭的人,下地獄吧。莎士比亞是先知,早就把軟弱無能、老愛聽美言的總統寫的清清楚楚,寫在前頭事後驗證,看吧,每次都這樣子演。
Hadst not been born than not to have pleased me better.
Time shall unfold what plaited cunning hides:Who cover faults, at last shame them derides.如果愛就是把對方看的比自己重要,那,就讓我試試看吧。
沒有留言:
張貼留言