用布林通道做套利模型,直观又好用!附指标源码

在量化套利的工具库中,布林通道(Bollinger Bands)一直是比较主流的套利方法——逻辑直观、信号清晰,尤其适合跨品种或跨期价差套利场景。

最近我们编写了一款布林带套利指标,能让套利机会可视化呈现,操作门槛直接降低,今天就带大家详细拆解

图片[1]|用布林通道做套利模型,直观又好用!附指标源码

一、指标核心逻辑:价差+布林带,抓偏离回归机会

套利的本质是捕捉“价格偏离正常关系”的机会,布林带恰好能帮我们精准定位这种偏离。我们以螺纹钢2601(简称A)和热卷2601(简称B)为例,看看指标如何运作:

图片[2]|用布林通道做套利模型,直观又好用!附指标源码
  • 核心线条定义:白色线条代表“价差=A价格-B价格”,三条彩色线分别是布林带的上轨、中轨(价差均值)、下轨;
  • 交易信号规则:当价差向上突破上轨时,说明A相对B高估(A是“强品种”),此时做空A、做多B;当价差向下突破下轨时,说明B相对A高估(B是“强品种”),此时做多B、做空A;
  • 止盈逻辑:当价差回归到中轨(正常均值区间)时,双边平仓获利。

整个过程清晰直观,新手也能快速上手。

二、和交易所自带套利工具比,优势在哪?

很多朋友会问:交易所就有套利品种,这款自定义指标的独特性是什么?

核心在于“高度灵活+全品种覆盖”

  • 品种无限制:不仅能做同品种跨期(如螺纹2601&2605),还能做跨品种套利(如豆油&棕榈油、铜&铝),只要是你想搭配的品种组合,都能叠加计算;
  • 参数可调整:布林带的周期、标准差倍数(如默认2倍σ,可根据行情调为1.5或2.5倍)等参数,都能按自己的交易习惯修改;
  • 可视化更强:直接将价差和布林带叠加在K线幅图,信号直观不杂乱,不用在多个界面间切换。

三、Python完整代码:直接“抄作业”

这款指标的Python代码已经整理完毕,包含数据获取、价差计算、布林带绘制、信号标记等全流程。需要的朋友可以直接复制使用,也能基于此修改成自己的专属套利工具

from pydantic import BaseModel

class Params(BaseModel, validate_assignment=True):
    from pydantic import Field
    
    is_main:bool = Field(default=False, title="是否为主图")
    symbol:str = Field(default="ag2512", title="其他合约名称")
    boll_window: int = Field(default=20, title="N")
    multiplier1: float = Field(default=2.0, title="M1")
    multiplier2: float = Field(default=2.0, title="M2")


# def on_subscribe(self):
#     # 第一个参数为合约名称,后续参数为周期名称,可以订阅多个合约和周期
#     self.subscribe(self.symbol, self.period)  # 订阅当前主合约
#     other_symbol = self.params.symbol
#     if other_symbol and other_symbol != self.symbol:
#         self.subscribe(other_symbol, self.period)


def on_init(self): 
    self.calculate_type = 'last' # 声明计算方式 'all' - 全量计算,'last' - 增量计算
    self.calculate_data_type = 'list' # 声明订阅数据类型,'list' - 列表
    self.spread_line = self.Line('价差', '#f0f5ff', line_width=2) # 初始化一个线条对象

    # 初始化布林带线条
    self.upper_band_line = self.Line('上轨', '#00FFFF', line_width=1)  # 青色
    self.middle_band_line = self.Line('中轨', '#8A2BE2', line_width=1)  # 蓝色
    self.lower_band_line = self.Line('下轨', '#FFC0CB', line_width=1)  # 粉色

    self.close_price = []
    self.close_other = []
    self.datetime_array = []



