- Source web contents
- Parse web contents
- Insert database
1. Source Web Contents
Link: http://estockweb.standardchartered.com.tw/z/zc/zcc/zcc_1101.djhtm
我們稍微把 1101 改成 2498,就變成 2498的股利政策,表示我們可以透過這些網路資料來擷取個股股利政策。
擷取方式採用 third-party 軟體:wget。wget 有 Windows 版本及 Mac 版本,移植性不用擔心。那麼該怎麼用 Python 寫出跨平台的程式呢?這個網路上有很多資源,我也是抄別人的做法加以改良。
首先,先寫 interface,wget.py:
import platform
os = platform.system()
if os == 'Windows':
from . import wget_win as wget_private
elif os == 'Darwin':
from . import wget_mac as wget_private
else:
raise Exception('Please add support for your platform')
def wget(cmdline):
wget_private.wget(cmdline)
就是一個 wget method,舉例來說:url --waitretry=3 -O dest_file抓哪個網址,waitretry 設為 3 ((這是很重要的設定,畢竟網路環境不穩,無法無限制的等待,也不能不等待,我們得選中庸之道)),存在哪個目的檔。
程式中,我們利用 platform module 得到目前跑什麼平台。例如是 Windows 平台,那我就寫 wget_win.py,有點 d-pointer 的味道。
import os
def wget(cmdline):
wget = os.path.abspath('./core/thirdparty/wget/wget.exe')
assert os.path.isfile(wget)
wget_cmdline = '''{wget} {cmdline}'''.format(wget=wget, cmdline=cmdline)
os.system(wget_cmdline)
wget_mac.py
import os
def wget(cmdline):
wget_cmdline = '''wget {cmdline}'''.format(cmdline=cmdline)
os.system(wget_cmdline)
就是有那麼微微的不一樣。
2. Parse Web Contents
這是最複雜的部分,每個 web contents 可能差不多,也可能差很多。因為 web contents 多是 HTML 格式,關於取資料,lxml module 功能算十分強大,當然還有更強大的。
但中文網頁實在是有夠混亂的,特別是 encoding,我們得多試幾種 decoder。舉例來說,讀取 html content 的程式片斷如下
src_fd = open(src_file, 'rb')
src_content = src_fd.read()
src_fd.close()
content = None
try:
content = html.fromstring(src_content.decode('big5-hkscs'))
except UnicodeDecodeError as e:
self.LOGGER.debug(e)
content = html.fromstring(src_content.decode('gb18030'))
如果還沒辦法 decode,那可就麻煩了,目前網頁這兩個 decoders 就夠用了。
如果熟悉 xpath 的用法,接下來取資料就很容易了。通常這些網頁是用程式生出來的,格式自然比較規矩。xpath 程式片斷
for table in content.xpath('//html/body/div/table/tr/td[@width="99%"]/table/tr/td/table/tr/td/table'):
for yearly_dataset in table.xpath('./tr'):
yearly_data = yearly_dataset.xpath('./td/text()')
if len(yearly_data) is 7:
activity_date = self.get_date(yearly_data[0])
if not activity_date:
continue
record = [
self.STOCK_CODE,
activity_date,
self.get_double(yearly_data[1]),
self.get_double(yearly_data[2]),
self.get_double(yearly_data[3]),
self.get_double(yearly_data[4]),
self.get_double(yearly_data[5]),
self.get_double(yearly_data[6]),
]
總之知道基本技巧後,其它就是自由發揮,不會就 google 唄。
3. Insert Database
以 PostgreSQL 為例,我們得先準備好 schema
用 pgAdmin III 執行 SQL statement,並且留意執行身分,記得把權限開給適當的人。接著就是用 py-postgresql module 執行 PostgreSQL 操作。
這邊又有點小東西可以講。
Program to an interface, not an implementation
直接操作 PostgreSQL 或許方便,但將來想改用 MySQL 或是 SQL Server,得考慮程式好不好改,因為當初我也從 SQLite 改到 PostgreSQL。
db_config.py
insertion/insertion_factory.py
附錄:
standardchartered_source.py
stock_dividend_source.py
3. Insert Database
以 PostgreSQL 為例,我們得先準備好 schema
create table if not exists StockDividend
(
creation_dt timestamp default current_timestamp,
stock_code text not null,
activity_date date not null,
cash_dividend double precision,
stock_dividend_from_retained_earnings double precision,
stock_dividend_from_capital_reserve double precision,
stock_dividend double precision,
total_dividend double precision,
profit_sharing_percentage double precision,
unique (stock_code, activity_date)
);
用 pgAdmin III 執行 SQL statement,並且留意執行身分,記得把權限開給適當的人。接著就是用 py-postgresql module 執行 PostgreSQL 操作。
這邊又有點小東西可以講。
Program to an interface, not an implementation
直接操作 PostgreSQL 或許方便,但將來想改用 MySQL 或是 SQL Server,得考慮程式好不好改,因為當初我也從 SQLite 改到 PostgreSQL。
db_config.py
DB_TYPE = 'postgres'如果將來想換 database,直接在這邊設定,加入適當的 implementation 就可以了。
insertion/insertion_factory.py
from .. import db_config
db_type = db_config.DB_TYPE
if db_type == 'sqlite':
from .sqlite import insertion_factory as factory_private
elif db_type == 'postgres':
from .postgres import insertion_factory as factory_private
class InsertionFactory():
@staticmethod
def insertion():
return factory_private.InsertionFactory().insertion()
對於 insert,我們利用 factory pattern,根據 DB_TYPE,生成適當的 Insertion object。這裡我比較龜毛,最外面的 factory 只會生出 PostgreSQL 專屬的 factory,最後怎麼生出來,由專屬的 factory 來煩惱。
insertion/postgresql/insertion_factory.py
insertion/postgresql/insertion_factory.py
from . import insertion
class InsertionFactory():
@staticmethod
def insertion():
return insertion.Insertion()
insertion/postgresql/insertion.py
import postgresql
class Insertion():
def __init__(self):
self.CONN_STRING = 'pq://stocktotal:stocktotal@localhost:5432/stocktotal'
self.DB_CONN = None
def open(self):
self.DB_CONN = postgresql.open(self.CONN_STRING)
def close(self):
self.DB_CONN.close()
self.DB_CONN = None
def insert(self, sql_cmd, record):
try:
fixed_record = [None if _.strip() == '' else _ for _ in record]
prepared_stmt = self.DB_CONN.prepare(sql_cmd)
prepared_stmt(*fixed_record)
except postgresql.exceptions.UniqueError:
pass
def insert_stock_dividend(self, record):
sql_cmd = \
'''
insert into StockDividend(
stock_code,
activity_date,
cash_dividend,
stock_dividend_from_retained_earnings,
stock_dividend_from_capital_reserve,
stock_dividend,
total_dividend,
profit_sharing_percentage
) values(
$1,
$2::text::date,
$3::text::float8,
$4::text::float8,
$5::text::float8,
$6::text::float8,
$7::text::float8,
$8::text::float8
)
'''
self.insert(sql_cmd, record)
外面 client user 只需傳 list 進來就可以了。當然這樣的 interface 有點死板,第一個是股號,第二個是日期,第三個是等等之類的,不過好用就好了,將來要改,也不會改的太疲憊。
附錄:
standardchartered_source.py
import csv
import logging
import os
from datetime import date
class StandardcharteredSource():
def __init__(self):
from ..db.insertion import insertion_factory
self.LOGGER = logging.getLogger()
self.URL_TEMPLATE = ''
self.STOCK_CODE = None
self.SOURCE_TYPE = None
self.HTML_DIR = ''
self.CSV_DIR = ''
self.DB_INSERTION = insertion_factory.InsertionFactory().insertion()
def source(self, stock_code):
self.STOCK_CODE = stock_code
self.source_url_to_html(self.HTML_DIR)
self.source_html_to_csv(self.HTML_DIR, self.CSV_DIR)
self.source_csv_to_db(self.SOURCE_TYPE, self.CSV_DIR, self.DB_INSERTION)
def source_url_to_html(self, dest_dir):
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
url = self.__get_url()
dest_file = self.get_filename(dest_dir, 'html')
self.__wget(url, dest_file)
def source_html_to_csv(self, src_dir, dest_dir):
pass
def source_csv_to_db(self, source_type, src_dir, db_insertion):
src_file = self.get_filename(src_dir, 'csv')
if not os.path.isfile(src_file):
return
self.LOGGER.debug('''{src_file} => db'''.format(src_file=src_file))
fd = open(src_file, 'r')
csv_reader = csv.reader(fd)
INSERT_SOURCE_TYPE_MAP = {
'capital_structure_summary': db_insertion.insert_capital_structure_summary,
'stock_dividend': db_insertion.insert_stock_dividend,
}
db_insertion.open()
for r in csv_reader:
INSERT_SOURCE_TYPE_MAP[source_type](r)
self.LOGGER.debug(r)
db_insertion.close()
fd.close()
def get_filename(self, src_dir, ext):
return os.path.join(src_dir, self.STOCK_CODE + '.' + ext)
# Get date from ROC year to date (Python data type)
def get_date(self, literal):
try:
return date(int(literal) + 1911, 1, 1)
except ValueError:
return None
def get_double(self, literal):
literal = literal.replace(',','')
try:
return float(literal)
except ValueError:
return None
def __get_url(self):
return self.URL_TEMPLATE % self.STOCK_CODE
def __wget(self, url, dest_file):
from ..base import wget
cmdline = '''\"{url}\" --waitretry=3 -O \"{dest_file}\"'''.format(url=url, dest_file=dest_file)
wget.wget(cmdline)
import csv
import os
from lxml import html
from . import standardchartered_source
class StockDividendSource(standardchartered_source.StandardcharteredSource):
def __init__(self):
standardchartered_source.StandardcharteredSource.__init__(self)
self.URL_TEMPLATE = '''http://estockweb.standardchartered.com.tw/z/zc/zcc/zcc_%s.djhtm'''
self.SOURCE_TYPE = 'stock_dividend'
self.HTML_DIR = '../dataset/stock_dividend/html/'
self.CSV_DIR = '../dataset/stock_dividend/csv/'
def source_html_to_csv(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
src_file = self.get_filename(src_dir, 'html')
dest_file = self.get_filename(dest_dir, 'csv')
self.LOGGER.debug('''{src_file} => {dest_file}'''.format(src_file=src_file, dest_file=dest_file))
assert os.path.isfile(src_file)
dest_fd = open(dest_file, 'w', newline='')
csv_writer = csv.writer(dest_fd)
src_fd = open(src_file, 'rb')
src_content = src_fd.read()
src_fd.close()
content = None
try:
content = html.fromstring(src_content.decode('big5-hkscs').replace(' ', ' ').replace('<BR>', ''))
except UnicodeDecodeError as e:
self.LOGGER.debug(e)
content = html.fromstring(src_content.decode('gb18030').replace(' ', ' ').replace('<BR>', ''))
for table in content.xpath('//html/body/div/table/tr/td[@width="99%"]/table/tr/td/table/tr/td/table'):
for yearly_dataset in table.xpath('./tr'):
yearly_data = yearly_dataset.xpath('./td/text()')
if len(yearly_data) is 7:
activity_date = self.get_date(yearly_data[0])
if not activity_date:
continue
record = [
self.STOCK_CODE,
activity_date,
self.get_double(yearly_data[1]),
self.get_double(yearly_data[2]),
self.get_double(yearly_data[3]),
self.get_double(yearly_data[4]),
self.get_double(yearly_data[5]),
self.get_double(yearly_data[6]),
]
csv_writer.writerow(record)
dest_fd.close()
沒有留言:
張貼留言