MCP Best Practices: Güvenli Uygulama Geliştirme Rehberi


Giriş

Model Context Protocol (MCP), LLM’lerin harici kaynaklara güvenli bir şekilde erişmesini sağlayan bir protokol. Ancak kötü uygulamalar, tasarım hataları veya eksik kontroller ciddi güvenlik açıklarına yol açabiliyor.

Bu rehberde MCP’yi güvenli ve verimli kullanmanın yollarını, gerçek implementasyon örnekleriyle birlikte inceleyeceğiz.

1. Prensip: En Az Yetki (Principle of Least Privilege)

Temel Kavram

Her araç, her kaynak yalnızca ihtiyacı olan izinleri almalıdır. Hiçbir zaman fazla yetki vermeyin.

Uygulama

# ❌ YANLIŞ: Tüm dosya sistemi erişimi
mcp_server = MCPServer({
    "tools": [
        {
            "name": "read_file",
            "allowed_paths": ["/"]  # Tüm dosya sistemi!
        }
    ]
})

# ✅ DOĞRU: Sınırlı erişim
mcp_server = MCPServer({
    "tools": [
        {
            "name": "read_file",
            "allowed_paths": [
                "/app/documents",
                "/app/config"
            ],
            "denied_patterns": [
                "*.key",
                "*.secret",
                "*password*"
            ]
        }
    ]
})

Pratik Örnekler

# Veritabanı aracı için en az yetki
database_tool = {
    "name": "query_database",
    "allowed_tables": ["users", "products"],  # Sadece gerekli tablolar
    "allowed_operations": ["SELECT"],  # Yazma işlemleri yok
    "row_limit": 100,  # Veri sızıntısı riski azalt
    "timeout": 5  # DOS saldırılarından koru
}

# E-posta gönderme aracı
email_tool = {
    "name": "send_email",
    "allowed_recipients": ["admin@company.com"],
    "rate_limit": 5,  # Saat başına 5 e-posta
    "subject_filter": True,  # Farklı başlık kontrol et
    "attachment_disabled": True  # Dosya eki kabul etme
}

2. Input Validation ve Sanitization

Temel Validasyon Katmanı

from typing import Any, Dict
import re

class MCPInputValidator:
    """MCP araçları için giriş doğrulaması"""
    
    def __init__(self):
        self.patterns = {
            "sql_injection": r"(DROP|DELETE|INSERT|UPDATE|UNION|SELECT.*--|;)",
            "path_traversal": r"(\.\./|\.\.\\|%2e%2e)",
            "command_injection": r"([;&|`$(){}\\])",
        }
    
    def validate_file_path(self, path: str) -> str:
        """Dosya yolunu doğrula"""
        # Normalize et
        normalized = path.replace("\\", "/")
        
        # Path traversal kontrol et
        if re.search(self.patterns["path_traversal"], normalized):
            raise ValueError("Geçersiz dosya yolu: Path traversal denemesi")
        
        # Mutlak yola dönüştür ve sınırları kontrol et
        absolute_path = os.path.abspath(normalized)
        allowed_base = "/app/documents"
        
        if not absolute_path.startswith(allowed_base):
            raise ValueError("Dosya erişim sınırı dışında")
        
        return absolute_path
    
    def validate_sql_query(self, query: str) -> bool:
        """SQL sorgusu doğrula"""
        # Tehlikeli pattern'ler kontrol et
        if re.search(self.patterns["sql_injection"], query, re.IGNORECASE):
            raise ValueError("Geçersiz SQL sorgusu")
        
        # Parameterized query'i zorla
        if not query.count("?") > 0:
            raise ValueError("Parameterized queries kullanmalısınız")
        
        return True
    
    def validate_email(self, email: str) -> str:
        """E-posta adresini doğrula"""
        pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
        
        if not re.match(pattern, email):
            raise ValueError("Geçersiz e-posta adresi")
        
        # Whitelist kontrolü
        domain = email.split("@")[1]
        allowed_domains = ["company.com", "trusted.org"]
        
        if domain not in allowed_domains:
            raise ValueError(f"E-posta alanı {domain} izin verilen listede değil")
        
        return email

# Kullanım
validator = MCPInputValidator()

# Dosya oku
file_path = validator.validate_file_path("/app/documents/report.pdf")

# SQL sorgusu
sql = "SELECT * FROM users WHERE id = ?"
validator.validate_sql_query(sql)

# E-posta
email = validator.validate_email("user@company.com")

3. Tool Isolation ve Sandboxing

Docker’da Araçları Çalıştır

# Dockerfile: Isolated MCP Tool
FROM python:3.11-slim

# Güvenli olmayan paketler yükleme
RUN useradd -m -u 1000 mcp_user && \
    chmod 755 /tmp

