API Security & Authentication: Modern Web Güvenliği


Giriş

Günümüzde neredeyse her web uygulaması API’lar üzerinden çalışıyor. Frontend’den backend’e, mikroservisler arasında, mobil uygulamalardan sunuculara… Her yerde API’lar var.

Peki bu kadar kritik bir bileşeni nasıl güvenli hale getiriyoruz? Bu yazıda API güvenliğinin temellerinden - authentication ve authorization’dan - best practices’lere kadar her şeyi inceleyeceğiz.

1. Authentication (Kimlik Doğrulama)

1.1 JWT (JSON Web Tokens)

from datetime import datetime, timedelta
import jwt
import secrets
from typing import Optional, Dict

class JWTManager:
    """JWT token yönetimi"""
    
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.token_expiry = timedelta(hours=1)
    
    def create_token(self, 
                    user_id: str,
                    username: str,
                    permissions: list) -> str:
        """Access token oluştur"""
        
        payload = {
            "user_id": user_id,
            "username": username,
            "permissions": permissions,
            "exp": datetime.utcnow() + self.token_expiry,
            "iat": datetime.utcnow(),
            "jti": secrets.token_hex(16)  # Token ID (revocation için)
        }
        
        token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
        return token
    
    def create_refresh_token(self, user_id: str) -> str:
        """Refresh token oluştur (daha uzun ömre sahip)"""
        
        payload = {
            "user_id": user_id,
            "exp": datetime.utcnow() + timedelta(days=7),
            "iat": datetime.utcnow(),
            "type": "refresh"
        }
        
        token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
        return token
    
    def verify_token(self, token: str) -> Optional[Dict]:
        """Token'ı doğrula"""
        
        try:
            payload = jwt.decode(
                token, 
                self.secret_key, 
                algorithms=[self.algorithm]
            )
            return payload
        except jwt.ExpiredSignatureError:
            raise ValueError("Token süresi dolmuş")
        except jwt.InvalidTokenError:
            raise ValueError("Geçersiz token")
    
    def revoke_token(self, token_jti: str):
        """Token'ı iptal et (revocation listesine ekle)"""
        # Redis veya veritabanında sakla
        # self.revocation_store.add(token_jti)
        pass

# Kullanım
jwt_manager = JWTManager(secret_key="your-super-secret-key")

# Token oluştur
access_token = jwt_manager.create_token(
    user_id="123",
    username="johndoe",
    permissions=["read:documents", "write:documents"]
)

# Token doğrula
payload = jwt_manager.verify_token(access_token)
print(f"User: {payload['username']}, Permissions: {payload['permissions']}")

1.2 OAuth 2.0 Implementasyonu

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
import hashlib

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: str
    disabled: bool = False

class UserInDB(User):
    hashed_password: str

# Şifreli veritabanı
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "email": "john@example.com",
        "hashed_password": hashlib.sha256(b"password123").hexdigest(),
        "disabled": False,
    }
}

