diff --git a/common/convert/datetime_convert.py b/common/convert/datetime_convert.py new file mode 100644 index 0000000..0d554cc --- /dev/null +++ b/common/convert/datetime_convert.py @@ -0,0 +1,128 @@ +# datetime_convert.py + +import pytz +from datetime import datetime, timezone +import pandas as pd + +def datetime_to_int(dt: datetime) -> int: + """ + 将 datetime 对象转换为毫秒级时间戳 (int)。 + + Args: + dt (datetime): 要转换的 datetime 对象。 + + Returns: + int: 毫秒级时间戳。 + """ + # 如果 datetime 对象没有时区信息,则默认为 UTC + if dt.tzinfo is None: + # 假设输入是 UTC 时间 + dt = dt.replace(tzinfo=timezone.utc) + # 转换为 UTC 时间戳 (秒),再乘以 1000 得到毫秒 + return int(dt.timestamp() * 1000) + +def int_to_datetime(ts_ms: int, tz_info: str = 'UTC') -> datetime: + """ + 将毫秒级时间戳转换为 datetime 对象。 + + Args: + ts_ms (int): 毫秒级时间戳。 + tz_info (str): 目标时区名称 (如 'UTC', 'Asia/Shanghai')。默认为 'UTC'。 + + Returns: + datetime: 转换后的 datetime 对象。 + """ + # 先转换为秒级时间戳 + ts_s = ts_ms / 1000.0 + # 使用 pandas 转换,方便处理时区 + dt_utc = pd.to_datetime(ts_s, unit='s', utc=True) + # 转换为目标时区 + if tz_info != 'UTC': + target_tz = pytz.timezone(tz_info) + dt_target = dt_utc.tz_convert(target_tz) + return dt_target.to_pydatetime() + else: + # 如果目标是 UTC,直接返回 UTC 时间 + return dt_utc.to_pydatetime() + +def utc_to_beijing(utc_dt: datetime) -> datetime: + """ + 将 UTC 时间转换为北京时间 (东八区)。 + + Args: + utc_dt (datetime): UTC 时间的 datetime 对象。 + + Returns: + datetime: 北京时间的 datetime 对象。 + """ + # 确保输入是 UTC 时区 + if utc_dt.tzinfo is None: + utc_dt = utc_dt.replace(tzinfo=pytz.UTC) + elif utc_dt.tzinfo != pytz.UTC: + # 如果不是 UTC,先转换为 UTC + utc_dt = utc_dt.astimezone(pytz.UTC) + # 转换为北京时间 + beijing_tz = pytz.timezone('Asia/Shanghai') + beijing_dt = utc_dt.astimezone(beijing_tz) + return beijing_dt + +def beijing_to_utc(beijing_dt: datetime) -> datetime: + """ + 将北京时间 (东八区) 转换为 UTC 时间。 + + Args: + beijing_dt (datetime): 北京时间的 datetime 对象。 + + Returns: + datetime: UTC 时间的 datetime 对象。 + """ + # 确保输入是北京时间时区 + if beijing_dt.tzinfo is None: + # 如果没有时区信息,默认为北京时间 (Asia/Shanghai) + beijing_dt = pytz.timezone('Asia/Shanghai').localize(beijing_dt) + elif beijing_dt.tzinfo != pytz.timezone('Asia/Shanghai'): + # 如果不是北京时间时区,先本地化为北京时间 + beijing_dt = pytz.timezone('Asia/Shanghai').localize(beijing_dt) + # 转换为 UTC + utc_dt = beijing_dt.astimezone(pytz.UTC) + return utc_dt + +# --- 示例用法 (可选,放在文件末尾或单独的测试脚本中) --- +if __name__ == "__main__": + print("=== datetime 转换测试 ===") + + # 1. datetime 转 int (时间戳) + now_utc = datetime.now(timezone.utc) + timestamp = datetime_to_int(now_utc) + print(f"当前UTC时间: {now_utc}") + print(f"时间戳 (毫秒): {timestamp}") + + # 2. int 转 datetime + dt_from_ts = int_to_datetime(timestamp, 'UTC') + print(f"从时间戳还原 UTC 时间: {dt_from_ts}") + dt_from_ts_beijing = int_to_datetime(timestamp, 'Asia/Shanghai') + print(f"从时间戳还原 北京时间: {dt_from_ts_beijing}") + + # 3. UTC 转 北京时间 + utc_time = datetime(2023, 10, 15, 12, 0, 0, tzinfo=pytz.UTC) + beijing_time = utc_to_beijing(utc_time) + print(f"UTC时间: {utc_time}") + print(f"北京时间: {beijing_time}") + + # 4. 北京时间 转 UTC + beijing_time_2 = datetime(2023, 10, 15, 20, 0, 0, tzinfo=pytz.timezone('Asia/Shanghai')) + utc_time_2 = beijing_to_utc(beijing_time_2) + print(f"北京时间: {beijing_time_2}") + print(f"UTC时间: {utc_time_2}") + + # 5. 无时区信息的 datetime (假设为 UTC) + naive_dt = datetime(2023, 10, 15, 12, 0, 0) + timestamp_naive = datetime_to_int(naive_dt) + print(f"无时区信息的 datetime: {naive_dt}") + print(f"假设为 UTC 转换的时间戳: {timestamp_naive}") + + # 6. 无时区信息的 datetime 转 北京时间 + # 通常这种情况下,你可能想把它当作 UTC 处理 + naive_dt_utc = naive_dt.replace(tzinfo=pytz.UTC) # 显式设置为 UTC + beijing_from_naive = utc_to_beijing(naive_dt_utc) + print(f"假设为 UTC 的 datetime 转北京时间: {beijing_from_naive}") \ No newline at end of file diff --git a/data_manager/data_structure/binance_kline.py b/data_manager/data_structure/binance_kline.py new file mode 100644 index 0000000..4f97ca3 --- /dev/null +++ b/data_manager/data_structure/binance_kline.py @@ -0,0 +1,128 @@ +# data_manager/data_structure/binance_kline.py + +from typing import List +from .kline_base import KlineBase +import datetime +import pytz +import pandas as pd + +class BinanceKline(KlineBase): + """ + Binance K线数据类,继承自KlineBase,包含Binance特有的额外字段。 + """ + + def __init__(self, raw_data: List): + """ + 从Binance API返回的原始数据列表初始化。 + + Args: + raw_data: Binance API返回的单个K线数据列表,格式如下: + [ + 1764837300000, // 开盘时间戳 + '3180.66', // 开盘价 + '3186.50', // 最高价 + '3180.66', // 最低价 + '3182.22', // 收盘价 + '1281.633', // 成交量 + 1764837599999, // 收盘时间戳 + '4080178.49870', // 成交额 + 2411, // 成交笔数 + '730.307', // 主动买入成交量 + '2324986.16889', // 主动买入成交额 + '0' // 请忽略该参数 + ] + """ + # 解析Binance特有的字段 + open_time_ms = raw_data[0] + open_price = float(raw_data[1]) + high_price = float(raw_data[2]) + low_price = float(raw_data[3]) + close_price = float(raw_data[4]) + volume = float(raw_data[5]) # BTC + close_time_ms = raw_data[6] + quote_asset_volume = float(raw_data[7]) # USDT + trade_count = raw_data[8] # 成交笔数 + taker_buy_volume = float(raw_data[9]) # 主动买入成交量 (BTC) + taker_buy_quote_volume = float(raw_data[10]) # 主动买入成交额 (USDT) + ignore = raw_data[11] # 忽略 + + # 调用父类构造函数,传入必需的OHLCV信息 + super().__init__( + open_time=open_time_ms, + close_time=close_time_ms, + open_price=open_price, + high_price=high_price, + low_price=low_price, + close_price=close_price, + volume=volume + ) + + # Binance特有的字段 + self.quote_asset_volume = quote_asset_volume # 成交额 + self.trade_count = trade_count # 成交笔数 + self.taker_buy_volume = taker_buy_volume # 主动买入成交量 + self.taker_buy_quote_volume = taker_buy_quote_volume # 主动买入成交额 + + @property + def open_datetime(self) -> datetime: + """获取开盘时间的datetime对象 (转换为北京时间)""" + # 1. 将毫秒时间戳转换为UTC datetime对象 (pandas.to_datetime 会自动处理) + utc_dt = pd.to_datetime(self.open_time, unit='ms', utc=True) + # 2. 设置为UTC时区 (确保是UTC) + utc_dt = utc_dt.tz_convert('UTC') + # 3. 转换为北京时间 (Asia/Shanghai) + beijing_tz = pytz.timezone('Asia/Shanghai') + beijing_dt = utc_dt.tz_convert(beijing_tz) + return beijing_dt.to_pydatetime() # 转换回普通datetime对象 + + @property + def close_datetime(self) -> datetime: + """获取收盘时间的datetime对象 (转换为北京时间)""" + # 1. 将毫秒时间戳转换为UTC datetime对象 (pandas.to_datetime 会自动处理) + utc_dt = pd.to_datetime(self.close_time, unit='ms', utc=True) + # 2. 设置为UTC时区 (确保是UTC) + utc_dt = utc_dt.tz_convert('UTC') + # 3. 转换为北京时间 (Asia/Shanghai) + beijing_tz = pytz.timezone('Asia/Shanghai') + beijing_dt = utc_dt.tz_convert(beijing_tz) + return beijing_dt.to_pydatetime() # 转换回普通datetime对象 + + + + def to_dict(self) -> dict: + """ + 将K线数据转换为字典格式。 + 包含所有Binance特有的字段。 + """ + base_dict = { + 'open_time': self.open_time, + 'close_time': self.close_time, + 'open_price': self.open_price, + 'high_price': self.high_price, + 'low_price': self.low_price, + 'close_price': self.close_price, + 'volume': self.volume, + 'quote_asset_volume': self.quote_asset_volume, + 'trade_count': self.trade_count, + 'taker_buy_volume': self.taker_buy_volume, + 'taker_buy_quote_volume': self.taker_buy_quote_volume + } + return base_dict + + def to_pandas_series(self) -> pd.Series: + """ + 将K线数据转换为pandas Series。 + 索引是 `open_time`。 + """ + series = pd.Series(self.to_dict()) + series.name = self.open_time + return series + + def __str__(self): + """字符串表示""" + base_str = super().__str__() + return f"{base_str} | QuoteVol: {self.quote_asset_volume:.2f} | Trades: {self.trade_count} | TakerVol: {self.taker_buy_volume} | TakerQuoteVol: {self.taker_buy_quote_volume}" + + def __repr__(self): + """详细字符串表示""" + return self.__str__() \ No newline at end of file diff --git a/data_manager/data_structure/kline_base.py b/data_manager/data_structure/kline_base.py new file mode 100644 index 0000000..765ac93 --- /dev/null +++ b/data_manager/data_structure/kline_base.py @@ -0,0 +1,68 @@ +# data_manager/data_structure/kline_base.py + +from abc import ABC, abstractmethod +import pandas as pd +from datetime import datetime + +class KlineBase(ABC): + """ + K线数据基类,定义所有K线必须具备的核心属性和方法。 + 所有具体的K线类都应继承此基类。 + """ + + def __init__(self, open_time: int, close_time: int, open_price: float, high_price: float, + low_price: float, close_price: float, volume: float): + """ + 初始化K线基础信息。 + + Args: + open_time: 开盘时间戳 (毫秒) + close_time: 收盘时间戳 (毫秒) + open_price: 开盘价 + high_price: 最高价 + low_price: 最低价 + close_price: 收盘价 + volume: 成交量 + """ + self.open_time = open_time + self.close_time = close_time + self.open_price = open_price + self.high_price = high_price + self.low_price = low_price + self.close_price = close_price + self.volume = volume + + @property + def open_datetime(self) -> datetime: + """获取开盘时间的datetime对象""" + return pd.to_datetime(self.open_time, unit='ms') + + @property + def close_datetime(self) -> datetime: + """获取收盘时间的datetime对象""" + return pd.to_datetime(self.close_time, unit='ms') + + @abstractmethod + def to_dict(self) -> dict: + """ + 将K线数据转换为字典格式。 + 子类必须实现此方法。 + """ + pass + + @abstractmethod + def to_pandas_series(self) -> pd.Series: + """ + 将K线数据转换为pandas Series。 + 子类必须实现此方法。 + """ + pass + + def __str__(self): + """字符串表示""" + return f"Kline({self.open_datetime.strftime('%Y-%m-%d %H:%M:%S')} - {self.close_datetime.strftime('%Y-%m-%d %H:%M:%S')}, " \ + f"Open: {self.open_price}, Close: {self.close_price}, High: {self.high_price}, Low: {self.low_price}, Vol: {self.volume})" + + def __repr__(self): + """详细字符串表示""" + return self.__str__() \ No newline at end of file diff --git a/data_manager/source/base_source.py b/data_manager/source/base_source.py index e69de29..fae7334 100644 --- a/data_manager/source/base_source.py +++ b/data_manager/source/base_source.py @@ -0,0 +1,21 @@ +from abc import ABC, abstractmethod +import pandas as pd + + +class DataSource(ABC): + """数据源基类""" + + @abstractmethod + def try_connection(self): + """尝试连接行情""" + pass + + @abstractmethod + def get_historical_data(self, symbol: str, start_date: str, end_date: str, interval: str = "1d") : + """获取历史数据""" + pass + + @abstractmethod + def get_realtime_data(self, symbol: str) : + """获取实时数据""" + pass \ No newline at end of file diff --git a/data_manager/source/binance_source.py b/data_manager/source/binance_source.py new file mode 100644 index 0000000..0bd4454 --- /dev/null +++ b/data_manager/source/binance_source.py @@ -0,0 +1,58 @@ +import requests +import pandas as pd +from datetime import datetime +import os +from .base_source import DataSource +from binance_sdk_derivatives_trading_usds_futures.derivatives_trading_usds_futures import ( + DerivativesTradingUsdsFutures, + ConfigurationRestAPI, + DERIVATIVES_TRADING_USDS_FUTURES_REST_API_PROD_URL, +) +from binance_sdk_derivatives_trading_usds_futures.rest_api.models import ( + KlineCandlestickDataIntervalEnum, ContinuousContractKlineCandlestickDataContractTypeEnum, +) + +class BinanceSource(DataSource): + def __init__(self, api_key: str, api_secret: str, base_url: str, timeout: int): + self.api_key = api_key + self.api_secret = api_secret + self.base_url = base_url + self.timeout = timeout + configuration_rest_api = ConfigurationRestAPI( + api_key=os.getenv("API_KEY", self.api_key), + api_secret=os.getenv("API_SECRET", self.api_secret), + base_path=os.getenv( + "BASE_PATH", self.base_url + ), + timeout=timeout + ) + self.client = DerivativesTradingUsdsFutures(config_rest_api=configuration_rest_api) + + + + + def try_connection(self): + try: + response = self.client.rest_api.test_connectivity() + rate_limits = response.rate_limits + print(f"exchange_information() rate limits: {rate_limits}") + data = response.data() + return data + except Exception as e: + print(f"test_connectivity() error: {e}") + return None + + + def get_realtime_data(self, symbol: str): + return None + + def get_historical_data(self, symbol: str, interval: str = "1d",limit :int = None, start_time: int = None, end_time: int = None,): + response = self.client.rest_api.continuous_contract_kline_candlestick_data( + pair=symbol, + start_time=start_time, + end_time=end_time, + contract_type=ContinuousContractKlineCandlestickDataContractTypeEnum.PERPETUAL, + interval=KlineCandlestickDataIntervalEnum[f"INTERVAL_{interval}"].value, + limit=limit + ) + return response.data() \ No newline at end of file diff --git a/test/test_binance_source.py b/test/test_binance_source.py new file mode 100644 index 0000000..eaf995a --- /dev/null +++ b/test/test_binance_source.py @@ -0,0 +1,30 @@ +import datetime + +from data_manager.source.binance_source import * +from data_manager.data_structure.binance_kline import * +from common.convert.datetime_convert import * +from binance_sdk_derivatives_trading_usds_futures.derivatives_trading_usds_futures import ( + DerivativesTradingUsdsFutures, + ConfigurationRestAPI, + DERIVATIVES_TRADING_USDS_FUTURES_REST_API_PROD_URL, +) + +api_key="tl8m5dBtgsmZYblDD2jSpgmZZuag4curdLwpj3sHBlpLWOCL4Wkqc9lhfJF3zOPo" +api_secret= "1CmuGOjywTLKRZbJTRGVnP44rEj3j90IPzeiUzjyEoRc2V7fMCB7cv3FndTfXcFu" +base_url=DERIVATIVES_TRADING_USDS_FUTURES_REST_API_PROD_URL + +rest_api_client=BinanceSource(api_key,api_secret,base_url,10000) + +end_time=datetime.now() +start_time=datetime(2025,12,4,17,00) + +end_time=beijing_to_utc(end_time) +start_time=beijing_to_utc(start_time) + + + +ethusdc_klines=rest_api_client.get_historical_data("ETHUSDC","5m",start_time=datetime_to_int(start_time),end_time=datetime_to_int(end_time)) + +for ethusdc_kline in ethusdc_klines: + k=BinanceKline(ethusdc_kline) + print(k.__str__())