import requests import pandas as pd from datetime import datetime import os from .base_source import DataSource from binance_sdk_derivatives_trading_usds_futures.derivatives_trading_usds_futures import ( DerivativesTradingUsdsFutures, ConfigurationRestAPI, DERIVATIVES_TRADING_USDS_FUTURES_REST_API_PROD_URL, ) from binance_sdk_derivatives_trading_usds_futures.rest_api.models import ( KlineCandlestickDataIntervalEnum, ContinuousContractKlineCandlestickDataContractTypeEnum, ) class BinanceSource(DataSource): def __init__(self, api_key: str, api_secret: str, base_url: str, timeout: int): self.api_key = api_key self.api_secret = api_secret self.base_url = base_url self.timeout = timeout configuration_rest_api = ConfigurationRestAPI( api_key=os.getenv("API_KEY", self.api_key), api_secret=os.getenv("API_SECRET", self.api_secret), base_path=os.getenv( "BASE_PATH", self.base_url ), timeout=timeout ) self.client = DerivativesTradingUsdsFutures(config_rest_api=configuration_rest_api) def try_connection(self): try: response = self.client.rest_api.test_connectivity() rate_limits = response.rate_limits print(f"exchange_information() rate limits: {rate_limits}") data = response.data() return data except Exception as e: print(f"test_connectivity() error: {e}") return None def get_realtime_data(self, symbol: str): return None def get_historical_data(self, symbol: str, interval: str = "1d",limit :int = None, start_time: int = None, end_time: int = None,): response = self.client.rest_api.continuous_contract_kline_candlestick_data( pair=symbol, start_time=start_time, end_time=end_time, contract_type=ContinuousContractKlineCandlestickDataContractTypeEnum.PERPETUAL, interval=KlineCandlestickDataIntervalEnum[f"INTERVAL_{interval}"].value, limit=limit ) return response.data()