Files

58 lines
2.1 KiB
Python
Raw Permalink Normal View History

import requests
import pandas as pd
from datetime import datetime
import os
from .base_source import DataSource
from binance_sdk_derivatives_trading_usds_futures.derivatives_trading_usds_futures import (
DerivativesTradingUsdsFutures,
ConfigurationRestAPI,
DERIVATIVES_TRADING_USDS_FUTURES_REST_API_PROD_URL,
)
from binance_sdk_derivatives_trading_usds_futures.rest_api.models import (
KlineCandlestickDataIntervalEnum, ContinuousContractKlineCandlestickDataContractTypeEnum,
)
class BinanceSource(DataSource):
def __init__(self, api_key: str, api_secret: str, base_url: str, timeout: int):
self.api_key = api_key
self.api_secret = api_secret
self.base_url = base_url
self.timeout = timeout
configuration_rest_api = ConfigurationRestAPI(
api_key=os.getenv("API_KEY", self.api_key),
api_secret=os.getenv("API_SECRET", self.api_secret),
base_path=os.getenv(
"BASE_PATH", self.base_url
),
timeout=timeout
)
self.client = DerivativesTradingUsdsFutures(config_rest_api=configuration_rest_api)
def try_connection(self):
try:
response = self.client.rest_api.test_connectivity()
rate_limits = response.rate_limits
print(f"exchange_information() rate limits: {rate_limits}")
data = response.data()
return data
except Exception as e:
print(f"test_connectivity() error: {e}")
return None
def get_realtime_data(self, symbol: str):
return None
def get_historical_data(self, symbol: str, interval: str = "1d",limit :int = None, start_time: int = None, end_time: int = None,):
response = self.client.rest_api.continuous_contract_kline_candlestick_data(
pair=symbol,
start_time=start_time,
end_time=end_time,
contract_type=ContinuousContractKlineCandlestickDataContractTypeEnum.PERPETUAL,
interval=KlineCandlestickDataIntervalEnum[f"INTERVAL_{interval}"].value,
limit=limit
)
return response.data()