×

t_text - 翻译文本接口对接全攻略:从入门到精通

万邦科技Lex 万邦科技Lex 发表于2026-02-05 09:01:14 浏览43 评论0

抢沙发发表评论

一、接口概览

1.1 接口简介

t_text通用翻译文本接口,支持多语言、多格式、多场景的文本翻译,提供标准化、高可用、易集成的翻译服务解决方案。

1.2 核心特性

  • 多语言支持:200+种语言互译

  • 智能翻译:神经网络翻译、语境理解

  • 批量处理:支持大规模文本批量翻译

  • 格式保持:HTML/Markdown/JSON等格式

  • 质量保障:术语优化、后编辑处理

  • 成本优化:智能成本控制策略

二、准备工作

2.1 环境配置

# requirements.txt requests>=2.28.0 python-dotenv>=1.0.0 pydantic>=2.0.0 aiohttp>=3.8.0 redis>=4.5.0

2.2 认证配置

# config.py import os from dotenv import load_dotenv from typing import Dict, Any load_dotenv() class TranslationConfig:     # 翻译服务配置     TRANSLATION_SERVICE = os.getenv('TRANSLATION_SERVICE', 'baidu')  # baidu/tencent/aliyun/youdao/google          # 百度翻译配置     BAIDU_APP_ID = os.getenv('BAIDU_APP_ID')     BAIDU_APP_KEY = os.getenv('BAIDU_APP_KEY')     BAIDU_API_BASE = 'https://fanyi-api.baidu.com/api/trans/vip/translate'          # 腾讯翻译配置     TENCENT_SECRET_ID = os.getenv('TENCENT_SECRET_ID')     TENCENT_SECRET_KEY = os.getenv('TENCENT_SECRET_KEY')     TENCENT_API_BASE = 'https://tmt.tencentcloudapi.com'          # 阿里云翻译配置     ALIYUN_ACCESS_KEY_ID = os.getenv('ALIYUN_ACCESS_KEY_ID')     ALIYUN_ACCESS_KEY_SECRET = os.getenv('ALIYUN_ACCESS_KEY_SECRET')     ALIYUN_API_BASE = 'https://mt.cn-hangzhou.aliyuncs.com'          # 有道翻译配置     YOUDAO_APP_KEY = os.getenv('YOUDAO_APP_KEY')     YOUDAO_APP_SECRET = os.getenv('YOUDAO_APP_SECRET')     YOUDAO_API_BASE = 'https://openapi.youdao.com/api'          # Google翻译配置     GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')     GOOGLE_API_BASE = 'https://translation.googleapis.com/language/translate/v2'          # 请求配置     REQUEST_TIMEOUT = 30     MAX_TEXT_LENGTH = 5000     BATCH_SIZE = 50          # 缓存配置     CACHE_TTL = 86400  # 24小时     REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')          # 错误配置     MAX_RETRIES = 3     RETRY_DELAY = 1.0

三、接口详解

3.1 接口地址

POST /translate

3.2 请求参数详解

基础参数

参数名类型必填说明示例
qstring/array待翻译文本"Hello World"
sourcestring源语言代码"en"
targetstring目标语言代码"zh"
formatstring文本格式"text/html/json"
versionstringAPI版本"1.0"

高级参数

参数名类型必填说明示例
domainstring专业领域"general/finance/medical"
glossary_idstring术语库ID"glossary_001"
preserve_formattingbool保持格式true
fallbackbool降级处理true
retry_countint重试次数3

3.3 语言代码表

语言代码说明
中文(简体)zh/zh-CN简体中文
中文(繁体)zh-TW繁体中文
英语en英语
日语ja日语
韩语ko韩语
法语fr法语
德语de德语
俄语ru俄语
西班牙语es西班牙语
阿拉伯语ar阿拉伯语
葡萄牙语pt葡萄牙语
意大利语it意大利语

四、完整代码实现

4.1 Python完整实现

