# Syncing Supabase with Typesense
Supabase (opens new window) is an open-source development platform constructed on PostgreSQL, offering secure direct database access to app users and an array of features including authentication management, TypeScript edge functions, logging, and storage, making it a popular choice among developers. However, its search functionality, based on PostgreSQL fuzzy search, is not as powerful compared to specialized search engines such as Typesense.
In this guide, we'll walk you through the process of syncing your user data with a Typesense search instance to enhance search capabilities and provide a more robust search experience for your users.
# Step 1: Configuring Supabase
# Creating a Products Table
Supabase offers a browser-based SQL and GUI editor for executing queries. If you prefer a database manager like PGAdmin, refer to Supabase's tutorial (opens new window) for setting up external connections. The upcoming examples use a hypothetical products table, which you can create using your preferred interface.
# Products table
CREATE TABLE public.products (
id UUID NOT NULL DEFAULT uuid_generate_v4 (),
product_name TEXT NULL,
updated_at TIMESTAMPTZ NULL DEFAULT now(),
user_id UUID NULL, -- References an authenticated user's id in Supabase managed auth.users database
CONSTRAINT products_pkey PRIMARY KEY (id),
CONSTRAINT products_user_id_fkey FOREIGN KEY (user_id) REFERENCES auth.users (id) ON DELETE CASCADE
);
# Enabling Row Level Security
Row Level Security (RLS) restricts the actions a user can perform on a row within a table. This feature will be utilized in the Syncing Deletes portion of the tutorial to support soft-deleting. There are two main app user roles Supabase utilizes:
- anon: anonymous users
- authenticated: registered users
Using RLS, it is possible to restrict access to only authenticated users with the appropriate user_id and session key.
# Example: Creating an RLS policy
CREATE POLICY "only an authenticated user can view their respective rows in the products table"
ON public.products
FOR SELECT
TO authenticated
USING (
-- auth.uid is a helper function provided by Supabase
auth.uid() = user_id
);
It's important to note that once RLS is enabled for a table, by default it will prevent all non-administrative roles (all app users) from accessing the table unless policies are written that explicitly grant permissions.
To enable RLS on the products table, you can either enable it using the Supabase's Table Editor, or you can execute the following query:
# Enabling RLS
ALTER TABLE products ENABLE ROW LEVEL SECURITY;
The policies that will be used for the walkthrough are:
# Products Table RLS Policies
CREATE POLICY "only an authenticated user can view their products in PostgreSQL"
ON public.products
FOR SELECT
TO authenticated
USING (
auth.uid() = user_id
);
CREATE POLICY "only authenticated users can insert into the products table"
ON public.products
FOR INSERT
TO authenticated
WITH CHECK (true);
CREATE POLICY "only an authenticated user is allowed to update their product offerings"
ON public.products
FOR UPDATE
TO authenticated
USING (
auth.uid() = user_id
) WITH CHECK (
auth.uid() = user_id
);
CREATE POLICY "only an authenticated user is allowed to remove their products from the database"
ON public.products
FOR DELETE
TO authenticated
USING (
auth.uid() = user_id
);
# Enabling Relevant PostgreSQL Extensions
The three extensions this tutorial will be using are:
- PG_NET (opens new window): allows the database to make asynchronous http/https requests with JSON
- HTTP (opens new window): allows the database to make synchronous http/https requests with all data formats
- PG_CRON (opens new window): gives the database the ability to double as a CRON server
All of these extensions can be found and enabled in Supabase by clicking on the Database icon in the nav menu, and then clicking on Extensions. For more guidance, you can check out Supabase's documentation (opens new window).
The PG_NET extension will be used to realtime sync PostgreSQL with Typesense. The HTTP and PG_CRON extensions will be used together to schedule and execute bulk syncing.
NOTE: Although most of this tutorial is done using PG/plSQL, Supabase does provide support for the PLV8 (opens new window) and PLJAVA (opens new window) extensions. They enable users to write routines in JavaScript and Java, respectively.
# Step 2: Configuring Typesense
If your Typesense instance is already running and connected to the internet, skip to the Setup API Keys section. This guide refers to the Docker portion of Typesense's installation documentation (opens new window) with some modifications.
# Install with Docker
Docker offers a consistent configuration process across Windows, Linux, and MacOS. First, install Docker Desktop (opens new window) if you haven't already. With Docker running, execute the command from the Typesense's Docker page (opens new window):
# Deploy Docker Image and Container
mkdir /tmp/Typesense-data
docker run -p 8108:8108 -v/tmp/data:/data typesense/typesense:0.24.1 --data-dir /data --api-key=Hu52dwsas2AdxdE
The following url can be used to ping your instance. If it is up and running, you will receive the back {"ok":true}
# Testing Connection in the Browser
http://localhost:8108/health
# Setup API Keys
The Typesense instance is deployed with a default API key: Hu52dwsas2AdxdE. For better security, we'll generate new keys. First, execute the following Shell command to replace the default API key:
NOTE: consider formatting cURL responses with json_pp, jq, or some other JSON prettier
# Creating a New Administrator API Key
curl 'http://localhost:8108/keys' \
-X POST \
-H "X-Typesense-API-KEY: Hu52dwsas2AdxdE" \
-H 'Content-Type: application/json' \
-d '{"description":"Admin key.","actions": ["*"], "collections": ["*"]}'
Save the returned admin key value. It should be the only active key:
# Listing All Active API Keys
curl 'http://localhost:8108/keys' \
-X GET \
-H "X-Typesense-API-KEY: ${Typesense_API_KEY}"
Now that there is only one master key remaining, you can use it to create a products's collection.
# Creating Product Collection
curl "http://localhost:8108/collections" \
-X POST \
-H "Content-Type: application/json" \
-H "X-Typesense-API-KEY: ${Typesense_API_KEY}" \
-d '{
"name": "products",
"fields": [
{"name": "id", "type": "string" },
{"name": "product_name", "type": "string" },
{"name": "user_id", "type": "string" },
{"name": "updated_at", "type": "float" }
],
"default_sorting_field": "updated_at"
}'
We will need to create two more keys: a "search only" key and "master" key for the products collection.
# Creating Products Search-Only API Key
curl 'http://localhost:8108/keys' \
-X POST \
-H "X-Typesense-API-KEY: ${Typesense_API_KEY}" \
-H 'Content-Type: application/json' \
-d '{"description":"Search-only products key.","actions": ["documents:search"], "collections": ["products"]}'
# Creating Products Admin API Key
curl 'http://localhost:8108/keys' \
-X POST \
-H "X-Typesense-API-KEY: ${Typesense_API_KEY}" \
-H 'Content-Type: application/json' \
-d '{"description":"Admin products key","actions": ["*"], "collections": ["products"]}'
# Tunneling to Connect to the Web
This Supabase walkthrough does not cover deploying a self-hosted instance on a dedicated server. Instead, you can use services such as Ngrok (opens new window) or Cloudflare Tunnel (opens new window) to securely forward traffic between your local machine and the web. Ngrok is slightly easier to use and offers a generous free tier. After setting up an account, follow the guide to install Ngrok (opens new window). Once finished, use a terminal command to forward your Typesense instance to the web.
# Activating Ngrok connection
ngrok http 8108
To test your tunnel, you can use the your Ngrok url in the browser
# Testing Connection
<NGROK URL>/health
If you receive the response {"ok":true}, then you are connected.
# Step 3: Introduction to PostgreSQL Connectivity
The PG_NET extension allows Supabase to communicate asynchronously through HTTP/HTTPS requests.
The following example requests data from the postman-echo API (opens new window).
# Requesting Data from an Endpoint
SELECT net.http_get('https://postman-echo.com/get?foo1=bar1&foo2=bar2') AS request_id;
When the query completes, you can use the returned request_id to find the response in Supabase's net._http_response table. You can also view the response by using the following query
# Viewing HTTP/HTTPS Response Message
SELECT
*
FROM net._http_response
WHERE id = <request_id>
NOTE: At the end of STEP 5.2, we will discuss how this table can be used to retry failed syncs.
The net.http_post function is just one means for posting to Typesense directly. However, it has a significant limitation: it only supports JSON as the Content-Type, whereas Typesense requires NDJSON compatibility. Fortunately, JSON and NDJSON are functionally equivalent when dealing with a single row. As a result, the code below will work when it only has to send one row from PostgreSQL to Typesense.
If you go to the Authentication tab in the Supabase side nav, you can create a new user. Using the new user profile, go to the Supabase Table Editor and manually add new rows to the products table.
# Naive Attempt to Connect to Typesense
SELECT net.http_post(
-- ADD TYPESENSE URL
url := '<TYPESENSE URL>/collections/products/documents/import?action=upsert'::TEXT,
-- Formats the products table's rows into JSONB and also converts updated_at from type TIMESTAMPTZ to type FLOAT
-- The body must be formatted as JSONB data
body := (
SELECT
to_jsonb(rows)
FROM (
SELECT
-- Converting type TIMESTAMPTZ to type float
EXTRACT(EPOCH FROM updated_at)::float AS updated_at,
id,
product_name,
user_id
FROM products
-- UNCOMMENT THE BELOW LINE TO MAKE THE QUERY WORK
-- LIMIT 1
) rows
)::JSONB,
headers := json_build_object(
'Content-Type', 'application/json',
'X-Typesense-API-KEY', '<API KEY>' -- ADD API KEY
)::JSONB,
timeout_milliseconds := 4000
) AS request_id;
If you were upserting multiple rows, you would have received the following error message:
Failed to run sql query: more than one row returned by a subquery used as an expression
As said before, PG_NET functions are incompatible for multi-row updates, but they are powerful for real-time syncing. Direct PostgreSQL-to-Typesense real-time syncs utilize triggers, which can block transactions for users. However, PG_NET functions are asynchronous, ensuring transactions are not delayed. This feature also makes them the preferred choice for deploying edge functions. The extension is robust. In fact, under-the-hood, this extension powers Supabase's webhook feature.
On the other hand, the HTTP extension supports NDJSON, making it suitable for direct bulk updates. It is also synchronous, so it will wait for a success or fail response, which makes handling errors simpler. It is compatible with PG_CRON cron jobs, which run on separate threads and do not interfere with the main database operations, minimizing impacts on user experience.
To perform bulk updates, rows must be converted into NDJSON. This can be accomplished using a PL/pgSQL function within PostgreSQL.
# Formatting Table Rows into NDJSON
CREATE OR REPLACE FUNCTION get_products_ndjson()
RETURNS TEXT
AS $$
DECLARE
ndjson TEXT := '';
BEGIN
SELECT
string_agg(row_to_json(row_data)::text, E'\n')
INTO ndjson
FROM (
SELECT
p.product_name,
p.id,
CAST(EXTRACT(epoch FROM p.updated_at) AS FLOAT) AS updated_at,
p.user_id
FROM products p
) AS row_data;
RETURN ndjson;
END;
$$ LANGUAGE plpgSQL;
Utilizing the above function, you can use the HTTP extension to make bulk upserts:
# Bulk Approach to Sync with Typesense
SELECT
status AS response_status,
content AS response_body,
(unnest(headers)).*
FROM http((
-- Method
'POST'::http_method,
-- URL (ADD TYPESENSE URL)
'<TYPESENSE URL>/collections/products/documents/import?action=upsert',
-- Headers
ARRAY[
-- ADD API KEY
http_header('X-Typesense-API-KEY', '<API KEY>')
]::http_header[],
-- Content type
'application/x-ndjson',
-- Payload
(SELECT get_products_ndjson())
)::http_request);
You can check if Typesense was updated with the following cURL request:
# Searching Typesense
curl -H "X-TYPESENSE-API-KEY: <API KEY>" \
"<TYPESENSE URL>/collections/products/documents/search?q=*"
# Step 4: Understanding Scheduling in Supabase
Using the PG_CRON extension, you can schedule cron jobs to coordinate bulk syncs with Typesense, periodically retry failed requests, and clear stale data from logging tables.
# Scheduling Cron Job: Calling Edge Function Every Minute
SELECT
cron.schedule(
'cron-job-name',
'* * * * *', -- Executes every minute (cron syntax)
$$
-- SQL query
SELECT net.http_get(
-- URL of Supabase Edge function
url:='https://<reference id>.functions.Supabase.co/Typesense-example',
headers:='{
"Content-Type": "application/json",
"Authorization": "Bearer <TOKEN>"
}'::JSONB
) as request_id;
$$
);
# Unscheduling an Active Cron Job
SELECT cron.unschedule('cron-job-name')
Cron jobs in PostgreSQL are timed using cron-syntax (opens new window). Each job can be run at most once per minute. Unfortunately, Supabase does not offer in-house support for shorter second long intervals. If that is problematic for your use case, you should use a more precise external cron or messaging server for better control.
In Supabase, cron job details are recorded in the cron schema with two tables:
- jobs
- job_run_details
It is helpful to reference these tables for monitoriing and debugging. Execution logs can be checked either through the Supabase table editor or by querying the tables directly.
# Query to Find Most Recent 10 Cron Execution Failures
SELECT
*
FROM cron.job_run_details
INNER JOIN cron.job ON cron.job.jobid = cron.job_run_details.jobid
WHERE
cron.job.jobname = 'cron-job-name'
AND
cron.job_run_details.status = 'failed'
ORDER BY start_time DESC
LIMIT 10;
Cron jobs are huge part of managing Supabase and will be featured heavily throughout the remainder of the guide.
# Step 5.1: Bulk Syncing Inserts/Updates Natively in PostgreSQL
Numerous methods exist for tracking unsynced rows in PostgreSQL, each offering its own advantages and disadvantages. This guide will explore various strategies for both real-time and bulk syncing. Ultimately, it's essential to determine which methods are best suited for your database design and use case.
Creating a log table for tracking unsynced rows is the first strategy that will be demonstrated. It is relatively straightforward to set-up and provides the most control over the amount rows that are synced at any given time.
# Creating a Logging Table to Track Unsynced Rows
CREATE TABLE public.products_sync_tracker (
product_id UUID NOT NULL PRIMARY KEY,
is_synced BOOLEAN DEFAULT FALSE,
CONSTRAINT product_id_fkey FOREIGN KEY (product_id) REFERENCES public.products (id) ON DELETE CASCADE
);
Triggers provide a populate the above table with unsynced data.
# Creating a Trigger to Monitor Product Inserts
CREATE OR REPLACE FUNCTION insert_products_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO products_sync_tracker (product_id)
VALUES (NEW.id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER insert_products_trigger
AFTER INSERT ON public.products
FOR EACH ROW
EXECUTE FUNCTION insert_products_trigger_func();
# Creating a Trigger to Monitor Product Updates
CREATE OR REPLACE FUNCTION update_products_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
-- Update products_sync_tracker table
UPDATE products_sync_tracker
SET is_synced = FALSE
WHERE product_id = NEW.id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_products_trigger
BEFORE UPDATE ON public.products
FOR EACH ROW
EXECUTE FUNCTION update_products_trigger_func();
# Scheduling Bulk Updates/Inserts with Cron Jobs
The following PL/pgSQL function converts unsynced rows into NDJSON and then upserts them into Typesense. If the upsert fails, the tracking table products_sync_tracker will be reverted to reflect this failure. By incorporating the function into a cron job, syncing at intervals becomes possible natively in PostgreSQL without the use of external servers. Though this is impressive, it does take up database resources. These functions should be set-up to finish quickly, so it is important to keep the payload size relatively low. By default, requests made by the HTTP extension timeout after 5 seconds. However, this can be changed by modifying the global variable http.timeout_msec.
The function can be broken down into 6-step process:
- Ensures no other syncing processes are running. Exits if there are.
- Retrieves unsynchronized rows by querying the products table.
- Convert the retrieved rows into the NDJSON (Newline Delimited JSON) format.
- Synchronize the formatted rows with Typesense.
- Examine Typesense's response to determine if any rows failed to synchronize.
- Update the products table to indicate the synchronization status of each row, marking any unsuccessful attempts.
# Function to Bulk Sync Rows
CREATE OR REPLACE FUNCTION sync_products_updates()
RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE
-- lock key: an arbitrary number that will be used as a 'key' to lock the function
-- only one instance of the function can have the key and run at any time
-- If multiple requests are sent at the same time while
-- rows are actively being updated, it is impossible to know which will be processed first.
-- This can lead to Typesense receiving stale data.
-- Locking the function prevents this negative outcome.
lock_key INTEGER := 123456;
is_locked BOOLEAN := FALSE;
-- used to tell Typesense how many rows need to be synced
total_rows INTEGER;
-- variables for converting rows to NDJSON
ndjson TEXT := '';
-- variables for referencing http response values
request_status INTEGER;
response_message TEXT;
BEGIN
-- Create lock, so that only one instance of the function can
-- be run at a time.
SELECT pg_try_advisory_xact_lock(lock_key) INTO is_locked;
IF NOT is_locked THEN
RAISE EXCEPTION 'Could not lock. Other job in process';
END IF;
-- Preemptively update 40 unsynced products as synced.
-- Create a temporary table to hold the updated rows
-- They will be synced with Typesense.
CREATE TEMPORARY TABLE updated_rows (
product_id UUID
) ON COMMIT DROP;
WITH soon_to_be_synced_rows AS (
UPDATE products_sync_tracker
SET is_synced = TRUE
WHERE product_id IN (
SELECT
product_id
FROM products_sync_tracker
WHERE is_synced = FALSE
LIMIT 40
)
RETURNING product_id
)
INSERT INTO updated_rows
SELECT * FROM soon_to_be_synced_rows;
SELECT
COUNT(product_id)
INTO total_rows
FROM updated_rows;
IF total_rows < 1 THEN
RAISE EXCEPTION 'No rows need to be synced';
END IF;
-- Cast the soon-to-be synced rows into a Typesense interpretable format
WITH row_data AS (
SELECT
p.product_name,
p.id,
CAST(EXTRACT(epoch FROM p.updated_at) AS FLOAT) AS updated_at,
p.user_id
FROM products p
JOIN updated_rows u ON p.id = u.product_id
)
SELECT
string_agg(row_to_json(row_data)::text, E'\n')
INTO ndjson
FROM row_data;
SELECT
status,
content
INTO request_status, response_message
FROM http((
'POST'::http_method,
-- ADD TYPESENSE URL
'<TYPESENSE URL>/collections/products/documents/import?action=upsert',
ARRAY[
-- ADD API KEY
http_header('X-Typesense-API-KEY', '<API KEY>')
]::http_header[],
'application/x-ndjson',
ndjson --payload
)::http_request);
-- Check if the request failed
IF request_status <> 200 THEN
-- stores error message in Supabase Postgres Logs
RAISE LOG 'Typesense Sync request failed. Request status: %. Message: %', request_status, response_message;
-- Raises Exception, which undoes the transaction
RAISE EXCEPTION 'UPSERT FAILED';
ELSE
-- A successful response will contain NDJSON of the results for each row
/* possible results:
{"success": true}
{"success": false, "error": "Bad JSON.", "document": "[bad doc]"}
*/
-- This must be processed to determine which rows synced and which did not
WITH ndjson_from_response AS (
SELECT unnest(string_to_array(response_message, E'\n')) AS ndjson_line
),
ndjson_to_json_data AS (
SELECT
ndjson_line::JSON AS json_line
FROM ndjson_from_response
),
failed_syncs AS (
SELECT
json_line
FROM ndjson_to_json_data
WHERE (json_line->>'success')::BOOLEAN = FALSE
),
unsynced_ids AS (
SELECT
((json_line->>'document')::JSON->>'id')::UUID AS ids
FROM failed_syncs
)
UPDATE public.products_sync_tracker
SET is_synced = FALSE
WHERE product_id IN (SELECT ids FROM unsynced_ids);
END IF;
END;
$$;
# Cron Job to Bulk Upsert into Typesense
SELECT cron.schedule(
'update-insert-Typesense-job',
'* * * * *',
$$
-- SQL query
SELECT sync_products_updates();
$$
);
To test if Typesense synced, manually add a row to the products table using Supabase's table editor. Check the cron schema in the same table editor to observe when your cron job executed.
# Searching Typesense for Synced Data
curl -X GET "<TYPESENSE URL>/collections/products/documents/search?q=*" \
-H "X-TYPESENSE-API-KEY: <API KEY>"
# Step 5.2: Bulk Syncing Inserts/Updates with Edge Functions
Some users may prefer using servers as an intermediary to communicate with Typesense. This has the benefit of reducing strain on the database, as well as being able to accommodate relatively high volume syncs. It is also particularly useful when it is necessary to sanitize or reformat data. Fortunately, Supabase offers serverless edge functions in Deno (TypeScript).
If an edge function fails halfway through its execution, there must be a way to capture and resolve the failure. In the previous example, row syncs were tracked with a products_sync_tracker table based on row updates. However, an alternative structure can be introduced that offers advantages for edge functions. Using the products table updated_at column, unsynced rows can be tracked by timing instead and resent when edge functions fail. For this to work, it is helpful to remove the previous trigger on the products table and replace them with one that monitors the updated_at column.
# Removing Previous Trigger
DROP TRIGGER IF EXISTS update_products_trigger
ON products;
DROP FUNCTION IF EXISTS update_products_trigger_func;
DROP TRIGGER IF EXISTS insert_products_trigger
ON products;
DROP FUNCTION IF EXISTS insert_products_trigger_func;
# Adding Trigger to Update updated_at Column
CREATE OR REPLACE FUNCTION update_products_time_func()
RETURNS TRIGGER AS $$
BEGIN
-- update products.updated_at column
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_at_products_trigger
BEFORE UPDATE ON public.products
FOR EACH ROW
EXECUTE FUNCTION update_products_time_func();
It is also necessary to track edge function calls, so they can be monitored for failures. Create a edge_function_tracker table.
# Creating Edge Deployment Monitoring Table
CREATE TABLE edge_function_tracker(
id UUID NOT NULL DEFAULT uuid_generate_v4 (),
start_time TIMESTAMPTZ DEFAULT NOW (),
start_time_of_prev_func TIMESTAMPTZ,
attempts INTEGER DEFAULT 1,
func_status TEXT NOT NULL DEFAULT 'PENDING' CHECK (func_status IN ('SUCCEEDED', 'FAILED', 'PENDING')),
request_id BIGINT NULL,
CONSTRAINT edge_function_tracker_pkey PRIMARY KEY (id)
);
The PG_NET function that will deploy the edge function must be wrapped in a PG/plSQL function so that function information can be logged before deployment. It can be broken down into a 4-step process:
- Create a new entry in the edge_function_tracker table.
- Gather required information for identifying unsynced rows.
- Send an edge function request with a payload containing necessary details.
- Record metadata associated with the edge deployment.
# Wrapper Function for Deploying/Tracking Edge Deployments
CREATE OR REPLACE FUNCTION edge_deployment_wrapper()
RETURNS VOID
AS $$
DECLARE
func_request_id BIGINT;
payload JSONB;
prev_start_time TIMESTAMPTZ;
BEGIN
-- Creates a new entry in the edge_function_tracker table
-- for the soon-to-be deployed edge function
WITH func_default_vals AS (
INSERT INTO edge_function_tracker
DEFAULT VALUES
RETURNING id, start_time
)
SELECT
row_to_json(func_default_vals.*)::JSONB
INTO payload
FROM func_default_vals;
-- Products that were updated after the previous deployment are unsyced.
-- Fetches prev function timestamp to help query unsynced rows for further processing.
SELECT
start_time
INTO prev_start_time
FROM edge_function_tracker
WHERE start_time < (payload->>'start_time')::TIMESTAMPTZ
ORDER BY id DESC
LIMIT 1;
-- If there was no prior start time, set the prev_start_time to 0
IF NOT FOUND THEN
prev_start_time := '1970-01-01 00:00:00+00'::TIMESTAMPTZ;
END IF;
-- Reformat payload to include prev_start_time
payload := payload || jsonb_build_object('start_time_of_prev_func', prev_start_time);
-- Send request to edge function
SELECT net.http_post(
url := '<FUNCTION URL>',
body := payload,
headers := '{
"Content-Type": "application/json",
"Authorization": "Bearer <SUPABASE ANON KEY>"
}'::JSONB,
timeout_milliseconds := 4000
) INTO func_request_id;
-- Record request_id and start_time_of_prev_func
-- This will be used for redeploying failed requests later
UPDATE edge_function_tracker
SET
request_id = func_request_id,
start_time_of_prev_func = (payload->>'start_time_of_prev_func')::TIMESTAMPTZ
WHERE id = (payload->>'id')::UUID;
END;
$$ LANGUAGE plpgsql;
The tracking PG/plSQL function can be called by a cron job to begin the bulk syncing process.
# Cron Job to Execute Edge Function
SELECT
cron.schedule(
'update-insert-Typesense-job',
'* * * * *', -- Executes every minute (cron syntax)
$$
-- SQL query
SELECT edge_deployment_wrapper();
$$
);
It is usually better to create PG/plSQL functions that can perform many interactions in a single request than to have the edge function query the database multiple times to get a small amount of data. The following function will be called by the edge function to simplify and speed up sync rejection handling for successful requests.
CREATE OR REPLACE FUNCTION report_failed_syncs (func_id UUID, failed_rows_id UUID[])
RETURNS VOID
AS $$
BEGIN
-- Update the "edge_function_tracker" table to reflect the
-- function call's status
UPDATE edge_function_tracker
SET func_status = 'SUCCEEDED'
WHERE id = func_id;
-- This line disables the updated_at trigger for the transaction.
SET LOCAL session_replication_role = 'replica';
-- Update failed product syncs to NOW(), so that they will included in the next
-- syncing function
UPDATE products
SET updated_at = NOW()
WHERE id = ANY(failed_rows_id);
END;
$$ LANGUAGE plpgsql;
To deploy an edge function, you must have a Node Package Manager, such as NPM, Yarn, or PNPM. NPM can be installed by downloading Node.JS through the official download page (opens new window)
To create your first function, create a folder with your function name, and add an index.ts file inside. The code below is a modified version of the Supabase PostgreSQL demo (opens new window), using the get_products_updates_from_edge PL/pgSQL function from earlier.
NOTE: All functions and their logs can be found in the Edge Functions section of your Supabase dashboard.
The edge function that will be used can be broken down into 7-step process:
- Parse the request body to identify the specific rows that require synchronization.
- Retrieve unsynchronized rows by querying the products table.
- Convert the retrieved rows into the NDJSON (Newline Delimited JSON) format.
- Synchronize the formatted rows with Typesense.
- Examine Typesense's response to determine if any rows failed to synchronize.
- Update the products table to indicate the synchronization status of each row, marking any unsuccessful attempts.
- Record the overall success or failure of the edge function within the edge_function_tracker table for monitoring and analysis.
# Edge Function to Update Typesense
import * as postgres from 'https://deno.land/x/postgres@v0.14.2/mod.ts';
import { serve } from 'https://deno.land/std@0.177.0/http/server.ts';
// define your types at top
type TProductRows = {
id: string;
product_name: string;
updated_at: string;
user_id: string;
}[];
type TRequestJSON = {
id: string;
start_time: string;
start_time_of_prev_func: string;
};
// Your Database's connection URL is made available by default in all edge functions
// If you have any issues, though, you can retrieve it in the dashboard by going to
// the Project Settings's Database tab
const databaseUrl = Deno.env.get('SUPABASE_DB_URL')!;
// Create a database pool with three connections that are lazily established
const pool = new postgres.Pool(databaseUrl, 3, true);
serve(async (req) => {
try {
// Grab a connection from the pool
const connection = await pool.connect();
// 1. Parse Request Body:
const reqJSON = (await req.json()) as TRequestJSON;
try {
// 2. Retrieve unsynced products from the database
const unsyncedRows = (
await connection.queryObject({
args: [reqJSON.start_time_of_prev_func, reqJSON.start_time],
text: `SELECT
products.id,
products.product_name,
CAST(EXTRACT(epoch FROM products.updated_at) AS FLOAT) AS updated_at,
products.user_id
FROM products
-- Only sync products that have been updated since the last sync
WHERE updated_at BETWEEN $1::TIMESTAMPTZ AND $2::TIMESTAMPTZ;
-- IF SOFT DELETING, FILTER THE SOFT DELETED ROWS FROM THE SYNC
`,
})
).rows as TProductRows;
// If there are no unsynced rows, the function can return early
if (!unsyncedRows.length) {
const res = await connection.queryObject({
args: [reqJSON.id],
text: `
UPDATE edge_function_tracker
SET func_status = 'SUCCEEDED'
WHERE id = $1::UUID;
`,
});
return new Response('no rows to sync', {
status: 200,
headers: {
'Content-Type': 'application/json; charset=utf-8',
},
});
}
// 3. Convert rows into NDJSON format
const unsyncedProductsNDJSON: string = unsyncedRows
.map((product) =>
JSON.stringify({
...product,
updated_at: parseFloat(product.updated_at),
})
)
.join('\n');
// 4. Sync the new products with Typesense
const response = await fetch(
// ADD YOUR TYPESENSE URL
'<TYPESENSE URL>/collections/products/documents/import?action=upsert',
{
method: 'POST',
headers: {
'Content-Type': 'application/x-ndjson',
//ADD YOUR TYPESENSE API KEY
'X-TYPESENSE-API-KEY': '<API KEY>',
},
body: unsyncedProductsNDJSON,
}
);
// The response will contain NDJSON of the results. In the case that some updates failed,
// we need to reflect this failure in the products_sync_tracker table
/* possible results
{"success": true}
{"success": false, "error": "Bad JSON.", "document": "[bad doc]"}
*/
const ndJsonResponse = await response.text();
// converting response to array of objects
let JSONStringArr = ndJsonResponse.split('\n');
const parsedJSON = JSONStringArr.map((res) => JSON.parse(res));
// 5. filtering out the failed syncs' ids
const failedSyncIds = parsedJSON
.filter((doc) => !doc.success)
.map((doc) => JSON.parse(doc.document).id) as string[];
// If there are no failed syncs, the function can return early
if (!failedSyncIds.length) {
const res = await connection.queryObject({
args: [reqJSON.id],
text: `
UPDATE edge_function_tracker
SET func_status = 'SUCCEEDED'
WHERE id = $1::UUID;
`,
});
return new Response('no rows to sync', {
status: 200,
headers: {
'Content-Type': 'application/json; charset=utf-8',
},
});
}
// 6/7. Updates the "updated_at" column in the products table to NOW().
// By doing so, the failed rows will be resynced by the next edge function
// Declare the function call to be a success.
const result = await connection.queryObject({
args: [reqJSON.id, failedSyncIds],
text: `
SELECT report_failed_syncs ($1::UUID, $2::UUID[])
`,
});
// Return the response with the correct content type header
return new Response(
`synced rows between ${reqJSON.start_time_of_prev_func} and ${reqJSON.start_time}`,
{
status: 200,
headers: {
'Content-Type': 'application/x-ndjson; charset=utf-8',
},
}
);
} finally {
// Release the connection back into the pool
connection.release();
}
} catch (err) {
console.error(err);
return new Response(String(err?.message ?? err), {
status: 500,
});
}
});
With the function set-up, navigate to your function's parent directory in the terminal and execute the following command
# Login to Supabase Command Line
npx supabase login
You will be prompted to enter your password and directed towards a link to generate an access token. After logging in, you can deploy your function.
# Deploy Edge Function To Supabase
npx supabase functions deploy <YOUR FUNCTION`S DIRECTORY NAME>
You should receive a link that will give you insight about your new function. To call it, you will also need your project's ANON KEY, which can be found in the API Tab of your project's settings. You can schedule your function to sync Typesense with a cron job.
Unfortunately, not all syncs will succeed. It's important to have some retry mechanism. Also, at some point the edge_function_tracker table may become bloated and need to be cleaned. The below PG/plSQL function manages both of these issues with the following steps:
- Deletes successful sync records from the edge_function_tracker table.
- Updates 'PENDING' functions to 'FAILED' if certain conditions are met
- Re-attempts a failed sync request
# PL/pgSQL Function to Manage Edge Redeployment and Maintenance
CREATE OR REPLACE FUNCTION edge_function_maintainer()
RETURNS VOID
AS $$
DECLARE
func_request_id BIGINT;
payload JSONB;
BEGIN
-- Any successful syncs that border two 'SUCCEEDED' rows are no longer necessary to maintain.
-- To prevent the table from growing endlessly, these rows will be deleted
WITH organized_edge_function_tracker AS (
SELECT id,
func_status,
LAG(func_status) OVER (ORDER BY start_time) AS prev_func_status,
LEAD(func_status) OVER (ORDER BY start_time) AS next_func_status
FROM edge_function_tracker
),
success_conditions AS (
SELECT
id
FROM organized_edge_function_tracker
WHERE
func_status = 'SUCCEEDED'
AND
prev_func_status = 'SUCCEEDED'
AND
next_func_status = 'SUCCEEDED'
)
DELETE FROM edge_function_tracker
WHERE id IN (SELECT id FROM success_conditions);
-- Updates any !200 requests in the edge_function_tracker table to 'FAILED'
-- NOTE: the _http_response table, which holds the status for all PG_NET requests, is "unlogged".
-- This means that it will lose all its data in the case of a crash
-- As a failsafe, all functions that have not yet responded after 2 minutes will be assumed to have failed even if
-- their request row could not be found
WITH failed_rows AS (
SELECT
edge_function_tracker.id
FROM edge_function_tracker
INNER JOIN net._http_response ON net._http_response.id = edge_function_tracker.request_id
WHERE net._http_response.status_code NOT BETWEEN 200 AND 202
)
UPDATE edge_function_tracker
SET func_status = 'FAILED'
FROM failed_rows
WHERE
failed_rows.id = edge_function_tracker.id
OR
(func_status = 'PENDING' AND (NOW() - start_time > '2 minutes'::interval))
;
-- Fetch the data of a failed request, so that it can placed into a payload
-- for a retry
WITH selected_row AS (
SELECT
id,
start_time,
start_time_of_prev_func
FROM edge_function_tracker
WHERE func_status = 'FAILED' AND attempts < 3 -- if attempts are more than 3, assume the data is unsyncable for this section and stop trying
ORDER BY start_time
LIMIT 1
)
SELECT to_jsonb(selected_row.*) INTO payload
FROM selected_row;
-- test to see if there are any viable requests to retry
IF (payload->>'id')::UUID IS NOT NULL THEN
-- Send retry request to edge function
SELECT net.http_post(
url := '<EDGE FUNCTION URL>',
body := payload,
headers := '{
"Content-Type": "application/json",
"Authorization": "Bearer <SUPABASE ANON KEY>"
}'::JSONB,
timeout_milliseconds := 4000
) INTO func_request_id;
-- Record new request_id
UPDATE edge_function_tracker
SET
request_id = func_request_id,
attempts = attempts + 1
WHERE id = (payload->>'id')::UUID;
END IF;
END;
$$ LANGUAGE plpgSQL;
The maintainer function can be scheduled with a cron job
# Retry Failed Functions Every Minute
SELECT
cron.schedule(
'edge_function_maintainer',
'* * * * *', -- Executes every minute (cron syntax)
$$
SELECT edge_function_maintainer();
$$
);
# Step 6: Realtime Updates/Inserts with Triggers
There are some circumstances where realtime syncing may be important. This can only be achieved with triggers. The below example directly syncs between Supabase and Typesense, but you could also use the trigger to call an Edge function that does the same thing.
# Syncing Data with Triggers
-- Creating a function to be used by a update/insert trigger for the products table
CREATE OR REPLACE FUNCTION public.sync_products()
RETURNS TRIGGER AS $$
BEGIN
-- Make an https request to the Typesense server
PERFORM net.http_post(
-- ADD TYPESENSE URL
url := '<TYPESENSE URL>/collections/products/documents/import?action=upsert'::TEXT,
-- The NEW keyword represents the new row data
body := (
SELECT
to_jsonb(row.*)
FROM (
SELECT
-- Converting type TIMESTAMPTZ to type float
EXTRACT(EPOCH FROM NEW.updated_at)::float AS updated_at,
NEW.id,
NEW.product_name,
NEW.user_id
) AS row
)::JSONB,
headers := json_build_object(
'Content-Type', 'application/json',
-- ADD API KEY
'X-Typesense-API-KEY', '<API KEY>'
)::JSONB
);
RETURN NEW;
END;
$$ LANGUAGE plpgSQL;
-- Trigger that runs after any insert or update in the products table
CREATE TRIGGER sync_updates_and_inserts_in_Typesense
AFTER INSERT OR UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION sync_products();
WARNING: It is important to restate that these requests are asynchronous, and they must be to avoid blocking user transactions. Once the request is sent, a background worker will listen for a response and add it to the net._http_response table. It's possible to monitor updates/inserts in the net._http_response table with cron jobs or triggers to retry failed syncs as outlined at the end of Step 5.2. Unfortunately, using triggers for immediate retries can be dangerous for this task, especially if the data is incompatible with Typesense. Without a clear breakout condition, they can enter an infinite loop.
# Step 7: Bulk Syncing Deletes
# Scheduling Bulk Deletes with Cron Jobs
Bulk syncing deletions between Supabase and Typesense can be done with one of two approaches:
- Temporarily preserving deleted rows' ids in an intermediary table until they can be removed from Typesense
- Soft deleting rows from a table until they are removed from TypeSense
NOTE: Because the deletion process does not require NDJSON, both the HTTP and PG_NET extension can work either for direct PostgreSQL-to-Typesense deletions or for triggering edge functions.
This section predominantly uses the HTTP extension. If you plan on using PG_NET, though, it is necessary to delay actually deleting rows until the net._http_response table has a status of 200 for the request. Step 5.2 has examples about how to plan around asynchronous recovery.
# Approach 1: Intermediary Tables
To achieve the first approach, you need to create a table to store the deleted rows' ids until Typesense can be synced.
# deleted_rows Table
CREATE TABLE deleted_rows(
table_name TEXT, --assumes table and Typesense collection share a name
deleted_row_id UUID,
CONSTRAINT deleted_rows_pkey PRIMARY KEY (deleted_row_id)
);
Whenever a row is deleted in the main table, a trigger should record a copy of its id.
# Trigger to Save id Column
-- Create the function to copy id to deleted_rows on delete
CREATE OR REPLACE FUNCTION copy_deleted_product_id()
RETURNS TRIGGER
LANGUAGE plpgSQL
AS $$
BEGIN
INSERT INTO deleted_rows (table_name, deleted_row_id)
VALUES ('products', OLD.id);
RETURN OLD;
END $$;
-- Create the trigger that calls the function when a record is deleted from the products table
CREATE TRIGGER copy_id_on_delete
AFTER DELETE ON public.products
FOR EACH ROW
EXECUTE FUNCTION copy_deleted_product_id();
In Typesense, bulk deletions can be performed with DELETE requests that include the conditions for deletion as query parameters within the URL.
# Example: Delete Request
curl -g -H "X-TYPESENSE-API-KEY: ${TYPESENSE_API_KEY}" -X DELETE \
"http://localhost:8108/collections/companies/documents?filter_by=id:[id1, id2, id3]"
NOTE: it may be necessary to use the url-encoded characters for square brackets "[ ]", respectively %5B and %5D
A PL/pgSQLfunction can be written to sync the deletions with Typesense before removing the copies from the deleted_rows table.
# Bulk Sync Deleted Rows
-- Create the function to delete the record from Typesense
CREATE OR REPLACE FUNCTION bulk_delete_products()
RETURNS VOID
LANGUAGE plpgSQL
AS $$
DECLARE
id_arr UUID[];
query_params TEXT;
request_status INTEGER;
response_message TEXT;
BEGIN
-- Select ids from deleted_rows
SELECT
-- store values into array so it can be deleted later
array_agg(deleted_row_id),
-- Format the rows to be used in the filter_by parameter
string_agg(deleted_row_id::text, ',')
INTO id_arr, query_params
FROM deleted_rows
LIMIT 40;
-- The params must be formatted within square brackets "[ ]", which are URL encoded as %5B and %5D, respectively.
query_params:= '%5B' || query_params || '%5D';
SELECT
status,
content
INTO request_status, response_message
FROM http((
'DELETE'::http_method,
-- ADD TYPESENSE URL
'<TYPESENSE URL>/collections/products/documents?filter_by=id:' || query_params,
ARRAY[
-- ADD API KEY
http_header('X-Typesense-API-KEY', '<API KEY>')
]::http_header[],
'application/json',
NULL
)::http_request);
-- Check if the request failed
IF request_status <> 200 THEN
-- stores error message in Supabase Postgres Logs
RAISE LOG 'HTTP DELETE request failed. Message: %', response_message;
-- Raises Exception, which undoes the transaction
RAISE EXCEPTION 'DELETE FAILED';
ELSE
DELETE FROM deleted_rows WHERE deleted_row_id = ANY(id_arr);
END IF;
END;
$$;
The function can be called every minute by a Cron Job.
# Sync Deleted Rows Every Minute
SELECT
cron.schedule(
'bulk-delete-from-typesese',
'* * * * *', -- Executes every minute (cron syntax)
$$
-- SQL query
SELECT bulk_delete_products();
$$
);
# Approach 2: Soft Deleting
In this approach, users must be restricted from directly deleting rows from the products table. To ensure this, it is essential to modify Row Level Security, revoking their deletion privileges. Without this modification, data inconsistencies between Supabase and Typesense are guaranteed to occur.
# Modifying RLS to Prevent Users from Deleting Rows
ALTER POLICY "only an authenticated user is allowed to remove their products "
ON public.products
TO authenticated
USING(
FALSE
);
Instead of directly deleting rows, users will have to modify a row in a way that essentially tags it as unusable. In this case, setting the user_id column to NULL will make the row inaccessible to all app users.
A PG/plSQL function can be made with either the PG_NET or HTTP extension to sync rows with nullified user_id columns with Typesense and then delete them:
# Syncing Nullified Rows Before Deleting Them
-- Create the function to delete the record from Typesense
CREATE OR REPLACE FUNCTION bulk_delete_products()
RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE
-- stores and formats deleted row ids
deleted_id_arr UUID[];
query_params TEXT;
-- monitors Typesense's response
request_status INTEGER;
response_message TEXT;
BEGIN
-- Select and format of ids from deleted_rows
SELECT
string_agg(id::text, ','),
array_agg(id)
INTO query_params, deleted_id_arr
FROM products
WHERE user_id IS NULL
LIMIT 40;
-- places the list of ids in url encoded brackets [ ], which are %5B and %5D
query_params := '%5B' || query_params || '%5D';
-- Send delete request to Typesense server
SELECT
status,
content
INTO request_status, response_message
FROM http((
'DELETE'::http_method,
-- ADD TYPESENSE URL
'<TYPESENSE URL>/collections/products/documents?filter_by=id:' || query_params,
ARRAY[
-- ADD API KEY
http_header('X-Typesense-API-KEY', '<API KEY>')
]::http_header[],
'application/json',
NULL
)::http_request);
-- Check if the request failed
IF request_status <> 200 THEN
-- stores error message in Supabase Postgres Logs
RAISE LOG 'HTTP DELETE request failed. Message: %', response_message;
-- Raises Exception, which undoes the transaction
RAISE EXCEPTION 'DELETE FAILED';
ELSE
DELETE FROM products WHERE id = ANY(deleted_id_arr);
END IF;
END;
$$;
The function can be called every minute by a cron job.
# Scheduling Bulk Deletes
SELECT
cron.schedule(
'delete-from-typesese',
'* * * * *', -- Executes every minute(cron syntax)
$$
-- SQL query
SELECT bulk_delete_products();
$$
);
# Step 8: Syncing Deletes in Realtime
# Syncing Individual Deletes
In Typesense, a single document can be deleted by making a request with the document's ID as a path parameter.
# cURL Request, Deleting document id
curl -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" -X DELETE \
"http://localhost:8108/collections/products/documents/<id>"
Depending on how you organize your database, you may come across a situation where deleting one row causes a cascade of deletes in another table. That can be managed by querying a shared field, such as a reference key.
# cURL Request, Deleting many documents by shared field
curl -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" -X DELETE \
"http://localhost:8108/collections/products/documents?filter_by=<shared_field>:=<value>"
Syncing deletes in realtime can be managed with a trigger.
# Single Row Delete Using Triggers
-- Create the function to delete the record from Typesense
CREATE OR REPLACE FUNCTION delete_products()
RETURNS TRIGGER
LANGUAGE plpgSQL
AS $$
BEGIN
SELECT net.http_delete(
url := format('<TYPESENSE URL>/collections/products/documents/%s', OLD.id),
headers := '{"X-Typesense-API-KEY": "<Typesense_API_KEY>"}'
)
RETURN OLD;
END $$;
-- Create the trigger that calls the function when a record is deleted from the products table
CREATE TRIGGER delete_products_trigger
AFTER DELETE ON public.products
FOR EACH ROW
EXECUTE FUNCTION delete_products();
Unfortunately, if the synchronization process fails for any reason, there will be no remaining data to reference for a subsequent attempt. To address this issue, you can either implement soft deletion of rows or create a temporary table to store deleted query values until the successful completion of the synchronization is confirmed. The latter option is described in detail below for deletes by shared-field.
# Store Values Until Sync is Confirmed Table
CREATE TABLE deleted_query_values (
filter_by_field TEXT NOT NULL,
shared_value TEXT NOT NULL,
request_id BIGINT,
created_at TIMESTAMPTZ NULL DEFAULT now()
)
The trigger's function can be modified to preserve the deleted values.
-- Create the function to delete the record from Typesense
CREATE OR REPLACE FUNCTION delete_products_trigger()
RETURNS TRIGGER
LANGUAGE plpgSQL
AS $$
DECLARE
func_request_id BIGINT;
BEGIN
SELECT net.http_delete(
-- ADD TYPESENSE URL
url := '<TYPESENSE URL>/collections/products/documents?filter_by=<FILTER_BY_FIELD>:=%s' || OLD.<SHARED_VALUE>::TEXT,
-- ADD API KEY
headers := '{"X-Typesense-API-KEY": "<API KEY>"}'
) INTO func_request_id;
-- populate table
INSERT INTO deleted_query_values (filter_by_field, shared_value, request_id)
VALUES ('<FILTER_BY_FIELD>', OLD.<SHARED_VALUE>::TEXT, func_request_id);
RETURN OLD;
END $$;
The deleted_query_values and net._http_response table can be referenced periodically with a cron job to retry failed attempts.
CREATE OR REPLACE FUNCTION retry_bulk_deletes()
RETURNS VOID
LANGUAGE plpgSQL
AS $$
DECLARE
value TEXT;
field TEXT;
func_request_id BIGINT;
BEGIN
-- clear synced rows from deleted_query_values table
DELETE FROM deleted_query_values
USING net._http_response
WHERE
net._http_response.status_code = 200
AND
net._http_response.id = deleted_query_values.request_id
-- get oldest delete query that failed
SELECT
shared_value,
filter_by_field
INTO value, field
FROM deleted_query_values
INNER JOIN net._http_response ON net._http_response.id = deleted_query_values.request_id
WHERE net._http_response.status_code NOT BETWEEN 200 AND 202
ORDER BY created_at
LIMIT 1;
-- reattempt deletion sync
SELECT net.http_delete(
-- ADD TYPESENSE URL
url := FORMAT('<TYPESENSE URL>/collections/products/documents?filter_by=%s:=%s', field, value),
-- ADD API KEY
headers := '{"X-Typesense-API-KEY": "<API KEY>"}'
) INTO func_request_id;
-- update deleted_query_values with new values
UPDATE deleted_query_values
SET
request_id = func_request_id,
created_at = NOW()
WHERE query_param = id;
END $$;
# Conclusion
In this tutorial, we explored different methods to synchronize data between Supabase and Typesense, ensuring that our search engine stays up-to-date with the latest changes in our database. We covered syncing inserts, updates, and deletes through interval-based and real-time strategies, using triggers, functions, and cron jobs.
By implementing these techniques, you can create a robust, efficient, and responsive search system for your application, providing users with a seamless and accurate search experience.
If we can make any improvements to this guide, click on the "Edit page" link below and send us a pull request.