def calculate_all(self, data):
    self.init_graphics()
    self.spread_line = self.Line('价差', '#f0f5ff', line_width=2) # 初始化一个线条对象

    # 初始化布林带线条
    self.upper_band_line = self.Line('上轨', '#00FFFF', line_width=1)  # 青色
    self.middle_band_line = self.Line('中轨', '#8A2BE2', line_width=1)  # 蓝色
    self.lower_band_line = self.Line('下轨', '#FFC0CB', line_width=1)  # 粉色
    class DataProvider:
        import pandas as pd
        @staticmethod
        def get_kline_by_qmf(symbol: str, period: str = "M1", length: int = 600) -> dict:
            import requests
            base_url = "http://127.0.0.1:11118/api/data/getkdata"
            params = {"symbol": symbol, "lenth": length, "sequence": period, "first": 1}
            try:
                response = requests.get(base_url, params=params)
                if response.status_code == 200:
                    return response.json()
                return {}
            except Exception:
                return {}

        @staticmethod
        def load_kline(symbol: str, period: str, length: int = 600) -> list:
            # 直接使用传入的周期参数
            data = DataProvider.get_kline_by_qmf(symbol, period, length)
            data = data.get("data")
            if not data:
                return []
            return DataProvider.filter_kline(symbol, data)

        @staticmethod
        def filter_kline(symbol: str, data: list) -> list:
            from datetime import datetime
            result_data = []
            keys = ["date", "_1", "open", "high", "low", "close", "volume", "amount", "time", "_2", "_3"]
            for item in data:
                result = dict(zip(keys, item))
                if result["time"]:
                    # 数据为分钟线
                    result["time"] = str(result["time"]).zfill(4)
                    result["datetime"] = datetime.strptime(f"{result['date']} {result['time']}", "%Y%m%d %H%M")
                else:
                    result["datetime"] = datetime.strptime(f"{result['date']}", "%Y%m%d")
                result["symbol"] = symbol
                for k in ["date", "time", "_1", "_2", "_3", "amount"]:
                    result.pop(k, None)
                result_data.append(result)
            return result_data

        @staticmethod
        def get_data(symbol: str, period: str, length: int = 300) -> pd.DataFrame:
            import pandas as pd
            df = pd.DataFrame(DataProvider.load_kline(symbol, period, length))
            if len(df.index) > 0 and "datetime" in df.columns:
                df["datetime_str"] = df["datetime"]
                return df
            return pd.DataFrame()

        @staticmethod
        def dataframe_to_kline(df: pd.DataFrame) -> dict:
            klines = {
                "close": df["close"].tolist(),
                "open": df["open"].tolist(),
                "high": df["high"].tolist(),
                "low": df["low"].tolist(),
                "datetime": df["datetime"].tolist(),
                "volume": df["volume"].tolist(),
            }
            return klines

    length, datetime, open, high, low, close, volume = data

    lines = DataProvider.dataframe_to_kline(DataProvider.get_data(self.params.symbol , self.period, 1000))

    length_1 = length
    other_length = len(lines['datetime'])
    self.close_price = close
    self.close_other = lines['close']
    self.datetime_array = datetime

    # 计算价差
    self.deltas = []
    min_length = min(length_1, other_length)
    price = self.close_price[-min_length:]
    other_close = self.close_other[-min_length:]
    for j in range(min_length):
        delta = price[j] - other_close[j]
        self.deltas.append(delta)
        self.spread_line.set_point(self.datetime_array[-min_length + j], delta)

    # 计算布林带
    import pandas as pd

    window = self.params.boll_window
    multiplier1 = self.params.multiplier1
    multiplier2 = self.params.multiplier2

    deltas_series = pd.Series(self.deltas)
    rolling_mean = deltas_series.rolling(window=window).mean()
    rolling_std = deltas_series.rolling(window=window).std()
    upper_band = rolling_mean + (rolling_std * multiplier1)
    lower_band = rolling_mean - (rolling_std * multiplier2)

    # 设置布林带线条的点
    # 计算
    for j in range(min_length):
        if j >= window - 1:
            self.upper_band_line.set_point(self.datetime_array[-min_length + j], upper_band.iloc[j])
            self.middle_band_line.set_point(self.datetime_array[-min_length + j], rolling_mean.iloc[j])
            self.lower_band_line.set_point(self.datetime_array[-min_length + j], lower_band.iloc[j])