import requests import time import hashlib import hmac import json import random from typing import Dict, Any, List, Optional, Union from datetime import datetime, timedelta from dataclasses import dataclass from urllib.parse import urlencode import redis import asyncio import aiohttp @dataclass class TranslationRequest:     """翻译请求"""     text: Union[str, List[str]]     source_lang: str     target_lang: str     format: str = "text"     domain: Optional[str] = None     glossary_id: Optional[str] = None     preserve_formatting: bool = True     fallback: bool = True     retry_count: int = 3     custom_params: Dict[str, Any] = None @dataclass class TranslationResult:     """翻译结果"""     success: bool     code: int     message: str     data: Dict[str, Any]     source_text: Union[str, List[str]]     translated_text: Union[str, List[str]]     source_lang: str     target_lang: str     service: str     translation_time: float     confidence: Optional[float] = None     glossary_matches: Optional[List[str]] = None @dataclass class BatchTranslationResult:     """批量翻译结果"""     success: bool     code: int     message: str     data: Dict[str, Any]     results: List[TranslationResult]     total_count: int     success_count: int     failed_count: int     total_time: float class TranslationAPI:     """通用翻译API客户端"""          def __init__(self, config: Dict[str, Any], redis_client=None):         self.config = config         self.redis = redis_client         self.session = requests.Session()         self.session.headers.update({             'User-Agent': 'Translation-API-Client/1.0',             'Accept': 'application/json'         })          def translate(         self,         text: Union[str, List[str]],         source_lang: str,         target_lang: str,         service: Optional[str] = None,         **kwargs     ) -> TranslationResult:         """         翻译文本                  Args:             text: 待翻译文本(字符串或列表)             source_lang: 源语言代码             target_lang: 目标语言代码             service: 翻译服务(baidu/tencent/aliyun/youdao/google)             **kwargs: 其他参数                  Returns:             翻译结果         """         if service is None:             service = self.config.get('TRANSLATION_SERVICE', 'baidu')                  # 构建请求         request = TranslationRequest(             text=text,             source_lang=source_lang,             target_lang=target_lang,             **kwargs         )                  # 检查缓存         cache_key = self._get_cache_key(request, service)         if self.redis:             cached = self.redis.get(cache_key)             if cached:                 data = json.loads(cached)                 return TranslationResult(**data)                  # 执行翻译         start_time = time.time()                  try:             if service == 'baidu':                 result = self._translate_baidu(request)             elif service == 'tencent':                 result = self._translate_tencent(request)             elif service == 'aliyun':                 result = self._translate_aliyun(request)             elif service == 'youdao':                 result = self._translate_youdao(request)             elif service == 'google':                 result = self._translate_google(request)             else:                 raise ValueError(f"不支持的翻译服务: {service}")                          result.translation_time = time.time() - start_time                          # 缓存结果             if self.redis and result.success:                 self.redis.setex(                     cache_key,                     self.config.get('CACHE_TTL', 86400),                     json.dumps(result.__dict__)                 )                          return result                      except Exception as e:             translation_time = time.time() - start_time             return TranslationResult(                 success=False,                 code=500,                 message=f"翻译失败: {str(e)}",                 data={},                 source_text=text,                 translated_text=[] if isinstance(text, list) else "",                 source_lang=source_lang,                 target_lang=target_lang,                 service=service,                 translation_time=translation_time             )          def _translate_baidu(self, request: TranslationRequest) -> TranslationResult:         """百度翻译"""         app_id = self.config.get('BAIDU_APP_ID')         app_key = self.config.get('BAIDU_APP_KEY')         api_base = self.config.get('BAIDU_API_BASE')                  if not app_id or not app_key:             raise ValueError("百度翻译配置缺失")                  # 构建请求参数         if isinstance(request.text, str):             query = request.text         else:             query = '\n'.join(request.text)                  salt = str(random.randint(32768, 65536))         sign_str = app_id + query + salt + app_key         sign = hashlib.md5(sign_str.encode('utf-8')).hexdigest()                  params = {             'q': query,             'from': request.source_lang,             'to': request.target_lang,             'appid': app_id,             'salt': salt,             'sign': sign         }                  # 发送请求         response = self.session.post(api_base, data=params, timeout=self.config.get('REQUEST_TIMEOUT', 30))                  if response.status_code == 200:             data = response.json()                          if 'trans_result' in data:                 trans_result = data['trans_result']                                  if isinstance(request.text, str):                     translated_text = trans_result[0]['dst']                 else:                     translated_text = [item['dst'] for item in trans_result]                                  return TranslationResult(                     success=True,                     code=200,                     message="成功",                     data=data,                     source_text=request.text,                     translated_text=translated_text,                     source_lang=request.source_lang,                     target_lang=request.target_lang,                     service="baidu",                     translation_time=0                 )             else:                 error_code = data.get('error_code', 0)                 error_msg = data.get('error_msg', '未知错误')                 return TranslationResult(                     success=False,                     code=error_code,                     message=error_msg,                     data=data,                     source_text=request.text,                     translated_text=[] if isinstance(request.text, list) else "",                     source_lang=request.source_lang,                     target_lang=request.target_lang,                     service="baidu",                     translation_time=0                 )         else:             return TranslationResult(                 success=False,                 code=response.status_code,                 message=f"HTTP {response.status_code}",                 data={},                 source_text=request.text,                 translated_text=[] if isinstance(request.text, list) else "",                 source_lang=request.source_lang,                 target_lang=request.target_lang,                 service="baidu",                 translation_time=0             )          def _translate_google(self, request: TranslationRequest) -> TranslationResult:         """Google翻译"""         api_key = self.config.get('GOOGLE_API_KEY')         api_base = self.config.get('GOOGLE_API_BASE')                  if not api_key:             raise ValueError("Google翻译配置缺失")                  # 构建请求参数         params = {             'q': request.text if isinstance(request.text, str) else request.text,             'source': request.source_lang,             'target': request.target_lang,             'key': api_key,             'format': request.format         }                  # 发送请求         headers = {             'Content-Type': 'application/json',             'X-Goog-Api-Key': api_key         }                  if isinstance(request.text, str):             data = {                 'q': [request.text],                 'source': request.source_lang,                 'target': request.target_lang             }         else:             data = {                 'q': request.text,                 'source': request.source_lang,                 'target': request.target_lang             }                  response = self.session.post(             api_base,             params=params,             json=data,             headers=headers,             timeout=self.config.get('REQUEST_TIMEOUT', 30)         )                  if response.status_code == 200:             data = response.json()                          if 'data' in data and 'translations' in data['data']:                 translations = data['data']['translations']                                  if isinstance(request.text, str):                     translated_text = translations[0]['translatedText']                 else:                     translated_text = [item['translatedText'] for item in translations]                                  return TranslationResult(                     success=True,                     code=200,                     message="成功",                     data=data,                     source_text=request.text,                     translated_text=translated_text,                     source_lang=request.source_lang,                     target_lang=request.target_lang,                     service="google",                     translation_time=0                 )             else:                 return TranslationResult(                     success=False,                     code=500,                     message="Google翻译返回数据格式错误",                     data=data,                     source_text=request.text,                     translated_text=[] if isinstance(request.text, list) else "",                     source_lang=request.source_lang,                     target_lang=request.target_lang,                     service="google",                     translation_time=0                 )         else:             return TranslationResult(                 success=False,                 code=response.status_code,                 message=f"HTTP {response.status_code}",                 data={},                 source_text=request.text,                 translated_text=[] if isinstance(request.text, list) else "",                 source_lang=request.source_lang,                 target_lang=request.target_lang,                 service="google",                 translation_time=0             )          def _translate_youdao(self, request: TranslationRequest) -> TranslationResult:         """有道翻译"""         app_key = self.config.get('YOUDAO_APP_KEY')         app_secret = self.config.get('YOUDAO_APP_SECRET')         api_base = self.config.get('YOUDAO_API_BASE')                  if not app_key or not app_secret:             raise ValueError("有道翻译配置缺失")                  # 构建请求参数         if isinstance(request.text, str):             query = request.text         else:             query = ' '.join(request.text)  # 有道不支持多文本,用空格连接                  salt = str(random.randint(1, 100000))         sign_str = app_key + query + salt + app_secret         sign = hashlib.md5(sign_str.encode('utf-8')).hexdigest()                  params = {             'q': query,             'from': request.source_lang,             'to': request.target_lang,             'appKey': app_key,             'salt': salt,             'sign': sign         }                  # 发送请求         response = self.session.post(api_base, data=params, timeout=self.config.get('REQUEST_TIMEOUT', 30))                  if response.status_code == 200:             data = response.json()                          if 'translation' in data and len(data['translation']) > 0:                 if isinstance(request.text, str):                     translated_text = data['translation'][0]                 else:                     # 有道返回单个字符串,需要分割                     translated_text = [data['translation'][0]]                                  return TranslationResult(                     success=True,                     code=200,                     message="成功",                     data=data,                     source_text=request.text,                     translated_text=translated_text,                     source_lang=request.source_lang,                     target_lang=request.target_lang,                     service="youdao",                     translation_time=0                 )             else:                 error_code = data.get('errorCode', 0)                 return TranslationResult(                     success=False,                     code=error_code,                     message=data.get('msg', '未知错误'),                     data=data,                     source_text=request.text,                     translated_text=[] if isinstance(request.text, list) else "",                     source_lang=request.source_lang,                     target_lang=request.target_lang,                     service="youdao",                     translation_time=0                 )         else:             return TranslationResult(                 success=False,                 code=response.status_code,                 message=f"HTTP {response.status_code}",                 data={},                 source_text=request.text,                 translated_text=[] if isinstance(request.text, list) else "",                 source_lang=request.source_lang,                 target_lang=request.target_lang,                 service="youdao",                 translation_time=0             )          def _translate_tencent(self, request: TranslationRequest) -> TranslationResult:         """腾讯翻译"""         # 腾讯翻译需要复杂的签名计算,这里简化实现         # 实际使用时建议使用腾讯云SDK         secret_id = self.config.get('TENCENT_SECRET_ID')         secret_key = self.config.get('TENCENT_SECRET_KEY')         api_base = self.config.get('TENCENT_API_BASE')                  if not secret_id or not secret_key:             raise ValueError("腾讯翻译配置缺失")                  try:             from tencentcloud.common import credential             from tencentcloud.common.profile.client_profile import ClientProfile             from tencentcloud.common.profile.http_profile import HttpProfile             from tencentcloud.tmt.v20180321 import tmt_client, models                          # 初始化客户端             cred = credential.Credential(secret_id, secret_key)             http_profile = HttpProfile()             http_profile.endpoint = "tmt.tencentcloudapi.com"             client_profile = ClientProfile()             client_profile.httpProfile = http_profile             client = tmt_client.TmtClient(cred, "ap-beijing", client_profile)                          # 构建请求             req = models.TextTranslateRequest()             req.SourceText = request.text if isinstance(request.text, str) else ' '.join(request.text)             req.Source = request.source_lang             req.Target = request.target_lang             req.ProjectId = 0                          # 发送请求             resp = client.TextTranslate(req)                          if isinstance(request.text, str):                 translated_text = resp.TargetText             else:                 translated_text = [resp.TargetText]                          return TranslationResult(                 success=True,                 code=200,                 message="成功",                 data=resp.to_json_string(),                 source_text=request.text,                 translated_text=translated_text,                 source_lang=request.source_lang,                 target_lang=request.target_lang,                 service="tencent",                 translation_time=0             )                      except Exception as e:             return TranslationResult(                 success=False,                 code=500,                 message=f"腾讯翻译失败: {str(e)}",                 data={},                 source_text=request.text,                 translated_text=[] if isinstance(request.text, list) else "",                 source_lang=request.source_lang,                 target_lang=request.target_lang,                 service="tencent",                 translation_time=0             )          def _translate_aliyun(self, request: TranslationRequest) -> TranslationResult:         """阿里云翻译"""         # 阿里云翻译需要复杂的签名计算,这里简化实现         # 实际使用时建议使用阿里云SDK         access_key_id = self.config.get('ALIYUN_ACCESS_KEY_ID')         access_key_secret = self.config.get('ALIYUN_ACCESS_KEY_SECRET')         api_base = self.config.get('ALIYUN_API_BASE')                  if not access_key_id or not access_key_secret:             raise ValueError("阿里云翻译配置缺失")                  try:             from alibabacloud_tea_openapi import models as aliyun_models             from alibabacloud_mt20180807.client import Client as AliyunClient             from alibabacloud_mt20180807 import models as mt_models                          # 初始化客户端             config = aliyun_models.Config(                 access_key_id=access_key_id,                 access_key_secret=access_key_secret             )             config.endpoint = 'mt.cn-hangzhou.aliyuncs.com'             client = AliyunClient(config)                          # 构建请求             req = mt_models.TranslateGeneralRequest()             req.source_text = request.text if isinstance(request.text, str) else ' '.join(request.text)             req.source_language = request.source_lang             req.target_language = request.target_lang             req.format_type = 'text'                          # 发送请求             resp = client.translate_general(req)                          if isinstance(request.text, str):                 translated_text = resp.data.translated             else:                 translated_text = [resp.data.translated]                          return TranslationResult(                 success=True,                 code=200,                 message="成功",                 data=resp.to_map(),                 source_text=request.text,                 translated_text=translated_text,                 source_lang=request.source_lang,                 target_lang=request.target_lang,                 service="aliyun",                 translation_time=0             )                      except Exception as e:             return TranslationResult(                 success=False,                 code=500,                 message=f"阿里云翻译失败: {str(e)}",                 data={},                 source_text=request.text,                 translated_text=[] if isinstance(request.text, list) else "",                 source_lang=request.source_lang,                 target_lang=request.target_lang,                 service="aliyun",                 translation_time=0             )          def batch_translate(         self,         texts: List[str],         source_lang: str,         target_lang: str,         service: Optional[str] = None,         batch_size: Optional[int] = None,         **kwargs     ) -> BatchTranslationResult:         """         批量翻译文本                  Args:             texts: 待翻译文本列表             source_lang: 源语言代码             target_lang: 目标语言代码             service: 翻译服务             batch_size: 批次大小             **kwargs: 其他参数                  Returns:             批量翻译结果         """         if batch_size is None:             batch_size = self.config.get('BATCH_SIZE', 50)                  start_time = time.time()         results = []         success_count = 0         failed_count = 0                  # 分批处理         for i in range(0, len(texts), batch_size):             batch_texts = texts[i:i+batch_size]                          try:                 result = self.translate(                     batch_texts,                     source_lang,                     target_lang,                     service=service,                     **kwargs                 )                                  results.append(result)                                  if result.success:                     success_count += len(batch_texts)                 else:                     failed_count += len(batch_texts)                                  # 避免请求过于频繁                 time.sleep(0.1)                              except Exception as e:                 failed_count += len(batch_texts)                 error_result = TranslationResult(                     success=False,                     code=500,                     message=str(e),                     data={},                     source_text=batch_texts,                     translated_text=[],                     source_lang=source_lang,                     target_lang=target_lang,                     service=service or self.config.get('TRANSLATION_SERVICE', 'baidu'),                     translation_time=0                 )                 results.append(error_result)                  total_time = time.time() - start_time                  return BatchTranslationResult(             success=success_count > 0,             code=200 if success_count > 0 else 500,             message=f"批量翻译完成,成功{success_count}条,失败{failed_count}条",             data={},             results=results,             total_count=len(texts),             success_count=success_count,             failed_count=failed_count,             total_time=total_time         )          def _get_cache_key(self, request: TranslationRequest, service: str) -> str:         """生成缓存键"""         import hashlib                  # 构建唯一标识         text_str = request.text if isinstance(request.text, str) else '|'.join(request.text)         key_data = f"{service}:{text_str}:{request.source_lang}:{request.target_lang}"                  return f"translation:{hashlib.md5(key_data.encode()).hexdigest()}"          def translate_with_retry(         self,         text: Union[str, List[str]],         source_lang: str,         target_lang: str,         service: Optional[str] = None,         max_retries: Optional[int] = None,         **kwargs     ) -> TranslationResult:         """         带重试的翻译                  Args:             text: 待翻译文本             source_lang: 源语言             target_lang: 目标语言             service: 翻译服务             max_retries: 最大重试次数             **kwargs: 其他参数                  Returns:             翻译结果         """         if max_retries is None:             max_retries = self.config.get('MAX_RETRIES', 3)                  last_exception = None                  for attempt in range(max_retries + 1):             try:                 result = self.translate(text, source_lang, target_lang, service, **kwargs)                                  if result.success:                     return result                 elif attempt < max_retries:                     # 等待后重试                     delay = self.config.get('RETRY_DELAY', 1.0) * (2 ** attempt)  # 指数退避                     time.sleep(delay)                     continue                 else:                     return result                                  except Exception as e:                 last_exception = e                 if attempt < max_retries:                     delay = self.config.get('RETRY_DELAY', 1.0) * (2 ** attempt)                     time.sleep(delay)                 else:                     return TranslationResult(                         success=False,                         code=500,                         message=f"翻译重试失败: {str(e)}",                         data={},                         source_text=text,                         translated_text=[] if isinstance(text, list) else "",                         source_lang=source_lang,                         target_lang=target_lang,                         service=service or self.config.get('TRANSLATION_SERVICE', 'baidu'),                         translation_time=0                     )                  return TranslationResult(             success=False,             code=500,             message=f"翻译失败: {str(last_exception)}",             data={},             source_text=text,             translated_text=[] if isinstance(text, list) else "",             source_lang=source_lang,             target_lang=target_lang,             service=service or self.config.get('TRANSLATION_SERVICE', 'baidu'),             translation_time=0         ) # 使用示例 def demo_translation_api():     """翻译API使用演示"""          # 初始化客户端     api = TranslationAPI(TranslationConfig)          print("=== 示例1:简单翻译 ===")     result = api.translate(         text="Hello, world! This is a translation example.",         source_lang="en",         target_lang="zh",         service="baidu"     )          if result.success:         print(f"原文: {result.source_text}")         print(f"译文: {result.translated_text}")         print(f"耗时: {result.translation_time:.3f}s")     else:         print(f"翻译失败: {result.message}")          print("\n=== 示例2:批量翻译 ===")     texts = [         "Good morning!",         "How are you today?",         "Have a nice day!",         "Thank you very much!"     ]          batch_result = api.batch_translate(texts, "en", "zh", service="baidu")          if batch_result.success:         for i, result in enumerate(batch_result.results):             if result.success:                 print(f"批次{i+1}: 成功翻译{len(result.translated_text)}条")             else:                 print(f"批次{i+1}: 翻译失败 - {result.message}")          print("\n=== 示例3:多服务对比 ===")     services = ['baidu', 'google']     text = "The quick brown fox jumps over the lazy dog."          for service in services:         result = api.translate(text, "en", "zh", service=service)         if result.success:             print(f"{service}: {result.translated_text}") if __name__ == "__main__":     demo_translation_api()

