首页 > 编程 > Python > 正文

神经网络python源码分享

2020-02-16 11:09:02
字体:
来源:转载
供稿:网友

神经网络的逻辑应该都是熟知的了,在这里想说明一下交叉验证

交叉验证方法:

看图大概就能理解了,大致就是先将数据集分成K份,对这K份中每一份都取不一样的比例数据进行训练和测试。得出K个误差,将这K个误差平均得到最终误差

这第一个部分是BP神经网络的建立

参数选取参照论文:基于数据挖掘技术的股价指数分析与预测研究_胡林林

import mathimport randomimport tushare as tsimport pandas as pdrandom.seed(0)def getData(id,start,end):  df = ts.get_hist_data(id,start,end)  DATA=pd.DataFrame(columns=['rate1', 'rate2','rate3','pos1','pos2','pos3','amt1','amt2','amt3','MA20','MA5','r'])  P1 = pd.DataFrame(columns=['high','low','close','open','volume'])  DATA2=pd.DataFrame(columns=['R'])  DATA['MA20']=df['ma20']  DATA['MA5']=df['ma5']  P=df['close']  P1['high']=df['high']  P1['low']=df['low']  P1['close']=df['close']  P1['open']=df['open']  P1['volume']=df['volume']  DATA['rate1']=(P1['close'].shift(1)-P1['open'].shift(1))/P1['open'].shift(1)  DATA['rate2']=(P1['close'].shift(2)-P1['open'].shift(2))/P1['open'].shift(2)  DATA['rate3']=(P1['close'].shift(3)-P1['open'].shift(3))/P1['open'].shift(3)  DATA['pos1']=(P1['close'].shift(1)-P1['low'].shift(1))/(P1['high'].shift(1)-P1['low'].shift(1))  DATA['pos2']=(P1['close'].shift(2)-P1['low'].shift(2))/(P1['high'].shift(2)-P1['low'].shift(2))  DATA['pos3']=(P1['close'].shift(3)-P1['low'].shift(3))/(P1['high'].shift(3)-P1['low'].shift(3))  DATA['amt1']=P1['volume'].shift(1)/((P1['volume'].shift(1)+P1['volume'].shift(2)+P1['volume'].shift(3))/3)  DATA['amt2']=P1['volume'].shift(2)/((P1['volume'].shift(2)+P1['volume'].shift(3)+P1['volume'].shift(4))/3)  DATA['amt3']=P1['volume'].shift(3)/((P1['volume'].shift(3)+P1['volume'].shift(4)+P1['volume'].shift(5))/3)  templist=(P-P.shift(1))/P.shift(1)  tempDATA = []  for indextemp in templist:    tempDATA.append(1/(1+math.exp(-indextemp*100)))  DATA['r'] = tempDATA  DATA=DATA.dropna(axis=0)  DATA2['R']=DATA['r']  del DATA['r']  DATA=DATA.T  DATA2=DATA2.T  DATAlist=DATA.to_dict("list")  result = []  for key in DATAlist:    result.append(DATAlist[key])  DATAlist2=DATA2.to_dict("list")  result2 = []  for key in DATAlist2:    result2.append(DATAlist2[key])  return resultdef getDataR(id,start,end):  df = ts.get_hist_data(id,start,end)  DATA=pd.DataFrame(columns=['rate1', 'rate2','rate3','pos1','pos2','pos3','amt1','amt2','amt3','MA20','MA5','r'])  P1 = pd.DataFrame(columns=['high','low','close','open','volume'])  DATA2=pd.DataFrame(columns=['R'])  DATA['MA20']=df['ma20'].shift(1)  DATA['MA5']=df['ma5'].shift(1)  P=df['close']  P1['high']=df['high']  P1['low']=df['low']  P1['close']=df['close']  P1['open']=df['open']  P1['volume']=df['volume']  DATA['rate1']=(P1['close'].shift(1)-P1['open'].shift(1))/P1['open'].shift(1)  DATA['rate2']=(P1['close'].shift(2)-P1['open'].shift(2))/P1['open'].shift(2)  DATA['rate3']=(P1['close'].shift(3)-P1['open'].shift(3))/P1['open'].shift(3)  DATA['pos1']=(P1['close'].shift(1)-P1['low'].shift(1))/(P1['high'].shift(1)-P1['low'].shift(1))  DATA['pos2']=(P1['close'].shift(2)-P1['low'].shift(2))/(P1['high'].shift(2)-P1['low'].shift(2))  DATA['pos3']=(P1['close'].shift(3)-P1['low'].shift(3))/(P1['high'].shift(3)-P1['low'].shift(3))  DATA['amt1']=P1['volume'].shift(1)/((P1['volume'].shift(1)+P1['volume'].shift(2)+P1['volume'].shift(3))/3)  DATA['amt2']=P1['volume'].shift(2)/((P1['volume'].shift(2)+P1['volume'].shift(3)+P1['volume'].shift(4))/3)  DATA['amt3']=P1['volume'].shift(3)/((P1['volume'].shift(3)+P1['volume'].shift(4)+P1['volume'].shift(5))/3)  templist=(P-P.shift(1))/P.shift(1)  tempDATA = []  for indextemp in templist:    tempDATA.append(1/(1+math.exp(-indextemp*100)))  DATA['r'] = tempDATA  DATA=DATA.dropna(axis=0)  DATA2['R']=DATA['r']  del DATA['r']  DATA=DATA.T  DATA2=DATA2.T  DATAlist=DATA.to_dict("list")  result = []  for key in DATAlist:    result.append(DATAlist[key])  DATAlist2=DATA2.to_dict("list")  result2 = []  for key in DATAlist2:    result2.append(DATAlist2[key])  return result2def rand(a, b):  return (b - a) * random.random() + adef make_matrix(m, n, fill=0.0):  mat = []  for i in range(m):    mat.append([fill] * n)  return matdef sigmoid(x):  return 1.0 / (1.0 + math.exp(-x))def sigmod_derivate(x):  return x * (1 - x)class BPNeuralNetwork:  def __init__(self):    self.input_n = 0    self.hidden_n = 0    self.output_n = 0    self.input_cells = []    self.hidden_cells = []    self.output_cells = []    self.input_weights = []    self.output_weights = []    self.input_correction = []    self.output_correction = []  def setup(self, ni, nh, no):    self.input_n = ni + 1    self.hidden_n = nh    self.output_n = no    # init cells    self.input_cells = [1.0] * self.input_n    self.hidden_cells = [1.0] * self.hidden_n    self.output_cells = [1.0] * self.output_n    # init weights    self.input_weights = make_matrix(self.input_n, self.hidden_n)    self.output_weights = make_matrix(self.hidden_n, self.output_n)    # random activate    for i in range(self.input_n):      for h in range(self.hidden_n):        self.input_weights[i][h] = rand(-0.2, 0.2)    for h in range(self.hidden_n):      for o in range(self.output_n):        self.output_weights[h][o] = rand(-2.0, 2.0)    # init correction matrix    self.input_correction = make_matrix(self.input_n, self.hidden_n)    self.output_correction = make_matrix(self.hidden_n, self.output_n)  def predict(self, inputs):    # activate input layer    for i in range(self.input_n - 1):      self.input_cells[i] = inputs[i]    # activate hidden layer    for j in range(self.hidden_n):      total = 0.0      for i in range(self.input_n):        total += self.input_cells[i] * self.input_weights[i][j]      self.hidden_cells[j] = sigmoid(total)    # activate output layer    for k in range(self.output_n):      total = 0.0      for j in range(self.hidden_n):        total += self.hidden_cells[j] * self.output_weights[j][k]      self.output_cells[k] = sigmoid(total)    return self.output_cells[:]  def back_propagate(self, case, label, learn, correct):    # feed forward    self.predict(case)    # get output layer error    output_deltas = [0.0] * self.output_n    for o in range(self.output_n):      error = label[o] - self.output_cells[o]      output_deltas[o] = sigmod_derivate(self.output_cells[o]) * error    # get hidden layer error    hidden_deltas = [0.0] * self.hidden_n    for h in range(self.hidden_n):      error = 0.0      for o in range(self.output_n):        error += output_deltas[o] * self.output_weights[h][o]      hidden_deltas[h] = sigmod_derivate(self.hidden_cells[h]) * error    # update output weights    for h in range(self.hidden_n):      for o in range(self.output_n):        change = output_deltas[o] * self.hidden_cells[h]        self.output_weights[h][o] += learn * change + correct * self.output_correction[h][o]        self.output_correction[h][o] = change    # update input weights    for i in range(self.input_n):      for h in range(self.hidden_n):        change = hidden_deltas[h] * self.input_cells[i]        self.input_weights[i][h] += learn * change + correct * self.input_correction[i][h]        self.input_correction[i][h] = change    # get global error    error = 0.0    for o in range(len(label)):      error += 0.5 * (label[o] - self.output_cells[o]) ** 2    return error  def train(self, cases, labels, limit=10000, learn=0.05, correct=0.1):    for i in range(limit):      error = 0.0      for i in range(len(cases)):        label = labels[i]        case = cases[i]        error += self.back_propagate(case, label, learn, correct)  def test(self,id):    result=getData("000001", "2015-01-05", "2015-01-09")    result2=getDataR("000001", "2015-01-05", "2015-01-09")    self.setup(11, 5, 1)    self.train(result, result2, 10000, 0.05, 0.1)    for t in resulttest:      print(self.predict(t))            
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表