2012年10月12日 星期五

Python Try-Except Example


Source Code Structure

.\sourcing_balance_sheet.py
.\sourcing_cash_flow_stmt.py
.\sourcing_income_stmt.py
.\src\balance_sheet\sourcing.py
.\src\cash_flow_stmt\sourcing.py
.\src\income_stmt\sourcing.py
.\src\common\sourcing_base.py
.\src\common\logger.py



Details

.\sourcing_balance_sheet.py

import logging
import sys

import src.stock_code.getter as getter
import src.balance_sheet.sourcing as sourcing
import src.common.logger as logger

def source_url_to_local():
    logger.config_root(level=logging.DEBUG)
    s = sourcing.Sourcing()
    s.source_url_to_local('1101', 2010, 4)
    
def source_local_to_sqlite():
    logger.config_root(level=logging.INFO)
    g = getter.Getter()
    s = sourcing.Sourcing()
    for stock_code in g.get():
        try:
            s.source_local_to_sqlite(stock_code)
        except AssertionError as e:
            print("Assertion error: {0}".format(stock_code))
        
if __name__ == '__main__':
    #sys.exit(source_url_to_local())
    sys.exit(source_local_to_sqlite())



.\sourcing_cash_flow_stmt.py

import logging
import sys

import src.stock_code.getter as getter
import src.cash_flow_stmt.sourcing as sourcing
import src.common.logger as logger

def source_url_to_local():
    logger.config_root(level=logging.DEBUG)
    s = sourcing.Sourcing()
    s.source_url_to_local('1101', 2010, 4)
    
def source_local_to_sqlite():
    logger.config_root(level=logging.DEBUG)
    g = getter.Getter()
    s = sourcing.Sourcing()
    for stock_code in g.get():
        try:
            s.source_local_to_sqlite(stock_code)
        except AssertionError as e:
            print("Assertion error: {0}".format(stock_code))

if __name__ == '__main__':
    #sys.exit(source_url_to_local())
    sys.exit(source_local_to_sqlite())



.\sourcing_income_stmt.py

import logging
import sys

import src.stock_code.getter as getter
import src.income_stmt.sourcing as sourcing
import src.common.logger as logger

def source_url_to_local():
    logger.config_root(level=logging.DEBUG)
    s = sourcing.Sourcing()
    s.source_url_to_local('1101', 2010, 4)
    
def source_local_to_sqlite():
    logger.config_root(level=logging.DEBUG)
    g = getter.Getter()
    s = sourcing.Sourcing()
    for stock_code in g.get():
        try:
            s.source_local_to_sqlite(stock_code)
        except AssertionError as e:
            print("Assertion error: {0}".format(stock_code))
        
if __name__ == '__main__':
    #sys.exit(source_url_to_local())
    sys.exit(source_local_to_sqlite())



.\src\balance_sheet\sourcing.py

import os

from ..common import sourcing_base

class Sourcing(sourcing_base.SourcingBase):

    def __init__(self):
        sourcing_base.SourcingBase.__init__(self)
        self.SQL_INSERT = '''insert or ignore into
            BalanceSheet(stock_code, report_type, report_date, activity_date, item, number)
            values(?, ?, ?, ?, ?, ?)
            '''

    def source_url_to_local(self, stock_code, year, season):
        self.__init_dirs(stock_code)
        self.__init_urls(stock_code, year, season)    
        sourcing_base.SourcingBase.source_url_to_local(self, self.LOCAL_DIR)
        
    def source_local_to_sqlite(self, stock_code):
        self.__init_dirs(stock_code)
        #local_file_dir = os.path.join(self.LOCAL_DIR, 'mops.twse.com.tw\mops\web')
        #sourcing_base.SourcingBase.source_local_to_deflated(self, local_file_dir, self.DEFLATED_DIR)
        #sourcing_base.SourcingBase.source_deflated_to_csv(self, self.DEFLATED_DIR, self.CSV_DIR)
        sourcing_base.SourcingBase.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)    
        
    def __init_dirs(self, stock_code):
        self.LOCAL_DIR = os.path.join('./dataset/balance_sheet/local/', stock_code)
        self.DEFLATED_DIR = os.path.join('./dataset/balance_sheet/deflated/', stock_code)
        self.CSV_DIR = os.path.join('./dataset/balance_sheet/csv/', stock_code)
        
    def __init_urls(self, stock_code, year, season):
        self.URLS = [
            self.URL_TEMPLATE % ('t05st32', stock_code, season, year - 1911),
            self.URL_TEMPLATE % ('t05st34', stock_code, season, year - 1911),
        ]



