Build AI agents using OpenAI's Realtime API

In this tutorial, we will show you how to build an app in Databutton that is leveraging OpenAI's Realtime API.

The Realtime API is a websocket API and not easy to setup directly from the front-end. Instead, we will proxy the API through the Databutton backend.

  1. First, make a new UI component called RealtimeAPI and paste in the following code: ...

    declare global {
      interface Window {
        webkitAudioContext: typeof AudioContext;
      }
    }
    
    import AudioQueueManager from "@/components/AudioQueueManager";
    type AudioQueueManagerType = {
      addAudioToQueue: (content: Int16Array) => void;
      isQueueEmpty: () => boolean;
      getQueueLength: () => number;
      // Note: dequeueAudio method is not available in the actual implementation
    };
    import { Button } from "@/components/ui/button";
    import { Switch } from "@/components/ui/switch";
    import { useToast } from "@/hooks/use-toast";
    import { cn } from "@/lib/utils";
    import { Loader2, Mic, MicOff, Power } from "lucide-react";
    import React, {
      useCallback,
      useEffect,
      useRef,
      useState,
      useImperativeHandle,
    } from "react";
    
    const SAMPLE_RATE = 24000;
    
    interface ContentItem {
      type: string;
      text?: string;
      audio?: string;
    }
    
    interface OutputItem {
      content: ContentItem[];
    }
    
    const base64ToUint8Array = (base64: string): Uint8Array => {
      const binaryString = atob(base64);
      const uint8Array = new Uint8Array(binaryString.length);
      for (let i = 0; i < binaryString.length; i++) {
        uint8Array[i] = binaryString.charCodeAt(i);
      }
      return uint8Array;
    };
    
    export type Props = {
      websocketUrl: string;
      instructions?: string;
      tools?: any[];
      ref?: React.Ref<{
        toggleMic: () => void;
        connect: () => void;
        disconnect: () => void;
      }>;
    };
    
    export const RealtimeAPI = React.forwardRef<
      {
        toggleMic: () => void;
        connect: () => void;
        disconnect: () => void;
      },
      Props
    >(
      (
        { websocketUrl, instructions = "You are a helpful assistant", tools = [] },
        ref,
      ) => {
        const [isMicrophoneOn, setIsMicrophoneOn] = useState(false);
        const [wsStatus, setWsStatus] = useState<
          "disconnected" | "connecting" | "connected"
        >("disconnected");
        const [isProcessing, setIsProcessing] = useState(false);
        const [transcription, setTranscription] = useState("");
        const [response, setResponse] = useState("");
        const [isPlaying, setIsPlaying] = useState(false);
        const [isLoading, setIsLoading] = useState(false);
        const websocketRef = useRef<WebSocket | null>(null);
        const audioContextRef = useRef<AudioContext | null>(null);
        const audioBufferSourceRef = useRef<AudioBufferSourceNode | null>(null);
        const audioWorkletNodeRef = useRef<AudioWorkletNode>();
        const mediaStreamRef = useRef<MediaStream>();
        const [audioQueueManager, setAudioQueueManager] =
          useState<AudioQueueManager | null>(null);
        const { toast } = useToast();
    
        useEffect(() => {
          setAudioQueueManager(new AudioQueueManager(SAMPLE_RATE));
          return () => {
            stopMicrophone();
            //disconnectWebSocket();
          };
        }, []);
    
        useEffect(() => {
          if (wsStatus === "disconnected") {
            websocketRef.current = null;
          }
        }, [wsStatus]);
    
        const setupAudioProcessing = async () => {
          const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
          const audioContext = new AudioContext({ sampleRate: SAMPLE_RATE });
          const source = audioContext.createMediaStreamSource(stream);
    
          // Create and load the AudioWorklet
          await audioContext.audioWorklet.addModule(
            URL.createObjectURL(
              new Blob(
                [
                  `
          class AudioProcessor extends AudioWorkletProcessor {
            constructor() {
              super();
              this.chunks = [];
            }
    
            process(inputs, outputs, parameters) {
              const input = inputs[0];
              if (input.length > 0) {
                const inputData = input[0];
                this.chunks.push(new Float32Array(inputData));
                if (this.chunks.length >= 10) { // Send chunks every 10 frames
                  this.port.postMessage(this.chunks);
                  this.chunks = [];
                }
              }
              return true;
            }
          }
    
          registerProcessor('audio-processor', AudioProcessor);
        `,
                ],
                { type: "application/javascript" },
              ),
            ),
          );
    
          const workletNode = new AudioWorkletNode(audioContext, "audio-processor");
    
          workletNode.port.onmessage = (event) => {
            const chunks: Float32Array[] = event.data;
            chunks.forEach((chunk) => sendAudioChunk(chunk));
          };
    
          source.connect(workletNode);
          workletNode.connect(audioContext.destination);
    
          return { stream, audioContext, source, workletNode };
        };
    
        const sendAudioChunk = (chunk: Float32Array) => {
          if (websocketRef.current?.readyState === WebSocket.OPEN) {
            // Convert Float32Array to Int16Array
            const pcmData = new Int16Array(chunk.length);
            for (let i = 0; i < chunk.length; i++) {
              const s = Math.max(-1, Math.min(1, chunk[i]));
              pcmData[i] = s < 0 ? s * 0x8000 : s * 0x7fff;
            }
    
            // Convert Int16Array to Uint8Array
            const uint8Array = new Uint8Array(pcmData.buffer);
    
            // Convert to base64
            const base64Chunk = btoa(String.fromCharCode.apply(null, uint8Array));
    
            websocketRef.current.send(
              JSON.stringify({
                event_id: `event_${Date.now()}`,
                type: "input_audio_buffer.append",
                audio: base64Chunk,
              }),
            );
          }
        };
    
        const startMicrophone = useCallback(async () => {
          try {
            const { stream, audioContext, source, workletNode } =
              await setupAudioProcessing();
            mediaStreamRef.current = stream;
            audioContextRef.current = audioContext;
            audioWorkletNodeRef.current = workletNode;
            setIsMicrophoneOn(true);
          } catch (error) {
            console.error("Error setting up audio:", error);
            toast({
              title: "Error",
              description:
                (error as Error).message || "Failed to set up audio processing.",
              variant: "destructive",
            });
          }
        }, [toast]);
    
        const stopMicrophone = useCallback(() => {
          if (audioWorkletNodeRef.current) {
            audioWorkletNodeRef.current.disconnect();
            audioWorkletNodeRef.current = undefined;
          }
          if (audioContextRef.current) {
            audioContextRef.current.close();
            audioContextRef.current = null;
          }
          if (mediaStreamRef.current) {
            mediaStreamRef.current.getTracks().forEach((track) => track.stop());
            mediaStreamRef.current = undefined;
          }
          setIsMicrophoneOn(false);
    
          // Send end of stream message
          // if (websocketRef.current) {
          //   websocketRef.current.send(
          //     JSON.stringify({
          //       event_id: `event_${Date.now()}`,
          //       type: "input_audio_buffer.end",
          //     }),
          //   );
          // }
        }, []);
    
        const connectWebSocket = () => {
          if (websocketRef.current?.readyState === WebSocket.OPEN) {
            console.log("WebSocket is already connected");
            return;
          }
          console.log("Attempting to connect to WebSocket:", websocketUrl);
          setIsLoading(true);
          setWsStatus("connecting");
          websocketRef.current = new WebSocket(websocketUrl);
    
          websocketRef.current.onopen = () => {
            setWsStatus("connected");
            setIsLoading(false);
            console.log("WebSocket connection established");
            // Send the new message structure to the server
            if (websocketRef.current?.readyState === WebSocket.OPEN) {
              websocketRef.current.send(
                JSON.stringify({
                  event_id: `event_${Date.now()}`,
                  session: {type: "session.update",
                            config: {
                    instruction: instructions,
                    tools: tools,
                    temperature: 0.7,
                    top_p: 0.95,
                    top_k: 50,
                    max_tokens: 1024,
                    presence_penalty: 0,
                    frequency_penalty: 0,
                  }},
                }),
              );
            }
            toast({
              title: "Connected",
              description: "WebSocket connection established.",
            });
          };
    
          websocketRef.current.onclose = (event) => {
            setWsStatus("disconnected");
            setIsLoading(false);
            console.log("WebSocket connection closed", event);
            toast({
              title: "Disconnected",
              description: "WebSocket connection closed.",
            });
          };
    
          websocketRef.current.onerror = (error) => {
            console.error("WebSocket error:", error);
            setIsLoading(false);
            toast({
              title: "Error",
              description: "WebSocket connection error. Please try reconnecting.",
              variant: "destructive",
            });
          };
    
          websocketRef.current.onmessage = (event) => {
            try {
              const data = JSON.parse(event.data);
              console.log("Received message:", data);
    
              // Handle different message types
              switch (data.type) {
                case "response.content_part.done":
                  // Update transcription for text or audio content
                  handleContentPart(data.part);
                  break;
                case "response.output_item.done":
                  // Process audio content and update response
                  handleOutputItem(data.item);
                  break;
                case "response.audio.delta":
                  // Process audio delta and add to queue
                  handleAudioDelta(data.delta);
                  break;
                case "response.audio.done":
                  console.log("Audio response completed:", data);
                  break;
                case "response.done":
                  // Mark processing as complete
                  handleResponseDone();
                  break;
                default:
                  console.log("Unhandled message type:", data.type);
              }
            } catch (error) {
              console.error("Error processing WebSocket message:", error);
              toast({
                title: "Error",
                description: "Failed to process server message.",
                variant: "destructive",
              });
            }
          };
        };
    
        // Helper functions for message handling
        const handleContentPart = (part: {
          type: string;
          text?: string;
          transcript?: string;
        }) => {
          console.log("Processing content part:", part);
          if (part.type === "text" && part.text) {
            console.log("Updating transcription with text:", part.text);
            setTranscription((prev) => prev + part.text);
          } else if (part.type === "audio" && part.transcript) {
            console.log(
              "Updating transcription with audio transcript:",
              part.transcript,
            );
            setTranscription(part.transcript);
          }
        };
    
        const handleOutputItem = (item: OutputItem) => {
          console.log("Processing output item:", item);
          const audioContent = item.content.find(
            (c: ContentItem) => c.type === "audio",
          );
          if (audioContent && audioContent.audio) {
            processAudioContent(audioContent.audio);
          } else {
            console.log("No audio content found in the output item.");
          }
          // Update response state with text content
          const textContent = item.content.find(
            (c: ContentItem) => c.type === "text",
          );
          if (textContent && textContent.text) {
            setResponse((prev) => prev + textContent.text);
          }
        };
    
        const processAudioContent = (audioData: string) => {
          console.log("Processing audio content, length:", audioData.length);
          try {
            const audioArray = base64ToUint8Array(audioData);
            console.log("Created Uint8Array, length:", audioArray.length);
            const int16Array = new Int16Array(audioArray.buffer);
            if (audioQueueManager) {
              audioQueueManager.addAudioToQueue(int16Array);
            } else {
              throw new Error("AudioQueueManager is not initialized");
            }
          } catch (error) {
            console.error("Error processing audio data:", error);
            if (error instanceof Error) {
              console.error("Error details:", error.message);
              console.error("Error stack:", error.stack);
            }
            toast({
              title: "Audio Processing Error",
              description: "Failed to process audio data. Please try again.",
              variant: "destructive",
            });
          }
        };
    
        const handleAudioDelta = (delta: string) => {
          console.log("Processing audio delta");
          if (delta) {
            try {
              const audioArray = base64ToUint8Array(delta);
              console.log(
                "Created Uint8Array from delta, length:",
                audioArray.length,
              );
              const int16Array = new Int16Array(audioArray.buffer);
              if (audioQueueManager) {
                audioQueueManager.addAudioToQueue(int16Array);
              } else {
                console.warn(
                  "AudioQueueManager is not initialized. Skipping audio chunk.",
                );
              }
            } catch (error) {
              console.error("Error processing audio delta data:", error);
            }
          } else {
            console.log("No audio data found in the audio delta.");
          }
        };
    
        const handleResponseDone = () => {
          console.log("Response completed");
          setIsProcessing(false);
          setTranscription((prev) => prev + "\n[Response completed]");
        };
    
        const disconnectWebSocket = () => {
          setIsLoading(true);
          if (websocketRef.current) {
            websocketRef.current.close();
          }
          setWsStatus("disconnected");
          setIsLoading(false);
          toast({
            title: "Disconnected",
            description: "WebSocket connection closed manually.",
          });
        };
    
        const toggleMicrophone = useCallback(() => {
          if (isMicrophoneOn) {
            stopMicrophone();
            setIsMicrophoneOn(false);
          } else {
            startMicrophone();
            setIsMicrophoneOn(true);
          }
        }, [isMicrophoneOn, startMicrophone, stopMicrophone]);
    
        useImperativeHandle(ref, () => ({
          toggleMic: toggleMicrophone,
          connect: connectWebSocket,
          disconnect: disconnectWebSocket,
        }));
    
        const playResponseAudio = useCallback(() => {
          if (isPlaying) {
            setIsPlaying(false);
            if (audioContextRef.current) {
              audioContextRef.current.close();
              audioContextRef.current = null;
            }
          } else {
            setIsPlaying(true);
            // Implement a polling mechanism to check for new audio data and play it
            const playAudio = async () => {
              const AudioContextClass =
                window.AudioContext || window.webkitAudioContext;
              audioContextRef.current = new AudioContextClass();
    
              // Function to play a single chunk of audio
              const playChunk = async (audioData: Int16Array) => {
                const audioBuffer = audioContextRef.current!.createBuffer(
                  1,
                  audioData.length,
                  SAMPLE_RATE,
                );
                audioBuffer.getChannelData(0).set(audioData);
    
                const source = audioContextRef.current!.createBufferSource();
                source.buffer = audioBuffer;
                source.connect(audioContextRef.current!.destination);
                source.start();
    
                await new Promise((resolve) => {
                  source.onended = resolve;
                });
              };
    
              // Polling loop to check for new audio data
              while (audioQueueManager && isPlaying) {
                if (!audioQueueManager.isQueueEmpty()) {
                  // Note: We don't have direct access to the audio data, so we're simulating it
                  // In a real scenario, you'd need a method to retrieve the audio data from the queue
                  const simulatedAudioData = new Int16Array(1024).fill(0);
                  await playChunk(simulatedAudioData);
                } else {
                  // Wait a short time before checking again
                  await new Promise((resolve) => setTimeout(resolve, 100));
                }
              }
    
              if (audioContextRef.current) {
                audioContextRef.current.close();
                audioContextRef.current = null;
              }
              setIsPlaying(false);
            };
    
            playAudio();
          }
        }, [isPlaying, audioQueueManager]);
    
        const MicrophoneButton = () => (
          <Button
            onClick={toggleMicrophone}
            className="w-16 h-16 rounded-full bg-primary hover:bg-primary/90 text-primary-foreground"
          >
            {isMicrophoneOn ? (
              <MicOff className="h-8 w-8" />
            ) : (
              <Mic className="h-8 w-8" />
            )}
          </Button>
        );
    
        const ConnectionToggle = () => (
          <div className="flex items-center space-x-2">
            <Switch
              checked={wsStatus === "connected"}
              onCheckedChange={() => {
                if (wsStatus === "connected") {
                  disconnectWebSocket();
                } else {
                  connectWebSocket();
                }
              }}
            />
            <Power
              className={cn(
                "h-4 w-4",
                wsStatus === "connected" ? "text-green-500" : "text-gray-400",
              )}
            />
          </div>
        );
    
        return (
          <div className="flex flex-col items-center space-y-4 p-4 bg-background rounded-lg shadow-md">
            <div className="flex justify-between items-center w-full">
              <span className="text-sm font-medium">
                {wsStatus === "connected"
                  ? "Connected"
                  : wsStatus === "connecting"
                    ? "Connecting..."
                    : "Disconnected"}
              </span>
              <ConnectionToggle />
            </div>
            <MicrophoneButton />
            <div className="flex items-center space-x-2">
              <span className="text-sm font-medium">
                Microphone: {isMicrophoneOn ? "On" : "Off"}
              </span>
              {isProcessing && (
                <Loader2 className="h-4 w-4 animate-spin text-primary" />
              )}
            </div>
          </div>
        );
      },
    );
    
    RealtimeAPI.displayName = "RealtimeAPI";
    

  1. Create a new API called realtime_api and paste in the following

    import asyncio
    import json
    from json import JSONEncoder
    import websockets
    import databutton as db
    from fastapi import APIRouter, WebSocket, HTTPException
    import logging
    from datetime import datetime, timedelta
    import pandas as pd
    
    logging.basicConfig(level=logging.INFO)
    
    router = APIRouter()
    
    OPENAI_API_KEY = db.secrets.get("OPENAI_API_KEY")
    
    
    class CustomJSONEncoder(JSONEncoder):
        def default(self, obj):
            if isinstance(obj, datetime):
                return obj.isoformat()
            return JSONEncoder.default(self, obj)
    
    
    def fetch_latest_messages(n: int):
        """Fetch the latest N messages from the webhook-data table."""
        df = db.storage.dataframes.get("webhook_data")
        df["added"] = pd.to_datetime(df["added"])
        sorted_df = df.sort_values("added", ascending=False)
        latest_messages = sorted_df.head(n).to_dict("records")
        for message in latest_messages:
            message["added"] = message["added"].isoformat()
        return latest_messages
    
    
    def find_messages_by_date_range(start_date: str, end_date: str):
        """Find messages within a specific date range from the webhook-data table."""
        df = db.storage.dataframes.get("webhook_data")
        df["added"] = pd.to_datetime(df["added"])
        start = pd.to_datetime(start_date)
        end = pd.to_datetime(end_date)
        filtered_df = df[(df["added"] >= start) & (df["added"] <= end)]
        result = filtered_df.to_dict("records")
        for message in result:
            message["added"] = message["added"].isoformat()
        return result
    
    
    async def connect_to_openai():
        url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
        headers = {
            "Authorization": f"Bearer {OPENAI_API_KEY}",
            "OpenAI-Beta": "realtime=v1",
        }
        return await websockets.connect(url, extra_headers=headers)
    
    
    async def forward(source, destination):
        try:
            while True:
                if isinstance(source, WebSocket):
                    message = await source.receive_text()
                else:
                    message = await source.recv()
    
                try:
                    json_message = json.loads(message)
    
                    # Forward other messages as usual
                    if isinstance(destination, WebSocket):
                        await destination.send_text(json.dumps(json_message))
                    else:
                        await destination.send(json.dumps(json_message))
    
                except json.JSONDecodeError:
                    # If it's not valid JSON, send it as plain text
                    if isinstance(destination, WebSocket):
                        await destination.send_text(message)
                    else:
                        await destination.send(message)
                except Exception as e:
                    logging.error(f"Error processing message: {str(e)}\nMessage: {message}")
                    # Continue processing other messages even if one fails
                    continue
        except websockets.exceptions.ConnectionClosed:
            logging.info("Connection closed normally")
        except Exception as e:
            logging.error(f"Error in forwarding: {str(e)}")
    
    
    @router.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
        await websocket.accept()
    
        try:
            openai_ws = await connect_to_openai()
    
            # Create tasks for handling messages in both directions
            client_to_openai = asyncio.create_task(forward(websocket, openai_ws))
            openai_to_client = asyncio.create_task(forward(openai_ws, websocket))
    
            # Wait for either connection to close
            done, pending = await asyncio.wait([client_to_openai, openai_to_client], return_when=asyncio.FIRST_COMPLETED)
    
            # Cancel pending task
            for task in pending:
                task.cancel()
        except websockets.exceptions.InvalidStatusCode as e:
            logging.error(f"Failed to connect to OpenAI: {str(e)}")
            await websocket.send_text(json.dumps({"type": "error", "message": "Failed to connect to OpenAI"}))
            await websocket.close(code=1008, reason="Failed to connect to OpenAI")
        except Exception as e:
            logging.error(f"Error in WebSocket connection: {str(e)}")
            await websocket.send_text(json.dumps({"type": "error", "message": "Unexpected error occurred"}))
            await websocket.close(code=1011, reason="Unexpected error occurred")
        finally:
            if "openai_ws" in locals():
                await openai_ws.close()

Last updated