def hash_password(password: str) -> str:
    """Şifreyi hash'le (gerçekte bcrypt kullan)"""
    return hashlib.sha256(password.encode()).hexdigest()

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Şifreyi doğrula"""
    return hash_password(plain_password) == hashed_password

def authenticate_user(username: str, password: str) -> Optional[UserInDB]:
    """Kullanıcıyı doğrula"""
    
    if username not in fake_users_db:
        return None
    
    user = UserInDB(**fake_users_db[username])
    
    if not verify_password(password, user.hashed_password):
        return None
    
    return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
    """Mevcut kullanıcıyı al"""
    
    try:
        payload = jwt_manager.verify_token(token)
    except ValueError as e:
        raise HTTPException(status_code=401, detail=str(e))
    
    username = payload.get("username")
    
    if username not in fake_users_db:
        raise HTTPException(status_code=401, detail="Kullanıcı bulunamadı")
    
    return UserInDB(**fake_users_db[username])

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    """Token elde et"""
    
    user = authenticate_user(form_data.username, form_data.password)
    
    if not user:
        raise HTTPException(
            status_code=401,
            detail="Yanlış kullanıcı adı veya şifre"
        )
    
    access_token = jwt_manager.create_token(
        user_id=form_data.username,
        username=user.username,
        permissions=["read:user_data"]
    )
    
    return {
        "access_token": access_token,
        "token_type": "bearer"
    }

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    """Mevcut kullanıcı bilgisini al"""
    return current_user

2. Authorization (Yetkilendirme)

2.1 Role-Based Access Control (RBAC)

from enum import Enum
from typing import Set, Callable

class Role(Enum):
    ADMIN = "admin"
    MANAGER = "manager"
    USER = "user"
    GUEST = "guest"

class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

# Role-Permission mapping
ROLE_PERMISSIONS = {
    Role.ADMIN: {
        Permission.READ,
        Permission.WRITE,
        Permission.DELETE,
        Permission.ADMIN
    },
    Role.MANAGER: {
        Permission.READ,
        Permission.WRITE
    },
    Role.USER: {
        Permission.READ
    },
    Role.GUEST: set()
}

class RBACManager:
    """Role-Based Access Control yöneticisi"""
    
    def __init__(self):
        self.user_roles = {}  # user_id -> Set[Role]
    
    def assign_role(self, user_id: str, role: Role):
        """Kullanıcıya rol ata"""
        if user_id not in self.user_roles:
            self.user_roles[user_id] = set()
        self.user_roles[user_id].add(role)
    
    def has_permission(self, user_id: str, permission: Permission) -> bool:
        """Kullanıcının izni var mı?"""
        
        if user_id not in self.user_roles:
            return False
        
        for role in self.user_roles[user_id]:
            if permission in ROLE_PERMISSIONS.get(role, set()):
                return True
        
        return False
    
    def has_any_role(self, user_id: str, roles: Set[Role]) -> bool:
        """Kullanıcı belirtilen rollerden birine sahip mi?"""
        
        user_roles = self.user_roles.get(user_id, set())
        return bool(user_roles & roles)

# FastAPI'da kullanım
rbac = RBACManager()

def require_permission(permission: Permission):
    """Permission'ı gerektiren dependency"""
    async def permission_checker(current_user: User = Depends(get_current_user)):
        # Kullanıcının izni var mı kontrol et
        if not rbac.has_permission(current_user.username, permission):
            raise HTTPException(
                status_code=403,
                detail=f"{permission.value} izni gerekli"
            )
        return current_user
    
    return permission_checker

@app.delete("/documents/{doc_id}")
async def delete_document(
    doc_id: str,
    current_user: User = Depends(require_permission(Permission.DELETE))
):
    """Belgeyi sil (DELETE izni gerekli)"""
    return {"message": f"Belge {doc_id} silindi"}

2.2 Attribute-Based Access Control (ABAC)

from dataclasses import dataclass

@dataclass
class Resource:
    """Kaynak"""
    id: str
    owner_id: str
    resource_type: str  # "document", "file", vb.
    is_public: bool = False

class ABACManager:
    """Attribute-Based Access Control"""
    
    def __init__(self):
        self.policies = []
    
    def add_policy(self, policy: Callable):
        """Erişim politikası ekle"""
        self.policies.append(policy)
    
    def can_access(self, 
                  user_id: str,
                  action: str,
                  resource: Resource) -> bool:
        """Kullanıcı kaynağa erişebilir mi?"""
        
        # Tüm politikaları kontrol et
        for policy in self.policies:
            if not policy(user_id, action, resource):
                return False
        
        return True

# Politikalar tanımla
def owner_policy(user_id: str, action: str, resource: Resource) -> bool:
    """Sahibi sınırsız erişimi olabilir"""
    if user_id == resource.owner_id:
        return True
    return False

def public_read_policy(user_id: str, action: str, resource: Resource) -> bool:
    """Herkese açık kaynaklar okunabilir"""
    if resource.is_public and action == "read":
        return True
    return False

def organization_policy(user_id: str, action: str, resource: Resource, 
                        user_org_id: str, resource_org_id: str) -> bool:
    """Aynı organizasyon içinde okuma izni"""
    if user_org_id == resource_org_id and action == "read":
        return True
    return False

# Kullanım
abac = ABACManager()
abac.add_policy(owner_policy)
abac.add_policy(public_read_policy)

resource = Resource(
    id="doc1",
    owner_id="user2",
    resource_type="document",
    is_public=False
)

