API 调用示例
概述
本文档提供了 AutoBoxAPI 的详细调用示例,包括多种编程语言的实现方式、返回结果解析以及常见错误处理。
基础配置
在开始之前,请确保您已经:
- 获得有效的 API_KEY
- 了解 API 基础 URL:
https://your-domain.com/v1 - 准备好要调用的应用 ID
CURL 示例
1. AutoBox 同步任务
# 基础同步调用
curl -X 'POST' \
'https://app.leafmove.com/v1/autobox/' \
-H 'accept: application/json' \
-H 'Authorization: Bearer ak-PLOhbyWCkdzETHQ2o7UabDJ8iJs9weD6' \
-H 'Content-Type: application/json' \
-d '{
"app_id": "B202408261119",
"api_key": "ak-PLOhbyWCkdzETHQ2o7UabDJ8iJs9weD6",
"wait_response": false,
"call_params": {}
}'
成功响应:
{
"id": "chatcmpl-8abc123def456ghi789jkl",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-3.5-turbo",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "你好!我是AutoBox AI助手,很高兴为您服务。我可以帮助您处理各种任务,包括文本生成、问答、翻译等。有什么我可以帮助您的吗?"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 15,
"completion_tokens": 45,
"total_tokens": 60
}
}
2. 流式响应调用
curl -X POST "https://your-domain.com/v1/autobox/your_app_id/sync" \
-H "Authorization: Bearer ak-your32characterapikeyhere" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"role": "user", "content": "写一首关于春天的诗"}
],
"model": "gpt-3.5-turbo",
"stream": true,
"temperature": 0.8
}' \
--no-buffer
流式响应:
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{"content":"春"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{"content":"风"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{"content":"轻"},"finish_reason":null}]}
data: [DONE]
3. 异步任务调用
curl -X POST "https://your-domain.com/v1/autobox/your_app_id/async" \
-H "Authorization: Bearer ak-your32characterapikeyhere" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"role": "user", "content": "分析这份长文档的主要内容..."}
],
"model": "gpt-4",
"temperature": 0.3
}'
异步响应:
{
"detail": "任务已提交",
"task_number": "TASK_20240101120000_ABC123",
"status": "等待中",
"estimated_time": "预计2-5分钟完成"
}
Python 示例
1. 基础同步调用
import requests
import json
class AutoNavClient:
def __init__(self, api_key, base_url="https://your-domain.com/v1"):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def chat_completion(self, app_id, messages, model="gpt-3.5-turbo", **kwargs):
"""同步聊天完成"""
url = f"{self.base_url}/autobox/{app_id}/sync"
payload = {
"messages": messages,
"model": model,
**kwargs
}
try:
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"错误详情: {e.response.text}")
raise
# 使用示例
client = AutoNavClient("ak-your32characterapikeyhere")
messages = [
{"role": "user", "content": "请解释什么是机器学习"}
]
try:
result = client.chat_completion(
app_id="your_app_id",
messages=messages,
temperature=0.7,
max_tokens=1000
)
print("AI回复:", result["choices"][0]["message"]["content"])
print("Token使用:", result["usage"])
except Exception as e:
print(f"调用失败: {e}")
2. 流式响应处理
import requests
import json
def stream_chat_completion(api_key, app_id, messages):
"""处理流式响应"""
url = f"https://your-domain.com/v1/autobox/{app_id}/sync"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"messages": messages,
"model": "gpt-3.5-turbo",
"stream": True,
"temperature": 0.7
}
try:
response = requests.post(url, headers=headers, json=payload, stream=True)
response.raise_for_status()
full_content = ""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:] # 移除 'data: ' 前缀
if data == '[DONE]':
break
try:
chunk = json.loads(data)
if chunk['choices'][0]['delta'].get('content'):
content = chunk['choices'][0]['delta']['content']
full_content += content
print(content, end='', flush=True)
except json.JSONDecodeError:
continue
print() # 换行
return full_content
except requests.exceptions.RequestException as e:
print(f"流式请求失败: {e}")
raise
# 使用示例
messages = [{"role": "user", "content": "写一个Python快速排序的实现"}]
content = stream_chat_completion("ak-your32characterapikeyhere", "your_app_id", messages)
3. 异步任务处理
import time
import requests
class AsyncTaskHandler:
def __init__(self, api_key, base_url="https://your-domain.com/v1"):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def submit_async_task(self, app_id, messages, **kwargs):
"""提交异步任务"""
url = f"{self.base_url}/autobox/{app_id}/async"
payload = {
"messages": messages,
**kwargs
}
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
def get_task_status(self, task_number):
"""查询任务状态"""
url = f"{self.base_url}/task-query/{task_number}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def wait_for_completion(self, task_number, max_wait=300, poll_interval=5):
"""等待任务完成"""
start_time = time.time()
while time.time() - start_time < max_wait:
status = self.get_task_status(task_number)
if status["status"] == "已完成":
return status
elif status["status"] == "已报错":
raise Exception(f"任务执行失败: {status.get('comments', '未知错误')}")
print(f"任务状态: {status['status']}, 进度: {status.get('task_progress', 'N/A')}")
time.sleep(poll_interval)
raise TimeoutError("任务等待超时")
# 使用示例
handler = AsyncTaskHandler("ak-your32characterapikeyhere")
# 提交长时间任务
messages = [{"role": "user", "content": "请详细分析这个复杂的数据集..."}]
task_result = handler.submit_async_task("your_app_id", messages, model="gpt-4")
print(f"任务已提交: {task_result['task_number']}")
# 等待完成
try:
final_result = handler.wait_for_completion(task_result['task_number'])
print("任务完成:", final_result['task_result'])
except Exception as e:
print(f"任务失败: {e}")
JavaScript 示例
1. 基础调用 (Node.js)
const axios = require('axios');
class AutoNavClient {
constructor(apiKey, baseUrl = 'https://your-domain.com/v1') {
this.apiKey = apiKey;
this.baseUrl = baseUrl;
this.headers = {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json'
};
}
async chatCompletion(appId, messages, options = {}) {
const url = `${this.baseUrl}/autobox/${appId}/sync`;
const payload = {
messages,
model: 'gpt-3.5-turbo',
...options
};
try {
const response = await axios.post(url, payload, { headers: this.headers });
return response.data;
} catch (error) {
console.error('API调用失败:', error.response?.data || error.message);
throw error;
}
}
async streamChatCompletion(appId, messages, onChunk, options = {}) {
const url = `${this.baseUrl}/autobox/${appId}/sync`;
const payload = {
messages,
model: 'gpt-3.5-turbo',
stream: true,
...options
};
try {
const response = await axios.post(url, payload, {
headers: this.headers,
responseType: 'stream'
});
let fullContent = '';
response.data.on('data', (chunk) => {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content;
if (content) {
fullContent += content;
onChunk(content);
}
} catch (e) {
// 忽略解析错误
}
}
}
});
return new Promise((resolve, reject) => {
response.data.on('end', () => resolve(fullContent));
response.data.on('error', reject);
});
} catch (error) {
console.error('流式调用失败:', error.response?.data || error.message);
throw error;
}
}
}
// 使用示例
async function main() {
const client = new AutoNavClient('ak-your32characterapikeyhere');
const messages = [
{ role: 'user', content: '请介绍一下JavaScript的异步编程' }
];
try {
// 普通调用
const result = await client.chatCompletion('your_app_id', messages, {
temperature: 0.7,
max_tokens: 1000
});
console.log('AI回复:', result.choices[0].message.content);
console.log('Token使用:', result.usage);
// 流式调用
console.log('\n流式响应:');
await client.streamChatCompletion('your_app_id', messages, (chunk) => {
process.stdout.write(chunk);
});
} catch (error) {
console.error('调用失败:', error.message);
}
}
main();
2. 浏览器端调用
class AutoNavWebClient {
constructor(apiKey, baseUrl = 'https://your-domain.com/v1') {
this.apiKey = apiKey;
this.baseUrl = baseUrl;
}
async chatCompletion(appId, messages, options = {}) {
const url = `${this.baseUrl}/autobox/${appId}/sync`;
const payload = {
messages,
model: 'gpt-3.5-turbo',
...options
};
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(payload)
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || '请求失败');
}
return await response.json();
} catch (error) {
console.error('API调用失败:', error.message);
throw error;
}
}
async streamChatCompletion(appId, messages, onChunk, options = {}) {
const url = `${this.baseUrl}/autobox/${appId}/sync`;
const payload = {
messages,
model: 'gpt-3.5-turbo',
stream: true,
...options
};
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(payload)
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || '请求失败');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let fullContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return fullContent;
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content;
if (content) {
fullContent += content;
onChunk(content);
}
} catch (e) {
// 忽略解析错误
}
}
}
}
return fullContent;
} catch (error) {
console.error('流式调用失败:', error.message);
throw error;
}
}
}
// 使用示例
const client = new AutoNavWebClient('ak-your32characterapikeyhere');
document.getElementById('sendButton').addEventListener('click', async () => {
const input = document.getElementById('messageInput');
const output = document.getElementById('output');
const messages = [
{ role: 'user', content: input.value }
];
try {
output.innerHTML = '正在思考...';
// 流式显示结果
let content = '';
await client.streamChatCompletion('your_app_id', messages, (chunk) => {
content += chunk;
output.innerHTML = content;
});
} catch (error) {
output.innerHTML = `错误: ${error.message}`;
}
});
Java 示例
import java.io.*;
import java.net.http.*;
import java.net.URI;
import java.time.Duration;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
public class AutoNavClient {
private final String apiKey;
private final String baseUrl;
private final HttpClient httpClient;
private final ObjectMapper objectMapper;
public AutoNavClient(String apiKey, String baseUrl) {
this.apiKey = apiKey;
this.baseUrl = baseUrl != null ? baseUrl : "https://your-domain.com/v1";
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(30))
.build();
this.objectMapper = new ObjectMapper();
}
public JsonNode chatCompletion(String appId, JsonNode messages, String model, double temperature, int maxTokens)
throws IOException, InterruptedException {
String url = baseUrl + "/autobox/" + appId + "/sync";
// 构建请求体
String requestBody = String.format("""
{
"messages": %s,
"model": "%s",
"temperature": %f,
"max_tokens": %d,
"stream": false
}
""", messages.toString(), model, temperature, maxTokens);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.header("Authorization", "Bearer " + apiKey)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
throw new RuntimeException("API调用失败: " + response.statusCode() + " - " + response.body());
}
return objectMapper.readTree(response.body());
}
// 使用示例
public static void main(String[] args) {
try {
AutoNavClient client = new AutoNavClient("ak-your32characterapikeyhere", null);
// 构建消息
ObjectMapper mapper = new ObjectMapper();
JsonNode messages = mapper.readTree("""
[
{"role": "user", "content": "请用Java写一个简单的HTTP客户端"}
]
""");
JsonNode result = client.chatCompletion("your_app_id", messages, "gpt-3.5-turbo", 0.7, 1000);
System.out.println("AI回复: " + result.get("choices").get(0).get("message").get("content").asText());
System.out.println("Token使用: " + result.get("usage"));
} catch (Exception e) {
System.err.println("调用失败: " + e.getMessage());
e.printStackTrace();
}
}
}
PHP 示例
<?php
class AutoNavClient {
private $apiKey;
private $baseUrl;
public function __construct($apiKey, $baseUrl = 'https://your-domain.com/v1') {
$this->apiKey = $apiKey;
$this->baseUrl = $baseUrl;
}
public function chatCompletion($appId, $messages, $options = []) {
$url = $this->baseUrl . "/autobox/{$appId}/sync";
$payload = array_merge([
'messages' => $messages,
'model' => 'gpt-3.5-turbo',
'stream' => false
], $options);
$headers = [
'Authorization: Bearer ' . $this->apiKey,
'Content-Type: application/json'
];
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => json_encode($payload),
CURLOPT_HTTPHEADER => $headers,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 60,
CURLOPT_SSL_VERIFYPEER => false
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$error = curl_error($ch);
curl_close($ch);
if ($error) {
throw new Exception("CURL错误: " . $error);
}
if ($httpCode !== 200) {
throw new Exception("API调用失败: HTTP {$httpCode} - {$response}");
}
return json_decode($response, true);
}
public function asyncTask($appId, $messages, $options = []) {
$url = $this->baseUrl . "/autobox/{$appId}/async";
$payload = array_merge([
'messages' => $messages,
'model' => 'gpt-4'
], $options);
$headers = [
'Authorization: Bearer ' . $this->apiKey,
'Content-Type: application/json'
];
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => json_encode($payload),
CURLOPT_HTTPHEADER => $headers,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 30
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200) {
throw new Exception("异步任务提交失败: HTTP {$httpCode} - {$response}");
}
return json_decode($response, true);
}
}
// 使用示例
try {
$client = new AutoNavClient('ak-your32characterapikeyhere');
$messages = [
['role' => 'user', 'content' => '请用PHP写一个简单的API客户端']
];
// 同步调用
$result = $client->chatCompletion('your_app_id', $messages, [
'temperature' => 0.7,
'max_tokens' => 1000
]);
echo "AI回复: " . $result['choices'][0]['message']['content'] . "\n";
echo "Token使用: " . json_encode($result['usage']) . "\n";
// 异步调用
$asyncResult = $client->asyncTask('your_app_id', $messages);
echo "异步任务已提交: " . $asyncResult['task_number'] . "\n";
} catch (Exception $e) {
echo "调用失败: " . $e->getMessage() . "\n";
}
?>
常见错误处理示例
1. 认证错误
{
"detail": "api_key验证失败:该APIKEY不存在,验证失败"
}
处理方式:
try:
result = client.chat_completion(app_id, messages)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
print("API_KEY无效,请检查密钥是否正确")
elif e.response.status_code == 403:
print("访问被拒绝,请检查账户状态")
2. 配额不足错误
{
"detail": "配额不足,无法继续调用"
}
处理方式:
try:
result = client.chat_completion(app_id, messages)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
error_detail = e.response.json().get('detail', '')
if '配额不足' in error_detail:
print("配额已用完,请充值后继续使用")
else:
print("请求过于频繁,请稍后重试")
3. 参数验证错误
{
"detail": [
{
"loc": ["body", "temperature"],
"msg": "ensure this value is less than or equal to 2",
"type": "value_error.number.not_le"
}
]
}
处理方式:
try:
result = client.chat_completion(app_id, messages, temperature=3.0) # 错误的参数
except requests.exceptions.HTTPError as e:
if e.response.status_code == 422:
errors = e.response.json().get('detail', [])
for error in errors:
field = error.get('loc', [])[-1]
message = error.get('msg', '')
print(f"参数 {field} 错误: {message}")
性能优化建议
1. 连接复用
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 使用session进行请求
response = session.post(url, headers=headers, json=payload)
2. 异步处理
import asyncio
import aiohttp
async def async_chat_completion(session, app_id, messages):
url = f"https://your-domain.com/v1/autobox/{app_id}/sync"
payload = {
"messages": messages,
"model": "gpt-3.5-turbo"
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async with session.post(url, json=payload, headers=headers) as response:
return await response.json()
async def batch_requests():
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(10):
messages = [{"role": "user", "content": f"请求 {i}"}]
task = async_chat_completion(session, "your_app_id", messages)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
# 运行批量请求
results = asyncio.run(batch_requests())
这些示例涵盖了 AutoBoxAPI 的主要使用场景和编程语言实现。根据您的具体需求,可以参考相应的示例进行集成和开发。
