2012年10月5日 星期五

爬《金瓶梅》


完全制霸!!!((之前標題錯了))



還是會忍不住抱怨事情越來越多,弄到心情不太美麗。真正想改的東西尚未測試完畢,無所謂,一天二十四小時,就那麼多時間,多的事以後再做。



import logging
import os
import re

from lxml import html

from ..common import logger

class Sourcing():

    def __init__(self):
        self.__logger = logging.getLogger()
        self.URLS = []
        self.LOCAL_DIR = './dataset/jin_ping_mei/local/'
        self.TXT_DIR = './dataset/jin_ping_mei/txt/'
        self.TXT_AMALGAMATION = './dataset/jin_ping_mei/txt/AMALGAMATION.txt'

    def source(self):
        self.__init_urls()
        self.source_url_to_local(self.LOCAL_DIR)
        self.source_local_to_txt_batch(self.LOCAL_DIR, self.TXT_DIR)
        self.source_txt_to_amalgamation(self.TXT_DIR, self.TXT_AMALGAMATION)
     
    def source_url_to_local(self, dest_dir):
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for url in self.URLS:
            self.__wget(url, dest_dir)

    def source_local_to_txt_batch(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for file in os.listdir(src_dir):
            file_name, file_ext = os.path.splitext(file)
            dest_file = os.path.join(dest_dir, file_name + ".txt")
            self.source_local_to_txt(os.path.join(src_dir, file), dest_file)

    def source_local_to_txt(self, src_file, dest_file):
        src_file_fd = open(src_file, encoding='utf-8')
        content = src_file_fd.read()
        src_file_fd.close()
     
        dest_file_fd = open(dest_file, 'w', encoding='utf-8')
        html_content = html.fromstring(content.replace('<br>', ''))
        for header in html_content.xpath('//html/body/div/div/div[@class="articleHeader"]/h1/text()'):
            dest_file_fd.write(header + '\n\n')
            break
        for content in html_content.xpath('//html/body/div/div/div[@id="content"]/div[@id="mw-content-text"]'):
            for row in content.xpath('./p/text()'):
                dest_file_fd.write(row + '\n\n')
        dest_file_fd.close()

    def source_txt_to_amalgamation(self, src_dir, dest_file):
        dest_file_fd = open(dest_file, 'w', encoding='utf-8')
        for i in range(1, 101):
            src_file = os.path.join(src_dir, '''%02d.txt''' % i)
            assert os.path.isfile(src_file)
            dest_file_fd.write(open(src_file, encoding='utf-8').read())
            dest_file_fd.write('\n\n')
        dest_file_fd.close()
         
    def __init_urls(self):
        URL_TEMPLATE = '''http://zh.wikisource.7val.com/wiki/%%E9%%87%%91%%E7%%93%%B6%%E6%%A2%%85/%%E7%%AC%%AC%02d%%E5%%9B%%9E'''
        self.URLS = [URL_TEMPLATE % _ for _ in range(1, 101)]      
         
    def __wget(self, url, dest_dir):
        dest_file = os.path.join(dest_dir, '''%s.html''' % url[72: url.index('%', 72)])
        wget = os.path.abspath('./src/thirdparty/wget/wget.exe')
        assert os.path.isfile(wget)          
        wget_cmdline = '''%s -U firefox -N \"%s\" --waitretry=3 -O %s''' % (wget, url, dest_file)
        os.system(wget_cmdline)          



我應該停止抱怨。星期五晚上跟同事分享爬網頁的心得,喪失的動力又回來了!來看剛剛改好的程式:


sourcing_base.py ((共用部分抽出來))

import csv
import logging
import os
import re
import shutil
import sqlite3

from lxml import html

from ..common import logger

class SourcingBase():

    def __init__(self):
        self.LOGGER = logging.getLogger()
        self.URL_TEMPLATE = \
            '''http://mops.twse.com.tw/mops/web/ajax_%s?TYPEK=all&TYPEK2=&checkbtn=&co_id=%s&code1=&encodeURIComponent=1&firstin=1&isnew=false&keyword4=&off=1&queryName=co_id&season=%02d&step=1&year=%d'''
        self.URLS = []
        self.LOCAL_DIR = './'
        self.DEFLATED_DIR = './'
        self.CSV_DIR = './'
        self.DB_FILE = './db/stocktotal.db'
        self.SQL_INSERT = ''
        self.WHITE_MSG = [
            '資料庫中查無需求資料',
            '無應編製合併財報之子公司',
            '外國發行人免申報個別財務報表資訊,請至合併財務報表查詢',
        ]
        self.SEASON_STR_MAP = {
            '01' : '-03-31',
            '02' : '-06-30',
            '03' : '-09-30',
            '04' : '-12-31'
        }
        self.REPORT_TYPE_MAP = {
            't05st32' : 'I', # Individual Income Statement
            't05st34' : 'C', # Consolidated Income Statement
            't05st36' : 'I', # Individual Cash Flow Statement
            't05st39' : 'C', # Consolidated Cash Flow Statement
            't05st31' : 'I', # Individual Balance Sheet
            't05st33' : 'C', # Consolidated Balance Sheet
        }

    def source_url_to_local(self, dest_dir):
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for url in self.URLS:
            self.__wget(url, dest_dir)
         
    def source_local_to_deflated(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            prog_name = file[5:12]
            args = self.__parse_args(file)
            html_file = '''%s_%s_%s_%s.html''' % \
                    (prog_name, args['co_id'], args['year'], args['season'])
            shutil.copy(os.path.join(src_dir, file), os.path.join(dest_dir, html_file))

    def source_deflated_to_csv(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            file_name, file_ext = os.path.splitext(file)
            dest_file = os.path.join(dest_dir, file_name + '.csv')
            self.source_deflated_to_csv_single(os.path.join(src_dir, file), dest_file)

    def source_deflated_to_csv_single(self, src_file, dest_file):
        src_file_fd = open(src_file, 'rb')
        content = src_file_fd.read()
        src_file_fd.close()
   
        table = html.fromstring(content.decode('utf-8').replace('&nbsp;', ' '))
        xpath_no_record = table.xpath('//body/center/h3/text()')
        if len(xpath_no_record) is 1:
            with open(dest_file, 'w') as fd:
                fd.write(xpath_no_record[0].strip())
            return

        xpath_busy = table.xpath('//html/body/center/table/tr/td/font/center/text()')
        if len(xpath_busy) is 1:
            with open(dest_file, 'w') as fd:
                fd.write(xpath_busy[0].strip())
            return
         
        csv_writer = csv.writer(open(dest_file, 'w', newline=''))
        for tr in table.xpath('//tr'):
            tds = tr.xpath('./td/text()')
            if len(tds) is 5:
                csv_record = [tds[0].strip(), tds[1].strip(), tds[3].strip()]
                csv_writer.writerow(csv_record)          

    def source_csv_to_sqlite(self, src_dir, dest_db, sql_insert):
        assert os.path.isdir(src_dir)
        for file in os.listdir(src_dir):
            self.source_csv_to_sqlite_single(os.path.join(src_dir, file), dest_db, sql_insert)
         
    def source_csv_to_sqlite_single(self, src_file, dest_db, sql_insert):
        self.LOGGER.debug('''%s => %s''' % (src_file, dest_db))
        assert os.path.isfile(src_file)
        assert os.path.isfile(dest_db)
     
        file_name, file_ext = os.path.splitext(os.path.basename(src_file))
        report_code, stock_code, year, season = file_name.split('_')
        report_type = self.REPORT_TYPE_MAP[report_code]
        date = self.__get_date(year, season)
     
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()      
        csv_reader = csv.reader(open(src_file, 'r'))
        for row in csv_reader:
            if len(row) is 1:
                msg = row[0]
                if msg in self.WHITE_MSG:
                    self.LOGGER.info('''%s => %s => No record''' % (src_file, msg))
                else:
                    self.LOGGER.error('''%s => %s''' % (src_file, msg))
            elif len(row) in (2, 3):
                cursor.execute(self.SQL_INSERT, \
                        (stock_code, report_type, date, date, row[0], row[1]))
            if len(row) is 3:
                last_date = self.__get_last_date(year, season)
                cursor.execute(self.SQL_INSERT, \
                        (stock_code, report_type, date, last_date, row[0], row[2]))      
        conn.commit()
        cursor.close()
        conn.close()
             
    def __get_date(self, year, season):
        return str(int(year) + 1911) + self.SEASON_STR_MAP[season]

    def __get_last_date(self, year, season):
        return str(int(year) + 1910) + self.SEASON_STR_MAP[season]      
     
    def __wget(self, url, dest_dir):
        url_to_filepath = re.compile('https?://|ftp://').sub('', url).replace(':', '_')
        dest_file = os.path.join(dest_dir, url_to_filepath)
        dest_file_dir = os.path.dirname(dest_file)
        if not os.path.exists(dest_file_dir):
            os.makedirs(dest_file_dir)

        wget = os.path.abspath('./src/thirdparty/wget/wget.exe')
        assert os.path.isfile(wget)
        wget_cmdline = '''%s -N \"%s\" --waitretry=3 -P %s''' % (wget, url, dest_file_dir)
        os.system(wget_cmdline)
     
    def __parse_args(self, args_line):
        args = {}
        for kvs in args_line.split('&'):
            kv = kvs.split('=')
            args[kv[0]] = kv[1]
        return args



sourcing.py ((爬現金流量表))

import csv
import logging
import os
import shutil

from lxml import html

from ..common import logger
from ..common import sourcing_base

class Sourcing(sourcing_base.SourcingBase):

    def __init__(self):
        sourcing_base.SourcingBase.__init__(self)
        self.TEXT_DIR = ''
        self.ITEM_PREFIXES = {
            'Operating' : [
                '營業活動',
                '│營業活動'
            ],
            'Investing' : [
                '投資活動',
                '│投資活動'
            ],
            'Financing' : [
                '融資活動',
                '│融資活動',
                '理財活動',
                '不影響現金流量之融資活動'
            ],
        }
        self.SQL_INSERT = '''insert or ignore into
            CashFlowStmt(stock_code, report_type, report_date, activity_date, item, number)
            values(?, ?, ?, ?, ?, ?)
            '''

    def source_url_to_local(self, stock_code, year, season):
        self.__init_dirs(stock_code)
        self.__init_urls(stock_code, year, season)  
        sourcing_base.SourcingBase.source_url_to_local(self, self.LOCAL_DIR)
     
    def source_local_to_sqlite(self, stock_code):
        self.__init_dirs(stock_code)
        local_file_dir = os.path.join(self.LOCAL_DIR, 'mops.twse.com.tw\mops\web')
        sourcing_base.SourcingBase.source_local_to_deflated(self, local_file_dir, self.DEFLATED_DIR)
        self.source_deflated_to_text(self.DEFLATED_DIR, self.TEXT_DIR)
        self.source_text_to_csv(self.TEXT_DIR, self.CSV_DIR)
        sourcing_base.SourcingBase.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)  
     
    def source_deflated_to_text(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            file_name, file_ext = os.path.splitext(file)
            txt_file = os.path.join(dest_dir, file_name + '.txt')
            self.source_deflated_to_text_single(os.path.join(src_dir, file), txt_file)

    def source_deflated_to_text_single(self, src_file, dest_txt):
        if os.path.getsize(src_file) is 0:
            shutil.copy(src_file, dest_txt)
            return
     
        src_file_fd = open(src_file, 'rb')
        content = src_file_fd.read()
        src_file_fd.close()

        table = html.fromstring(content.decode('utf-8'))
        xpath_stmt = table.xpath('//body/table[@class="hasBorder"]/tr/td/pre/text()')
        if len(xpath_stmt) is 1:
            with open(dest_txt, 'w', encoding='utf-8') as fd:
                fd.write(xpath_stmt[0].strip())
            return

        xpath_no_record = table.xpath('//body/center/h3/text()')
        if len(xpath_no_record) is 1:
            with open(dest_txt, 'w', encoding='utf-8') as fd:
                fd.write(xpath_no_record[0].strip())
            return

        xpath_busy = table.xpath('//html/body/center/table/tr/td/font/center/text()')
        if len(xpath_busy) is 1:
            with open(dest_txt, 'w', encoding='utf-8') as fd:
                fd.write(xpath_busy[0].strip())
            return

    def source_text_to_csv(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)

        for file in os.listdir(src_dir):
            file_name, file_ext = os.path.splitext(file)
            csv_file = os.path.join(dest_dir, file_name + '.csv')
            self.source_text_to_csv_single(os.path.join(src_dir, file), csv_file)

    def source_text_to_csv_single(self, src_txt, dest_csv):
        self.LOGGER.debug('''%s => %s''' % (src_txt, dest_csv))

        fd = open(src_txt, 'rb')
        content = fd.read()
        fd.close()
        lines = content.decode('utf-8').split('\n')
     
        # No record
        if len(lines) is 1:
            msg = lines[0]
            if msg in self.WHITE_MSG:
                self.LOGGER.info('''%s => %s => No record''' % (src_txt, msg))
            else:
                self.LOGGER.error('''%s => %s''' % (src_txt, msg))
        # Has record
        else:
            items = self.__fetch_items(lines)
            rows = self.__build_records(src_txt, items)
            csv_writer = csv.writer(open(dest_csv, 'w', newline=''))
            csv_writer.writerows(rows)

    def __init_dirs(self, stock_code):
        self.LOCAL_DIR = os.path.join('./dataset/cash_flow_stmt/local/', stock_code)
        self.DEFLATED_DIR = os.path.join('./dataset/cash_flow_stmt/deflated/', stock_code)
        self.TEXT_DIR = os.path.join('./dataset/cash_flow_stmt/text/', stock_code)
        self.CSV_DIR = os.path.join('./dataset/cash_flow_stmt/csv/', stock_code)
     
    def __init_urls(self, stock_code, year, season):
        self.URLS = [
            self.URL_TEMPLATE % ('t05st36', stock_code, season, year - 1911),
            self.URL_TEMPLATE % ('t05st39', stock_code, season, year - 1911),
        ]

    def __fetch_items(self, lines):
        items = {
            'Operating' : [],
            'Investing' : [],
            'Financing' : [],
        }
        for line in lines:
            line_strip = line.strip()
            for key in items:
                for prefix in self.ITEM_PREFIXES[key]:
                    if line_strip.startswith(prefix):
                        items[key].append(line)
        for key in items:
            self.LOGGER.debug('''%s: %s''', key, items[key])
        return items

    def __build_records(self, src_txt, items):
        records = []
        for item in items:
            for line in items[item]:
                words = self.__split_words(line)
                if len(words) > 2:
                    number = self.__get_number(words[1])
                    last_number = self.__get_number(words[2])
                    record = [item, number, last_number]
                    records.append(record)
                    self.LOGGER.info('''record: %s''', record)
        return records      

    def __split_words(self, line):
        words = line.split()
        word_num = len(words)
        for i, word in enumerate(words):
            if (word == '(') or (word == '($'):
                next_i = i + 1
                if next_i < word_num:
                    words[next_i] = '(' + words[next_i]

        fixed_words = []
        for word in words:
            if (word != '') and (word != '(') and (word != '($') and (word != '$'):
                fixed_words.append(word)
        return fixed_words

    def __get_number(self, number):
        number = number.strip()
        number = number.replace('$', '').replace(',', '')
        if (number[0] == '(') and (number[-1] == ')'):
            number = '-' + number[1:-1]
        return number



sourcing.py ((爬資產負債表,以後會改名字,財務狀況表))

import os

from ..common import sourcing_base

class Sourcing(sourcing_base.SourcingBase):

    def __init__(self):
        sourcing_base.SourcingBase.__init__(self)
        self.SQL_INSERT = '''insert or ignore into
            BalanceSheet(stock_code, report_type, report_date, activity_date, item, number)
            values(?, ?, ?, ?, ?, ?)
            '''

    def source_url_to_local(self, stock_code, year, season):
        self.__init_dirs(stock_code)
        self.__init_urls(stock_code, year, season)  
        sourcing_base.SourcingBase.source_url_to_local(self, self.LOCAL_DIR)
     
    def source_local_to_sqlite(self, stock_code):
        self.__init_dirs(stock_code)
        local_file_dir = os.path.join(self.LOCAL_DIR, 'mops.twse.com.tw\mops\web')
        sourcing_base.SourcingBase.source_local_to_deflated(self, local_file_dir, self.DEFLATED_DIR)
        sourcing_base.SourcingBase.source_deflated_to_csv(self, self.DEFLATED_DIR, self.CSV_DIR)
        sourcing_base.SourcingBase.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)  
     
    def __init_dirs(self, stock_code):
        self.LOCAL_DIR = os.path.join('./dataset/balance_sheet/local/', stock_code)
        self.DEFLATED_DIR = os.path.join('./dataset/balance_sheet/deflated/', stock_code)
        self.CSV_DIR = os.path.join('./dataset/balance_sheet/csv/', stock_code)
     
    def __init_urls(self, stock_code, year, season):
        self.URLS = [
            self.URL_TEMPLATE % ('t05st32', stock_code, season, year - 1911),
            self.URL_TEMPLATE % ('t05st34', stock_code, season, year - 1911),
        ]



sourcing.py ((爬損益表))

import os

from ..common import sourcing_base

class Sourcing(sourcing_base.SourcingBase):

    def __init__(self):
        sourcing_base.SourcingBase.__init__(self)
        self.SQL_INSERT = '''insert or ignore into
            IncomeStmt(stock_code, report_type, report_date, activity_date, item, number)
            values(?, ?, ?, ?, ?, ?)
            '''

    def source_url_to_local(self, stock_code, year, season):
        self.__init_dirs(stock_code)
        self.__init_urls(stock_code, year, season)    
        sourcing_base.SourcingBase.source_url_to_local(self, self.LOCAL_DIR)
        
    def source_local_to_sqlite(self, stock_code):
        self.__init_dirs(stock_code)
        local_file_dir = os.path.join(self.LOCAL_DIR, 'mops.twse.com.tw\mops\web')
        sourcing_base.SourcingBase.source_local_to_deflated(self, local_file_dir, self.DEFLATED_DIR)
        sourcing_base.SourcingBase.source_deflated_to_csv(self, self.DEFLATED_DIR, self.CSV_DIR)
        sourcing_base.SourcingBase.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)    
        
    def __init_dirs(self, stock_code):
        self.LOCAL_DIR = os.path.join('./dataset/income_stmt/local/', stock_code)
        self.DEFLATED_DIR = os.path.join('./dataset/income_stmt/deflated/', stock_code)
        self.CSV_DIR = os.path.join('./dataset/income_stmt/csv/', stock_code)
        
    def __init_urls(self, stock_code, year, season):
        self.URLS = [
            self.URL_TEMPLATE % ('t05st32', stock_code, season, year - 1911),
            self.URL_TEMPLATE % ('t05st34', stock_code, season, year - 1911),
        ]


沒有留言:

張貼留言