# user1 kaynağa erişebilir mi?
can_access = abac.can_access("user1", "read", resource)
print(f"Can user1 access: {can_access}")  # False

# Sahibi erişebilir
can_access = abac.can_access("user2", "read", resource)
print(f"Can user2 access: {can_access}")  # True

3. API Security Best Practices

3.1 Rate Limiting

from fastapi import FastAPI
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.util import get_remote_address

app = FastAPI()

@app.on_event("startup")
async def startup():
    await FastAPILimiter.init(redis_client)

@app.get("/api/data")
@limiter.limit("10/minute")
async def get_data(request: Request):
    """1 dakikada maksimum 10 istek"""
    return {"data": "..."}

# Global rate limiting
RATE_LIMITS = {
    "login": "5/minute",
    "api_call": "100/hour",
    "file_upload": "10/hour"
}

3.2 CORS (Cross-Origin Resource Sharing)

from fastapi.middleware.cors import CORSMiddleware

# ❌ YANLIŞ: Tüm originleri izin ver
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Güvensiz!
    allow_methods=["*"],
    allow_headers=["*"],
)

# ✅ DOĞRU: Spesifik originleri izin ver
ALLOWED_ORIGINS = [
    "https://example.com",
    "https://app.example.com",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
    max_age=3600,
)

3.3 HTTPS ve Security Headers

from fastapi.middleware.trustedhost import TrustedHostMiddleware

# Trusted host
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["example.com", "www.example.com"]
)

# Security headers
@app.middleware("http")
async def add_security_headers(request, call_next):
    response = await call_next(request)
    
    # HSTS: HTTPS'yi zorla
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    
    # Content Security Policy
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    
    # XFrame Options
    response.headers["X-Frame-Options"] = "DENY"
    
    # XContent Type Options
    response.headers["X-Content-Type-Options"] = "nosniff"
    
    # Referrer Policy
    response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
    
    return response

3.4 Input Validation (Fastpydantic)

from pydantic import BaseModel, Field, validator, EmailStr
from typing import Optional