def calculate_last(self, data):
    class DataProvider:
        import pandas as pd
        @staticmethod
        def get_kline_by_qmf(symbol: str, period: str = "M1", length: int = 600) -> dict:
            import requests
            base_url = "http://127.0.0.1:11118/api/data/getkdata"
            params = {"symbol": symbol, "lenth": length, "sequence": period, "first": 1}
            try:
                response = requests.get(base_url, params=params)
                if response.status_code == 200:
                    return response.json()
                return {}
            except Exception:
                return {}

        @staticmethod
        def load_kline(symbol: str, period: str, length: int = 600) -> list:
            # 直接使用传入的周期参数
            data = DataProvider.get_kline_by_qmf(symbol, period, length)
            data = data.get("data")
            if not data:
                return []
            return DataProvider.filter_kline(symbol, data)

        @staticmethod
        def filter_kline(symbol: str, data: list) -> list:
            from datetime import datetime
            result_data = []
            keys = ["date", "_1", "open", "high", "low", "close", "volume", "amount", "time", "_2", "_3"]
            for item in data:
                result = dict(zip(keys, item))
                if result["time"]:
                    # 数据为分钟线
                    result["time"] = str(result["time"]).zfill(4)
                    result["datetime"] = datetime.strptime(f"{result['date']} {result['time']}", "%Y%m%d %H%M")
                else:
                    result["datetime"] = datetime.strptime(f"{result['date']}", "%Y%m%d")
                result["symbol"] = symbol
                for k in ["date", "time", "_1", "_2", "_3", "amount"]:
                    result.pop(k, None)
                result_data.append(result)
            return result_data

        @staticmethod
        def get_data(symbol: str, period: str, length: int = 300) -> pd.DataFrame:
            import pandas as pd
            df = pd.DataFrame(DataProvider.load_kline(symbol, period, length))
            if len(df.index) > 0 and "datetime" in df.columns:
                df["datetime_str"] = df["datetime"]
                return df
            return pd.DataFrame()

        @staticmethod
        def dataframe_to_kline(df: pd.DataFrame) -> dict:
            klines = {
                "close": df["close"].tolist(),
                "open": df["open"].tolist(),
                "high": df["high"].tolist(),
                "low": df["low"].tolist(),
                "datetime": df["datetime"].tolist(),
                "volume": df["volume"].tolist(),
            }
            return klines
    lines = DataProvider.dataframe_to_kline(DataProvider.get_data(self.params.symbol , self.period, 1000))
    datetime_str, open, high, low, close, volume = data

    if datetime_str == self.datetime_array[-1]: 
        self.close_price[-1] = close
        self.close_other[-1] = lines['close'][-1]
    else:
        self.close_price.append(close)
        self.close_other.append(lines['close'][-1])
        self.datetime_array.append(datetime_str)

    current_price = self.close_price[-1]
    current_price_1 = self.close_other[-1]

    delta = current_price - current_price_1
    
    self.spread_line.set_point(datetime_str, delta)

    if datetime_str == self.datetime_array[-1]: 
        self.deltas[-1] = delta
    else:
        self.deltas.append(delta)

    
    import pandas as pd

    window = self.params.boll_window
    multiplier1 = self.params.multiplier1
    multiplier2 = self.params.multiplier2

    deltas_series = pd.Series(self.deltas[-window:])
    if len(deltas_series.index) >= window:
        rolling_mean = deltas_series.rolling(window=window).mean()
        rolling_std = deltas_series.rolling(window=window).std()
        upper_band = rolling_mean + (rolling_std * multiplier1)
        lower_band = rolling_mean - (rolling_std * multiplier2)

        self.upper_band_line.set_point(datetime_str, upper_band.iloc[-1])
        self.middle_band_line.set_point(datetime_str, rolling_mean.iloc[-1])
        self.lower_band_line.set_point(datetime_str, lower_band.iloc[-1])

    pass

四、延伸:从“看指标”到“自动化交易”

如果觉得手动盯盘麻烦,基于这个指标做信号预警或量化交易系统也是完全可行的:比如设置价差突破轨道时自动弹窗提醒,甚至对接交易接口实现“信号触发-自动下单”的闭环。

套利的关键是“稳”和“准”,布林带的优势就在于用统计学规律降低主观判断误差,而自定义指标则让这种规律更贴合我们的交易需求。

需要完整代码的朋友,直接按提示获取即可,此代码仅是指标案例,如果运行中遇到问题,或想定制成交量验证、止损等功能,都可咨询客服

👉 技术支持,请微信扫码:

图片[3]|用布林通道做套利模型,直观又好用!附指标源码
© 版权声明
THE END
喜欢就支持一下吧
点赞559 分享
评论 抢沙发

请登录后发表评论

    请登录后查看评论内容