WORKDIR /app

# Uygulamayı kopyala
COPY --chown=mcp_user:mcp_user . .

# Düşük ayrıcalıklı kullanıcı olarak çalıştır
USER mcp_user

# Ağ ve kaynak kısıtlamaları
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD python -c "import socket; s = socket.socket(); s.connect(('localhost', 8000))"

ENTRYPOINT ["python", "-u", "mcp_tool.py"]

Process Isolation Konfigürasyonu

# docker-compose.yml
version: '3.8'
services:
  mcp_tool_1:
    image: mcp-isolated-tool:latest
    container_name: mcp_tool_1
    networks:
      - mcp_network
    resources:
      limits:
        cpus: '0.5'
        memory: 512M
      reservations:
        cpus: '0.25'
        memory: 256M
    environment:
      - MCP_TIMEOUT=5
      - LOG_LEVEL=INFO
    security_opt:
      - no-new-privileges:true
    read_only: true
    tmpfs:
      - /tmp
      - /var/tmp
    volumes:
      - /app/safe_data:/data:ro
    cap_drop:
      - ALL
    cap_add:
      - NET_BIND_SERVICE

  mcp_tool_2:
    image: mcp-isolated-tool:latest
    container_name: mcp_tool_2
    networks:
      - mcp_network
    # ... benzer konfigürasyon

networks:
  mcp_network:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

4. Rate Limiting ve DoS Koruması

from functools import wraps
from datetime import datetime, timedelta
from collections import defaultdict
import threading

class RateLimiter:
    """Araç çağrılarını sınırla"""
    
    def __init__(self):
        self.requests = defaultdict(list)
        self.lock = threading.Lock()
    
    def is_allowed(self, user_id: str, tool_name: str, 
                   max_requests: int = 10, 
                   window_seconds: int = 60) -> bool:
        """Rate limit kontrolü"""
        with self.lock:
            now = datetime.now()
            key = f"{user_id}:{tool_name}"
            
            # Eski istekleri temizle
            cutoff = now - timedelta(seconds=window_seconds)
            self.requests[key] = [
                req_time for req_time in self.requests[key]
                if req_time > cutoff
            ]
            
            # Sınırı kontrol et
            if len(self.requests[key]) >= max_requests:
                return False
            
            # Yeni isteği kaydet
            self.requests[key].append(now)
            return True

# Decorator kullanımı
limiter = RateLimiter()

def rate_limited(max_requests=10, window_seconds=60):
    def decorator(func):
        @wraps(func)
        def wrapper(user_id: str, *args, **kwargs):
            tool_name = func.__name__
            
            if not limiter.is_allowed(user_id, tool_name, 
                                     max_requests, window_seconds):
                raise RateLimitError(
                    f"Rate limit exceeded for {tool_name}. "
                    f"Max {max_requests} requests per {window_seconds}s"
                )
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

# Uygulama
@rate_limited(max_requests=5, window_seconds=60)
def query_database(user_id: str, query: str):
    return execute_query(query)

5. Audit Logging ve Monitoring

import json
import logging
from datetime import datetime
from dataclasses import dataclass, asdict

@dataclass
class AuditLog:
    """MCP araçı audit logu"""
    timestamp: str
    user_id: str
    tool_name: str
    action: str
    status: str
    duration_ms: float
    ip_address: str
    input_hash: str
    output_summary: str
    error_message: str = None

class MCPAuditLogger:
    """Kapsamlı audit logging"""
    
    def __init__(self, log_file: str):
        self.logger = logging.getLogger("mcp_audit")
        handler = logging.FileHandler(log_file)
        handler.setFormatter(
            logging.Formatter('%(message)s')
        )
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_tool_execution(self, 
                          user_id: str,
                          tool_name: str,
                          action: str,
                          status: str,
                          duration_ms: float,
                          ip_address: str,
                          user_input: str,
                          output: str,
                          error: str = None):
        """Araç yürütmesini kaydıt et"""
        
        import hashlib
        input_hash = hashlib.sha256(user_input.encode()).hexdigest()[:16]
        output_summary = output[:100] + "..." if len(output) > 100 else output
        
        audit_log = AuditLog(
            timestamp=datetime.utcnow().isoformat(),
            user_id=user_id,
            tool_name=tool_name,
            action=action,
            status=status,
            duration_ms=duration_ms,
            ip_address=ip_address,
            input_hash=input_hash,
            output_summary=output_summary,
            error_message=error
        )
        
        self.logger.info(json.dumps(asdict(audit_log)))

# Kullanım
audit_logger = MCPAuditLogger("/var/log/mcp_audit.log")

