chart.py
import os
import numpy as np
import matplotlib.pyplot as plt
from ..stocktotal import stocktotal
from ..common import finance_util
class Chart():
def generate_operating_income_chart(self, stock_code, dest_dir):
s = stocktotal.Stocktotal()
s_records = s.select_operating_income(stock_code)
records = np.core.records.fromrecords(s_records, names='activity_date,income')
dates = [_.toordinal() for _ in records.activity_date]
incomes = records.income
ma3 = finance_util.moving_average(incomes, 3)
ma12 = finance_util.moving_average(incomes, 12)
plt.clf()
plt.plot_date(x=dates, y=incomes, fmt='-', color='gray')
plt.plot_date(x=dates, y=ma3, fmt='-', color='blue', linewidth=3)
plt.plot_date(x=dates, y=ma12, fmt='-', color='red', linewidth=6)
plt.title('''Operating Income Trend: %s''' % stock_code)
plt.ylabel('Operating Income')
plt.grid(True)
plt.savefig(os.path.join(dest_dir, '''%s.png''' % stock_code))
finance_util.py
import numpy as np
def moving_average(sequence, period):
if len(sequence) < period:
return sequence
array = np.asarray(sequence)
weights = np.ones(period)
weights /= weights.sum()
rv = np.convolve(array, weights, mode='full')[:len(array)]
return rv
stocktotal.py
import os
import sqlite3
from datetime import date
from datetime import datetime
class Stocktotal():
def __init__(self):
self.DB_FILE = './db/stocktotal.db'
assert os.path.isfile(self.DB_FILE)
def select_operating_income_stock_code(self):
SQL_SELECT = \
'''
SELECT code from StockCode where code in
(
SELECT distinct(stock_code) FROM OperatingIncome
)
'''
conn = sqlite3.connect(self.DB_FILE)
cursor = conn.cursor()
cursor.execute(SQL_SELECT)
rv = [_[0] for _ in cursor.fetchall()]
cursor.close()
conn.close()
return rv
def select_operating_income(self, stock_code):
SQL_SELECT = \
'''
SELECT activity_date, income from
(
SELECT *, max(report_date) FROM OperatingIncome where stock_code = ?
group by activity_date
order by activity_date
)
'''
conn = sqlite3.connect(self.DB_FILE)
cursor = conn.cursor()
cursor.execute(SQL_SELECT, [stock_code])
rv = [(datetime.strptime(_[0], '%Y-%m-%d'), _[1]) for _ in cursor.fetchall()]
cursor.close()
conn.close()
return rv
def select_stock_code(self):
SQL_SELECT = '''select code from StockCode'''
conn = sqlite3.connect(self.DB_FILE)
cursor = conn.cursor()
cursor.execute(SQL_SELECT)
rv = [_[0] for _ in cursor.fetchall()]
cursor.close()
conn.close()
return rv
沒有留言:
張貼留言