.\src\cash_flow_stmt\sourcing.py

import csv
import logging
import os
import shutil

from lxml import html

from ..common import logger
from ..common import sourcing_base

class Sourcing(sourcing_base.SourcingBase):

    def __init__(self):
        sourcing_base.SourcingBase.__init__(self)
        self.TEXT_DIR = ''
        self.ITEM_PREFIXES = {
            'Operating' : [
                '營業活動',
                '│營業活動'
            ],
            'Investing' : [
                '投資活動',
                '│投資活動'
            ],
            'Financing' : [
                '融資活動',
                '│融資活動',
                '理財活動',
                '不影響現金流量之融資活動'
            ],
        }
        self.SQL_INSERT = '''insert or ignore into
            CashFlowStmt(stock_code, report_type, report_date, activity_date, item, number)
            values(?, ?, ?, ?, ?, ?)
            '''

    def source_url_to_local(self, stock_code, year, season):
        self.__init_dirs(stock_code)
        self.__init_urls(stock_code, year, season)    
        sourcing_base.SourcingBase.source_url_to_local(self, self.LOCAL_DIR)
        
    def source_local_to_sqlite(self, stock_code):
        self.__init_dirs(stock_code)
        #local_file_dir = os.path.join(self.LOCAL_DIR, 'mops.twse.com.tw\mops\web')
        #sourcing_base.SourcingBase.source_local_to_deflated(self, local_file_dir, self.DEFLATED_DIR)
        #self.source_deflated_to_text(self.DEFLATED_DIR, self.TEXT_DIR)
        #self.source_text_to_csv(self.TEXT_DIR, self.CSV_DIR)
        sourcing_base.SourcingBase.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)    
       
    def source_deflated_to_text(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            file_name, file_ext = os.path.splitext(file)
            txt_file = os.path.join(dest_dir, file_name + '.txt')
            self.source_deflated_to_text_single(os.path.join(src_dir, file), txt_file)

    def source_deflated_to_text_single(self, src_file, dest_file):
        self.LOGGER.debug('''%s => %s''' % (src_file, dest_file))
        if os.path.getsize(src_file) is 0:
            shutil.copy(src_file, dest_file)
            return
        
        src_file_fd = open(src_file, 'rb')
        content = src_file_fd.read()
        src_file_fd.close()

        table = b''
        try:
            table = html.fromstring(content.decode('utf-8'))
        except UnicodeDecodeError as e:
            self.LOGGER.debug(e)
            table = html.fromstring(content.decode('big5'))

        xpath_stmt = table.xpath('//body/table[@class="hasBorder"]/tr/td/pre/text()')
        if len(xpath_stmt) is 1:
            with open(dest_file, 'w', encoding='utf-8') as fd:
                fd.write(xpath_stmt[0].strip())
            return

        xpath_no_record = table.xpath('//body/center/h3/text()')
        if len(xpath_no_record) is 1:
            with open(dest_file, 'w', encoding='utf-8') as fd:
                fd.write(xpath_no_record[0].strip())
            return
    def source_text_to_csv(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            file_name, file_ext = os.path.splitext(file)
            csv_file = os.path.join(dest_dir, file_name + '.csv')
            self.source_text_to_csv_single(os.path.join(src_dir, file), csv_file)

    def source_text_to_csv_single(self, src_txt, dest_csv):
        self.LOGGER.debug('''%s => %s''' % (src_txt, dest_csv))

        fd = open(src_txt, 'rb')
        content = fd.read()
        fd.close()
        lines = content.decode('utf-8').split('\n')
        
        # No record
        if len(lines) is 1:
            msg = lines[0]
            if msg in self.WHITE_MSG:
                self.LOGGER.info('''%s => %s => No record''' % (src_txt, msg))
            else:
                self.LOGGER.error('''%s => %s''' % (src_txt, msg))
        # Has record
        else:
            items = self.__fetch_items(lines)
            rows = self.__build_records(src_txt, items)
            csv_writer = csv.writer(open(dest_csv, 'w', newline=''))
            csv_writer.writerows(rows)

    def __init_dirs(self, stock_code):
        self.LOCAL_DIR = os.path.join('./dataset/cash_flow_stmt/local/', stock_code)
        self.DEFLATED_DIR = os.path.join('./dataset/cash_flow_stmt/deflated/', stock_code)
        self.TEXT_DIR = os.path.join('./dataset/cash_flow_stmt/text/', stock_code)
        self.CSV_DIR = os.path.join('./dataset/cash_flow_stmt/csv/', stock_code)
        
    def __init_urls(self, stock_code, year, season):
        self.URLS = [
            self.URL_TEMPLATE % ('t05st36', stock_code, season, year - 1911),
            self.URL_TEMPLATE % ('t05st39', stock_code, season, year - 1911),
        ]

    def __fetch_items(self, lines):
        items = {
            'Operating' : [],
            'Investing' : [],
            'Financing' : [],
        }
        for line in lines:
            line_strip = line.strip()
            for key in items:
                for prefix in self.ITEM_PREFIXES[key]:
                    if line_strip.startswith(prefix):
                        items[key].append(line)
        for key in items:
            self.LOGGER.debug('''%s: %s''', key, items[key])
        return items

    def __build_records(self, src_txt, items):
        records = []
        for item in items:
            for line in items[item]:
                words = self.__split_words(line)
                if len(words) > 2:
                    number = self.__get_number(words[1])
                    last_number = self.__get_number(words[2])
                    record = [item, number, last_number]
                    records.append(record)
                    self.LOGGER.info('''record: %s''', record)
        return records        

    def __split_words(self, line):
        words = line.split()
        word_num = len(words)
        for i, word in enumerate(words):
            if (word == '(') or (word == '($'):
                next_i = i + 1
                if next_i < word_num:
                    words[next_i] = '(' + words[next_i]

        fixed_words = []
        for word in words:
            if (word != '') and (word != '(') and (word != '($') and (word != '$'): 
                fixed_words.append(word)
        return fixed_words

    def __get_number(self, number):
        number = number.strip()
        number = number.replace('$', '').replace(',', '')
        if (number[0] == '(') and (number[-1] == ')'):
            number = '-' + number[1:-1]
        return number
        


.\src\income_stmt\sourcing.py

import os

from ..common import sourcing_base

class Sourcing(sourcing_base.SourcingBase):

    def __init__(self):
        sourcing_base.SourcingBase.__init__(self)
        self.SQL_INSERT = '''insert or ignore into
            IncomeStmt(stock_code, report_type, report_date, activity_date, item, number)
            values(?, ?, ?, ?, ?, ?)
            '''

    def source_url_to_local(self, stock_code, year, season):
        self.__init_dirs(stock_code)
        self.__init_urls(stock_code, year, season)    
        sourcing_base.SourcingBase.source_url_to_local(self, self.LOCAL_DIR)
        
    def source_local_to_sqlite(self, stock_code):
        self.__init_dirs(stock_code)
        #local_file_dir = os.path.join(self.LOCAL_DIR, 'mops.twse.com.tw\mops\web')
        #sourcing_base.SourcingBase.source_local_to_deflated(self, local_file_dir, self.DEFLATED_DIR)
        #sourcing_base.SourcingBase.source_deflated_to_csv(self, self.DEFLATED_DIR, self.CSV_DIR)
        sourcing_base.SourcingBase.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)    
        
    def __init_dirs(self, stock_code):
        self.LOCAL_DIR = os.path.join('./dataset/income_stmt/local/', stock_code)
        self.DEFLATED_DIR = os.path.join('./dataset/income_stmt/deflated/', stock_code)
        self.CSV_DIR = os.path.join('./dataset/income_stmt/csv/', stock_code)
        
    def __init_urls(self, stock_code, year, season):
        self.URLS = [
            self.URL_TEMPLATE % ('t05st32', stock_code, season, year - 1911),
            self.URL_TEMPLATE % ('t05st34', stock_code, season, year - 1911),
        ]



.\src\common\sourcing_base.py

import csv
import logging
import os
import re
import shutil
import sqlite3

from lxml import etree
from lxml import html

from ..common import logger

class SourcingBase():

    def __init__(self):
        self.LOGGER = logging.getLogger()
        self.URL_TEMPLATE = \
            '''http://mops.twse.com.tw/mops/web/ajax_%s?TYPEK=all&TYPEK2=&checkbtn=&co_id=%s&code1=&encodeURIComponent=1&firstin=1&isnew=false&keyword4=&off=1&queryName=co_id&season=%02d&step=1&year=%d'''
        self.URLS = []
        self.LOCAL_DIR = './'
        self.DEFLATED_DIR = './'
        self.CSV_DIR = './'
        self.DB_FILE = './db/stocktotal.db'
        self.SQL_INSERT = ''
        self.WHITE_MSG = [
            '資料庫中查無需求資料 !',
            '資料庫中查無需求資料',
            '無應編製合併財報之子公司',
            '外國發行人免申報個別財務報表資訊,請至合併財務報表查詢',
        ]
        self.SEASON_STR_MAP = {
            '01' : '-03-31',
            '02' : '-06-30',
            '03' : '-09-30',
            '04' : '-12-31'
        }
        self.REPORT_TYPE_MAP = {
            't05st32' : 'I', # Individual Income Statement
            't05st34' : 'C', # Consolidated Income Statement
            't05st36' : 'I', # Individual Cash Flow Statement
            't05st39' : 'C', # Consolidated Cash Flow Statement
            't05st31' : 'I', # Individual Balance Sheet
            't05st33' : 'C', # Consolidated Balance Sheet
        }

    def source_url_to_local(self, dest_dir):
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for url in self.URLS:
            self.__wget(url, dest_dir)
            
    def source_local_to_deflated(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            prog_name = file[5:12]
            args = self.__parse_args(file)
            html_file = '''%s_%s_%s_%s.html''' % \
                    (prog_name, args['co_id'], args['year'], args['season'])
            shutil.copy(os.path.join(src_dir, file), os.path.join(dest_dir, html_file))  

    def source_deflated_to_csv(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            file_name, file_ext = os.path.splitext(file)
            dest_file = os.path.join(dest_dir, file_name + '.csv')
            self.source_deflated_to_csv_single(os.path.join(src_dir, file), dest_file)

    def source_deflated_to_csv_single(self, src_file, dest_file):
        self.LOGGER.debug('''%s => %s''' % (src_file, dest_file))
        src_file_fd = open(src_file, 'rb')
        content = src_file_fd.read()
        src_file_fd.close()

        # wget timeout => 0 filesize web content => should be source again.
        if content == b'':
            self.LOGGER.error('''%s => 0 filesize''' % src_file)
            return
        
        table = b''
        try:
            table = html.fromstring(content.decode('utf-8').replace('&nbsp;', ' '))
        except UnicodeDecodeError as e:
            self.LOGGER.debug(e)
            table = html.fromstring(content.decode('big5').replace('&nbsp;', ' '))
        except Exception as e:
            self.LOGGER.error(e)
            return
            
        xpath_no_record = table.xpath('//body/center/h3/text()')
        if len(xpath_no_record) is 1:
            with open(dest_file, 'w') as fd:
                fd.write(xpath_no_record[0].strip())
            return
        
        csv_writer = csv.writer(open(dest_file, 'w', newline=''))
        for tr in table.xpath('//tr'):
            tds = tr.xpath('./td/text()')
            if len(tds) is 5:
                csv_record = [tds[0].strip(), tds[1].strip(), tds[3].strip()]
                csv_writer.writerow(csv_record)            

    def source_csv_to_sqlite(self, src_dir, dest_db, sql_insert):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            self.source_csv_to_sqlite_single(os.path.join(src_dir, file), dest_db, sql_insert)
            
    def source_csv_to_sqlite_single(self, src_file, dest_db, sql_insert):
        self.LOGGER.debug('''%s => %s''' % (src_file, dest_db))
        assert os.path.isfile(src_file)
        assert os.path.isfile(dest_db)
        
        file_name, file_ext = os.path.splitext(os.path.basename(src_file))
        report_code, stock_code, year, season = file_name.split('_')
        report_type = self.REPORT_TYPE_MAP[report_code]
        date = self.__get_date(year, season)
        
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()        
        csv_reader = csv.reader(open(src_file, 'r'))
        for row in csv_reader:
            if len(row) is 1:
                msg = row[0]
                if msg in self.WHITE_MSG:
                    self.LOGGER.info('''%s => %s => No record''' % (src_file, msg))
                else:
                    self.LOGGER.error('''%s => %s''' % (src_file, msg))
            elif len(row) in (2, 3):
                cursor.execute(self.SQL_INSERT, \
                        (stock_code, report_type, date, date, row[0], row[1]))
            if len(row) is 3:
                last_date = self.__get_last_date(year, season)
                cursor.execute(self.SQL_INSERT, \
                        (stock_code, report_type, date, last_date, row[0], row[2]))        
        conn.commit()
        cursor.close()
        conn.close()
                
    def __get_date(self, year, season):
        return str(int(year) + 1911) + self.SEASON_STR_MAP[season]

    def __get_last_date(self, year, season):
        return str(int(year) + 1910) + self.SEASON_STR_MAP[season]        
        
    def __wget(self, url, dest_dir):
        url_to_filepath = re.compile('https?://|ftp://').sub('', url).replace(':', '_')
        dest_file = os.path.join(dest_dir, url_to_filepath)
        dest_file_dir = os.path.dirname(dest_file)
        if not os.path.exists(dest_file_dir):
            os.makedirs(dest_file_dir)

        wget = os.path.abspath('./src/thirdparty/wget/wget.exe')
        assert os.path.isfile(wget)  
        wget_cmdline = '''%s -N \"%s\" --waitretry=3 -P %s''' % (wget, url, dest_file_dir)
        os.system(wget_cmdline)
        
    def __parse_args(self, args_line):
        args = {}
        for kvs in args_line.split('&'):
            kv = kvs.split('=')
            args[kv[0]] = kv[1]
        return args



.\src\common\logger.py

import logging
import sys

FORMAT = "%(asctime)s %(filename)s [%(levelname)s] %(message)s"
DATEFMT = "%H:%M:%S"

def config_root(level=logging.INFO,
                threshold=logging.WARNING,
                format=FORMAT,
                datefmt=DATEFMT):
    root = logging.getLogger()
    root.setLevel(level)
    formatter = logging.Formatter(format, datefmt)

    stdout_handler = logging.StreamHandler(sys.stdout)
    stdout_handler.setLevel(level)
    stdout_handler.setFormatter(logging.Formatter(format, datefmt))
    root.addHandler(stdout_handler)

    #stderr_handler = logging.StreamHandler(sys.stderr)
    #stderr_handler.setLevel(logging.ERROR)
    #stderr_handler.setFormatter(logging.Formatter(format, datefmt))
    #root.addHandler(stderr_handler)



大功告成。



《King Lear》
I cannot leave my heart into my mouth.
Cordelia 註定杯具了。

I yet beseech your majesty, --If for I want that glib and oily art,To speak and purpose not; since what I well intend,I'll do't before I speak, -- that you make knownIt is no vicious blot, murder, or foulness,No unchaste action, or dishonour'd step,That hath deprived me of your grace and favour;But even for want of that for which I am richer,A still-soliciting eye, and such a tongueAs I am glad I have not, though not to have itHath lost me in your liking.
大家愛聽美言,特別是在位君主或是總統,特別渴求贊同的眼光,愛聽好聽的話。嘴巴說用人品德要兼優,實際上就是挑哈巴狗而已。

Better thou
Hadst not been born than not to have pleased me better.
你們這些不吹我喇叭的人,下地獄吧。莎士比亞是先知,早就把軟弱無能、老愛聽美言的總統寫的清清楚楚,寫在前頭事後驗證,看吧,每次都這樣子演。

Time shall unfold what plaited cunning hides:Who cover faults, at last shame them derides.
如果愛就是把對方看的比自己重要,那,就讓我試試看吧。


沒有留言:

張貼留言