import requests
import json
import datetime
import time
from typing import Dict, Any, Optional
class MCPHiveProgressUpdater:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_status_update(
self,
message: str,
state: str,
is_final: bool = False,
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""Send a status update to MCP Hive"""
event = {
"kind": "status-update",
"final": is_final,
"metadata": metadata or {},
"status": {
"message": {
"parts": [
{
"kind": "text",
"text": message
}
]
},
"state": state,
"timestamp": datetime.datetime.utcnow().isoformat() + "Z"
}
}
try:
response = requests.post(
self.webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(event),
timeout=30
)
return response.status_code == 200
except Exception as e:
print(f"Failed to send status update: {e}")
return False
def send_artifact(
self,
name: str,
parts: list,
description: Optional[str] = None,
is_last_chunk: bool = True,
append: bool = False,
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""Send an artifact update to MCP Hive"""
artifact = {
"name": name,
"description": description,
"parts": parts
}
if metadata:
artifact["metadata"] = metadata
event = {
"kind": "artifact-update",
"append": append,
"lastChunk": is_last_chunk,
"artifact": artifact
}
try:
response = requests.post(
self.webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(event),
timeout=30
)
return response.status_code == 200
except Exception as e:
print(f"Failed to send artifact: {e}")
return False
# Usage Example
def process_large_dataset(data_file: str, webhook_url: str):
"""Example long-running data processing function"""
updater = MCPHiveProgressUpdater(webhook_url)
# Initial status
updater.send_status_update("Starting data processing...", "working")
try:
# Simulate processing with progress updates
total_records = 10000
batch_size = 1000
batches = total_records // batch_size
for i in range(batches):
# Your actual processing logic here
time.sleep(2) # Simulate work
# Send progress update
progress = (i + 1) / batches * 100
updater.send_status_update(
f"Processed {(i+1)*batch_size} of {total_records} records ({progress:.1f}%)",
"working",
metadata={
"progress": progress,
"recordsProcessed": (i+1)*batch_size,
"totalRecords": total_records
}
)
# Send results as artifact
results = {
"summary": {
"totalProcessed": total_records,
"anomaliesDetected": 42,
"processingTime": 20.5
},
"findings": [
{"kind": "anomaly", "severity": "high", "count": 15},
{"kind": "warning", "severity": "medium", "count": 27}
]
}
updater.send_artifact(
name="Processing Results",
description=f"Analysis results for {data_file}",
parts=[
{
"kind": "text",
"text": f"Successfully processed {total_records} records. Found 42 anomalies."
},
{
"kind": "data",
"data": results
}
]
)
# Final status
updater.send_status_update(
"Data processing completed successfully",
"completed",
is_final=True,
metadata={"totalProcessingTime": 20.5}
)
except Exception as e:
# Handle errors
updater.send_status_update(
f"Processing failed: {str(e)}",
"failed",
is_final=True,
metadata={"error": str(e)}
)