class CreateUserRequest(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    password: str = Field(..., min_length=8)
    age: int = Field(..., ge=18, le=120)
    
    @validator('username')
    def username_alphanumeric(cls, v):
        """Username alfanumerik olmalı"""
        if not v.isalnum():
            raise ValueError('Username yalnızca harf ve rakam içerebilir')
        return v
    
    @validator('password')
    def password_strength(cls, v):
        """Güçlü şifre kontrolü"""
        if not any(c.isupper() for c in v):
            raise ValueError('Şifre büyük harf içermeli')
        if not any(c.isdigit() for c in v):
            raise ValueError('Şifre rakam içermeli')
        return v

@app.post("/users")
async def create_user(user: CreateUserRequest):
    """Kullanıcı oluştur (otomatik validasyon)"""
    return {"username": user.username, "email": user.email}

4. API Versioning ve Deprecation

from enum import Enum

class APIVersion(Enum):
    V1 = "v1"
    V2 = "v2"

# Versioning strategies

# 1. URL Path
@app.get("/api/v1/users")
async def get_users_v1():
    return {"version": "v1"}

@app.get("/api/v2/users")
async def get_users_v2():
    return {"version": "v2", "enhanced": True}

# 2. Header
@app.get("/users")
async def get_users(x_api_version: str = Header(default="v1")):
    if x_api_version == "v1":
        return {"version": "v1"}
    elif x_api_version == "v2":
        return {"version": "v2"}

# 3. Query parameter
@app.get("/users")
async def get_users(api_version: str = Query(default="v1")):
    if api_version == "v1":
        return {"version": "v1"}
    elif api_version == "v2":
        return {"version": "v2"}

# Deprecation warning
def deprecated_endpoint(version: str, recommended: str):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            print(f"⚠️ {version} deprecated. Use {recommended} instead")
            return await func(*args, **kwargs)
        return wrapper
    return decorator

@app.get("/api/v1/old-endpoint")
@deprecated_endpoint("v1", "v2")
async def old_endpoint():
    return {"message": "Bu endpoint deprecated"}

5. API Monitoring ve Security Logging

import logging
import json
from datetime import datetime

class APISecurityLogger:
    """API güvenlik loggingı"""
    
    def __init__(self, log_file: str):
        self.logger = logging.getLogger("api_security")
        handler = logging.FileHandler(log_file)
        handler.setFormatter(
            logging.Formatter('%(message)s')
        )
        self.logger.addHandler(handler)
    
    def log_request(self, 
                   timestamp: str,
                   method: str,
                   endpoint: str,
                   user_id: str,
                   ip_address: str,
                   status_code: int,
                   response_time: float):
        """İsteği kayıt et"""
        
        log_entry = {
            "timestamp": timestamp,
            "method": method,
            "endpoint": endpoint,
            "user_id": user_id,
            "ip_address": ip_address,
            "status_code": status_code,
            "response_time_ms": response_time,
        }
        
        # Şüpheli aktiviteleri logla
        if status_code in [401, 403]:
            log_entry["severity"] = "HIGH"
        elif response_time > 5000:
            log_entry["severity"] = "MEDIUM"
        
        self.logger.info(json.dumps(log_entry))

# FastAPI middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()
    
    response = await call_next(request)
    
    duration = (time.time() - start_time) * 1000
    
    security_logger.log_request(
        timestamp=datetime.utcnow().isoformat(),
        method=request.method,
        endpoint=request.url.path,
        user_id=request.headers.get("x-user-id", "anonymous"),
        ip_address=request.client.host,
        status_code=response.status_code,
        response_time=duration
    )
    
    return response

6. Secure Password Management

from passlib.context import CryptContext
import secrets

# bcrypt kullan (SHA256 değil!)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class PasswordManager:
    """Şifre yönetimi"""
    
    @staticmethod
    def hash_password(password: str) -> str:
        """Şifreyi hash'le"""
        return pwd_context.hash(password)
    
    @staticmethod
    def verify_password(plain_password: str, hashed_password: str) -> bool:
        """Şifreyi doğrula"""
        return pwd_context.verify(plain_password, hashed_password)
    
    @staticmethod
    def generate_secure_password(length: int = 12) -> str:
        """Güvenli şifre oluştur"""
        return secrets.token_urlsafe(length)

# Şifre resetleme (secure token ile)
def send_password_reset_email(email: str):
    """Şifre reset e-postası gönder"""
    
    # Güvenli token oluştur
    reset_token = secrets.token_urlsafe(32)
    reset_link = f"https://example.com/reset-password?token={reset_token}"
    
    # Token'ı cache/db'de sakla (15 dakika validity)
    # cache.set(f"reset_token:{reset_token}", email, ex=900)
    
    # E-postayı gönder
    # send_email(email, reset_link)

7. API Security Checklist

## Pre-Deployment Checklist

### Authentication & Authorization
- [ ] HTTPS kullanıyor mu?
- [ ] JWT token'lar güvenli mi?
- [ ] Refresh token'lar düzgün rotate ediliyor mu?
- [ ] CORS politikası spesifik origin'lere sınırlı mı?
- [ ] Authentication bypass zafiyeti var mı?

### Input & Output
- [ ] Tüm girdiler doğrulanıyor mu?
- [ ] SQL injection koruması var mı?
- [ ] XSS koruması var mı?
- [ ] Error mesajları sensitif bilgi açığa çıkarıyor mu?

### Rate Limiting & DoS
- [ ] Rate limiting uygulanmış mı?
- [ ] Large payload saldırılarına karşı korunan mı?
- [ ] Timeout'lar ayarlandı mı?

### Logging & Monitoring
- [ ] Tüm authentications kaydediliyor mu?
- [ ] Tüm errors logu alınıyor mu?
- [ ] Monitoring alertları var mı?

### Dependencies
- [ ] Bağımlılıklar güncel mi?
- [ ] Known vulnerabilities kontrol edildi mi?

Sonuç

API güvenliği bir defaya mahsus yapılan bir iş değil. Tasarımdan başlayıp üretime kadar devam eden, sonrasında da sürekli bakım gerektiren bir süreç.

Katmanlı güvenlik yaklaşımı, düzenli monitoring ve periyodik audit’ler ile sistemlerinizi güvende tutabilirsiniz. Unutmayın ki en güvenli sistem, sürekli test edilen ve güncellenen sistemdir.