Rate Limiting Best Practices
Strategies and patterns for working effectively with Audian's rate limits.
Monitoring and Tracking​
Implement Rate Limit Tracking​
Track your usage to stay within limits:
class RateLimitTracker:
def __init__(self):
self.limit = None
self.remaining = None
self.reset_time = None
def update(self, response):
self.limit = int(response.headers.get('RateLimit-Limit', 0))
self.remaining = int(response.headers.get('RateLimit-Remaining', 0))
self.reset_time = int(response.headers.get('RateLimit-Reset', 0))
def get_usage_percentage(self):
if self.limit == 0:
return 0
return ((self.limit - self.remaining) / self.limit) * 100
def should_slow_down(self):
return self.get_usage_percentage() > 75
tracker = RateLimitTracker()
Log Rate Limit Information​
import logging
logger = logging.getLogger('audian_api')
def log_rate_limit_status(response):
remaining = response.headers.get('RateLimit-Remaining')
limit = response.headers.get('RateLimit-Limit')
logger.info(f"Rate limit: {remaining}/{limit}")
Implementing Backoff Strategies​
Exponential Backoff​
Implement exponential backoff with jitter:
import time
import random
import requests
def make_request_with_backoff(url, max_retries=3):
for attempt in range(max_retries):
try:
response = requests.get(url)
if response.status_code == 429:
wait_time = (2 ** attempt) + random.uniform(0, 1)
logger.warning(
f"Rate limited. Waiting {wait_time:.1f}s"
)
time.sleep(wait_time)
continue
response.raise_for_status()
return response
except requests.RequestException as e:
if attempt == max_retries - 1:
raise
wait_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
raise Exception("Max retries exceeded")
Respect Retry-After Header​
Always check the Retry-After header:
def make_request_with_retry_after(url):
response = requests.get(url)
if response.status_code == 429:
retry_after = response.headers.get('Retry-After')
if retry_after:
wait_time = int(retry_after)
logger.info(f"Rate limited. Waiting {wait_time}s")
time.sleep(wait_time)
# Retry request
return make_request_with_retry_after(url)
return response
Request Queuing​
Simple Queue System​
import queue
import threading
import time
import requests
class APIRequestQueue:
def __init__(self, requests_per_minute=50):
self.queue = queue.Queue()
self.requests_per_minute = requests_per_minute
self.min_interval = 60 / requests_per_minute
def add_request(self, url, headers=None):
self.queue.put((url, headers))
def worker(self):
last_request_time = 0
while True:
if self.queue.empty():
time.sleep(0.1)
continue
url, headers = self.queue.get()
current_time = time.time()
time_since_last = current_time - last_request_time
if time_since_last < self.min_interval:
time.sleep(self.min_interval - time_since_last)
try:
response = requests.get(url, headers=headers)
last_request_time = time.time()
except Exception as e:
logger.error(f"Request failed: {e}")
self.queue.task_done()
def start(self, num_workers=1):
for _ in range(num_workers):
thread = threading.Thread(target=self.worker, daemon=True)
thread.start()
# Usage
queue = APIRequestQueue(requests_per_minute=40)
queue.start()
queue.add_request('https://api.audian.com:8443/v2/numbers')
Priority Queue​
Process important requests first:
import heapq
import threading
class PriorityAPIQueue:
def __init__(self):
self.queue = []
def add_request(self, url, priority=5, headers=None):
# Lower priority number = higher importance
heapq.heappush(
self.queue,
(priority, url, headers)
)
def get_next_request(self):
if not self.queue:
return None
priority, url, headers = heapq.heappop(self.queue)
return url, headers
# Usage
queue = PriorityAPIQueue()
queue.add_request('https://api.audian.com:8443/v2/numbers', priority=1) # High priority
queue.add_request('https://api.audian.com:8443/v2/billing/usage', priority=5) # Low priority
Caching Strategies​
Simple Cache​
import time
class APICache:
def __init__(self, ttl_seconds=300):
self.cache = {}
self.ttl = ttl_seconds
def get(self, key):
if key not in self.cache:
return None
value, timestamp = self.cache[key]
if time.time() - timestamp > self.ttl:
del self.cache[key]
return None
return value
def set(self, key, value):
self.cache[key] = (value, time.time())
# Usage
cache = APICache(ttl_seconds=600) # 10 minute cache
def get_phone_numbers(api_key):
cache_key = f"numbers_{api_key}"
cached = cache.get(cache_key)
if cached:
return cached
response = requests.get(
'https://api.audian.com:8443/v2/numbers',
headers={'X-Auth-Token': api_key}
)
numbers = response.json()
cache.set(cache_key, numbers)
return numbers
LRU Cache​
from functools import lru_cache
import requests
@lru_cache(maxsize=128)
def get_number_details(number_id, api_key):
response = requests.get(
f'https://api.audian.com:8443/v2/numbers/{number_id}',
headers={'X-Auth-Token': api_key}
)
return response.json()
Batch Operations​
Use Batch Endpoints​
Use batch endpoints to reduce request count:
# Instead of making 100 individual requests
for number in numbers:
requests.post(
f'https://api.audian.com:8443/v2/sms/send',
json={'to': number, 'message': msg}
) # 100 requests!
# Make 1 batch request
requests.post(
'https://api.audian.com:8443/v2/sms/batch',
json={
'messages': [
{'to': number, 'message': msg}
for number in numbers
]
}
) # 1 request!
Batch Size Optimization​
def send_sms_batch(numbers, message, batch_size=100):
for i in range(0, len(numbers), batch_size):
batch = numbers[i:i + batch_size]
response = requests.post(
'https://api.audian.com:8443/v2/sms/batch',
json={
'messages': [
{'to': num, 'message': message}
for num in batch
]
}
)
print(f"Sent batch {i // batch_size + 1}")
Webhook Strategy​
Use Webhooks Instead of Polling​
Instead of polling frequently:
# DON'T: Poll every 5 seconds
while True:
response = requests.get('https://api.audian.com:8443/v2/calls')
time.sleep(5) # Rate limit killer!
Set up webhooks:
# DO: Receive webhooks
@app.route('/webhook/call-status', methods=['POST'])
def handle_call_status():
event = request.json
# Process call status update
return {'status': 'ok'}, 200
Multiple API Keys​
Distribute Load​
Use multiple API keys to increase total capacity:
api_keys = [
'key_1',
'key_2',
'key_3'
]
current_key_index = 0
def get_next_api_key():
global current_key_index
key = api_keys[current_key_index]
current_key_index = (current_key_index + 1) % len(api_keys)
return key
def make_request(url):
api_key = get_next_api_key()
response = requests.get(
url,
headers={'X-Auth-Token': api_key}
)
return response
Adaptive Rate Limiting​
Adjust Request Rate Based on Remaining Quota​
class AdaptiveRequester:
def __init__(self):
self.last_remaining = None
self.sleep_time = 0.1 # Start with 100ms between requests
def adjust_rate(self, response):
remaining = int(response.headers.get('RateLimit-Remaining', 0))
if self.last_remaining is not None:
decrease = self.last_remaining - remaining
# If we're consuming quota faster than expected, slow down
if decrease > 1:
self.sleep_time *= 1.5
# If quota is abundant, speed up
elif decrease < 0.5:
self.sleep_time *= 0.9
self.last_remaining = remaining
def make_request(self, url, headers):
response = requests.get(url, headers=headers)
self.adjust_rate(response)
time.sleep(self.sleep_time)
return response
Monitoring and Alerting​
Rate Limit Alerts​
import smtplib
class RateLimitAlertManager:
def __init__(self, email_recipient):
self.recipient = email_recipient
self.warning_sent = False
def check_and_alert(self, response):
remaining = int(response.headers.get('RateLimit-Remaining', 0))
limit = int(response.headers.get('RateLimit-Limit', 0))
percentage = (remaining / limit) * 100
if percentage < 10 and not self.warning_sent:
self.send_alert(f"Rate limit critical: {percentage:.0f}%")
self.warning_sent = True
elif percentage > 25:
self.warning_sent = False
def send_alert(self, message):
# Send email alert
print(f"ALERT: {message}")
Performance Tips​
- Batch Operations: Combine multiple operations into one request
- Cache Aggressively: Store data locally when possible
- Use Webhooks: React to events instead of polling
- Implement Backoff: Exponential backoff on 429 errors
- Monitor Headers: Track remaining quota
- Plan Ahead: Schedule heavy operations off-peak
- Use Multiple Keys: Distribute load across API keys
Production Checklist​
Before deploying to production:
- Implement exponential backoff
- Respect Retry-After headers
- Monitor rate limit usage
- Set up alerting for quota warnings
- Cache frequently accessed data
- Use batch operations where possible
- Implement request queuing
- Test with rate limit headers
- Document rate limit expectations
- Plan scaling strategy
FAQ​
Q: Should I use multiple API keys? A: Only if you have legitimate load distribution needs. Abusing multiple keys is against ToS.
Q: What's a good request per minute to target? A: Aim for 50-75% of your limit. This leaves room for bursts.
Q: How long should I cache data? A: Depends on your use case. SMS templates: 24h. Usage data: 1h. Phone numbers: 30m.
Q: Should I retry on 429? A: Yes, always respect Retry-After and retry after waiting.
Q: Can I predict when I'll hit the limit? A: Yes, track your usage rate and extrapolate.