# Araç yürütülürken
start_time = time.time()
try:
    result = execute_tool(user_input)
    duration = (time.time() - start_time) * 1000
    
    audit_logger.log_tool_execution(
        user_id="user123",
        tool_name="read_file",
        action="READ",
        status="SUCCESS",
        duration_ms=duration,
        ip_address=request.remote_addr,
        user_input=user_input,
        output=result
    )
except Exception as e:
    duration = (time.time() - start_time) * 1000
    
    audit_logger.log_tool_execution(
        user_id="user123",
        tool_name="read_file",
        action="READ",
        status="FAILED",
        duration_ms=duration,
        ip_address=request.remote_addr,
        user_input=user_input,
        output="",
        error=str(e)
    )

6. Tool Manifest Güvenliği

{
  "name": "safe_mcp_tools",
  "version": "1.0.0",
  "tools": [
    {
      "name": "read_document",
      "description": "Belirtilen belgeyi oku",
      "type": "file_reader",
      "requiredArgs": ["file_path"],
      "optionalArgs": ["encoding"],
      "timeout": 5000,
      "maxResourcesUsage": {
        "memory": "100MB",
        "cpu": "50%"
      },
      "accessControl": {
        "allowedUsers": ["group:analysts"],
        "deniedUsers": [],
        "requiresApproval": false
      },
      "security": {
        "validateInput": true,
        "sanitizeOutput": true,
        "logExecution": true
      },
      "rateLimit": {
        "requestsPerMinute": 60,
        "requestsPerHour": 1000
      },
      "allowedPaths": [
        "/documents/safe",
        "/reports"
      ],
      "blockedPatterns": [
        "*.secret",
        "*.key",
        "*password*"
      ]
    }
  ],
  "integrations": {
    "llmModel": "gpt-4",
    "contextWindow": 8192,
    "maxToolsPerRequest": 5
  }
}

7. Credential Management

from abc import ABC, abstractmethod
import os
from cryptography.fernet import Fernet

class CredentialStore(ABC):
    """Kimlik bilgileri için soyut depo"""
    
    @abstractmethod
    def get(self, key: str) -> str:
        pass
    
    @abstractmethod
    def set(self, key: str, value: str) -> None:
        pass

class EncryptedCredentialStore(CredentialStore):
    """Şifreli kimlik bilgileri deposu"""
    
    def __init__(self, key_file: str = "/etc/mcp/encryption.key"):
        self.cipher = self._load_cipher(key_file)
        self.store = {}
    
    def _load_cipher(self, key_file: str) -> Fernet:
        if not os.path.exists(key_file):
            raise FileNotFoundError(f"Encryption key not found: {key_file}")
        
        with open(key_file, 'rb') as f:
            key = f.read()
        
        return Fernet(key)
    
    def get(self, key: str) -> str:
        if key not in self.store:
            raise KeyError(f"Credential not found: {key}")
        
        encrypted = self.store[key]
        return self.cipher.decrypt(encrypted).decode()
    
    def set(self, key: str, value: str) -> None:
        encrypted = self.cipher.encrypt(value.encode())
        self.store[key] = encrypted

# ❌ YANLIŞ: Kimlik bilgilerini kodda
database_url = "postgresql://user:password@localhost/db"

# ✅ DOĞRU: Güvenli depodan oku
cred_store = EncryptedCredentialStore()
database_url = cred_store.get("db_connection_string")

# Çevre değişkenlerinden oku
api_key = os.getenv("API_KEY")  # .env dosyasından veya sistem ayarlarından

8. Dependency Management

# requirements.txt - Tam sürüm pinleme
python-dotenv==1.0.0
requests==2.31.0
cryptography==41.0.0
pydantic==2.5.0

# Güvenlik açığı kontrolü
# pip install safety
# safety check

# Düzenli güncellemeler
# pip list --outdated

Best Practices Özet

PratikAçıklamaÖncelik
En Az YetkiSadece ihtiyaç duyulan izinlerKritik
Input ValidationTüm girdileri kontrol etKritik
Logging & AuditHer şeyi kaydıt etKritik
Rate LimitingDoS saldırılarından koruYüksek
IsolationAraçları izole etYüksek
Credential ManagementSırları güvenli tutYüksek
Regular AuditsPeriyodik kontrolOrta
Security HeadersHTTP başlıklarıOrta

Sonuç

MCP güvenliği tek bir özellik ya da kontrol listesi değil. Gerçek güvenlik, tasarımdan implementasyona kadar tüm süreçte katmanlı bir yaklaşım gerektirir.

Önemli olan gözlemlemek, doğrulamak, izole etmek ve kayıt tutmak. Bu prensipler üzerine inşa ettiğiniz sistemler hem güvenli hem de sürdürülebilir olacaktır.