4.2 异步实现

import aiohttp from typing import List, Dict, Any, Union class AsyncTranslationAPI:    
 """异步翻译API客户端"""          def __init__(self, config: Dict[str, Any], redis_client=None):         
 self.config = config         self.redis = redis_client        
 self.session = None          async def __aenter__(self):         
 self.session = aiohttp.ClientSession()         return self          
 async def __aexit__(self, exc_type, exc_val, exc_tb):         await self.session.close()          
 async def translate_async(         self,         text: Union[str, List[str]],         source_lang: str,       
   target_lang: str,         service: Optional[str] = None,         **kwargs     ) -> TranslationResult:       
     """         异步翻译文本                  Args:             
     text: 待翻译文本             source_lang: 源语言             
     target_lang: 目标语言             service: 翻译服务         
     # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex    
     **kwargs: 其他参数                  Returns:             翻译结果         """         if service is None:             service = self.config.get('TRANSLATION_SERVICE', 'baidu')                  # 检查缓存         cache_key = self._get_cache_key(text, source_lang, target_lang, service)         if self.redis:             cached = await self.redis.get(cache_key)             if cached:                 data = json.loads(cached)                 return TranslationResult(**data)                  start_time = time.time()                  try:             if service == 'baidu':                 result = await self._translate_baidu_async(text, source_lang, target_lang)             elif service == 'google':                 result = await self._translate_google_async(text, source_lang, target_lang)             else:                 raise ValueError(f"不支持的异步翻译服务: {service}")                          result.translation_time = time.time() - start_time                          # 缓存结果             if self.redis and result.success:                 await self.redis.setex(                     cache_key,                     self.config.get('CACHE_TTL', 86400),                     json.dumps(result.__dict__)                 )                          return result                      except Exception as e:             translation_time = time.time() - start_time             return TranslationResult(                 success=False,                 code=500,                 message=f"翻译失败: {str(e)}",                 data={},                 source_text=text,                 translated_text=[] if isinstance(text, list) else "",                 source_lang=source_lang,                 target_lang=target_lang,                 service=service,                 translation_time=translation_time             )          async def _translate_baidu_async(         self,         text: Union[str, List[str]],         source_lang: str,         target_lang: str     ) -> TranslationResult:         """异步百度翻译"""         app_id = self.config.get('BAIDU_APP_ID')         app_key = self.config.get('BAIDU_APP_KEY')         api_base = self.config.get('BAIDU_API_BASE')                  if not app_id or not app_key:             raise ValueError("百度翻译配置缺失")                  # 构建请求参数         if isinstance(text, str):             query = text         else:             query = '\n'.join(text)                  salt = str(random.randint(32768, 65536))         sign_str = app_id + query + salt + app_key         sign = hashlib.md5(sign_str.encode('utf-8')).hexdigest()                  params = {             'q': query,             'from': source_lang,             'to': target_lang,             'appid': app_id,             'salt': salt,             'sign': sign         }                  # 发送请求         async with self.session.post(api_base, data=params) as response:             if response.status == 200:                 data = await response.json()                                  if 'trans_result' in data:                     trans_result = data['trans_result']                                          if isinstance(text, str):                         translated_text = trans_result[0]['dst']                     else:                         translated_text = [item['dst'] for item in trans_result]                                          return TranslationResult(                         success=True,                         code=200,                         message="成功",                         data=data,                         source_text=text,                         translated_text=translated_text,                         source_lang=source_lang,                         target_lang=target_lang,                         service="baidu",                         translation_time=0                     )                 else:                     error_code = data.get('error_code', 0)                     error_msg = data.get('error_msg', '未知错误')                     return TranslationResult(                         success=False,                         code=error_code,                         message=error_msg,                         data=data,                         source_text=text,                         translated_text=[] if isinstance(text, list) else "",                         source_lang=source_lang,                         target_lang=target_lang,                         service="baidu",                         translation_time=0                     )             else:                 return TranslationResult(                     success=False,                     code=response.status,                     message=f"HTTP {response.status}",                     data={},                     source_text=text,                     translated_text=[] if isinstance(text, list) else "",                     source_lang=source_lang,                     target_lang=target_lang,                     service="baidu",                     translation_time=0                 )          async def batch_translate_async(         self,         texts: List[str],         source_lang: str,         target_lang: str,         service: Optional[str] = None,         batch_size: Optional[int] = None,         **kwargs     ) -> BatchTranslationResult:         """         异步批量翻译         """         if batch_size is None:             batch_size = self.config.get('BATCH_SIZE', 50)                  start_time = time.time()         tasks = []                  # 创建异步任务         for i in range(0, len(texts), batch_size):             batch_texts = texts[i:i+batch_size]             task = self.translate_async(batch_texts, source_lang, target_lang, service, **kwargs)             tasks.append(task)                  # 并行执行         results = await asyncio.gather(*tasks)                  # 统计结果         success_count = sum(1 for r in results if r.success)         failed_count = len(results) - success_count         total_time = time.time() - start_time                  return BatchTranslationResult(             success=success_count > 0,             code=200 if success_count > 0 else 500,             message=f"批量翻译完成,成功{success_count}条,失败{failed_count}条",             data={},             results=results,             total_count=len(texts),             success_count=success_count,             failed_count=failed_count,             total_time=total_time         ) # 异步使用示例 async def demo_async_translation():     """异步翻译演示"""     async with AsyncTranslationAPI(TranslationConfig) as api:         texts = [             "Hello, world!",             "How are you?",             "Good morning!",             "Have a nice day!"         ]                  result = await api.batch_translate_async(texts, "en", "zh", "baidu")                  if result.success:             for i, translation in enumerate(result.results):                 if translation.success:                     print(f"文本{i+1}: {translation.translated_text}")

