2.source基类实现 3.时间转换工具datetime convert实现 4.使用binance restapi获取数据实现 5.binance获取数据单元测试
128 lines
4.4 KiB
Python
128 lines
4.4 KiB
Python
# datetime_convert.py
|
||
|
||
import pytz
|
||
from datetime import datetime, timezone
|
||
import pandas as pd
|
||
|
||
def datetime_to_int(dt: datetime) -> int:
|
||
"""
|
||
将 datetime 对象转换为毫秒级时间戳 (int)。
|
||
|
||
Args:
|
||
dt (datetime): 要转换的 datetime 对象。
|
||
|
||
Returns:
|
||
int: 毫秒级时间戳。
|
||
"""
|
||
# 如果 datetime 对象没有时区信息,则默认为 UTC
|
||
if dt.tzinfo is None:
|
||
# 假设输入是 UTC 时间
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
# 转换为 UTC 时间戳 (秒),再乘以 1000 得到毫秒
|
||
return int(dt.timestamp() * 1000)
|
||
|
||
def int_to_datetime(ts_ms: int, tz_info: str = 'UTC') -> datetime:
|
||
"""
|
||
将毫秒级时间戳转换为 datetime 对象。
|
||
|
||
Args:
|
||
ts_ms (int): 毫秒级时间戳。
|
||
tz_info (str): 目标时区名称 (如 'UTC', 'Asia/Shanghai')。默认为 'UTC'。
|
||
|
||
Returns:
|
||
datetime: 转换后的 datetime 对象。
|
||
"""
|
||
# 先转换为秒级时间戳
|
||
ts_s = ts_ms / 1000.0
|
||
# 使用 pandas 转换,方便处理时区
|
||
dt_utc = pd.to_datetime(ts_s, unit='s', utc=True)
|
||
# 转换为目标时区
|
||
if tz_info != 'UTC':
|
||
target_tz = pytz.timezone(tz_info)
|
||
dt_target = dt_utc.tz_convert(target_tz)
|
||
return dt_target.to_pydatetime()
|
||
else:
|
||
# 如果目标是 UTC,直接返回 UTC 时间
|
||
return dt_utc.to_pydatetime()
|
||
|
||
def utc_to_beijing(utc_dt: datetime) -> datetime:
|
||
"""
|
||
将 UTC 时间转换为北京时间 (东八区)。
|
||
|
||
Args:
|
||
utc_dt (datetime): UTC 时间的 datetime 对象。
|
||
|
||
Returns:
|
||
datetime: 北京时间的 datetime 对象。
|
||
"""
|
||
# 确保输入是 UTC 时区
|
||
if utc_dt.tzinfo is None:
|
||
utc_dt = utc_dt.replace(tzinfo=pytz.UTC)
|
||
elif utc_dt.tzinfo != pytz.UTC:
|
||
# 如果不是 UTC,先转换为 UTC
|
||
utc_dt = utc_dt.astimezone(pytz.UTC)
|
||
# 转换为北京时间
|
||
beijing_tz = pytz.timezone('Asia/Shanghai')
|
||
beijing_dt = utc_dt.astimezone(beijing_tz)
|
||
return beijing_dt
|
||
|
||
def beijing_to_utc(beijing_dt: datetime) -> datetime:
|
||
"""
|
||
将北京时间 (东八区) 转换为 UTC 时间。
|
||
|
||
Args:
|
||
beijing_dt (datetime): 北京时间的 datetime 对象。
|
||
|
||
Returns:
|
||
datetime: UTC 时间的 datetime 对象。
|
||
"""
|
||
# 确保输入是北京时间时区
|
||
if beijing_dt.tzinfo is None:
|
||
# 如果没有时区信息,默认为北京时间 (Asia/Shanghai)
|
||
beijing_dt = pytz.timezone('Asia/Shanghai').localize(beijing_dt)
|
||
elif beijing_dt.tzinfo != pytz.timezone('Asia/Shanghai'):
|
||
# 如果不是北京时间时区,先本地化为北京时间
|
||
beijing_dt = pytz.timezone('Asia/Shanghai').localize(beijing_dt)
|
||
# 转换为 UTC
|
||
utc_dt = beijing_dt.astimezone(pytz.UTC)
|
||
return utc_dt
|
||
|
||
# --- 示例用法 (可选,放在文件末尾或单独的测试脚本中) ---
|
||
if __name__ == "__main__":
|
||
print("=== datetime 转换测试 ===")
|
||
|
||
# 1. datetime 转 int (时间戳)
|
||
now_utc = datetime.now(timezone.utc)
|
||
timestamp = datetime_to_int(now_utc)
|
||
print(f"当前UTC时间: {now_utc}")
|
||
print(f"时间戳 (毫秒): {timestamp}")
|
||
|
||
# 2. int 转 datetime
|
||
dt_from_ts = int_to_datetime(timestamp, 'UTC')
|
||
print(f"从时间戳还原 UTC 时间: {dt_from_ts}")
|
||
dt_from_ts_beijing = int_to_datetime(timestamp, 'Asia/Shanghai')
|
||
print(f"从时间戳还原 北京时间: {dt_from_ts_beijing}")
|
||
|
||
# 3. UTC 转 北京时间
|
||
utc_time = datetime(2023, 10, 15, 12, 0, 0, tzinfo=pytz.UTC)
|
||
beijing_time = utc_to_beijing(utc_time)
|
||
print(f"UTC时间: {utc_time}")
|
||
print(f"北京时间: {beijing_time}")
|
||
|
||
# 4. 北京时间 转 UTC
|
||
beijing_time_2 = datetime(2023, 10, 15, 20, 0, 0, tzinfo=pytz.timezone('Asia/Shanghai'))
|
||
utc_time_2 = beijing_to_utc(beijing_time_2)
|
||
print(f"北京时间: {beijing_time_2}")
|
||
print(f"UTC时间: {utc_time_2}")
|
||
|
||
# 5. 无时区信息的 datetime (假设为 UTC)
|
||
naive_dt = datetime(2023, 10, 15, 12, 0, 0)
|
||
timestamp_naive = datetime_to_int(naive_dt)
|
||
print(f"无时区信息的 datetime: {naive_dt}")
|
||
print(f"假设为 UTC 转换的时间戳: {timestamp_naive}")
|
||
|
||
# 6. 无时区信息的 datetime 转 北京时间
|
||
# 通常这种情况下,你可能想把它当作 UTC 处理
|
||
naive_dt_utc = naive_dt.replace(tzinfo=pytz.UTC) # 显式设置为 UTC
|
||
beijing_from_naive = utc_to_beijing(naive_dt_utc)
|
||
print(f"假设为 UTC 的 datetime 转北京时间: {beijing_from_naive}") |