2012年10月23日 星期二

Python 好用!numpy 及 matplotlib

廢話不多說,直接看 Operating Income Trend Chart Generator:

chart.py

import os
import numpy as np
import matplotlib.pyplot as plt

from ..stocktotal import stocktotal
from ..common import finance_util

class Chart():

    def generate_operating_income_chart(self, stock_code, dest_dir):
        s = stocktotal.Stocktotal()
        s_records = s.select_operating_income(stock_code)
        records = np.core.records.fromrecords(s_records, names='activity_date,income')

        dates = [_.toordinal() for _ in records.activity_date]
        incomes = records.income
        ma3 = finance_util.moving_average(incomes, 3)
        ma12 = finance_util.moving_average(incomes, 12)

        plt.clf()
        plt.plot_date(x=dates, y=incomes, fmt='-', color='gray')
        plt.plot_date(x=dates, y=ma3, fmt='-', color='blue', linewidth=3)
        plt.plot_date(x=dates, y=ma12, fmt='-', color='red', linewidth=6)
        plt.title('''Operating Income Trend: %s''' % stock_code)
        plt.ylabel('Operating Income')
        plt.grid(True)
        plt.savefig(os.path.join(dest_dir, '''%s.png''' % stock_code))



finance_util.py

import numpy as np

def moving_average(sequence, period):
    if len(sequence) < period:
        return sequence
    array = np.asarray(sequence)
    weights = np.ones(period)
    weights /= weights.sum()
    rv = np.convolve(array, weights, mode='full')[:len(array)]
    return rv



stocktotal.py

import os
import sqlite3

from datetime import date
from datetime import datetime

class Stocktotal():

    def __init__(self):
        self.DB_FILE = './db/stocktotal.db'
        assert os.path.isfile(self.DB_FILE)
 
    def select_operating_income_stock_code(self):
        SQL_SELECT = \
        '''
            SELECT code from StockCode where code in
            (
                SELECT distinct(stock_code) FROM OperatingIncome
            )
        '''
        conn = sqlite3.connect(self.DB_FILE)
        cursor = conn.cursor()
        cursor.execute(SQL_SELECT)
        rv = [_[0] for _ in cursor.fetchall()]
        cursor.close()
        conn.close()
        return rv
     
    def select_operating_income(self, stock_code):
        SQL_SELECT = \
        '''
            SELECT activity_date, income from
            (
                SELECT *, max(report_date) FROM OperatingIncome where stock_code = ?
                group by activity_date
                order by activity_date
            )
        '''
        conn = sqlite3.connect(self.DB_FILE)
        cursor = conn.cursor()
        cursor.execute(SQL_SELECT, [stock_code])
        rv = [(datetime.strptime(_[0], '%Y-%m-%d'), _[1]) for _ in cursor.fetchall()]
        cursor.close()
        conn.close()
        return rv

    def select_stock_code(self):
        SQL_SELECT = '''select code from StockCode'''
        conn = sqlite3.connect(self.DB_FILE)
        cursor = conn.cursor()
        cursor.execute(SQL_SELECT)
        rv = [_[0] for _ in cursor.fetchall()]
        cursor.close()
        conn.close()
        return rv

沒有留言:

張貼留言