五、返回结果解析

5.1 成功响应示例

{   "success": true,   "code": 200,   "message": "成功",   "data": {     "from": "en",     "to": "zh",   
  "trans_result": [       {         "src": "Hello, world!",         "dst": "你好,世界!"       }     ]   },  
   "source_text": "Hello, world!",   "translated_text": "你好,世界!",   "source_lang": "en",   "target_lang": "zh",   "service": "baidu",  
    "translation_time": 0.345,   "confidence": 0.95 }

5.2 批量响应示例

{   "success": true,   "code": 200,   
"message": "批量翻译完成,成功2条,失败0条",   "data": {},   
"results": [     {       "success": true,       "code": 200,       "message": "成功",      
 "data": {         "trans_result": [           {             "src": "Good morning!",           
  # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
   "dst": "早上好!"           }         ]       },       "source_text": "Good morning!",     
     "translated_text": "早上好!",       "source_lang": "en",       "target_lang": "zh",       "service": "baidu",    
        "translation_time": 0.234     },     {       "success": true,       "code": 200,       "message": "成功",     
          "data": {         "trans_result": [           {             "src": "Thank you!",             
          "dst": "谢谢你!"           }         ]       },       "source_text": "Thank you!",      
          "translated_text": "谢谢你!",       "source_lang": "en",       "target_lang": "zh",       "service": "baidu",       "translation_time": 0.256     }   ],   "total_count": 2,   "success_count": 2,   "failed_count": 0,   "total_time": 0.512 }

