Build AI agents using OpenAI's Realtime API

In this tutorial, we will show you how to build an app in Databutton that is leveraging OpenAI's Realtime API.

The Realtime API is a websocket API and not easy to setup directly from the front-end. Instead, we will proxy the API through the Databutton backend.

  1. First, make a new UI component called RealtimeAPI and paste in the following code: ...

    import { Button } from "@/components/ui/button";
    import { Switch } from "@/components/ui/switch";
    import { useToast } from "@/hooks/use-toast";
    import { cn } from "@/lib/utils";
    import brain from "brain";
    import { Bug, Mic, MicOff, Power } from "lucide-react";
    import { useCallback } from "react";
    import React, { useEffect, useRef, useState, useImperativeHandle } from "react";
    import { ErrorBoundary } from "react-error-boundary";
    
    class AudioQueueManager {
      private audioQueue: Int16Array[] = [];
      private isPlaying: boolean = false;
      private pitchFactor: number = 1.0;
      private sampleRate: number;
      private worker: Worker | null = null;
      private fftSize: number = 1024;
      private fftAnalyser: AnalyserNode;
      private noiseGateThreshold: number;
      private crossfadeDuration: number;
      private lowPassFilterCutoff: number;
    
      constructor(
        sampleRate: number = 24000,
        noiseGateThreshold: number = 0.001,
        crossfadeDuration: number = 0.05,
        lowPassFilterCutoff: number = 0.3,
      ) {
        this.sampleRate = sampleRate;
        this.noiseGateThreshold = noiseGateThreshold;
        this.crossfadeDuration = crossfadeDuration;
        this.lowPassFilterCutoff = lowPassFilterCutoff;
        const audioContext = new (
          window.AudioContext || (window as any).webkitAudioContext
        )();
        this.fftAnalyser = audioContext.createAnalyser();
        this.fftAnalyser.fftSize = this.fftSize;
      }
    
      setNoiseGateThreshold(threshold: number): void {
        this.noiseGateThreshold = threshold;
      }
    
      setCrossfadeDuration(duration: number): void {
        this.crossfadeDuration = duration;
      }
    
      setLowPassFilterCutoff(cutoff: number): void {
        this.lowPassFilterCutoff = cutoff;
      }
      setWorker(worker: Worker) {
        this.worker = worker;
      }
      isQueueEmpty(): boolean {
        return this.audioQueue.length === 0;
      }
    
      getQueueLength(): number {
        return this.audioQueue.length;
      }
    
      setPitchFactor(factor: number): void {
        this.pitchFactor = factor;
      }
    
      addAudioToQueue(audioData: Int16Array): void {
        this.audioQueue.push(audioData);
        this.performFFT(audioData);
        this.playNext();
      }
    
      private performFFT(audioData: Int16Array): void {
        const bufferLength = this.fftAnalyser.frequencyBinCount;
        const dataArray = new Uint8Array(bufferLength);
    
        this.fftAnalyser.getByteFrequencyData(dataArray);
    
        if (this.worker) {
          this.worker.postMessage({
            type: "fftResult",
            data: { fftResult: Array.from(dataArray) },
          });
        }
      }
      private async playNext(): Promise<void> {
        if (this.isPlaying || this.audioQueue.length === 0) return;
    
        this.isPlaying = true;
    
        const audioData = this.audioQueue.shift();
        if (audioData) {
          await this.playAudio(audioData);
        }
    
        this.isPlaying = false;
        this.playNext();
      }
    
      private applyNoiseGate(audioData: Float32Array): Float32Array {
        return audioData.map((sample) =>
          Math.abs(sample) < this.noiseGateThreshold ? 0 : sample,
        );
      }
    
      private applyLowPassFilter(audioData: Float32Array): Float32Array {
        let lastOutput = 0;
        return audioData.map((sample) => {
          lastOutput =
            lastOutput + this.lowPassFilterCutoff * (sample - lastOutput);
          return lastOutput;
        });
      }
    
      private playAudio(audioBuffer: Int16Array): Promise<void> {
        return new Promise((resolve) => {
          const audioContext = new (
            (window.AudioContext ||
              (window as any).webkitAudioContext) as typeof AudioContext
          )({
            sampleRate: this.sampleRate,
          });
    
          const float32Array = new Float32Array(audioBuffer.length);
          for (let i = 0; i < audioBuffer.length; i++) {
            float32Array[i] = audioBuffer[i] / 0x7fff;
          }
    
          // Apply noise gate and low-pass filter
          const processedAudio = this.applyLowPassFilter(
            this.applyNoiseGate(float32Array),
          );
    
          const audioBufferObj = audioContext.createBuffer(
            1,
            processedAudio.length,
            audioContext.sampleRate,
          );
          audioBufferObj.copyToChannel(processedAudio, 0);
    
          const source = audioContext.createBufferSource();
          source.buffer = audioBufferObj;
          source.playbackRate.value = this.pitchFactor;
    
          // Implement crossfading
          const gainNode = audioContext.createGain();
          gainNode.gain.setValueAtTime(0, audioContext.currentTime);
          gainNode.gain.linearRampToValueAtTime(
            1,
            audioContext.currentTime + this.crossfadeDuration,
          );
          gainNode.gain.setValueAtTime(
            1,
            audioContext.currentTime +
            audioBufferObj.duration -
            this.crossfadeDuration,
          );
          gainNode.gain.linearRampToValueAtTime(
            0,
            audioContext.currentTime + audioBufferObj.duration,
          );
    
          source.connect(gainNode);
          gainNode.connect(audioContext.destination);
    
          // Send playback data to Web Worker in smaller chunks
          const chunkSize = 1024;
          for (let i = 0; i < processedAudio.length; i += chunkSize) {
            const chunk = processedAudio.slice(i, i + chunkSize);
            this.sendPlaybackDataToWorker(chunk);
          }
    
          source.onended = () => {
            resolve();
          };
    
          source.start(0);
        });
      }
    
      private sendPlaybackDataToWorker(data: Float32Array): void {
        if (this.worker) {
          this.worker.postMessage({
            type: "playbackChunk",
            data: data,
          });
        } else {
          console.warn("Worker not set in AudioQueueManager");
        }
      }
    }
    
    const SAMPLE_RATE = 24000;
    
    // Add this function at the top level of your component file
    const jsonReplacer = (key: string, value: any) => {
      if (typeof value === "bigint") {
        return value.toString();
      }
      return value;
    };
    
    const defaultTools = [
      {
        type: "function",
        name: "fetch_latest_messages",
        description: "Fetch the latest messages from the webhook data",
        parameters: {
          type: "object",
          properties: {
            n: {
              type: "integer",
              description: "Number of latest messages to fetch",
            },
          },
          required: ["n"],
        },
      },
    ];
    
    interface ContentItem {
      type: string;
      text?: string;
      audio?: string;
    }
    
    interface OutputItem {
      content: ContentItem[];
    }
    
    const base64ToUint8Array = (base64: string): Uint8Array => {
      const binaryString = atob(base64);
      const uint8Array = new Uint8Array(binaryString.length);
      for (let i = 0; i < binaryString.length; i++) {
        uint8Array[i] = binaryString.charCodeAt(i);
      }
      return uint8Array;
    };
    
    const int16ArrayToFloat32Array = (int16Array: Int16Array): Float32Array => {
      const float32Array = new Float32Array(int16Array.length);
      for (let i = 0; i < int16Array.length; i++) {
        float32Array[i] = int16Array[i] / 32768.0;
      }
      return float32Array;
    };
    
    export type Props = {
      websocketUrl: string;
      instructions?: string;
      tools?: Array<{
        type: string;
        function: {
          name: string;
          description: string;
          parameters: {
            type: string;
            properties: Record<string, unknown>;
            required: string[];
          };
        };
      }>;
      ref?: React.Ref<{
        toggleMic: () => void;
        connect: () => void;
        disconnect: () => void;
      }>;
    };
    
    export const RealtimeAPI = React.forwardRef<
      {
        toggleMic: () => void;
        connect: () => void;
        disconnect: () => void;
      },
      Props
    >(
      (
        {
          websocketUrl,
          instructions = "You are a helpful assistant that helps users with their messages and how to follow. Make sure to not read entire message unless explicitely asked so. Give brief overviews and key points. Suggest action items and follow-ups. Don't spend a lot of time reading up all content.",
          tools = defaultTools,
        },
        ref,
      ) => {
        const [isMicrophoneOn, setIsMicrophoneOn] = useState(false);
        const [wsStatus, setWsStatus] = useState<
          "disconnected" | "connecting" | "connected"
        >("disconnected");
        const [isProcessing, setIsProcessing] = useState(false);
        const [transcription, setTranscription] = useState("");
        const [response, setResponse] = useState("");
        const [isLoading, setIsLoading] = useState(false);
        const [currentFunctionName, setCurrentFunctionName] = useState<
          string | null
        >(null);
        const [currentCallId, setCurrentCallId] = useState<string | null>(null);
        const [fftData, setFftData] = useState<Uint8Array | null>(null);
        const [isMicLoading, setIsMicLoading] = useState(false);
        const analyserRef = useRef<AnalyserNode | null>(null);
        // Use a ref to store the accumulated function call buffer
        // This ensures we have the most up-to-date data when processing function call arguments
        const functionCallBufferRef = useRef("");
        const websocketRef = useRef<WebSocket | null>(null);
        const audioContextRef = useRef<AudioContext | null>(null);
        const audioWorkletNodeRef = useRef<AudioWorkletNode>();
        const mediaStreamRef = useRef<MediaStream>();
        const workerRef = useRef<Worker | null>(null);
        const [debugMode, setDebugMode] = useState(false);
        const [audioQueueManager, setAudioQueueManager] =
          useState<AudioQueueManager | null>(null);
        const { toast } = useToast();
    
        useEffect(() => {
          const newAudioQueueManager = new AudioQueueManager(SAMPLE_RATE);
          setAudioQueueManager(newAudioQueueManager);
          // Initialize Web Worker
          const worker = new Worker(
            URL.createObjectURL(
              new Blob(
                [
                  `
                  let DEBUG = false; // Set to false to disable detailed logging
    
                  function log(...args) {
                    if (DEBUG) {
                      console.log(...args);
                    }
                  }
    
                  self.onmessage = function(e) {
                    const { type, data } = e.data;
                    if (type === 'audioChunk') {
                      const { chunk } = data;
                      if (!chunk || chunk.length === 0) {
                        log('Received empty or invalid chunk');
                        self.postMessage({ type: 'processedAudio', base64Chunk: '' });
                        return;
                      }
                      const pcmData = new Int16Array(chunk.length);
                      for (let i = 0; i < chunk.length; i++) {
                        const s = Math.max(-1, Math.min(1, chunk[i]));
                        pcmData[i] = s < 0 ? s * 0x8000 : s * 0x7fff;
                      }
                      const uint8Array = new Uint8Array(pcmData.buffer);
                      const base64Chunk = btoa(String.fromCharCode.apply(null, uint8Array));
                      log('Processed audioChunk, base64Chunk length:', base64Chunk.length);
                      self.postMessage({ type: 'processedAudio', base64Chunk });
                    } else if (type === 'setDebug') {
                      DEBUG = data;
                      log('Debug mode set to:', DEBUG);
                    }
                  }
    
                  `,
                ],
                { type: "application/javascript" },
              ),
            ),
          );
    
          if (worker) {
            workerRef.current = worker;
            newAudioQueueManager.setWorker(worker);
    
            worker.onmessage = (e) => {
              const { type, data } = e.data;
              if (type === "fftResult") {
                if (data && data.fftResult) {
                  console.log(
                    "Received FFT result, length:",
                    data.fftResult.length,
                  );
                  setFftData(new Uint8Array(data.fftResult));
                } else {
                  console.error(
                    "Received fftResult message with invalid or missing data:",
                    data,
                  );
                  toast({
                    title: "Error",
                    description: "Received invalid FFT data",
                    variant: "destructive",
                  });
                }
              } else if (type === "error") {
                console.error("Error in Web Worker:", data.message, data.stack);
                toast({
                  title: "Error",
                  description: `An error occurred while processing audio data: ${data.message}`,
                  variant: "destructive",
                });
              }
            };
          } else {
            console.error("Failed to initialize Web Worker");
            toast({
              title: "Error",
              description:
                "Failed to initialize audio processing. Please try again.",
              variant: "destructive",
            });
          }
    
          return () => {
            stopMicrophone();
            // Cleanup Web Worker
            if (workerRef.current) {
              workerRef.current.terminate();
              workerRef.current = null;
            }
            // Cleanup WebSocket
            if (websocketRef.current) {
              websocketRef.current.close();
              websocketRef.current = null;
            }
          };
        }, [toast]);
    
        useEffect(() => {
          if (wsStatus === "disconnected") {
            websocketRef.current = null;
          }
        }, [wsStatus]);
    
        const setupAudioProcessing = async () => {
          console.log("Setting up audio processing...");
          if (audioContextRef.current) {
            console.log("Audio context already exists. Reusing...");
            return {
              stream: mediaStreamRef.current!,
              audioContext: audioContextRef.current,
              source: audioContextRef.current.createMediaStreamSource(
                mediaStreamRef.current!,
              ),
              workletNode: audioWorkletNodeRef.current!,
              analyser: analyserRef.current!,
            };
          }
    
          const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
          const audioContext = new AudioContext({ sampleRate: SAMPLE_RATE });
          const source = audioContext.createMediaStreamSource(stream);
    
          console.log("Created new AudioContext");
          // Create and connect the AnalyserNode
          const analyser = audioContext.createAnalyser();
          analyser.fftSize = 2048;
          analyser.smoothingTimeConstant = 0.1;
          source.connect(analyser);
    
          // Create and load the AudioWorklet
          await audioContext.audioWorklet.addModule(
            URL.createObjectURL(
              new Blob(
                [
                  `
          class AudioProcessor extends AudioWorkletProcessor {
            constructor() {
              super();
              this.chunks = [];
            }
    
            process(inputs, outputs, parameters) {
              const input = inputs[0];
              if (input.length > 0) {
                const inputData = input[0];
                this.chunks.push(new Float32Array(inputData));
                if (this.chunks.length >= 10) { // Send chunks every 10 frames
                  this.port.postMessage(this.chunks);
                  this.chunks = [];
                }
              }
              return true;
            }
          }
    
          registerProcessor('audio-processor', AudioProcessor);
        `,
                ],
                { type: "application/javascript" },
              ),
            ),
          );
    
          const workletNode = new AudioWorkletNode(audioContext, "audio-processor");
    
          workletNode.port.onmessage = (event) => {
            const chunks: Float32Array[] = event.data;
            chunks.forEach((chunk) => sendAudioChunk(chunk));
          };
    
          source.connect(workletNode);
          workletNode.connect(audioContext.destination);
    
          console.log("Audio processing setup complete");
          return { stream, audioContext, source, workletNode, analyser };
        };
    
        const sendAudioChunk = (chunk: Float32Array) => {
          if (websocketRef.current?.readyState === WebSocket.OPEN) {
            // Convert Float32Array to Int16Array
            const pcmData = new Int16Array(chunk.length);
            for (let i = 0; i < chunk.length; i++) {
              const s = Math.max(-1, Math.min(1, chunk[i]));
              pcmData[i] = s < 0 ? s * 0x8000 : s * 0x7fff;
            }
    
            // Convert Int16Array to Uint8Array
            const uint8Array = new Uint8Array(pcmData.buffer);
    
            // Convert to base64
            const base64Chunk = btoa(String.fromCharCode.apply(null, uint8Array));
    
            websocketRef.current.send(
              JSON.stringify({
                event_id: `event_${Date.now()}`,
                type: "input_audio_buffer.append",
                audio: base64Chunk,
              }),
            );
          }
        };
    
        useEffect(() => {
          if (isMicrophoneOn) {
            console.log(
              "Microphone is on, FFT data will be updated through Web Worker",
            );
          }
        }, [isMicrophoneOn]);
    
        const startMicrophone = useCallback(async () => {
          try {
            const { stream, audioContext, source, workletNode, analyser } =
              await setupAudioProcessing();
            if (audioQueueManager && workerRef.current) {
              audioQueueManager.setWorker(workerRef.current);
            }
            if (!mediaStreamRef.current) {
              mediaStreamRef.current = stream;
            }
            if (!audioContextRef.current) {
              audioContextRef.current = audioContext;
            }
            if (!audioWorkletNodeRef.current) {
              audioWorkletNodeRef.current = workletNode;
            }
            if (!analyserRef.current) {
              analyserRef.current = analyser;
            }
            setIsMicrophoneOn(true);
            console.log("Microphone started successfully");
          } catch (error) {
            console.error("Error setting up audio:", error);
            toast({
              title: "Error",
              description:
                (error as Error).message || "Failed to set up audio processing.",
              variant: "destructive",
            });
          }
        }, [toast]);
    
        const stopMicrophone = useCallback(() => {
          console.log("Stopping microphone...");
          if (audioWorkletNodeRef.current) {
            audioWorkletNodeRef.current.disconnect();
            audioWorkletNodeRef.current = undefined;
          }
          if (audioContextRef.current) {
            audioContextRef.current.close();
            audioContextRef.current = null;
          }
          if (mediaStreamRef.current) {
            mediaStreamRef.current.getTracks().forEach((track) => track.stop());
            mediaStreamRef.current = undefined;
          }
          setIsMicrophoneOn(false);
          console.log("Microphone stopped");
    
          // Send end of stream message
          // if (websocketRef.current) {
          //   websocketRef.current.send(
          //     JSON.stringify({
          //       event_id: `event_${Date.now()}`,
          //       type: "input_audio_buffer.end",
          //     }),
          //   );
          // }
        }, []);
    
        const connectWebSocket = () => {
          if (websocketRef.current?.readyState === WebSocket.OPEN) {
            console.log("WebSocket is already connected");
            return;
          }
          console.log("Attempting to connect to WebSocket:", websocketUrl);
          setIsLoading(true);
          setWsStatus("connecting");
          websocketRef.current = new WebSocket(websocketUrl);
    
          websocketRef.current.onopen = () => {
            setWsStatus("connected");
            setIsLoading(false);
            console.log("WebSocket connection established");
            // Send the new message structure to the server
            if (websocketRef.current?.readyState === WebSocket.OPEN) {
              websocketRef.current.send(
                JSON.stringify({
                  event_id: `event_${Date.now()}`,
                  type: "session.update",
                  session: {
                    instructions: instructions,
                    tools: tools,
                  },
                }),
              );
            }
            toast({
              title: "Connected",
              description: "WebSocket connection established.",
            });
          };
    
          websocketRef.current.onclose = (event) => {
            setWsStatus("disconnected");
            setIsLoading(false);
            console.log("WebSocket connection closed", event);
            toast({
              title: "Disconnected",
              description: "WebSocket connection closed.",
            });
          };
    
          websocketRef.current.onerror = (error) => {
            console.error("WebSocket error:", error);
            setIsLoading(false);
            toast({
              title: "Error",
              description: "WebSocket connection error. Please try reconnecting.",
              variant: "destructive",
            });
          };
    
          websocketRef.current.onmessage = (event) => {
            try {
              const data = JSON.parse(event.data);
              console.log("Received message:", data);
    
              // Handle different message types
              switch (data.type) {
                case "conversation.item.created":
                  if (data.item?.type === "function_call") {
                    setCurrentFunctionName(data.item.name);
                    setCurrentCallId(data.item.call_id);
                    console.log(`Function call started: ${JSON.stringify(data)}`);
                  }
                  break;
                case "response.function_call_arguments.delta":
                  console.log("Received delta:", data.delta);
                  functionCallBufferRef.current += data.delta;
                  console.log(
                    "Updated functionCallBuffer:",
                    functionCallBufferRef.current,
                  );
                  break;
                case "response.function_call_arguments.done": {
                  console.log(
                    "Final functionCallBuffer:",
                    functionCallBufferRef.current,
                  );
                  try {
                    const functionArgs = JSON.parse(functionCallBufferRef.current);
                    console.log(`Calling function: ${currentFunctionName}`);
                    console.log(`Arguments: ${JSON.stringify(functionArgs)}`);
    
                    // Call the appropriate function
    
                    fetchLatestMessages(functionArgs.n);
                  } catch (error) {
                    console.error("Error parsing function call arguments:", error);
                    console.error(
                      "Function call buffer content:",
                      functionCallBufferRef.current,
                    );
                    toast({
                      title: "Error",
                      description: "Failed to process function call arguments.",
                      variant: "destructive",
                    });
                  }
    
                  // Reset function call related states
                  setCurrentFunctionName(null);
                  functionCallBufferRef.current = "";
                  break;
                }
                case "response.content_part.done":
                  // Update transcription for text or audio content
                  handleContentPart(data.part);
                  break;
                case "response.output_item.done":
                  // Process audio content and update response
                  handleOutputItem(data.item);
                  break;
                case "response.audio.delta":
                  // Process audio delta and add to queue
                  handleAudioDelta(data.delta);
                  break;
                case "response.audio.done":
                  console.log("Audio response completed:", data);
                  break;
                case "response.done":
                  // Mark processing as complete
                  handleResponseDone();
                  break;
                default:
                  console.log("Unhandled message type:", data.type);
              }
            } catch (error) {
              console.error("Error processing WebSocket message:", error);
              toast({
                title: "Error",
                description: "Failed to process server message.",
                variant: "destructive",
              });
            }
          };
        };
    
        const fetchLatestMessages = async (n: number) => {
          try {
            const response = await brain.get_latest_messages({ n });
            const messages = await response.json();
            console.log(`Fetched ${messages.length} latest messages:`, messages);
    
            // Log the result
            console.log("THE RESULT::", messages);
    
            // Format the result as specified
            const formatted_result = {
              type: "conversation.item.create",
              item: {
                type: "function_call_output",
                call_id: currentCallId,
                output: JSON.stringify(messages, jsonReplacer),
              },
            };
    
            // Send the formatted result back through the WebSocket
            if (websocketRef.current?.readyState === WebSocket.OPEN) {
              websocketRef.current.send(JSON.stringify(formatted_result));
            }
    
            // Send the function call result
            if (websocketRef.current?.readyState === WebSocket.OPEN) {
              websocketRef.current.send(
                JSON.stringify({
                  event_id: `event_${Date.now()}`,
                  type: "function_call_result",
                  result: messages,
                }),
              );
            }
    
            // Reset the currentCallId
            setCurrentCallId(null);
          } catch (error) {
            console.error("Error fetching latest messages:", error);
            toast({
              title: "Error",
              description: "Failed to fetch latest messages.",
              variant: "destructive",
            });
          }
        };
    
        // Helper functions for message handling
        const handleContentPart = (part: {
          type: string;
          text?: string;
          transcript?: string;
        }) => {
          console.log("Processing content part:", part);
          if (part.type === "text" && part.text) {
            console.log("Updating transcription with text:", part.text);
            setTranscription((prev) => prev + part.text);
          } else if (part.type === "audio" && part.transcript) {
            console.log(
              "Updating transcription with audio transcript:",
              part.transcript,
            );
            setTranscription(part.transcript);
          }
        };
    
        const handleOutputItem = (item: OutputItem) => {
          console.log("Processing output item:", item);
          const audioContent = item.content.find(
            (c: ContentItem) => c.type === "audio",
          );
          if (audioContent && audioContent.audio) {
            processAudioContent(audioContent.audio);
          } else {
            console.log("No audio content found in the output item.");
          }
          // Update response state with text content
          const textContent = item.content.find(
            (c: ContentItem) => c.type === "text",
          );
          if (textContent && textContent.text) {
            setResponse((prev) => prev + textContent.text);
          }
        };
    
        const processAudioContent = (audioData: string) => {
          console.log("Processing audio content, length:", audioData.length);
          try {
            const audioArray = base64ToUint8Array(audioData);
            console.log("Created Uint8Array, length:", audioArray.length);
            const int16Array = new Int16Array(audioArray.buffer);
            if (audioQueueManager) {
              audioQueueManager.addAudioToQueue(int16Array);
            } else {
              throw new Error("AudioQueueManager is not initialized");
            }
          } catch (error) {
            console.error("Error processing audio data:", error);
            if (error instanceof Error) {
              console.error("Error details:", error.message);
              console.error("Error stack:", error.stack);
            }
            toast({
              title: "Audio Processing Error",
              description: "Failed to process audio data. Please try again.",
              variant: "destructive",
            });
          }
        };
    
        const handleAudioDelta = (delta: string) => {
          console.log("Received audio delta, length:", delta ? delta.length : 0);
          if (delta && delta.length > 0) {
            try {
              const audioArray = base64ToUint8Array(delta);
              console.log(
                "Created Uint8Array from delta, length:",
                audioArray.length,
              );
              const int16Array = new Int16Array(audioArray.buffer);
              if (audioQueueManager) {
                audioQueueManager.addAudioToQueue(int16Array);
                console.log("Added audio to queue, length:", int16Array.length);
              } else {
                console.warn(
                  "AudioQueueManager is not initialized. Skipping audio chunk.",
                );
              }
            } catch (error) {
              console.error("Error processing audio delta data:", error);
              toast({
                title: "Error",
                description: "Failed to process audio data. Please try again.",
                variant: "destructive",
              });
            }
          } else {
            console.log("Received empty audio delta. Skipping processing.");
          }
        };
    
        const handleResponseDone = () => {
          console.log("Response completed");
          setIsProcessing(false);
          setTranscription((prev) => prev + "\n[Response completed]");
        };
    
        const disconnectWebSocket = () => {
          setIsLoading(true);
          if (websocketRef.current) {
            websocketRef.current.close();
          }
          setWsStatus("disconnected");
          setIsLoading(false);
          toast({
            title: "Disconnected",
            description: "WebSocket connection closed manually.",
          });
        };
    
        const toggleMicrophone = useCallback(async () => {
          setIsMicLoading(true);
          try {
            if (isMicrophoneOn) {
              await stopMicrophone();
            } else {
              await startMicrophone();
            }
          } catch (error) {
            console.error("Error toggling microphone:", error);
            toast({
              title: "Error",
              description: "Failed to toggle microphone. Please try again.",
              variant: "destructive",
            });
          } finally {
            setIsMicLoading(false);
          }
        }, [isMicrophoneOn, startMicrophone, stopMicrophone, toast]);
    
        const toggleDebugMode = () => {
          setDebugMode(!debugMode);
          if (workerRef.current) {
            workerRef.current.postMessage({ type: "setDebug", data: !debugMode });
          }
        };
    
        useImperativeHandle(ref, () => ({
          toggleMic: toggleMicrophone,
          connect: connectWebSocket,
          disconnect: disconnectWebSocket,
          toggleDebug: toggleDebugMode,
        }));
    
        const MicrophoneButton = () => (
          <Button
            onClick={toggleMicrophone}
            disabled={isMicLoading}
            className="w-16 h-16 rounded-full bg-primary hover:bg-primary/90 text-primary-foreground"
          >
            {isMicLoading ? (
              <span className="loading loading-spinner loading-md"></span>
            ) : isMicrophoneOn ? (
              <MicOff className="h-8 w-8" />
            ) : (
              <Mic className="h-8 w-8" />
            )}
          </Button>
        );
    
        const ConnectionToggle = () => (
          <div className="flex items-center space-x-2">
            <Switch
              checked={wsStatus === "connected"}
              onCheckedChange={() => {
                if (wsStatus === "connected") {
                  disconnectWebSocket();
                } else {
                  connectWebSocket();
                }
              }}
            />
            <Power
              className={cn(
                "h-4 w-4",
                wsStatus === "connected" ? "text-green-500" : "text-gray-400",
              )}
            />
          </div>
        );
    
        const DebugToggle = () => (
          <Button
            onClick={toggleDebugMode}
            className={cn(
              "w-8 h-8 rounded-full",
              debugMode
                ? "bg-yellow-500 hover:bg-yellow-600"
                : "bg-gray-300 hover:bg-gray-400",
            )}
          >
            <Bug className="h-4 w-4" />
          </Button>
        );
    
        return (
          <div className="flex flex-col items-center space-y-4 p-4 bg-background rounded-lg shadow-md">
            <ErrorBoundary
              fallback={<div>Something went wrong. Please refresh the page.</div>}
              onError={(error, errorInfo) => {
                console.error("Error in RealtimeAPI component:", error, errorInfo);
                toast({
                  title: "Error",
                  description: "An unexpected error occurred. Please try again.",
                  variant: "destructive",
                });
              }}
            >
              <div className="flex justify-between items-center w-full">
                <span className="text-sm font-medium">
                  {wsStatus === "connected"
                    ? "Connected"
                    : wsStatus === "connecting"
                      ? "Connecting..."
                      : "Disconnected"}
                </span>
                <ConnectionToggle />
              </div>
              <MicrophoneButton />
              <div className="flex items-center space-x-2">
                <span className="text-sm font-medium">
                  Microphone: {isMicrophoneOn ? "On" : "Off"}
                </span>
              </div>
            </ErrorBoundary>
          </div>
        );
      },
    );
    
    RealtimeAPI.displayName = "RealtimeAPI";
    

  1. Create a new API called realtime_api and paste in the following

    import asyncio
    import json
    from json import JSONEncoder
    import websockets
    import databutton as db
    from fastapi import APIRouter, WebSocket, HTTPException, Request
    import logging
    from datetime import datetime, timedelta
    import pandas as pd
    from pydantic import BaseModel
    
    logging.basicConfig(level=logging.INFO)
    
    router = APIRouter()
    
    OPENAI_API_KEY = db.secrets.get("OPENAI_API_KEY")
    
    
    class WebhookData(BaseModel):
        message: str
        sender: str
        date: str
    
    
    def store_webhook_data(data: WebhookData):
        df = db.storage.dataframes.get(
            "webhook_data",
            default=pd.DataFrame(columns=["message", "sender", "date", "added"]),
        )
        new_row = data.dict()
        new_row["added"] = datetime.now()
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
        db.storage.dataframes.put("webhook_data", df)
    
    
    class CustomJSONEncoder(JSONEncoder):
        def default(self, obj):
            if isinstance(obj, datetime):
                return obj.isoformat()
            return JSONEncoder.default(self, obj)
    
    
    def fetch_latest_messages(n: int):
        """Fetch the latest N messages from the webhook-data table."""
        df = db.storage.dataframes.get("webhook_data")
        df["added"] = pd.to_datetime(df["added"])
        sorted_df = df.sort_values("added", ascending=False)
        latest_messages = sorted_df.head(n).to_dict("records")
        for message in latest_messages:
            message["added"] = message["added"].isoformat()
        return latest_messages
    
    
    def find_messages_by_date_range(start_date: str, end_date: str):
        """Find messages within a specific date range from the webhook-data table."""
        df = db.storage.dataframes.get("webhook_data")
        df["added"] = pd.to_datetime(df["added"])
        start = pd.to_datetime(start_date)
        end = pd.to_datetime(end_date)
        filtered_df = df[(df["added"] >= start) & (df["added"] <= end)]
        result = filtered_df.to_dict("records")
        for message in result:
            message["added"] = message["added"].isoformat()
        return result
    
    
    async def connect_to_openai():
        url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
        headers = {
            "Authorization": f"Bearer {OPENAI_API_KEY}",
            "OpenAI-Beta": "realtime=v1",
        }
        return await websockets.connect(url, extra_headers=headers)
    
    
    async def forward(source, destination):
        function_call_buffer = ""
        current_function_name = None
    
        try:
            while True:
                if isinstance(source, WebSocket):
                    message = await source.receive_text()
                else:
                    message = await source.recv()
    
                try:
                    json_message = json.loads(message)
                    if (
                        json_message["type"] == "conversation.item.created"
                        and json_message["item"]["type"] == "function_call"
                    ):
                        current_function_name = json_message["item"]["name"]
                        call_id = json_message["item"]["call_id"]
                        logging.info(f"Function call started: {json_message}")
                    elif json_message["type"] == "response.function_call_arguments.delta":
                        function_call_buffer += json_message["delta"]
                    elif json_message["type"] == "response.function_call_arguments.done":
                        # Parse the complete function call arguments
                        function_args = json.loads(function_call_buffer)
    
                        logging.info(f"Calling function: {current_function_name}")
                        logging.info(f"Arguments: {function_args}")
    
                        # Call the appropriate function
                        if current_function_name == "fetch_latest_messages":
                            result = fetch_latest_messages(function_args["n"])
                        elif current_function_name == "find_messages_by_date_range":
                            result = find_messages_by_date_range(function_args["start_date"], function_args["end_date"])
                        else:
                            raise ValueError(f"Unknown function: {current_function_name}")
    
                        print("THE RESULT::", result)
                        # Format the result as specified
                        formatted_result = {
                            "type": "conversation.item.create",
                            "item": {
                                "type": "function_call_output",
                                "call_id": call_id,
                                "output": json.dumps(result, cls=CustomJSONEncoder),
                            },
                        }
    
                        # Send the formatted result back to the client
                        if isinstance(source, WebSocket):
                            await source.send_text(json.dumps(formatted_result))
                        else:
                            await source.send(json.dumps(formatted_result))
    
                        await source.send(json.dumps({"type": "response.create"}))
                        # Reset the function call buffer and current function name
                        function_call_buffer = ""
                        current_function_name = None
    
                        # Don't forward function call messages to the client
                        continue
                    # Forward other messages as usual
                    if isinstance(destination, WebSocket):
                        await destination.send_text(json.dumps(json_message))
                    else:
                        await destination.send(json.dumps(json_message))
    
                except json.JSONDecodeError:
                    # If it's not valid JSON, send it as plain text
                    if isinstance(destination, WebSocket):
                        await destination.send_text(message)
                    else:
                        await destination.send(message)
                except Exception as e:
                    logging.error(f"Error processing message: {str(e)}\nMessage: {message}")
                    # Continue processing other messages even if one fails
                    continue
        except websockets.exceptions.ConnectionClosed:
            logging.info("Connection closed normally")
        except Exception as e:
            logging.error(f"Error in forwarding: {str(e)}")
    
    
    @router.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
        await websocket.accept()
    
        try:
            openai_ws = await connect_to_openai()
    
            # Create tasks for handling messages in both directions
            client_to_openai = asyncio.create_task(forward(websocket, openai_ws))
            openai_to_client = asyncio.create_task(forward(openai_ws, websocket))
    
            # Wait for either connection to close
            done, pending = await asyncio.wait([client_to_openai, openai_to_client], return_when=asyncio.FIRST_COMPLETED)
    
            # Cancel pending task
            for task in pending:
                task.cancel()
        except websockets.exceptions.InvalidStatusCode as e:
            logging.error(f"Failed to connect to OpenAI: {str(e)}")
            await websocket.send_text(json.dumps({"type": "error", "message": "Failed to connect to OpenAI"}))
            await websocket.close(code=1008, reason="Failed to connect to OpenAI")
        except Exception as e:
            logging.error(f"Error in WebSocket connection: {str(e)}")
            await websocket.send_text(json.dumps({"type": "error", "message": "Unexpected error occurred"}))
            await websocket.close(code=1011, reason="Unexpected error occurred")
        finally:
            if "openai_ws" in locals():
                await openai_ws.close()
    
    
    @router.post("/webhook")
    async def webhook(request: Request):
        try:
            data = await request.json()
            webhook_data = WebhookData(
                message=data.get("message"),
                sender=data.get("sender"),
                date=data.get("date"),
            )
            store_webhook_data(webhook_data)
            return {"status": "success", "message": "Data stored successfully"}
        except Exception as e:
            raise HTTPException(status_code=400, detail=str(e))
    
    
    @router.get("/fetch_latest_messages")
    async def get_latest_messages(n: int = 10):
        try:
            messages = fetch_latest_messages(n)
            return {"status": "success", "messages": messages}
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))
    

  1. Create a page where you use the Realtime API. Here is an example Home page.

    import { Button } from "@/components/ui/button";
    import { Card } from "@/components/ui/card";
    import { useTheme } from "@/hooks/use-theme";
    import { API_URL } from "app";
    import { RealtimeAPI } from "components/RealtimeAPI";
    import { MoonIcon, SunIcon } from "lucide-react";
    import React from "react";
    
    export default function App() {
      const websocketUrl = `${API_URL.replace(/^http/, "ws")}/ws`;
      const { theme, setTheme } = useTheme();
    
      const toggleTheme = () => {
        setTheme(theme === "light" ? "dark" : "light");
      };
    
      return (
        <div className="min-h-screen bg-background text-foreground flex flex-col items-center justify-center p-4">
          <Card className="w-full max-w-3xl bg-card text-card-foreground p-8 rounded-2xl shadow-lg border-2 border-primary">
            <div className="flex justify-between items-center mb-8">
              <h1 className="text-4xl font-bold bg-gradient-to-r from-primary to-accent bg-clip-text text-transparent">
                OpenAI Realtime Chat
              </h1>
              <Button
                variant="outline"
                size="icon"
                onClick={toggleTheme}
                className="rounded-full p-2 hover:bg-muted"
              >
                {theme === "light" ? (
                  <MoonIcon className="h-6 w-6" />
                ) : (
                  <SunIcon className="h-6 w-6" />
                )}
              </Button>
            </div>
    
            <div className="bg-muted p-6 rounded-xl">
              <p className="text-lg mb-4">
                Welcome to your AI-powered chat assistant. Click the button to start
                speaking!
              </p>
              <RealtimeAPI websocketUrl={websocketUrl} />
            </div>
    
            <footer className="mt-8 text-center text-sm text-muted-foreground">
              <p>Powered by OpenAI and Zapier โ€ข Your data is securely processed</p>
            </footer>
          </Card>
        </div>
      );
    }
    

Last updated