Build AI agents using OpenAI's Realtime API
In this tutorial, we will show you how to build an app in Databutton that is leveraging OpenAI's Realtime API.
The Realtime API is a websocket API and not easy to setup directly from the front-end. Instead, we will proxy the API through the Databutton backend.
First, make a new UI component called RealtimeAPI and paste in the following code: ...
import { Button } from "@/components/ui/button"; import { Switch } from "@/components/ui/switch"; import { useToast } from "@/hooks/use-toast"; import { cn } from "@/lib/utils"; import brain from "brain"; import { Bug, Mic, MicOff, Power } from "lucide-react"; import { useCallback } from "react"; import React, { useEffect, useRef, useState, useImperativeHandle } from "react"; import { ErrorBoundary } from "react-error-boundary"; class AudioQueueManager { private audioQueue: Int16Array[] = []; private isPlaying: boolean = false; private pitchFactor: number = 1.0; private sampleRate: number; private worker: Worker | null = null; private fftSize: number = 1024; private fftAnalyser: AnalyserNode; private noiseGateThreshold: number; private crossfadeDuration: number; private lowPassFilterCutoff: number; constructor( sampleRate: number = 24000, noiseGateThreshold: number = 0.001, crossfadeDuration: number = 0.05, lowPassFilterCutoff: number = 0.3, ) { this.sampleRate = sampleRate; this.noiseGateThreshold = noiseGateThreshold; this.crossfadeDuration = crossfadeDuration; this.lowPassFilterCutoff = lowPassFilterCutoff; const audioContext = new ( window.AudioContext || (window as any).webkitAudioContext )(); this.fftAnalyser = audioContext.createAnalyser(); this.fftAnalyser.fftSize = this.fftSize; } setNoiseGateThreshold(threshold: number): void { this.noiseGateThreshold = threshold; } setCrossfadeDuration(duration: number): void { this.crossfadeDuration = duration; } setLowPassFilterCutoff(cutoff: number): void { this.lowPassFilterCutoff = cutoff; } setWorker(worker: Worker) { this.worker = worker; } isQueueEmpty(): boolean { return this.audioQueue.length === 0; } getQueueLength(): number { return this.audioQueue.length; } setPitchFactor(factor: number): void { this.pitchFactor = factor; } addAudioToQueue(audioData: Int16Array): void { this.audioQueue.push(audioData); this.performFFT(audioData); this.playNext(); } private performFFT(audioData: Int16Array): void { const bufferLength = this.fftAnalyser.frequencyBinCount; const dataArray = new Uint8Array(bufferLength); this.fftAnalyser.getByteFrequencyData(dataArray); if (this.worker) { this.worker.postMessage({ type: "fftResult", data: { fftResult: Array.from(dataArray) }, }); } } private async playNext(): Promise<void> { if (this.isPlaying || this.audioQueue.length === 0) return; this.isPlaying = true; const audioData = this.audioQueue.shift(); if (audioData) { await this.playAudio(audioData); } this.isPlaying = false; this.playNext(); } private applyNoiseGate(audioData: Float32Array): Float32Array { return audioData.map((sample) => Math.abs(sample) < this.noiseGateThreshold ? 0 : sample, ); } private applyLowPassFilter(audioData: Float32Array): Float32Array { let lastOutput = 0; return audioData.map((sample) => { lastOutput = lastOutput + this.lowPassFilterCutoff * (sample - lastOutput); return lastOutput; }); } private playAudio(audioBuffer: Int16Array): Promise<void> { return new Promise((resolve) => { const audioContext = new ( (window.AudioContext || (window as any).webkitAudioContext) as typeof AudioContext )({ sampleRate: this.sampleRate, }); const float32Array = new Float32Array(audioBuffer.length); for (let i = 0; i < audioBuffer.length; i++) { float32Array[i] = audioBuffer[i] / 0x7fff; } // Apply noise gate and low-pass filter const processedAudio = this.applyLowPassFilter( this.applyNoiseGate(float32Array), ); const audioBufferObj = audioContext.createBuffer( 1, processedAudio.length, audioContext.sampleRate, ); audioBufferObj.copyToChannel(processedAudio, 0); const source = audioContext.createBufferSource(); source.buffer = audioBufferObj; source.playbackRate.value = this.pitchFactor; // Implement crossfading const gainNode = audioContext.createGain(); gainNode.gain.setValueAtTime(0, audioContext.currentTime); gainNode.gain.linearRampToValueAtTime( 1, audioContext.currentTime + this.crossfadeDuration, ); gainNode.gain.setValueAtTime( 1, audioContext.currentTime + audioBufferObj.duration - this.crossfadeDuration, ); gainNode.gain.linearRampToValueAtTime( 0, audioContext.currentTime + audioBufferObj.duration, ); source.connect(gainNode); gainNode.connect(audioContext.destination); // Send playback data to Web Worker in smaller chunks const chunkSize = 1024; for (let i = 0; i < processedAudio.length; i += chunkSize) { const chunk = processedAudio.slice(i, i + chunkSize); this.sendPlaybackDataToWorker(chunk); } source.onended = () => { resolve(); }; source.start(0); }); } private sendPlaybackDataToWorker(data: Float32Array): void { if (this.worker) { this.worker.postMessage({ type: "playbackChunk", data: data, }); } else { console.warn("Worker not set in AudioQueueManager"); } } } const SAMPLE_RATE = 24000; // Add this function at the top level of your component file const jsonReplacer = (key: string, value: any) => { if (typeof value === "bigint") { return value.toString(); } return value; }; const defaultTools = [ { type: "function", name: "fetch_latest_messages", description: "Fetch the latest messages from the webhook data", parameters: { type: "object", properties: { n: { type: "integer", description: "Number of latest messages to fetch", }, }, required: ["n"], }, }, ]; interface ContentItem { type: string; text?: string; audio?: string; } interface OutputItem { content: ContentItem[]; } const base64ToUint8Array = (base64: string): Uint8Array => { const binaryString = atob(base64); const uint8Array = new Uint8Array(binaryString.length); for (let i = 0; i < binaryString.length; i++) { uint8Array[i] = binaryString.charCodeAt(i); } return uint8Array; }; const int16ArrayToFloat32Array = (int16Array: Int16Array): Float32Array => { const float32Array = new Float32Array(int16Array.length); for (let i = 0; i < int16Array.length; i++) { float32Array[i] = int16Array[i] / 32768.0; } return float32Array; }; export type Props = { websocketUrl: string; instructions?: string; tools?: Array<{ type: string; function: { name: string; description: string; parameters: { type: string; properties: Record<string, unknown>; required: string[]; }; }; }>; ref?: React.Ref<{ toggleMic: () => void; connect: () => void; disconnect: () => void; }>; }; export const RealtimeAPI = React.forwardRef< { toggleMic: () => void; connect: () => void; disconnect: () => void; }, Props >( ( { websocketUrl, instructions = "You are a helpful assistant that helps users with their messages and how to follow. Make sure to not read entire message unless explicitely asked so. Give brief overviews and key points. Suggest action items and follow-ups. Don't spend a lot of time reading up all content.", tools = defaultTools, }, ref, ) => { const [isMicrophoneOn, setIsMicrophoneOn] = useState(false); const [wsStatus, setWsStatus] = useState< "disconnected" | "connecting" | "connected" >("disconnected"); const [isProcessing, setIsProcessing] = useState(false); const [transcription, setTranscription] = useState(""); const [response, setResponse] = useState(""); const [isLoading, setIsLoading] = useState(false); const [currentFunctionName, setCurrentFunctionName] = useState< string | null >(null); const [currentCallId, setCurrentCallId] = useState<string | null>(null); const [fftData, setFftData] = useState<Uint8Array | null>(null); const [isMicLoading, setIsMicLoading] = useState(false); const analyserRef = useRef<AnalyserNode | null>(null); // Use a ref to store the accumulated function call buffer // This ensures we have the most up-to-date data when processing function call arguments const functionCallBufferRef = useRef(""); const websocketRef = useRef<WebSocket | null>(null); const audioContextRef = useRef<AudioContext | null>(null); const audioWorkletNodeRef = useRef<AudioWorkletNode>(); const mediaStreamRef = useRef<MediaStream>(); const workerRef = useRef<Worker | null>(null); const [debugMode, setDebugMode] = useState(false); const [audioQueueManager, setAudioQueueManager] = useState<AudioQueueManager | null>(null); const { toast } = useToast(); useEffect(() => { const newAudioQueueManager = new AudioQueueManager(SAMPLE_RATE); setAudioQueueManager(newAudioQueueManager); // Initialize Web Worker const worker = new Worker( URL.createObjectURL( new Blob( [ ` let DEBUG = false; // Set to false to disable detailed logging function log(...args) { if (DEBUG) { console.log(...args); } } self.onmessage = function(e) { const { type, data } = e.data; if (type === 'audioChunk') { const { chunk } = data; if (!chunk || chunk.length === 0) { log('Received empty or invalid chunk'); self.postMessage({ type: 'processedAudio', base64Chunk: '' }); return; } const pcmData = new Int16Array(chunk.length); for (let i = 0; i < chunk.length; i++) { const s = Math.max(-1, Math.min(1, chunk[i])); pcmData[i] = s < 0 ? s * 0x8000 : s * 0x7fff; } const uint8Array = new Uint8Array(pcmData.buffer); const base64Chunk = btoa(String.fromCharCode.apply(null, uint8Array)); log('Processed audioChunk, base64Chunk length:', base64Chunk.length); self.postMessage({ type: 'processedAudio', base64Chunk }); } else if (type === 'setDebug') { DEBUG = data; log('Debug mode set to:', DEBUG); } } `, ], { type: "application/javascript" }, ), ), ); if (worker) { workerRef.current = worker; newAudioQueueManager.setWorker(worker); worker.onmessage = (e) => { const { type, data } = e.data; if (type === "fftResult") { if (data && data.fftResult) { console.log( "Received FFT result, length:", data.fftResult.length, ); setFftData(new Uint8Array(data.fftResult)); } else { console.error( "Received fftResult message with invalid or missing data:", data, ); toast({ title: "Error", description: "Received invalid FFT data", variant: "destructive", }); } } else if (type === "error") { console.error("Error in Web Worker:", data.message, data.stack); toast({ title: "Error", description: `An error occurred while processing audio data: ${data.message}`, variant: "destructive", }); } }; } else { console.error("Failed to initialize Web Worker"); toast({ title: "Error", description: "Failed to initialize audio processing. Please try again.", variant: "destructive", }); } return () => { stopMicrophone(); // Cleanup Web Worker if (workerRef.current) { workerRef.current.terminate(); workerRef.current = null; } // Cleanup WebSocket if (websocketRef.current) { websocketRef.current.close(); websocketRef.current = null; } }; }, [toast]); useEffect(() => { if (wsStatus === "disconnected") { websocketRef.current = null; } }, [wsStatus]); const setupAudioProcessing = async () => { console.log("Setting up audio processing..."); if (audioContextRef.current) { console.log("Audio context already exists. Reusing..."); return { stream: mediaStreamRef.current!, audioContext: audioContextRef.current, source: audioContextRef.current.createMediaStreamSource( mediaStreamRef.current!, ), workletNode: audioWorkletNodeRef.current!, analyser: analyserRef.current!, }; } const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); const audioContext = new AudioContext({ sampleRate: SAMPLE_RATE }); const source = audioContext.createMediaStreamSource(stream); console.log("Created new AudioContext"); // Create and connect the AnalyserNode const analyser = audioContext.createAnalyser(); analyser.fftSize = 2048; analyser.smoothingTimeConstant = 0.1; source.connect(analyser); // Create and load the AudioWorklet await audioContext.audioWorklet.addModule( URL.createObjectURL( new Blob( [ ` class AudioProcessor extends AudioWorkletProcessor { constructor() { super(); this.chunks = []; } process(inputs, outputs, parameters) { const input = inputs[0]; if (input.length > 0) { const inputData = input[0]; this.chunks.push(new Float32Array(inputData)); if (this.chunks.length >= 10) { // Send chunks every 10 frames this.port.postMessage(this.chunks); this.chunks = []; } } return true; } } registerProcessor('audio-processor', AudioProcessor); `, ], { type: "application/javascript" }, ), ), ); const workletNode = new AudioWorkletNode(audioContext, "audio-processor"); workletNode.port.onmessage = (event) => { const chunks: Float32Array[] = event.data; chunks.forEach((chunk) => sendAudioChunk(chunk)); }; source.connect(workletNode); workletNode.connect(audioContext.destination); console.log("Audio processing setup complete"); return { stream, audioContext, source, workletNode, analyser }; }; const sendAudioChunk = (chunk: Float32Array) => { if (websocketRef.current?.readyState === WebSocket.OPEN) { // Convert Float32Array to Int16Array const pcmData = new Int16Array(chunk.length); for (let i = 0; i < chunk.length; i++) { const s = Math.max(-1, Math.min(1, chunk[i])); pcmData[i] = s < 0 ? s * 0x8000 : s * 0x7fff; } // Convert Int16Array to Uint8Array const uint8Array = new Uint8Array(pcmData.buffer); // Convert to base64 const base64Chunk = btoa(String.fromCharCode.apply(null, uint8Array)); websocketRef.current.send( JSON.stringify({ event_id: `event_${Date.now()}`, type: "input_audio_buffer.append", audio: base64Chunk, }), ); } }; useEffect(() => { if (isMicrophoneOn) { console.log( "Microphone is on, FFT data will be updated through Web Worker", ); } }, [isMicrophoneOn]); const startMicrophone = useCallback(async () => { try { const { stream, audioContext, source, workletNode, analyser } = await setupAudioProcessing(); if (audioQueueManager && workerRef.current) { audioQueueManager.setWorker(workerRef.current); } if (!mediaStreamRef.current) { mediaStreamRef.current = stream; } if (!audioContextRef.current) { audioContextRef.current = audioContext; } if (!audioWorkletNodeRef.current) { audioWorkletNodeRef.current = workletNode; } if (!analyserRef.current) { analyserRef.current = analyser; } setIsMicrophoneOn(true); console.log("Microphone started successfully"); } catch (error) { console.error("Error setting up audio:", error); toast({ title: "Error", description: (error as Error).message || "Failed to set up audio processing.", variant: "destructive", }); } }, [toast]); const stopMicrophone = useCallback(() => { console.log("Stopping microphone..."); if (audioWorkletNodeRef.current) { audioWorkletNodeRef.current.disconnect(); audioWorkletNodeRef.current = undefined; } if (audioContextRef.current) { audioContextRef.current.close(); audioContextRef.current = null; } if (mediaStreamRef.current) { mediaStreamRef.current.getTracks().forEach((track) => track.stop()); mediaStreamRef.current = undefined; } setIsMicrophoneOn(false); console.log("Microphone stopped"); // Send end of stream message // if (websocketRef.current) { // websocketRef.current.send( // JSON.stringify({ // event_id: `event_${Date.now()}`, // type: "input_audio_buffer.end", // }), // ); // } }, []); const connectWebSocket = () => { if (websocketRef.current?.readyState === WebSocket.OPEN) { console.log("WebSocket is already connected"); return; } console.log("Attempting to connect to WebSocket:", websocketUrl); setIsLoading(true); setWsStatus("connecting"); websocketRef.current = new WebSocket(websocketUrl); websocketRef.current.onopen = () => { setWsStatus("connected"); setIsLoading(false); console.log("WebSocket connection established"); // Send the new message structure to the server if (websocketRef.current?.readyState === WebSocket.OPEN) { websocketRef.current.send( JSON.stringify({ event_id: `event_${Date.now()}`, type: "session.update", session: { instructions: instructions, tools: tools, }, }), ); } toast({ title: "Connected", description: "WebSocket connection established.", }); }; websocketRef.current.onclose = (event) => { setWsStatus("disconnected"); setIsLoading(false); console.log("WebSocket connection closed", event); toast({ title: "Disconnected", description: "WebSocket connection closed.", }); }; websocketRef.current.onerror = (error) => { console.error("WebSocket error:", error); setIsLoading(false); toast({ title: "Error", description: "WebSocket connection error. Please try reconnecting.", variant: "destructive", }); }; websocketRef.current.onmessage = (event) => { try { const data = JSON.parse(event.data); console.log("Received message:", data); // Handle different message types switch (data.type) { case "conversation.item.created": if (data.item?.type === "function_call") { setCurrentFunctionName(data.item.name); setCurrentCallId(data.item.call_id); console.log(`Function call started: ${JSON.stringify(data)}`); } break; case "response.function_call_arguments.delta": console.log("Received delta:", data.delta); functionCallBufferRef.current += data.delta; console.log( "Updated functionCallBuffer:", functionCallBufferRef.current, ); break; case "response.function_call_arguments.done": { console.log( "Final functionCallBuffer:", functionCallBufferRef.current, ); try { const functionArgs = JSON.parse(functionCallBufferRef.current); console.log(`Calling function: ${currentFunctionName}`); console.log(`Arguments: ${JSON.stringify(functionArgs)}`); // Call the appropriate function fetchLatestMessages(functionArgs.n); } catch (error) { console.error("Error parsing function call arguments:", error); console.error( "Function call buffer content:", functionCallBufferRef.current, ); toast({ title: "Error", description: "Failed to process function call arguments.", variant: "destructive", }); } // Reset function call related states setCurrentFunctionName(null); functionCallBufferRef.current = ""; break; } case "response.content_part.done": // Update transcription for text or audio content handleContentPart(data.part); break; case "response.output_item.done": // Process audio content and update response handleOutputItem(data.item); break; case "response.audio.delta": // Process audio delta and add to queue handleAudioDelta(data.delta); break; case "response.audio.done": console.log("Audio response completed:", data); break; case "response.done": // Mark processing as complete handleResponseDone(); break; default: console.log("Unhandled message type:", data.type); } } catch (error) { console.error("Error processing WebSocket message:", error); toast({ title: "Error", description: "Failed to process server message.", variant: "destructive", }); } }; }; const fetchLatestMessages = async (n: number) => { try { const response = await brain.get_latest_messages({ n }); const messages = await response.json(); console.log(`Fetched ${messages.length} latest messages:`, messages); // Log the result console.log("THE RESULT::", messages); // Format the result as specified const formatted_result = { type: "conversation.item.create", item: { type: "function_call_output", call_id: currentCallId, output: JSON.stringify(messages, jsonReplacer), }, }; // Send the formatted result back through the WebSocket if (websocketRef.current?.readyState === WebSocket.OPEN) { websocketRef.current.send(JSON.stringify(formatted_result)); } // Send the function call result if (websocketRef.current?.readyState === WebSocket.OPEN) { websocketRef.current.send( JSON.stringify({ event_id: `event_${Date.now()}`, type: "function_call_result", result: messages, }), ); } // Reset the currentCallId setCurrentCallId(null); } catch (error) { console.error("Error fetching latest messages:", error); toast({ title: "Error", description: "Failed to fetch latest messages.", variant: "destructive", }); } }; // Helper functions for message handling const handleContentPart = (part: { type: string; text?: string; transcript?: string; }) => { console.log("Processing content part:", part); if (part.type === "text" && part.text) { console.log("Updating transcription with text:", part.text); setTranscription((prev) => prev + part.text); } else if (part.type === "audio" && part.transcript) { console.log( "Updating transcription with audio transcript:", part.transcript, ); setTranscription(part.transcript); } }; const handleOutputItem = (item: OutputItem) => { console.log("Processing output item:", item); const audioContent = item.content.find( (c: ContentItem) => c.type === "audio", ); if (audioContent && audioContent.audio) { processAudioContent(audioContent.audio); } else { console.log("No audio content found in the output item."); } // Update response state with text content const textContent = item.content.find( (c: ContentItem) => c.type === "text", ); if (textContent && textContent.text) { setResponse((prev) => prev + textContent.text); } }; const processAudioContent = (audioData: string) => { console.log("Processing audio content, length:", audioData.length); try { const audioArray = base64ToUint8Array(audioData); console.log("Created Uint8Array, length:", audioArray.length); const int16Array = new Int16Array(audioArray.buffer); if (audioQueueManager) { audioQueueManager.addAudioToQueue(int16Array); } else { throw new Error("AudioQueueManager is not initialized"); } } catch (error) { console.error("Error processing audio data:", error); if (error instanceof Error) { console.error("Error details:", error.message); console.error("Error stack:", error.stack); } toast({ title: "Audio Processing Error", description: "Failed to process audio data. Please try again.", variant: "destructive", }); } }; const handleAudioDelta = (delta: string) => { console.log("Received audio delta, length:", delta ? delta.length : 0); if (delta && delta.length > 0) { try { const audioArray = base64ToUint8Array(delta); console.log( "Created Uint8Array from delta, length:", audioArray.length, ); const int16Array = new Int16Array(audioArray.buffer); if (audioQueueManager) { audioQueueManager.addAudioToQueue(int16Array); console.log("Added audio to queue, length:", int16Array.length); } else { console.warn( "AudioQueueManager is not initialized. Skipping audio chunk.", ); } } catch (error) { console.error("Error processing audio delta data:", error); toast({ title: "Error", description: "Failed to process audio data. Please try again.", variant: "destructive", }); } } else { console.log("Received empty audio delta. Skipping processing."); } }; const handleResponseDone = () => { console.log("Response completed"); setIsProcessing(false); setTranscription((prev) => prev + "\n[Response completed]"); }; const disconnectWebSocket = () => { setIsLoading(true); if (websocketRef.current) { websocketRef.current.close(); } setWsStatus("disconnected"); setIsLoading(false); toast({ title: "Disconnected", description: "WebSocket connection closed manually.", }); }; const toggleMicrophone = useCallback(async () => { setIsMicLoading(true); try { if (isMicrophoneOn) { await stopMicrophone(); } else { await startMicrophone(); } } catch (error) { console.error("Error toggling microphone:", error); toast({ title: "Error", description: "Failed to toggle microphone. Please try again.", variant: "destructive", }); } finally { setIsMicLoading(false); } }, [isMicrophoneOn, startMicrophone, stopMicrophone, toast]); const toggleDebugMode = () => { setDebugMode(!debugMode); if (workerRef.current) { workerRef.current.postMessage({ type: "setDebug", data: !debugMode }); } }; useImperativeHandle(ref, () => ({ toggleMic: toggleMicrophone, connect: connectWebSocket, disconnect: disconnectWebSocket, toggleDebug: toggleDebugMode, })); const MicrophoneButton = () => ( <Button onClick={toggleMicrophone} disabled={isMicLoading} className="w-16 h-16 rounded-full bg-primary hover:bg-primary/90 text-primary-foreground" > {isMicLoading ? ( <span className="loading loading-spinner loading-md"></span> ) : isMicrophoneOn ? ( <MicOff className="h-8 w-8" /> ) : ( <Mic className="h-8 w-8" /> )} </Button> ); const ConnectionToggle = () => ( <div className="flex items-center space-x-2"> <Switch checked={wsStatus === "connected"} onCheckedChange={() => { if (wsStatus === "connected") { disconnectWebSocket(); } else { connectWebSocket(); } }} /> <Power className={cn( "h-4 w-4", wsStatus === "connected" ? "text-green-500" : "text-gray-400", )} /> </div> ); const DebugToggle = () => ( <Button onClick={toggleDebugMode} className={cn( "w-8 h-8 rounded-full", debugMode ? "bg-yellow-500 hover:bg-yellow-600" : "bg-gray-300 hover:bg-gray-400", )} > <Bug className="h-4 w-4" /> </Button> ); return ( <div className="flex flex-col items-center space-y-4 p-4 bg-background rounded-lg shadow-md"> <ErrorBoundary fallback={<div>Something went wrong. Please refresh the page.</div>} onError={(error, errorInfo) => { console.error("Error in RealtimeAPI component:", error, errorInfo); toast({ title: "Error", description: "An unexpected error occurred. Please try again.", variant: "destructive", }); }} > <div className="flex justify-between items-center w-full"> <span className="text-sm font-medium"> {wsStatus === "connected" ? "Connected" : wsStatus === "connecting" ? "Connecting..." : "Disconnected"} </span> <ConnectionToggle /> </div> <MicrophoneButton /> <div className="flex items-center space-x-2"> <span className="text-sm font-medium"> Microphone: {isMicrophoneOn ? "On" : "Off"} </span> </div> </ErrorBoundary> </div> ); }, ); RealtimeAPI.displayName = "RealtimeAPI";
Create a new API called realtime_api and paste in the following
import asyncio import json from json import JSONEncoder import websockets import databutton as db from fastapi import APIRouter, WebSocket, HTTPException, Request import logging from datetime import datetime, timedelta import pandas as pd from pydantic import BaseModel logging.basicConfig(level=logging.INFO) router = APIRouter() OPENAI_API_KEY = db.secrets.get("OPENAI_API_KEY") class WebhookData(BaseModel): message: str sender: str date: str def store_webhook_data(data: WebhookData): df = db.storage.dataframes.get( "webhook_data", default=pd.DataFrame(columns=["message", "sender", "date", "added"]), ) new_row = data.dict() new_row["added"] = datetime.now() df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) db.storage.dataframes.put("webhook_data", df) class CustomJSONEncoder(JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.isoformat() return JSONEncoder.default(self, obj) def fetch_latest_messages(n: int): """Fetch the latest N messages from the webhook-data table.""" df = db.storage.dataframes.get("webhook_data") df["added"] = pd.to_datetime(df["added"]) sorted_df = df.sort_values("added", ascending=False) latest_messages = sorted_df.head(n).to_dict("records") for message in latest_messages: message["added"] = message["added"].isoformat() return latest_messages def find_messages_by_date_range(start_date: str, end_date: str): """Find messages within a specific date range from the webhook-data table.""" df = db.storage.dataframes.get("webhook_data") df["added"] = pd.to_datetime(df["added"]) start = pd.to_datetime(start_date) end = pd.to_datetime(end_date) filtered_df = df[(df["added"] >= start) & (df["added"] <= end)] result = filtered_df.to_dict("records") for message in result: message["added"] = message["added"].isoformat() return result async def connect_to_openai(): url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01" headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "OpenAI-Beta": "realtime=v1", } return await websockets.connect(url, extra_headers=headers) async def forward(source, destination): function_call_buffer = "" current_function_name = None try: while True: if isinstance(source, WebSocket): message = await source.receive_text() else: message = await source.recv() try: json_message = json.loads(message) if ( json_message["type"] == "conversation.item.created" and json_message["item"]["type"] == "function_call" ): current_function_name = json_message["item"]["name"] call_id = json_message["item"]["call_id"] logging.info(f"Function call started: {json_message}") elif json_message["type"] == "response.function_call_arguments.delta": function_call_buffer += json_message["delta"] elif json_message["type"] == "response.function_call_arguments.done": # Parse the complete function call arguments function_args = json.loads(function_call_buffer) logging.info(f"Calling function: {current_function_name}") logging.info(f"Arguments: {function_args}") # Call the appropriate function if current_function_name == "fetch_latest_messages": result = fetch_latest_messages(function_args["n"]) elif current_function_name == "find_messages_by_date_range": result = find_messages_by_date_range(function_args["start_date"], function_args["end_date"]) else: raise ValueError(f"Unknown function: {current_function_name}") print("THE RESULT::", result) # Format the result as specified formatted_result = { "type": "conversation.item.create", "item": { "type": "function_call_output", "call_id": call_id, "output": json.dumps(result, cls=CustomJSONEncoder), }, } # Send the formatted result back to the client if isinstance(source, WebSocket): await source.send_text(json.dumps(formatted_result)) else: await source.send(json.dumps(formatted_result)) await source.send(json.dumps({"type": "response.create"})) # Reset the function call buffer and current function name function_call_buffer = "" current_function_name = None # Don't forward function call messages to the client continue # Forward other messages as usual if isinstance(destination, WebSocket): await destination.send_text(json.dumps(json_message)) else: await destination.send(json.dumps(json_message)) except json.JSONDecodeError: # If it's not valid JSON, send it as plain text if isinstance(destination, WebSocket): await destination.send_text(message) else: await destination.send(message) except Exception as e: logging.error(f"Error processing message: {str(e)}\nMessage: {message}") # Continue processing other messages even if one fails continue except websockets.exceptions.ConnectionClosed: logging.info("Connection closed normally") except Exception as e: logging.error(f"Error in forwarding: {str(e)}") @router.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: openai_ws = await connect_to_openai() # Create tasks for handling messages in both directions client_to_openai = asyncio.create_task(forward(websocket, openai_ws)) openai_to_client = asyncio.create_task(forward(openai_ws, websocket)) # Wait for either connection to close done, pending = await asyncio.wait([client_to_openai, openai_to_client], return_when=asyncio.FIRST_COMPLETED) # Cancel pending task for task in pending: task.cancel() except websockets.exceptions.InvalidStatusCode as e: logging.error(f"Failed to connect to OpenAI: {str(e)}") await websocket.send_text(json.dumps({"type": "error", "message": "Failed to connect to OpenAI"})) await websocket.close(code=1008, reason="Failed to connect to OpenAI") except Exception as e: logging.error(f"Error in WebSocket connection: {str(e)}") await websocket.send_text(json.dumps({"type": "error", "message": "Unexpected error occurred"})) await websocket.close(code=1011, reason="Unexpected error occurred") finally: if "openai_ws" in locals(): await openai_ws.close() @router.post("/webhook") async def webhook(request: Request): try: data = await request.json() webhook_data = WebhookData( message=data.get("message"), sender=data.get("sender"), date=data.get("date"), ) store_webhook_data(webhook_data) return {"status": "success", "message": "Data stored successfully"} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @router.get("/fetch_latest_messages") async def get_latest_messages(n: int = 10): try: messages = fetch_latest_messages(n) return {"status": "success", "messages": messages} except Exception as e: raise HTTPException(status_code=500, detail=str(e))
Create a page where you use the Realtime API. Here is an example Home page.
import { Button } from "@/components/ui/button"; import { Card } from "@/components/ui/card"; import { useTheme } from "@/hooks/use-theme"; import { API_URL } from "app"; import { RealtimeAPI } from "components/RealtimeAPI"; import { MoonIcon, SunIcon } from "lucide-react"; import React from "react"; export default function App() { const websocketUrl = `${API_URL.replace(/^http/, "ws")}/ws`; const { theme, setTheme } = useTheme(); const toggleTheme = () => { setTheme(theme === "light" ? "dark" : "light"); }; return ( <div className="min-h-screen bg-background text-foreground flex flex-col items-center justify-center p-4"> <Card className="w-full max-w-3xl bg-card text-card-foreground p-8 rounded-2xl shadow-lg border-2 border-primary"> <div className="flex justify-between items-center mb-8"> <h1 className="text-4xl font-bold bg-gradient-to-r from-primary to-accent bg-clip-text text-transparent"> OpenAI Realtime Chat </h1> <Button variant="outline" size="icon" onClick={toggleTheme} className="rounded-full p-2 hover:bg-muted" > {theme === "light" ? ( <MoonIcon className="h-6 w-6" /> ) : ( <SunIcon className="h-6 w-6" /> )} </Button> </div> <div className="bg-muted p-6 rounded-xl"> <p className="text-lg mb-4"> Welcome to your AI-powered chat assistant. Click the button to start speaking! </p> <RealtimeAPI websocketUrl={websocketUrl} /> </div> <footer className="mt-8 text-center text-sm text-muted-foreground"> <p>Powered by OpenAI and Zapier โข Your data is securely processed</p> </footer> </Card> </div> ); }
Last updated