5.3 错误响应示例

{   "success": false,   "code": 54003,   "message": "访问频率受限",   "data": {},   "source_text": "Hello, world!",   "translated_text": "",   "source_lang": "en",   "target_lang": "zh",   "service": "baidu",   "translation_time": 0.123 }

5.4 错误码说明

错误码说明处理建议
200成功-
52001请求超时重试请求
52002系统错误稍后重试
52003未授权用户检查API密钥
54000必填参数为空检查请求参数
54001签名错误检查签名算法
54003访问频率受限降低请求频率
54004账户余额不足充值账户
54005长查询请求频繁减少长文本请求
58000客户端IP非法检查IP白名单
58001语言方向错误检查语言代码
90107认证未通过重新认证

六、高级功能实现

6.1 智能服务选择

class IntelligentTranslationService:     """智能翻译服务选择器"""          def __init__(self, config: Dict[str, Any]):         self.config = config         self.service_performance = {}          def select_best_service(         self,         source_lang: str,         target_lang: str,         text_length: int,         strategy: str = "auto"     ) -> str:         """         智能选择最佳翻译服务                  Args:             source_lang: 源语言             target_lang: 目标语言             text_length: 文本长度             strategy: 选择策略(auto/cost/quality/speed)                  Returns:             最佳翻译服务         """         available_services = self._get_available_services(source_lang, target_lang, text_length)                  if len(available_services) == 0:             return "baidu"  # 默认服务                  if strategy == "auto":             # 自动选择:综合考虑质量、成本、速度             scored_services = []                          for service in available_services:                 score = self._calculate_service_score(service, text_length)                 scored_services.append((service, score))                          scored_services.sort(key=lambda x: x[1], reverse=True)             return scored_services[0][0]                  elif strategy == "cost":             # 成本优先             cost_config = {                 'baidu': 0.001,                 'tencent': 0.002,                 'aliyun': 0.0015,                 'youdao': 0.002,                 'google': 0.003             }                          min_cost_service = min(available_services, key=lambda x: cost_config.get(x, 1.0))             return min_cost_service                  elif strategy == "quality":             # 质量优先             quality_config = {                 'baidu': 0.85,                 'tencent': 0.82,                 'aliyun': 0.83,                 'youdao': 0.80,                 'google': 0.92             }                          max_quality_service = max(available_services, key=lambda x: quality_config.get(x, 0.0))             return max_quality_service                  elif strategy == "speed":             # 速度优先             speed_config = {                 'baidu': 0.8,                 'tencent': 0.85,                 'aliyun': 0.8,                 'youdao': 0.75,                 'google': 0.9             }                          max_speed_service = max(available_services, key=lambda x: speed_config.get(x, 0.0))             return max_speed_service                  else:             return available_services[0]          def _get_available_services(         self,         source_lang: str,         target_lang: str,         text_length: int     ) -> List[str]:         """获取可用的翻译服务"""         available = []                  # 检查各服务的语言支持和长度限制         service_limits = {             'baidu': {'max_length': 5000, 'languages': ['zh', 'en', 'ja', 'ko', 'fr', 'de', 'ru', 'es']},             'tencent': {'max_length': 2000, 'languages': ['zh', 'en', 'ja', 'ko', 'fr', 'de', 'ru', 'es']},             'aliyun': {'max_length': 5000, 'languages': ['zh', 'en', 'ja', 'ko', 'fr', 'de', 'ru', 'es']},             'youdao': {'max_length': 5000, 'languages': ['zh', 'en', 'ja', 'ko', 'fr', 'de', 'ru', 'es']},             'google': {'max_length': 30000, 'languages': ['zh', 'en', 'ja', 'ko', 'fr', 'de', 'ru', 'es', 'ar', 'pt']}         }                  for service, limits in service_limits.items():             if (source_lang in limits['languages'] and                  target_lang in limits['languages'] and                 text_length <= limits['max_length']):                 available.append(service)                  return available          def _calculate_service_score(self, service: str, text_length: int) -> float:         """计算服务综合得分"""         # 质量权重0.5,成本权重0.3,速度权重0.2         quality_scores = {             'baidu': 0.85,             'tencent': 0.82,             'aliyun': 0.83,             'youdao': 0.80,             'google': 0.92         }                  cost_scores = {             'baidu': 0.001,             'tencent': 0.002,             'aliyun': 0.0015,             'youdao': 0.002,             'google': 0.003         }                  speed_scores = {             'baidu': 0.8,             'tencent': 0.85,             'aliyun': 0.8,             'youdao': 0.75,             'google': 0.9         }                  quality = quality_scores.get(service, 0.0)         cost = 1.0 / cost_scores.get(service, 1.0)  # 成本越低分数越高         speed = speed_scores.get(service, 0.0)                  return quality * 0.5 + cost * 0.3 + speed * 0.2

