1.kline基类实现
2.source基类实现 3.时间转换工具datetime convert实现 4.使用binance restapi获取数据实现 5.binance获取数据单元测试
This commit is contained in:
128
common/convert/datetime_convert.py
Normal file
128
common/convert/datetime_convert.py
Normal file
@@ -0,0 +1,128 @@
|
||||
# datetime_convert.py
|
||||
|
||||
import pytz
|
||||
from datetime import datetime, timezone
|
||||
import pandas as pd
|
||||
|
||||
def datetime_to_int(dt: datetime) -> int:
|
||||
"""
|
||||
将 datetime 对象转换为毫秒级时间戳 (int)。
|
||||
|
||||
Args:
|
||||
dt (datetime): 要转换的 datetime 对象。
|
||||
|
||||
Returns:
|
||||
int: 毫秒级时间戳。
|
||||
"""
|
||||
# 如果 datetime 对象没有时区信息,则默认为 UTC
|
||||
if dt.tzinfo is None:
|
||||
# 假设输入是 UTC 时间
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
# 转换为 UTC 时间戳 (秒),再乘以 1000 得到毫秒
|
||||
return int(dt.timestamp() * 1000)
|
||||
|
||||
def int_to_datetime(ts_ms: int, tz_info: str = 'UTC') -> datetime:
|
||||
"""
|
||||
将毫秒级时间戳转换为 datetime 对象。
|
||||
|
||||
Args:
|
||||
ts_ms (int): 毫秒级时间戳。
|
||||
tz_info (str): 目标时区名称 (如 'UTC', 'Asia/Shanghai')。默认为 'UTC'。
|
||||
|
||||
Returns:
|
||||
datetime: 转换后的 datetime 对象。
|
||||
"""
|
||||
# 先转换为秒级时间戳
|
||||
ts_s = ts_ms / 1000.0
|
||||
# 使用 pandas 转换,方便处理时区
|
||||
dt_utc = pd.to_datetime(ts_s, unit='s', utc=True)
|
||||
# 转换为目标时区
|
||||
if tz_info != 'UTC':
|
||||
target_tz = pytz.timezone(tz_info)
|
||||
dt_target = dt_utc.tz_convert(target_tz)
|
||||
return dt_target.to_pydatetime()
|
||||
else:
|
||||
# 如果目标是 UTC,直接返回 UTC 时间
|
||||
return dt_utc.to_pydatetime()
|
||||
|
||||
def utc_to_beijing(utc_dt: datetime) -> datetime:
|
||||
"""
|
||||
将 UTC 时间转换为北京时间 (东八区)。
|
||||
|
||||
Args:
|
||||
utc_dt (datetime): UTC 时间的 datetime 对象。
|
||||
|
||||
Returns:
|
||||
datetime: 北京时间的 datetime 对象。
|
||||
"""
|
||||
# 确保输入是 UTC 时区
|
||||
if utc_dt.tzinfo is None:
|
||||
utc_dt = utc_dt.replace(tzinfo=pytz.UTC)
|
||||
elif utc_dt.tzinfo != pytz.UTC:
|
||||
# 如果不是 UTC,先转换为 UTC
|
||||
utc_dt = utc_dt.astimezone(pytz.UTC)
|
||||
# 转换为北京时间
|
||||
beijing_tz = pytz.timezone('Asia/Shanghai')
|
||||
beijing_dt = utc_dt.astimezone(beijing_tz)
|
||||
return beijing_dt
|
||||
|
||||
def beijing_to_utc(beijing_dt: datetime) -> datetime:
|
||||
"""
|
||||
将北京时间 (东八区) 转换为 UTC 时间。
|
||||
|
||||
Args:
|
||||
beijing_dt (datetime): 北京时间的 datetime 对象。
|
||||
|
||||
Returns:
|
||||
datetime: UTC 时间的 datetime 对象。
|
||||
"""
|
||||
# 确保输入是北京时间时区
|
||||
if beijing_dt.tzinfo is None:
|
||||
# 如果没有时区信息,默认为北京时间 (Asia/Shanghai)
|
||||
beijing_dt = pytz.timezone('Asia/Shanghai').localize(beijing_dt)
|
||||
elif beijing_dt.tzinfo != pytz.timezone('Asia/Shanghai'):
|
||||
# 如果不是北京时间时区,先本地化为北京时间
|
||||
beijing_dt = pytz.timezone('Asia/Shanghai').localize(beijing_dt)
|
||||
# 转换为 UTC
|
||||
utc_dt = beijing_dt.astimezone(pytz.UTC)
|
||||
return utc_dt
|
||||
|
||||
# --- 示例用法 (可选,放在文件末尾或单独的测试脚本中) ---
|
||||
if __name__ == "__main__":
|
||||
print("=== datetime 转换测试 ===")
|
||||
|
||||
# 1. datetime 转 int (时间戳)
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
timestamp = datetime_to_int(now_utc)
|
||||
print(f"当前UTC时间: {now_utc}")
|
||||
print(f"时间戳 (毫秒): {timestamp}")
|
||||
|
||||
# 2. int 转 datetime
|
||||
dt_from_ts = int_to_datetime(timestamp, 'UTC')
|
||||
print(f"从时间戳还原 UTC 时间: {dt_from_ts}")
|
||||
dt_from_ts_beijing = int_to_datetime(timestamp, 'Asia/Shanghai')
|
||||
print(f"从时间戳还原 北京时间: {dt_from_ts_beijing}")
|
||||
|
||||
# 3. UTC 转 北京时间
|
||||
utc_time = datetime(2023, 10, 15, 12, 0, 0, tzinfo=pytz.UTC)
|
||||
beijing_time = utc_to_beijing(utc_time)
|
||||
print(f"UTC时间: {utc_time}")
|
||||
print(f"北京时间: {beijing_time}")
|
||||
|
||||
# 4. 北京时间 转 UTC
|
||||
beijing_time_2 = datetime(2023, 10, 15, 20, 0, 0, tzinfo=pytz.timezone('Asia/Shanghai'))
|
||||
utc_time_2 = beijing_to_utc(beijing_time_2)
|
||||
print(f"北京时间: {beijing_time_2}")
|
||||
print(f"UTC时间: {utc_time_2}")
|
||||
|
||||
# 5. 无时区信息的 datetime (假设为 UTC)
|
||||
naive_dt = datetime(2023, 10, 15, 12, 0, 0)
|
||||
timestamp_naive = datetime_to_int(naive_dt)
|
||||
print(f"无时区信息的 datetime: {naive_dt}")
|
||||
print(f"假设为 UTC 转换的时间戳: {timestamp_naive}")
|
||||
|
||||
# 6. 无时区信息的 datetime 转 北京时间
|
||||
# 通常这种情况下,你可能想把它当作 UTC 处理
|
||||
naive_dt_utc = naive_dt.replace(tzinfo=pytz.UTC) # 显式设置为 UTC
|
||||
beijing_from_naive = utc_to_beijing(naive_dt_utc)
|
||||
print(f"假设为 UTC 的 datetime 转北京时间: {beijing_from_naive}")
|
||||
Reference in New Issue
Block a user