2012年10月22日 星期一

Operating Income

((抓交替))



SQLite3 Schema:

create table if not exists OperatingIncome
(
    creation_dt datetime default current_timestamp,
    report_date datetime not null,
    stock_code text not null,
    activity_date datetime not null,
    income real,
    unique (report_date, stock_code, activity_date) on conflict ignore
);



Python sourcing.py:

import csv
import logging
import os
import xlrd

import datetime
from lxml import html

from ..common import sourcing_mops
from ..common import date_util as date_util

class Sourcing(sourcing_mops.SourcingMops):

    def __init__(self):
        self.LOGGER = logging.getLogger()
        self.URL_TEMPLATE = '''http://mops.twse.com.tw/t21/sii/t21sc03_%s_%s.html'''
        self.DATES = []
        self.HTML_DIR = '''./dataset/operating_income/html/'''
        self.CSV_DIR = '''./dataset/operating_income/csv/'''
        self.DB_FILE = './db/stocktotal.db'
        self.SQL_INSERT = '''insert or ignore into OperatingIncome(
                report_date,
                stock_code,
                activity_date,
                income
            ) values(?, ?, ?, ?)'''

    def source(self, begin_date, end_date):
        sourcing_mops.SourcingMops.init_dates(self, begin_date, end_date)
        sourcing_mops.SourcingMops.source_url_to_html(self, self.HTML_DIR)
        self.source_html_to_csv(self.HTML_DIR, self.CSV_DIR)
        sourcing_mops.SourcingMops.source_csv_to_sqlite(self, self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)
 
    def source_html_to_csv(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for date in reversed(self.DATES):
            self.source_html_to_csv_single(src_dir, dest_dir, date)  

    def source_html_to_csv_single(self, src_dir, dest_dir, date):
        src_file = self.get_filename(src_dir, date, 'html')
        dest_file = self.get_filename(dest_dir, date, 'csv')
        self.LOGGER.debug('''%s => %s''' % (src_file, dest_file))
        assert os.path.isfile(src_file)
     
        dest_fd = open(dest_file, 'w', newline='')
        csv_writer = csv.writer(dest_fd)
     
        src_fd = open(src_file, 'rb')
        src_content = src_fd.read()
        src_fd.close()

        content = None
        try:
            content = html.fromstring(src_content.decode('big5-hkscs').replace(' ', ' '))
        except UnicodeDecodeError as e:
            self.LOGGER.debug(e)
            content = html.fromstring(src_content.decode('gb18030').replace(' ', ' '))
         
        for category in content.xpath('//html/body/center/table'):
            for co_list in category.xpath('./tr/td[@colspan="2"]/table'):
                for co in co_list.xpath('./tr[@align="right"]'):
                    # Ignore summary of this category
                    summary = co.xpath('./th/text()')
                    if len(summary) is 1:
                        continue

                    items = co.xpath('./td/text()')
                    assert len(items) is 10
                    stock_code = items[0]

                    this_month_record = [
                        date,
                        stock_code,
                        date_util.get_this_month_by(date),
                        items[2].strip().replace(',','')
                    ]
                    last_month_record = [
                        date,
                        stock_code,
                        date_util.get_last_month_by(date),
                        items[3].strip().replace(',','')
                    ]
                    last_year_record = [
                        date,
                        stock_code,
                        date_util.get_last_year_by(date),
                        items[4].strip().replace(',','')
                    ]
                    csv_writer.writerow(this_month_record)
                    csv_writer.writerow(last_month_record)
                    csv_writer.writerow(last_year_record)
        dest_fd.close()
         
    def get_url(self, date):
        return self.URL_TEMPLATE % (date.year - 1911, date.month)
     
    def get_filename(self, src_dir, date, ext):
        return os.path.join(src_dir, date.strftime('%Y-%m') + '.' + ext)



sourcing_mops.py

((後來想想,還是得用 date => url,然後把 date 記起來,這樣才方便。之後再來想怎麼 refactoring 吧。))

# coding: big5

import csv
import logging
import os
import shutil
import sqlite3

from datetime import date
from datetime import datetime

class SourcingMops():

    def __init__(self):
        self.LOGGER = logging.getLogger()
        self.DATES = []
        self.HTML_DIR = ''
        self.XLS_DIR = ''
        self.CSV_DIR = ''
        self.DB_FILE = './db/stocktotal.db'
        self.SQL_INSERT = ''

    def init_dates(self, begin_date, end_date):
        begin = datetime.strptime(begin_date, '%Y-%m-%d')
        end = datetime.strptime(end_date, '%Y-%m-%d')
        monthly_begin = 12 * begin.year + begin.month - 1
        monthly_end = 12 * end.year + end.month
        for monthly in range(monthly_begin, monthly_end):
            year, month = divmod(monthly, 12)
            self.DATES.append(date(year, month + 1, 1))
         
    def source_url_to_html(self, dest_dir):
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for date in self.DATES:
            url = self.get_url(date)
            dest_file = self.get_filename(dest_dir, date, 'html')
            self.__wget(url, dest_file)

    def source_zip_to_xls(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for date in self.DATES:
            src_file = self.get_filename(src_dir, date, 'zip')
            dest_file = self.get_filename(dest_dir, date, 'xls')
            self.source_zip_to_xls_single(src_file, dest_dir, dest_file)
 
    def source_zip_to_xls_single(self, src_file, dest_dir, dest_file):
        assert os.path.isfile(src_file)
        assert os.path.isdir(dest_dir)

        sevenzip_output_dir = os.path.join(dest_dir, 'sevenzip_output_dir')
        self.__sevenzip_extract(src_file, sevenzip_output_dir)
        if not os.path.exists(sevenzip_output_dir):
            self.LOGGER.info('''%s => Failure to extract''' % src_file)
            return

        file_list = os.listdir(sevenzip_output_dir)
        assert len(file_list) is 1
        sevenzip_output_file = os.path.join(sevenzip_output_dir, file_list[0])
        shutil.copy(sevenzip_output_file, dest_file)
        shutil.rmtree(sevenzip_output_dir)
     
    def source_csv_to_sqlite(self, src_dir, dest_db, sql_insert):
        assert os.path.isdir(src_dir)
        assert os.path.isfile(dest_db)
        for date in self.DATES:
            src_file = self.get_filename(src_dir, date, 'csv')
            if os.path.isfile(src_file):
                self.source_csv_to_sqlite_single(src_file, dest_db, sql_insert)
         
    def source_csv_to_sqlite_single(self, src_file, dest_db, sql_insert):
        self.LOGGER.debug('''%s => %s''' % (src_file, dest_db))
        fd = open(src_file, 'r')
        csv_reader = csv.reader(fd)
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
        for row in csv_reader:
            cursor.execute(sql_insert, row)
            self.LOGGER.debug(row)
        conn.commit()
        cursor.close()
        conn.close()
        fd.close()

    def get_url(self, date):
        pass
     
    def get_filename(self, src_dir, date, ext):
        pass
     
    def __wget(self, url, dest_file):
        wget = os.path.abspath('./src/thirdparty/wget/wget.exe')
        assert os.path.isfile(wget)
        wget_cmdline = '''%s -N \"%s\" --waitretry=3 -O \"%s\"''' % (wget, url, dest_file)
        os.system(wget_cmdline)

    def __sevenzip_extract(self, src_file, dest_dir):
        sevenzip = os.path.abspath('./src/thirdparty/sevenzip/7z.exe')
        assert os.path.isfile(sevenzip)
        sevenzip_cmdline = '''%s e %s -y -o%s''' % (sevenzip, src_file, dest_dir)
        os.system(sevenzip_cmdline)



date_util.py

import datetime

def get_last_month():
    today = datetime.date.today()
    first = datetime.date(day=1, month=today.month, year=today.year)
    last_month = first - datetime.timedelta(days=1)
    return datetime.date(day=1, month=last_month.month, year=last_month.year)

def get_this_month():
    today = datetime.date.today()
    return datetime.date(day=1, month=today.month, year=today.year)

def get_yesterday():
    return datetime.date.today() - datetime.timedelta(days=1)

def get_last_month_by(someday):
    first = datetime.date(day=1, month=someday.month, year=someday.year)
    last_month = first - datetime.timedelta(days=1)
    return datetime.date(day=1, month=last_month.month, year=last_month.year)

def get_this_month_by(someday):
    return datetime.date(day=1, month=someday.month, year=someday.year)
 
def get_last_year_by(someday):
    first = datetime.date(day=1, month=someday.month, year=someday.year)
    assert first.day is 1
    return datetime.date(day=1, month=first.month, year=first.year-1)

沒有留言:

張貼留言