6.2 术语库管理

class GlossaryManager:     """术语库管理器"""          def __init__(self, redis_client):         self.redis = redis_client         self.glossary_prefix = "glossary:"          def create_glossary(self, glossary_id: str, terms: Dict[str, str]) -> bool:         """创建术语库"""         try:             glossary_key = f"{self.glossary_prefix}{glossary_id}"             self.redis.hset(glossary_key, mapping=terms)             return True         except Exception as e:             print(f"创建术语库失败: {e}")             return False          def get_glossary_terms(self, glossary_id: str) -> Dict[str, str]:         """获取术语库"""         glossary_key = f"{self.glossary_prefix}{glossary_id}"         terms = self.redis.hgetall(glossary_key)         return {k.decode(): v.decode() for k, v in terms.items()} if terms else {}          def apply_glossary(self, text: str, glossary_id: str) -> str:         """应用术语库"""         terms = self.get_glossary_terms(glossary_id)                  for source, target in terms.items():             # 使用正则表达式进行精确匹配             import re             text = re.sub(rf'\b{re.escape(source)}\b', target, text, flags=re.IGNORECASE)                  return text          def batch_apply_glossary(self, texts: List[str], glossary_id: str) -> List[str]:         """批量应用术语库"""         terms = self.get_glossary_terms(glossary_id)         results = []                  for text in texts:             processed_text = self.apply_glossary(text, glossary_id)             results.append(processed_text)                  return results

6.3 翻译质量评估

