Files
quantitative_trading/common/convert/datetime_convert.py
Msi-147K\Shi.Ji 9a3769f862 1.kline基类实现
2.source基类实现
3.时间转换工具datetime convert实现
4.使用binance restapi获取数据实现
5.binance获取数据单元测试
2025-12-05 17:13:20 +08:00

128 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# datetime_convert.py
import pytz
from datetime import datetime, timezone
import pandas as pd
def datetime_to_int(dt: datetime) -> int:
"""
将 datetime 对象转换为毫秒级时间戳 (int)。
Args:
dt (datetime): 要转换的 datetime 对象。
Returns:
int: 毫秒级时间戳。
"""
# 如果 datetime 对象没有时区信息,则默认为 UTC
if dt.tzinfo is None:
# 假设输入是 UTC 时间
dt = dt.replace(tzinfo=timezone.utc)
# 转换为 UTC 时间戳 (秒),再乘以 1000 得到毫秒
return int(dt.timestamp() * 1000)
def int_to_datetime(ts_ms: int, tz_info: str = 'UTC') -> datetime:
"""
将毫秒级时间戳转换为 datetime 对象。
Args:
ts_ms (int): 毫秒级时间戳。
tz_info (str): 目标时区名称 (如 'UTC', 'Asia/Shanghai')。默认为 'UTC'
Returns:
datetime: 转换后的 datetime 对象。
"""
# 先转换为秒级时间戳
ts_s = ts_ms / 1000.0
# 使用 pandas 转换,方便处理时区
dt_utc = pd.to_datetime(ts_s, unit='s', utc=True)
# 转换为目标时区
if tz_info != 'UTC':
target_tz = pytz.timezone(tz_info)
dt_target = dt_utc.tz_convert(target_tz)
return dt_target.to_pydatetime()
else:
# 如果目标是 UTC直接返回 UTC 时间
return dt_utc.to_pydatetime()
def utc_to_beijing(utc_dt: datetime) -> datetime:
"""
将 UTC 时间转换为北京时间 (东八区)。
Args:
utc_dt (datetime): UTC 时间的 datetime 对象。
Returns:
datetime: 北京时间的 datetime 对象。
"""
# 确保输入是 UTC 时区
if utc_dt.tzinfo is None:
utc_dt = utc_dt.replace(tzinfo=pytz.UTC)
elif utc_dt.tzinfo != pytz.UTC:
# 如果不是 UTC先转换为 UTC
utc_dt = utc_dt.astimezone(pytz.UTC)
# 转换为北京时间
beijing_tz = pytz.timezone('Asia/Shanghai')
beijing_dt = utc_dt.astimezone(beijing_tz)
return beijing_dt
def beijing_to_utc(beijing_dt: datetime) -> datetime:
"""
将北京时间 (东八区) 转换为 UTC 时间。
Args:
beijing_dt (datetime): 北京时间的 datetime 对象。
Returns:
datetime: UTC 时间的 datetime 对象。
"""
# 确保输入是北京时间时区
if beijing_dt.tzinfo is None:
# 如果没有时区信息,默认为北京时间 (Asia/Shanghai)
beijing_dt = pytz.timezone('Asia/Shanghai').localize(beijing_dt)
elif beijing_dt.tzinfo != pytz.timezone('Asia/Shanghai'):
# 如果不是北京时间时区,先本地化为北京时间
beijing_dt = pytz.timezone('Asia/Shanghai').localize(beijing_dt)
# 转换为 UTC
utc_dt = beijing_dt.astimezone(pytz.UTC)
return utc_dt
# --- 示例用法 (可选,放在文件末尾或单独的测试脚本中) ---
if __name__ == "__main__":
print("=== datetime 转换测试 ===")
# 1. datetime 转 int (时间戳)
now_utc = datetime.now(timezone.utc)
timestamp = datetime_to_int(now_utc)
print(f"当前UTC时间: {now_utc}")
print(f"时间戳 (毫秒): {timestamp}")
# 2. int 转 datetime
dt_from_ts = int_to_datetime(timestamp, 'UTC')
print(f"从时间戳还原 UTC 时间: {dt_from_ts}")
dt_from_ts_beijing = int_to_datetime(timestamp, 'Asia/Shanghai')
print(f"从时间戳还原 北京时间: {dt_from_ts_beijing}")
# 3. UTC 转 北京时间
utc_time = datetime(2023, 10, 15, 12, 0, 0, tzinfo=pytz.UTC)
beijing_time = utc_to_beijing(utc_time)
print(f"UTC时间: {utc_time}")
print(f"北京时间: {beijing_time}")
# 4. 北京时间 转 UTC
beijing_time_2 = datetime(2023, 10, 15, 20, 0, 0, tzinfo=pytz.timezone('Asia/Shanghai'))
utc_time_2 = beijing_to_utc(beijing_time_2)
print(f"北京时间: {beijing_time_2}")
print(f"UTC时间: {utc_time_2}")
# 5. 无时区信息的 datetime (假设为 UTC)
naive_dt = datetime(2023, 10, 15, 12, 0, 0)
timestamp_naive = datetime_to_int(naive_dt)
print(f"无时区信息的 datetime: {naive_dt}")
print(f"假设为 UTC 转换的时间戳: {timestamp_naive}")
# 6. 无时区信息的 datetime 转 北京时间
# 通常这种情况下,你可能想把它当作 UTC 处理
naive_dt_utc = naive_dt.replace(tzinfo=pytz.UTC) # 显式设置为 UTC
beijing_from_naive = utc_to_beijing(naive_dt_utc)
print(f"假设为 UTC 的 datetime 转北京时间: {beijing_from_naive}")