
In this tutorial, we guide users through the construction of a strong, production-taiyar python SDK. This begins showing how to establish and configure the required asynchronous HTTP libraries (AIOHTP, Nest-Asyncio). This then moves through the implementation of core components, including structured response objects, token-bucket rate limiting, in-memory caching with TTL and a clean, dataklas-powered designs. We will see how to wrap these pieces in an advancedsk class that supports ASYNC reference management, automatic retrieved/weight-on-lymph behavior, JSON/Author header injection, and convenient http-VARB methods. By the way, a demo harness against jsonplaceholder shows the cashing efficiency, bringing batch with rate limits, error handling, and even shows how to expand SDK via a fluent “builder” pattern for a custom configuration.
import asyncio
import aiohttp
import time
import json
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import hashlib
import logging
!pip install aiohttp nest-asyncio
We install and configure Esinkronous runtime with timing, dataklas modeling, caching (hallib and dattime), and structured logs by importing asyncio and aiohttp. , PIP installed the AIOHTTP Nest-Asyncio line ensures that an event can original an event loop within the notebook Colab, allowing strong AsyNC http requests and rate-limiting workflows.
@dataclass
class APIResponse:
"""Structured response object"""
data: Any
status_code: int
headers: Dict[str, str]
timestamp: datetime
def to_dict(self) -> Dict:
return asdict(self)
The apiresponse dataclass http reaction details, payload (data), status code, header and a single, typed object to the recovery timist of recovery in the object. To_dict () Assistant converts the example to easy logging, serialization, or downstream processing into a plain dictionary.
class RateLimiter:
"""Token bucket rate limiter"""
def __init__(self, max_calls: int = 100, time_window: int = 60):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def can_proceed(self) -> bool:
now = time.time()
self.calls = [call_time for call_time in self.calls if now - call_time < self.time_window]
if len(self.calls) < self.max_calls:
self.calls.append(now)
return True
return False
def wait_time(self) -> float:
if not self.calls:
return 0
return max(0, self.time_window - (time.time() - self.calls[0]))
Ratelimiter category implements a simple token-bocket policy by tracking the recent call timestamp and allowing up to max_calls within a rolling time_windo. When reaching the border, the can_prce () returns the wrong, and calculates the wait_time () how long to stay before making the next request.
class Cache:
"""Simple in-memory cache with TTL"""
def __init__(self, default_ttl: int = 300):
self.cache = {}
self.default_ttl = default_ttl
def _generate_key(self, method: str, url: str, params: Dict = None) -> str:
key_data = f"{method}:{url}:{json.dumps(params or {}, sort_keys=True)}"
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, method: str, url: str, params: Dict = None) -> Optional[APIResponse]:
key = self._generate_key(method, url, params)
if key in self.cache:
response, expiry = self.cache[key]
if datetime.now() < expiry:
return response
del self.cache[key]
return None
def set(self, method: str, url: str, response: APIResponse, params: Dict = None, ttl: int = None):
key = self._generate_key(method, url, params)
expiry = datetime.now() + timedelta(seconds=ttl or self.default_ttl)
self.cache[key] = (response, expiry)
Cash class provides a mild in-memory TTL cache to the request signature (method, url, params) in a unique key (method, URL, PARAMS). This returns the valid cache eperuspons objects before the expiration and automatically eliminates stale entries after its stay from time to time.
class AdvancedSDK:
"""Advanced SDK with modern Python patterns"""
def __init__(self, base_url: str, api_key: str = None, rate_limit: int = 100):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.session = None
self.rate_limiter = RateLimiter(max_calls=rate_limit)
self.cache = Cache()
self.logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
logger = logging.getLogger(f"SDK-{id(self)}")
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
async def __aenter__(self):
"""Async context manager entry"""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.close()
def _get_headers(self) -> Dict[str, str]:
headers = {'Content-Type': 'application/json'}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
return headers
async def _make_request(self, method: str, endpoint: str, params: Dict = None,
data: Dict = None, use_cache: bool = True) -> APIResponse:
"""Core request method with rate limiting and caching"""
if use_cache and method.upper() == 'GET':
cached = self.cache.get(method, endpoint, params)
if cached:
self.logger.info(f"Cache hit for {method} {endpoint}")
return cached
if not self.rate_limiter.can_proceed():
wait_time = self.rate_limiter.wait_time()
self.logger.warning(f"Rate limit hit, waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
async with self.session.request(
method=method.upper(),
url=url,
params=params,
json=data,
headers=self._get_headers()
) as resp:
response_data = await resp.json() if resp.content_type == 'application/json' else await resp.text()
api_response = APIResponse(
data=response_data,
status_code=resp.status,
headers=dict(resp.headers),
timestamp=datetime.now()
)
if use_cache and method.upper() == 'GET' and 200 <= resp.status < 300:
self.cache.set(method, endpoint, api_response, params)
self.logger.info(f"{method.upper()} {endpoint} - Status: {resp.status}")
return api_response
except Exception as e:
self.logger.error(f"Request failed: {str(e)}")
raise
async def get(self, endpoint: str, params: Dict = None, use_cache: bool = True) -> APIResponse:
return await self._make_request('GET', endpoint, params=params, use_cache=use_cache)
async def post(self, endpoint: str, data: Dict = None) -> APIResponse:
return await self._make_request('POST', endpoint, data=data, use_cache=False)
async def put(self, endpoint: str, data: Dict = None) -> APIResponse:
return await self._make_request('PUT', endpoint, data=data, use_cache=False)
async def delete(self, endpoint: str) -> APIResponse:
return await self._make_request('DELETE', endpoint, use_cache=False)
The Advancedsdk square wraps everything together in a clean, async-firist client: this ASYNC manages an AIOHTP session through reference managers, injects JSON and Aath Hader, and coordinates our ratelimitors and cash under the hood. Its _Make_request method takes centralized/post/pot/delete logic, cash lookup, rate-limit weight, error logging, and response packing to apiresponse objects, while Get/Post/Put/Delete helpers give us ergonomic, high-level calls.
async def demo_sdk():
"""Demonstrate SDK capabilities"""
print("🚀 Advanced SDK Demo")
print("=" * 50)
async with AdvancedSDK("https://jsonplaceholder.typicode.com") as sdk:
print("\n📥 Testing GET request with caching...")
response1 = await sdk.get("/posts/1")
print(f"First request - Status: {response1.status_code}")
print(f"Title: {response1.data.get('title', 'N/A')}")
response2 = await sdk.get("/posts/1")
print(f"Second request (cached) - Status: {response2.status_code}")
print("\n📤 Testing POST request...")
new_post = {
"title": "Advanced SDK Tutorial",
"body": "This SDK demonstrates modern Python patterns",
"userId": 1
}
post_response = await sdk.post("/posts", data=new_post)
print(f"POST Status: {post_response.status_code}")
print(f"Created post ID: {post_response.data.get('id', 'N/A')}")
print("\n⚡ Testing batch requests with rate limiting...")
tasks = []
for i in range(1, 6):
tasks.append(sdk.get(f"/posts/{i}"))
results = await asyncio.gather(*tasks)
print(f"Batch completed: {len(results)} requests")
for i, result in enumerate(results, 1):
print(f" Post {i}: {result.data.get('title', 'N/A')[:30]}...")
print("\n❌ Testing error handling...")
try:
error_response = await sdk.get("/posts/999999")
print(f"Error response status: {error_response.status_code}")
except Exception as e:
print(f"Handled error: {type(e).__name__}")
print("\n✅ Demo completed successfully!")
async def run_demo():
"""Colab-friendly demo runner"""
await demo_sdk()
Demo_sdk corouts move through the main features of SDK, issues a cashed Get request, performs a post, executing each capacity JSONPLACEHOLDER API, printing status code and against sample data, execute a batch of rates and handling a batch. Run_Demo Helper ensures that this demo moves smoothly inside the current event loop of a Colab notebook.
import nest_asyncio
nest_asyncio.apply()
if __name__ == "__main__":
try:
asyncio.run(demo_sdk())
except RuntimeError:
loop = asyncio.get_event_loop()
loop.run_until_complete(demo_sdk())
class SDKBuilder:
"""Builder pattern for SDK configuration"""
def __init__(self, base_url: str):
self.base_url = base_url
self.config = {}
def with_auth(self, api_key: str):
self.config['api_key'] = api_key
return self
def with_rate_limit(self, calls_per_minute: int):
self.config['rate_limit'] = calls_per_minute
return self
def build(self) -> AdvancedSDK:
return AdvancedSDK(self.base_url, **self.config)
Finally, we apply nest_asyncio to enable the nested event loop in Colab, then run the demo through asyncio.run (with a decline for manual loop execution if necessary). It also introduces a SDKBUILDER square that applies an fluent builder pattern to easily configure and install Advancedk with custom authentication and rate-limit settings.
Finally, this SDK tutorial offers a scalable foundation for any comfortable integration, which is from a combination of modern python idioms (Dataklas, Asink/AATAT, reference managers) with practical tooling (rate limit, cash, structured logging). Adopting the pattern shown here, especially requests to separate the concerns between orchestration, cashing and reaction modeling, teams can accelerate the growth of new API customers, ensuring predicting, observation and flexibility.
Check it CodesAll credit for this research goes to the researchers of this project. Also, feel free to follow us Twitter And don’t forget to join us 100k+ mL subredit More membership Our newspaper,
Sana Hasan, a counseling intern and double degree student at Marktekpost in IIT Madras, is emotional about implementing technology and AI to resolve real -world challenges. With a keen interest in solving practical problems, he brings a new approach to the intersection of AI and real -life solutions.