class TranslationQualityEvaluator:     """翻译质量评估器"""          def __init__(self):         self.metrics = ['accuracy', 'fluency', 'terminology', 'style']          def evaluate(         self,         source_text: str,         translated_text: str,         source_lang: str,         target_lang: str     ) -> float:         """         评估翻译质量                  Args:             source_text: 源文本             translated_text: 译文             source_lang: 源语言             target_lang: 目标语言                  Returns:             质量评分(0-1)         """         if len(source_text.strip()) == 0 or len(translated_text.strip()) == 0:             return 0.0                  # 计算多个维度的评分         accuracy_score = self._calculate_accuracy_score(source_text, translated_text)         fluency_score = self._calculate_fluency_score(translated_text, target_lang)         terminology_score = self._calculate_terminology_score(source_text, translated_text)         style_score = self._calculate_style_score(source_text, translated_text)                  # 加权平均         weights = [0.4, 0.3, 0.2, 0.1]  # 准确性权重最高         scores = [accuracy_score, fluency_score, terminology_score, style_score]                  return sum(score * weight for score, weight in zip(scores, weights))          def _calculate_accuracy_score(self, source: str, target: str) -> float:         """计算准确性评分"""         import re                  # 分词         source_words = re.findall(r'\w+', source.lower())         target_words = re.findall(r'\w+', target.lower())                  if len(source_words) == 0 or len(target_words) == 0:             return 0.0                  # 计算词重叠度         common_words = set(source_words) & set(target_words)         precision = len(common_words) / len(target_words) if len(target_words) > 0 else 0.0         recall = len(common_words) / len(source_words) if len(source_words) > 0 else 0.0                  # F1分数         if precision + recall > 0:             f1_score = 2 * precision * recall / (precision + recall)         else:             f1_score = 0.0                  return min(f1_score * 2, 1.0)  # 归一化到0-1          def _calculate_fluency_score(self, text: str, lang: str) -> float:         """计算流畅性评分"""         if len(text.strip()) == 0:             return 0.0                  import re                  # 检查句子完整性         sentence_endings = ['.', '!', '?', '。', '!', '?']         has_ending = any(text.endswith(char) for char in sentence_endings)                  # 检查句子长度合理性         word_count = len(re.findall(r'\w+', text))         if word_count < 3:             length_score = 0.6         elif word_count < 20:             length_score = 0.8         else:             length_score = 0.9                  # 检查标点符号使用         punctuation_count = len(re.findall(r'[.!?,;:,。!?;:]', text))         if word_count > 0:             punctuation_score = min(punctuation_count / (word_count / 10), 1.0)         else:             punctuation_score = 0.0                  # 综合评分         ending_score = 0.8 if has_ending else 0.6         fluency_score = (ending_score + length_score + punctuation_score) / 3                  return min(fluency_score, 1.0)

七、实战应用场景

7.1 国际化网站翻译

class WebsiteTranslator:     """网站翻译器"""          def __init__(self, api_client, glossary_manager):         self.client = api_client         self.glossary = glossary_manager         self.page_cache = {}          def translate_webpage(         self,         content: str,         source_lang: str,         target_lang: str,         content_type: str = "html"     ) -> str:         """         翻译网页内容                  Args:             content: 网页内容             source_lang: 源语言             target_lang: 目标语言             content_type: 内容类型(html/markdown/text)                  Returns:             翻译后的内容         """         # 检查缓存         cache_key = f"webpage:{hashlib.md5(content.encode()).hexdigest()}:{target_lang}"         if cache_key in self.page_cache:             return self.page_cache[cache_key]                  # 提取文本内容         if content_type == "html":             texts = self._extract_text_from_html(content)         elif content_type == "markdown":             texts = self._extract_text_from_markdown(content)         else:             texts = [content]                  # 批量翻译         translated_texts = []         for text in texts:             if len(text.strip()) > 0:                 # 应用术语库                 text = self.glossary.apply_glossary(text, f"{source_lang}_{target_lang}")                                  # 翻译文本                 result = self.client.translate(text, source_lang, target_lang, "baidu")                 if result.success:                     translated_texts.append(result.translated_text)                 else:                     translated_texts.append(text)  # 翻译失败保留原文             else:                 translated_texts.append(text)                  # 重建内容         if content_type == "html":             translated_content = self._rebuild_html_content(content, texts, translated_texts)         elif content_type == "markdown":             translated_content = self._rebuild_markdown_content(content, texts, translated_texts)         else:             translated_content = '\n'.join(translated_texts)                  # 缓存结果         self.page_cache[cache_key] = translated_content                  return translated_content          def _extract_text_from_html(self, html: str) -> List[str]:         """从HTML中提取文本"""         import re         # 移除script和style标签         clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL)         clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)                  # 提取文本内容         texts = []         # 匹配标签内的文本         pattern = r'>([^<]+)<'         matches = re.findall(pattern, clean_html)         for match in matches:             text = match.strip()             if text and len(text) > 0:                 texts.append(text)                  return texts

7.2 文档翻译系统

class DocumentTranslator:     """文档翻译系统"""          def __init__(self, api_client, glossary_manager):         self.client = api_client         self.glossary = glossary_manager          def translate_document(         self,         file_path: str,         source_lang: str,         target_lang: str,         output_path: str = None     ) -> bool:         """         翻译文档                  Args:             file_path: 文档路径             source_lang: 源语言             target_lang: 目标语言             output_path: 输出路径                  Returns:             是否成功         """         import os         from pathlib import Path                  file_ext = Path(file_path).suffix.lower()                  try:             if file_ext == '.txt':                 return self._translate_text_file(file_path, source_lang, target_lang, output_path)             elif file_ext == '.md':                 return self._translate_markdown_file(file_path, source_lang, target_lang, output_path)             else:                 print(f"不支持的文件格式: {file_ext}")                 return False         except Exception as e:             print(f"文档翻译失败: {e}")             return False          def _translate_text_file(         self,         file_path: str,         source_lang: str,         target_lang: str,         output_path: str     ) -> bool:         """翻译文本文件"""         with open(file_path, 'r', encoding='utf-8') as f:             content = f.read()                  # 按段落分割         paragraphs = content.split('\n\n')         translated_paragraphs = []                  for paragraph in paragraphs:             if paragraph.strip():                 result = self.client.translate(paragraph, source_lang, target_lang, "baidu")                 if result.success:                     translated_paragraphs.append(result.translated_text)                 else:                     translated_paragraphs.append(paragraph)  # 翻译失败保留原文             else:                 translated_paragraphs.append('')                  # 写入文件         output_path = output_path or file_path.replace('.txt', f'_{target_lang}.txt')         with open(output_path, 'w', encoding='utf-8') as f:             f.write('\n\n'.join(translated_paragraphs))                  return True

八、故障排查与优化

8.1 常见问题解决

问题1:API限流处理

