2.source基类实现 3.时间转换工具datetime convert实现 4.使用binance restapi获取数据实现 5.binance获取数据单元测试
68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
# data_manager/data_structure/kline_base.py
|
||
|
||
from abc import ABC, abstractmethod
|
||
import pandas as pd
|
||
from datetime import datetime
|
||
|
||
class KlineBase(ABC):
|
||
"""
|
||
K线数据基类,定义所有K线必须具备的核心属性和方法。
|
||
所有具体的K线类都应继承此基类。
|
||
"""
|
||
|
||
def __init__(self, open_time: int, close_time: int, open_price: float, high_price: float,
|
||
low_price: float, close_price: float, volume: float):
|
||
"""
|
||
初始化K线基础信息。
|
||
|
||
Args:
|
||
open_time: 开盘时间戳 (毫秒)
|
||
close_time: 收盘时间戳 (毫秒)
|
||
open_price: 开盘价
|
||
high_price: 最高价
|
||
low_price: 最低价
|
||
close_price: 收盘价
|
||
volume: 成交量
|
||
"""
|
||
self.open_time = open_time
|
||
self.close_time = close_time
|
||
self.open_price = open_price
|
||
self.high_price = high_price
|
||
self.low_price = low_price
|
||
self.close_price = close_price
|
||
self.volume = volume
|
||
|
||
@property
|
||
def open_datetime(self) -> datetime:
|
||
"""获取开盘时间的datetime对象"""
|
||
return pd.to_datetime(self.open_time, unit='ms')
|
||
|
||
@property
|
||
def close_datetime(self) -> datetime:
|
||
"""获取收盘时间的datetime对象"""
|
||
return pd.to_datetime(self.close_time, unit='ms')
|
||
|
||
@abstractmethod
|
||
def to_dict(self) -> dict:
|
||
"""
|
||
将K线数据转换为字典格式。
|
||
子类必须实现此方法。
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def to_pandas_series(self) -> pd.Series:
|
||
"""
|
||
将K线数据转换为pandas Series。
|
||
子类必须实现此方法。
|
||
"""
|
||
pass
|
||
|
||
def __str__(self):
|
||
"""字符串表示"""
|
||
return f"Kline({self.open_datetime.strftime('%Y-%m-%d %H:%M:%S')} - {self.close_datetime.strftime('%Y-%m-%d %H:%M:%S')}, " \
|
||
f"Open: {self.open_price}, Close: {self.close_price}, High: {self.high_price}, Low: {self.low_price}, Vol: {self.volume})"
|
||
|
||
def __repr__(self):
|
||
"""详细字符串表示"""
|
||
return self.__str__() |