在聚宽平台上编写鳄鱼交易法则
# 导⼊函数库
import jqdata
import numpy as np
# 初始化函数,设定基准等等
def initialize(context):
t_option('u_real_price', True)
t_order_cost(OrderCost(clo_tax=0.001, open_commission=0.0003, clo_commission=0.0003, min_commission=5), type='stock') t_benchmark('000300.XSHG')
g.up_price = {} #向上碎形最⾼价
g.low_price = {} #向下碎形最低价
g.up_fractal_exists = {} #判断有效向上碎形
g.down_fractal_exists = {} #判断有效向下碎形
g.AO_index = {} #存放连续的AO指标数据
g.cal_AC_index = {} #计算AC指标中转存储
g.AC_index = {} #存放连续的AC指标数据
g.amount = {} #满仓仓位
g.stock = get_index_stocks('000300.XSHG')
幼儿园作息时间g.buy_stock = []
run_monthly(lect_univer,1,'open')
#重置全局变量
def ret_global():
g.up_price = {} #向上碎形最⾼价
g.low_price = {} #向下碎形最低价
g.up_fractal_exists = {} #判断有效向上碎形
g.down_fractal_exists = {} #判断有效向下碎形
叨唠g.AO_index = {} #存放连续的AO指标数据
g.cal_AC_index = {} #计算AC指标中转存储
g.AC_index = {} #存放连续的AC指标数据
g.amount = {} #满仓仓位
g.buy_stock = []
def initial_stock_global(stock):
g.up_price[stock] = 0
g.low_price[stock] = 0
g.up_fractal_exists[stock] = Fal
g.down_fractal_exists[stock] = Fal #判断有效向下碎形
g.AO_index[stock] = [0] #存放连续的AO指标数据
g.cal_AC_index[stock] = [0] #计算AC指标中转存储
g.AC_index[stock] = [0] #存放连续的AC指标数据
g.amount[stock] = 0 #满仓仓位
#轮换选股后清空持仓
def ret_position(context):
for stock in g.buy_stock:
order_target(stock,0)
log.info("ll %s for ret position"%stock)
#选股
def lect_univer(context):
#每三个⽉操作⼀次
month = context.h
if month%6 != g.month%6:
return
#清空全局变量
ret_position(context)
ret_global()
hist = history(30,'1d','clo',g.stock,df = Fal)
for stock in g.stock:
if is_sleeping_alligator(stock,hist,20):
g.buy_stock.append(stock)
#初始化该股票全局变量
initial_stock_global(stock)
print g.buy_stock
return None
#睡着的鳄鱼
def is_sleeping_alligator(stock,hist,nday):
for i in range(nday):
if is_struggle(stock,hist,i) == Fal:
return Fal
悦纳自己return True
#均线纠缠,BRG三线⾮常接近
def is_struggle(stock,hist,delta):
固肾功的正确做法
blue_line = hist[stock][-21-delta:-8-delta].mean()
red_line = hist[stock][-13-delta:-5-delta].mean()
green_line = hist[stock][-8-delta:-3-delta].mean()
if abs(blue_line/red_line-1)<0.02 and abs(red_line/green_line-1)<0.02:
学骑自行车的作文return True
el:
return Fal
#判断向上或向下碎形
def is_fractal(stock,direction):
hist = attribute_history(stock, 5, fields=[direction])
if direction == 'high':
if np.all(hist.iloc[:2] < hist.iloc[2]) and np.all(hist.iloc[3:] < hist.iloc[2]):
g.up_price[stock] = hist.iloc[2].values
return True
elif direction == 'low':
if np.all(hist.iloc[:2] > hist.iloc[2]) and np.all(hist.iloc[3:] > hist.iloc[2]):
g.low_price[stock] = hist.iloc[2].values
return True
抓螃蟹return Fal
#通过⽐较碎形与红线位置,判断碎形是否有效
def is_effective_fractal(stock, direction):
if is_fractal(stock,direction):
hist = attribute_history(stock, 11)
red_line = hist['clo'][:-3].mean()
clo_price = hist['clo'][-1]
if direction == 'high':
if clo_price > red_line:
g.up_fractal_exists[stock] = True
el:
g.up_fractal_exists[stock] = Fal
elif direction == 'low':
if clo_price < red_line:
g.down_fractal_exists[stock] = True
el:
操作系统的作用是g.down_fractal_exists[stock] = Fal
#N⽇内最⾼价格的N⽇线
def nday_high_point(stock,n):
hist = history(2*n,'1d','high',[stock],df = Fal)[stock]
high_point = []
for i in range(n):
high_point.append(max(hist[-5-i:-1-i]))
return np.array(high_point).mean()
#N⽇内最低价格的N⽇线
def nday_low_point(stock,n):
hist = history(2*n,'1d','low',[stock],df = Fal)[stock]
low_point = []
for i in range(n):
low_point.append(max(hist[-5-i:-1-i]))
return np.array(low_point).mean()
#AO=5⽇内(最⾼-最低)/2的5⽇移动平均-34⽇内(最⾼-最低)/2的34⽇移动平均def AO_index(stock):
g.AO_index[stock].append(nday_high_point(stock,5)/2 + nday_low_point(stock,5)/2\
- nday_high_point(stock,34)/2 - nday_low_point(stock,34)/2)
return None
#AO-AO的5⽇平均值的5⽇平均
def AC_index(stock):
AO_index(stock)
if len(g.AO_index[stock]) >= 5:
g.cal_AC_index[stock].append(g.AO_index[stock][-1] - np.array(g.AO_index[stock][-5:]).mean()) if len(g.cal_AC_index[stock]) >=5:
g.AC_index[stock].append(np.array(g.cal_AC_index[stock][-5:]).mean())
#判断序列n⽇上⾏
def is_up_going(alist,n):
if len(alist) < n:
return Fal
for i in range(n-1):
if alist[-(1+i)] <= alist[-(2+i)]:
return Fal
return True
#判断序列n⽇下⾏
def is_down_going(alist,n):
if len(alist) < n:
return Fal
for i in range(n-1):
if alist[-(1+i)] >= alist[-(2+i)]:
return Fal
return True
#碎形被突破
def active_fractal(stock,direction):
clo_price = history(1,'1d','clo',[stock],df=Fal)[stock][0]
if direction == 'up' and clo_price > g.up_price[stock]:
return True
elif direction == 'down' and clo_price < g.low_price[stock]:
return True
return Fal
#进场,初始仓位
def t_initial_position(stock,context):
clo_price = history(1,'1d','clo',[stock],df=Fal)[stock][0]
g.amount[stock] = context.portfolio.cash/clo_price/len(g.buy_stock)*3
order(stock, g.amount[stock])
log.info("buying %s 股数为 %s"%(stock,g.amount[stock]))
g.down_fractal_exists[stock] = Fal
#卖出
def ll_all_stock(stock,context):
order_target(stock,0)
log.info("lling %s"%stock)
g.up_fractal_exists[stock] = Fal
#加仓
def adjust_position(stock,context,position):
order(stock,g.amount[stock]*position)
log.info("adjust position buying %s 股数为 %s"%(stock,g.amount[stock]*position))
# 计算股票前n⽇收益率
行书图片def curity_return(days,curity_code):
hist1 = attribute_history(curity_code, days + 1, '1d', 'clo',df=Fal)
curity_returns = (hist1['clo'][-1]-hist1['clo'][0])/hist1['clo'][0]
return curity_returns
# ⽌损,根据前n⽇收益率
def conduct_nday_stoploss(context,curity_code,days,bench):
if curity_return(days,curity_code)<= bench:
for stock in g.buy_stock:
order_target_value(stock,0)
log.info("Sell %s for stoploss" %stock)
return True
el:
return Fal
# 计算股票累计收益率(从建仓⾄今)
def curity_accumulate_return(context,data,stock): current_price = data[stock].price
cost = context.portfolio.positions[stock].avg_cost
if cost != 0:
return (current_price-cost)/cost
el:
return None
# 个股⽌损,根据累计收益
def conduct_accumulate_stoploss(context,data,stock,bench): if curity_accumulate_return(context,data,stock) != None\ and curity_accumulate_return(context,data,stock) < bench: order_target_value(stock,0)
log.info("Sell %s for stoploss" %stock)
return True
el:
return Fal
# 个股⽌盈,根据累计收益
def conduct_accumulate_stopwin(context,data,stock,bench): if curity_accumulate_return(context,data,stock) != None\ and curity_accumulate_return(context,data,stock) > bench: order_target_value(stock,0)
log.info("Sell %s for stopwin" %stock)
return True
el:
return Fal
def handle_data(context,data):
#⼤盘⽌损
if conduct_nday_stoploss(context,'000300.XSHG',3,-0.03): return
for stock in g.buy_stock:
#个股⽌损
if conduct_accumulate_stopwin(context,data,stock,0.3)\
or conduct_accumulate_stoploss(context,data,stock,-0.1): return
#计算AO,AC指标
AC_index(stock)
#空仓时,寻找机会⼊场
if context.portfolio.positions[stock].amount == 0:
#计算向上碎形
is_effective_fractal(stock,'high')
#有效向上碎形存在,并被突破,买⼊
if g.up_fractal_exists and active_fractal(stock,'up'):
clo_price = history(5, '1d', 'clo', [stock],df = Fal)
if is_up_going(g.AO_index[stock],5)\
and is_up_going(g.AC_index[stock],3)\
and is_up_going(clo_price[stock],2):
t_initial_position(stock,context)
#有持仓时,加仓或离场
el:
#计算向下碎形
is_effective_fractal(stock,'low')
#出场条件1:有效向下碎形存在,并被突破,卖出
if g.down_fractal_exists and active_fractal(stock,'down'): ll_all_stock(stock,context)
return