def translate_with_retry(     self,     text: Union[str, List[str]],     source_lang: str,     target_lang: str,     service: Optional[str] = None,     max_retries: Optional[int] = None,     **kwargs ) -> TranslationResult:     """带重试的翻译"""     if max_retries is None:         max_retries = self.config.get('MAX_RETRIES', 3)          last_exception = None          for attempt in range(max_retries + 1):         try:             result = self.translate(text, source_lang, target_lang, service, **kwargs)                          if result.success:                 return result             elif result.code in [54003, 54004]:  # 频率限制或余额不足                 if attempt < max_retries:                     delay = self.config.get('RETRY_DELAY', 1.0) * (2 ** attempt)                     time.sleep(delay)                     continue             else:                 return result                          except Exception as e:             last_exception = e             if attempt < max_retries:                 delay = self.config.get('RETRY_DELAY', 1.0) * (2 ** attempt)                 time.sleep(delay)             else:                 return TranslationResult(                     success=False,                     code=500,                     message=f"翻译重试失败: {str(e)}",                     data={},                     source_text=text,                     translated_text=[] if isinstance(text, list) else "",                     source_lang=source_lang,                     target_lang=target_lang,                     service=service or self.config.get('TRANSLATION_SERVICE', 'baidu'),                     translation_time=0                 )          return TranslationResult(         success=False,         code=500,         message=f"翻译失败: {str(last_exception)}",         data={},         source_text=text,         translated_text=[] if isinstance(text, list) else "",         source_lang=source_lang,         target_lang=target_lang,         service=service or self.config.get('TRANSLATION_SERVICE', 'baidu'),         translation_time=0     )

问题2:长文本处理

def translate_long_text(     self,     text: str,     source_lang: str,     target_lang: str,     service: Optional[str] = None,     max_length: int = 2000 ) -> TranslationResult:     """翻译长文本(自动分段)"""     if len(text) <= max_length:         return self.translate(text, source_lang, target_lang, service)          # 按句子分割     import re     sentences = re.split(r'[.!?。!?]+', text)          # 合并短句子     segments = []     current_segment = ""     for sentence in sentences:         if len(current_segment) + len(sentence) < max_length:             current_segment += sentence + ". "         else:             if current_segment:                 segments.append(current_segment.strip())             current_segment = sentence + ". "          if current_segment:         segments.append(current_segment.strip())          # 批量翻译     batch_result = self.batch_translate(segments, source_lang, target_lang, service)          if batch_result.success:         translated_text = ' '.join([             result.translated_text for result in batch_result.results              if result.success         ])                  return TranslationResult(             success=True,             code=200,             message="长文本翻译成功",             data={'segments': len(segments)},             source_text=text,             translated_text=translated_text,             source_lang=source_lang,             target_lang=target_lang,             service=service,             translation_time=batch_result.total_time         )     else:         return TranslationResult(             success=False,             code=500,             message="长文本翻译失败",             data={},             source_text=text,             translated_text="",             source_lang=source_lang,             target_lang=target_lang,             service=service,             translation_time=0         )

8.2 性能优化建议

  1. 连接池管理

from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry def create_optimized_session():     """创建优化的会话"""     session = requests.Session()          # 配置重试策略     retry_strategy = Retry(         total=3,         backoff_factor=0.5,         status_forcelist=[429, 500, 502, 503, 504],     )          adapter = HTTPAdapter(         max_retries=retry_strategy,         pool_connections=10,         pool_maxsize=100,         pool_block=False     )          session.mount("https://", adapter)     session.mount("http://", adapter)          return session
  1. 智能缓存策略

class SmartTranslationCache:     """智能翻译缓存"""          def __init__(self, redis_client):         self.redis = redis_client         self.memory_cache = {}         self.memory_ttl = 300  # 5分钟         self.redis_ttl = 86400  # 24小时          def get_translation(self, cache_key: str):         """获取翻译缓存"""         # 检查内存缓存         if cache_key in self.memory_cache:             data, expire_time = self.memory_cache[cache_key]             if time.time() < expire_time):                 return data                  # 检查Redis缓存         if self.redis:             cached = self.redis.get(cache_key)             if cached:                 data = json.loads(cached)                 # 更新内存缓存                 self.memory_cache[cache_key] = (data, time.time() + self.memory_ttl)                 return data                  return None          def set_translation(self, cache_key: str, data: Dict[str, Any], ttl: int = None):         """设置翻译缓存"""         if ttl is None:             ttl = self.redis_ttl                  # 设置内存缓存         self.memory_cache[cache_key] = (data, time.time() + self.memory_ttl)                  # 设置Redis缓存         if self.redis:             self.redis.setex(cache_key, ttl, json.dumps(data))

九、最佳实践总结

9.1 安全实践

  1. 密钥管理:使用环境变量存储API密钥

  2. HTTPS强制:确保所有请求使用HTTPS

  3. 输入验证:验证所有输入参数

  4. 错误处理:不暴露敏感错误信息

9.2 性能实践

  1. 缓存策略:根据文本内容设置合适的缓存时间

  2. 批量操作:合并多个请求减少API调用次数

  3. 异步处理:使用异步IO提高并发性能

  4. 连接复用:使用连接池减少连接建立开销

9.3 业务实践

  1. 术语库管理:维护专业术语库提高翻译质量

  2. 质量评估:定期评估翻译质量并优化策略

  3. 服务选择:根据业务场景选择合适的翻译服务

  4. 成本控制:监控翻译成本并设置限制


附录:快速开始模板

# quick_start.py from translation_api import TranslationAPI, TranslationConfig import redis # 1. 初始化客户端 redis_client = redis.Redis.from_url(TranslationConfig.REDIS_URL) api = TranslationAPI(TranslationConfig, redis_client) # 2. 简单翻译 result = api.translate(     text="Hello, world! This is a translation example.",     source_lang="en",     target_lang="zh",     service="baidu" ) if result.success:     print(f"原文: {result.source_text}")     print(f"译文: {result.translated_text}") # 3. 批量翻译 texts = [     "Good morning!",     "How are you?",     "Have a nice day!",     "Thank you very much!" ] batch_result = api.batch_translate(texts, "en", "zh", "baidu") if batch_result.success:     for i, result in enumerate(batch_result.results):         if result.success:             print(f"文本{i+1}: {result.translated_text}") # 4. 带重试的翻译 result = api.translate_with_retry(     text="The quick brown fox jumps over the lazy dog.",     source_lang="en",     target_lang="zh",     service="baidu",     max_retries=3 )

通过本攻略,您应该能够:

  • 理解翻译接口的完整功能和参数配置

  • 实现多种翻译服务的标准化对接

  • 处理各种错误情况和性能优化

  • 构建高性能的翻译应用系统

  • 在实际业务中灵活应用翻译接口

建议根据实际业务需求选择合适的实现方案,并遵循最佳实践确保系统的稳定性和可维护性。


群贤毕至

访客