QuestDB for Capital Markets?

Learn more

Building an FX Liquidity Stress Analysis Workflow with QuestDB

QuestDB is the open-source time-series database for demanding workloads—from trading floors to mission control It delivers ultra-low latency, high ingestion throughput, and a multi-tier storage engine. Native support for Parquet and SQL keeps your data portable, AI-ready—no vendor lock-in.

Liquidity dry-ups in FX markets can sharply raise execution costs for large institutional orders. During stressed conditions, bid–ask spreads widen and market depth thins out, making it harder to execute large trades efficiently. Traditional indicators often react only after the fact, so traders have little time to adjust.

In this tutorial, we build a compact FX liquidity stress analysis pipeline using QuestDB and Python. We’ll show how to stream and aggregate high-frequency order book data, engineer microstructure features, label stress periods, and train an XGBoost model for early warnings. The focus is on reproducible data science, not black-box trading: everything runs end-to-end with open tools and SQL-based time-series processing.

The Liquidity Challenge in FX Markets

Episodes of stress in FX markets often emerge without warning. The IMF reports that during periods of uncertainty,

currency bid-ask spreads widen, and foreign funding and hedging costs rise.

These conditions typically appear around macro announcements, flash crashes, or when major trading sessions overlap and liquidity providers withdraw. As order book depth evaporates, execution costs can spike within seconds.

Conventional monitoring tools only register these shifts once spreads have already widened. Understanding and anticipating such moments requires data with fine time resolution, showing how spreads, volumes, and imbalance evolve in the seconds before stress appears. In this workflow we use second-level aggregated data for simplicity, but the same ideas apply to sub-second or microsecond feeds.

Detecting stress ahead of time involves two main ingredients:

  • Computing liquidity measures such as spread, imbalance, depth, and volatility on streaming market data.
  • Training a model on historical patterns to identify conditions that tend to precede stress events.

QuestDB Architecture for High-Frequency FX Data

QuestDB is built for this kind of workload, sustaining ingestion rates of over 5 million rows per second. In practice, this means FX order book snapshots or trade ticks can stream into QuestDB with minimal latency.

Its columnar, multi-tier storage keeps recent data in memory and older data on disk, allowing fast time-ordered scans across large ranges without complex ETL or duplication. Aggregations and historical queries remain interactive, even over billions of rows.

For Level 2 (order book) data, QuestDB supports array columns. Instead of storing each price level in separate rows, bid and ask ladders are stored as arrays per timestamp (for example, bid_prices, bid_sizes, ask_prices, ask_sizes). This compact design allows fast per-tick updates and SQL array functions to extract top-of-book or deeper levels on demand.

QuestDB’s SQL extensions include finance-oriented functions such as l2price(volume, sizes, prices) for volume-weighted execution prices and spread_bps(bid, ask) for bid–ask spreads in basis points.

Finally, QuestDB fully supports open formats. Any table or query can be exported to Apache Parquet via SQL or REST, and older partitions can be converted in-place. Parquet integrates natively with Python, Pandas, Spark, and ML frameworks, providing a compressed, portable dataset for modeling.

Example Machine Learning Workflow

To illustrate how QuestDB fits into an ML pipeline, we built a prototype workflow for predicting liquidity stress. The goal is to show how data and features flow from QuestDB into a modeling pipeline. The main steps are: labeling stress periods, engineering features from market data, exporting those features, training a classification model, and finally running inference on new data.

Setup

The full code for the example is available in this GitHub repository. The implementation was developed and tested using Python 3.13, though any Python version ≥3.10 should work reliably.

You can clone the blog posts examples repository and enter this post's directory:

git clone https://github.com/questdb/blog-examples.git
cd blog-examples/2025-11-fx-liquidity-stress-analysis

Although you can install the dependencies directly, it is advisable to use a virtual environment. To set up the environment, install uv. We are also using Poe the Poet to manage multi-step workflows. Each pipeline stage is defined as a separate task within pyproject.toml.

[project]
name = "fx-liquidity-stress"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"pandas==2.3.3",
"poethepoet==0.37.0",
"pyarrow==21.0.0",
"requests==2.32.5",
"scikit-learn==1.7.2",
"xgboost==3.1.0",
]
[tool.poe.tasks]
# if you have access to a QuestDB instance with Parquet export enabled, you can use
# the convert/export_to_parquet.py script instead, as it is more efficient. To be
# conservative, we are defaulting to CSV export plus Parquet conversion via Pandas
convert = { cmd = "python steps/convert_to_parquet.py", env = { PYTHONPATH = "." } }
train = { cmd = "python steps/train.py", env = { PYTHONPATH = "." } }
infer = { cmd = "python steps/inference.py", env = { PYTHONPATH = "." } }
pipeline = ["convert", "train", "infer"]

You can create a virtual environment and install the dependencies by running:

uv venv
uv sync

This creates an isolated environment under .venv/ and synchronizes all dependencies using the lockfile (uv.lock), ensuring deterministic builds across systems. If the uv.lock file doesn’t exist, uv sync will generate it automatically.

The directory structure for this example will be:

├── config
│   └── settings.py
├── dataset
│   └── output.parquet
├── model_registry
│   └── fx_liquidity_model.json
├── pyproject.toml
├── results
│   └── inference_latest.csv
├── src
│   ├── data_loader.py
│   ├── feature_extraction.py
│   ├── label_creator.py
│   └── model.py
├── steps
│   ├── convert_to_parquet.py
│   ├── export_to_parquet.py
│   ├── inference.py
│   └── train.py
└── uv.lock

After you read the rest of the post explaining every step, you will be able to run the pipeline in order via:

uv run poe pipeline

Dataset

For this demo, we used an FX order-book dataset exported from the QuestDB public demo. Each row in the market_data table represents an order book snapshot for a given currency pair and includes the top-of-book prices and sizes. We query up to 2,000,000 rows from one day ago, and average derived fields at 1-second intervals, filling any potential gaps so we have a dense dataset for training. Please note in this dataset, we store asks and bids as two-dimensional arrays, where the first position is an array with the prices, and the second an array with the sizes, so asks[1][1] would be top-of-book price and asks[2][1] would be top-of-book size.

This is the query we will execute:

WITH training_data AS (
SELECT timestamp, symbol,
asks[1][1] as ask_price, bids[1][1] as bid_price,
asks[2][1] as ask_volume, bids[2][1] as bid_volume
FROM market_data
WHERE symbol = 'EURUSD'
AND timestamp BETWEEN
dateadd('d', -2, now()) AND
dateadd('d', -1, now())
LIMIT 2000000
)
SELECT
timestamp,
symbol,
avg(ask_price - bid_price) AS spread,
avg((ask_price - bid_price) /
((ask_price + bid_price) / 2) * 10000)
AS spread_bps,
avg(bid_volume + ask_volume) AS total_volume,
avg((bid_volume - ask_volume) /
(bid_volume + ask_volume))
AS imbalance,
avg(bid_price) AS bid_price,
avg(ask_price) AS ask_price,
avg((bid_price + ask_price) / 2) AS mid_price
FROM training_data
SAMPLE BY 1s FILL(prev);
| timestamp | symbol | spread | spread_bps | total_volume | imbalance | bid_price | ask_price | mid_price |
| --------------------------- | ------ | ---------------------- | ------------------ | ------------------ | ----------------------- | ------------------ | ------------------ | ------------------ |
| 2025-11-03T18:18:53.000000Z | EURUSD | 0.00039999999999995595 | 3.4547869802372113 | 152603.3103448276 | -0.03717683856855787 | 1.1576137931034478 | 1.1580137931034487 | 1.1578137931034482 |
| 2025-11-03T18:18:54.000000Z | EURUSD | 0.00039999999999995595 | 3.455489918465511 | 151580.80434782608 | 0.01167956282073817 | 1.157378260869566 | 1.1577782608695653 | 1.1575782608695657 |
| 2025-11-03T18:18:55.000000Z | EURUSD | 0.00039999999999997817 | 3.4546730079518153 | 149224.9 | -0.022907699937196654 | 1.1576519999999995 | 1.1580519999999996 | 1.157852 |
| 2025-11-03T18:18:56.000000Z | EURUSD | 0.00040000000000014097 | 3.453232474702322 | 152220.23333333334 | -0.005828367764725703 | 1.158135 | 1.158535000000001 | 1.1583350000000003 |
...
| 2025-11-03T18:19:54.000000Z | EURUSD | 0.00039999999999995595 | 3.4607301777196686 | 151476.27272727274 | -0.006196169043030609 | 1.1556254545454552 | 1.1560254545454547 | 1.155825454545455 |
| 2025-11-03T18:19:55.000000Z | EURUSD | 0.00039999999999995595 | 3.461426439417276 | 147406.2394366197 | -0.004053108913267634 | 1.155392957746478 | 1.1557929577464778 | 1.1555929577464799 |
| 2025-11-03T18:19:56.000000Z | EURUSD | 0.00039999999999995595 | 3.462081559721717 | 154457.58571428573 | -0.021747505131181462 | 1.155174285714284 | 1.1555742857142879 | 1.1553742857142848 |

QuestDB can export data both as CSV and as Parquet. Since Parquet export can be disabled via a configuration flag, to make this demo more universal we are using the CSV export, then we convert to Parquet using Pandas. But in the example repository you will find the script export_to_parquet.py that would export directly into Parquet format, without any extra processing required. This is the script to download as CSV and convert to Parquet.

#steps.convert_to_parquet.py
import requests
import pandas as pd
from config.settings import QUESTDB_PATH, PARQUET_OUTPUT_PATH, TABLE_NAME, ROWS_LIMIT
# Use the same SAMPLE BY query shown in the Dataset section
query = """<same SQL query as above>"""
response = requests.get(
url=f"{QUESTDB_PATH.rstrip('/')}/exec",
params={'query': query}
)
response.raise_for_status()
data = response.json()
columns = [col['name'] for col in data['columns']]
df = pd.DataFrame(data['dataset'], columns=columns)
df.to_parquet(PARQUET_OUTPUT_PATH, index=False)

Key columns extracted and derived:

  • Timestamp: Datetime index marking each snapshot.

  • Symbol: Currency pair identifier (e.g., EURUSD).

  • Bid_price: Best bid price at the top of the book.

  • Ask_price: Best ask price at the top of the book.

  • Mid_price: Midpoint of bid and ask prices ((bid_price + ask_price) / 2).

Derived microstructure metrics:

  • Spread: Difference between ask and bid prices (ask_price - bid_price).

  • Spread_bps: Spread expressed in basis points (spread / mid_price × 10,000).

  • Total_volume: Combined visible volume at top levels (bid_size + ask_size).

  • Imbalance: Book imbalance ratio ((bid_size - ask_size) / (bid_size + ask_size)).

Data Loading

The data loading process uses a lightweight DataLoader utility to read and preprocess the Parquet dataset. It automatically converts timestamps to datetime objects, sorts the records chronologically, and prepares the data for downstream feature engineering and modeling.

# src/data_loader.py
import pandas as pd
class DataLoader:
def __init__(self, file_path):
self.file_path = file_path
def load_data(self):
"""Load and prepare raw Parquet data"""
print("Loading data...")
df = pd.read_parquet(self.file_path)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp').reset_index(drop=True)
return df

Feature Extraction

From the raw columns, we engineer a set of predictive features that capture short-term trends and volatility. Our Python FeatureEngineer class applies rolling-window and differencing operations (at a 5-minute window by default). For example, we compute:

  • Rolling metrics (5-minute): spread_ma_5m and spread_std_5m (mean and std of spread over past 5 minutes), volume_ma_5m (mean top-of-book size), and imbalance_ma_5m (mean order imbalance). These capture recent liquidity conditions.

  • Momentum/trend: price_change_1m and price_change_5m, which are the change in mid-price over the past 1 and 5 minutes, respectively. Large moves may precede stress.

  • Volatility: volatility_5m, the rolling standard deviation of 1-second returns (percent changes) over the last 5 minutes. This measures market uncertainty.

  • Spread trend: spread_trend, the 1-minute difference in spread (is the spread widening or tightening).

  • Volume surge: volume_surge, defined as the current L1 size divided by the 5-minute rolling mean L1 size. This highlights sudden volume spikes.

These features are computed in Pandas by rolling windows and diff operations. We ensure missing values are forward-filled so that no NaNs remain. In code, the create_features method looks like:

# src/feature_extraction.py
import pandas as pd
class FeatureEngineer:
def __init__(self, window_seconds=300):
"""
window_seconds: rolling window size (default = 5 minutes at 1-second intervals)
"""
self.window = window_seconds
def create_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Generate rolling, trend, and volatility features"""
# Rolling averages & std (5 min window)
df['spread_ma_5m'] = df['spread_bps'].rolling(self.window, min_periods=1).mean()
df['spread_std_5m'] = df['spread_bps'].rolling(self.window, min_periods=1).std()
df['volume_ma_5m'] = df['total_volume'].rolling(self.window, min_periods=1).mean()
df['imbalance_ma_5m'] = df['imbalance'].rolling(self.window, min_periods=1).mean()
# Price change trends
df['price_change_1m'] = df['mid_price'].diff(60) # 1 min = 60 sec
df['price_change_5m'] = df['mid_price'].diff(300) # 5 min = 300 sec
# Volatility (rolling std of pct changes)
df['volatility_5m'] = df['mid_price'].pct_change().rolling(self.window, min_periods=1).std()
# Spread trend (1 min difference)
df['spread_trend'] = df['spread_bps'].diff(60)
# Volume surge (current / avg of last 5 min)
df['volume_surge'] = df['total_volume'] / df['volume_ma_5m']
# Fill any NaNs
df = df.ffill().fillna(0)
return df

Label Creation

Next, we need target labels indicating whether a given time is or will become a liquidity-stress event. We define stress heuristically: if the spread is extremely wide or the top-of-book size is extremely low, that suggests illiquidity. Concretely, we take the 95th percentile of spread_bps and the 5th percentile of total_volume as thresholds.

Any snapshot above the spread threshold or below the size threshold is marked as a stress event. Then we shift these labels forward by a prediction horizon (e.g., 10 minutes) so that the model is learning to predict a future stress event from current features. Rows at the very end (with no future data to look ahead) are dropped. In code form:

# src/label_creator.py
import pandas as pd
class LabelCreator:
def __init__(self, horizon_minutes=10):
"""
horizon_minutes: how far ahead to predict stress (default = 10 minutes)
"""
self.horizon_minutes = horizon_minutes
def create_labels(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create binary stress labels"""
# Thresholds
spread_threshold = df['spread_bps'].quantile(0.95)
volume_threshold = df['total_volume'].quantile(0.05)
# Stress conditions
stress_events = (
(df['spread_bps'] > spread_threshold) |
(df['total_volume'] < volume_threshold)
).astype(int)
# Shift labels forward in time
horizon_rows = self.horizon_minutes * 60 # assuming 1s intervals
df['stress_label'] = stress_events.shift(-horizon_rows)
# Drop last rows where future labels don't exist
df = df[:-horizon_rows].copy()
stress_count = df['stress_label'].sum()
total_count = len(df)
return df

Applying this to a subset of our data yielded about 97,049 stress events out of 999,400 rows (~9.71%), with the remaining ~90.3% labeled normal. This class imbalance (≈10:1) means we must account for it during training.

Model Training

With features (X) and labels (y) ready, we train a binary classifier. For simplicity, we used an XGBoost gradient boosting classifier. We handle the class imbalance by setting scale_pos_weight = (#normal / #stress) so that the minority class is up-weighted. We also evaluate via walk-forward (time-series) cross-validation to respect the temporal ordering. In each fold, we train on earlier data and test on the next segment, computing precision, recall, and F1. We observed modest scores (e.g., average F1 ~0.36), reflecting the difficulty of the problem. The final model is then retrained on all data.

Our Python class for modeling looks like this:

# src/model.py
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import precision_score, recall_score, f1_score
class LiquidityStressModel:
def __init__(self):
self.model = None
self.feature_columns = None
def prepare_data(self, df, feature_cols):
"""Prepare features and labels for training"""
self.feature_columns = feature_cols
X = df[feature_cols].values
y = df['stress_label'].values
return X, y
def walk_forward_validation(self, X, y, n_splits=5):
"""Perform walk-forward validation for time series data"""
tscv = TimeSeriesSplit(n_splits=n_splits)
scores = []
for fold, (train_idx, test_idx) in enumerate(tscv.split(X)):
print(f"\nFold {fold+1}/{n_splits}")
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
# Compute scale_pos_weight for imbalanced classes
neg = np.sum(y_train == 0)
pos = np.sum(y_train == 1)
scale_pos_weight = neg / pos if pos > 0 else 1.0
# Train XGBoost model
model = xgb.XGBClassifier(
n_estimators=500,
max_depth=6,
learning_rate=0.05,
scale_pos_weight=scale_pos_weight,
eval_metric='logloss',
random_state=42
)
model.fit(X_train, y_train)
preds = model.predict(X_test)
precision = precision_score(y_test, preds, zero_division=0)
recall = recall_score(y_test, preds, zero_division=0)
f1 = f1_score(y_test, preds, zero_division=0)
print(f"Precision: {precision:.3f}, Recall: {recall:.3f}, F1: {f1:.3f}")
scores.append({'precision': precision, 'recall': recall, 'f1': f1})
# Average metrics across folds
avg_precision = np.mean([s['precision'] for s in scores])
avg_recall = np.mean([s['recall'] for s in scores])
avg_f1 = np.mean([s['f1'] for s in scores])
print(f"\nAverage Performance → Precision: {avg_precision:.3f}, Recall: {avg_recall:.3f}, F1: {avg_f1:.3f}")
return scores
def predict_stress(self, current_features):
"""Predict stress for a single feature row or 1D array."""
if self.model is None:
raise ValueError("Model not trained yet!")
arr = np.asarray(current_features)
if arr.ndim == 1:
arr = arr.reshape(1, -1)
prob = float(self.model.predict_proba(arr)[0, 1])
pred = bool(self.model.predict(arr)[0])
return {
"stress_probability": prob,
"stress_prediction": pred,
"alert_level": ("HIGH" if prob > 0.75 else "MEDIUM" if prob > 0.5 else "LOW"),
}
def save_model(self, path: str):
"""Persist model to disk."""
import os
# MODEL_PATH should include path and file, as in model_registry/fx_liquidity_model.json
os.makedirs(os.path.dirname(path), exist_ok=True)
self.model.save_model(path)
def load_model(self, path: str):
"""Load previously saved model."""
import xgboost as xgb
self.model = xgb.XGBClassifier()
self.model.load_model(path)
def train_final_model(self, X, y):
"""Train final model on all data"""
print("Training final model on full dataset...")
neg = np.sum(y == 0)
pos = np.sum(y == 1)
scale_pos_weight = neg / pos if pos > 0 else 1.0
self.model = xgb.XGBClassifier(
n_estimators=500,
max_depth=6,
learning_rate=0.05,
scale_pos_weight=scale_pos_weight,
eval_metric='logloss',
random_state=42
)
self.model.fit(X, y)
print("Final model trained!")
# Display feature importance
importance = self.model.feature_importances_
feature_importance = pd.DataFrame({
'feature': self.feature_columns,
'importance': importance
}).sort_values('importance', ascending=False)
print("\nTop 5 features by importance:")
print(feature_importance.head())

To use the function above, we are required to run the following file:

#steps/train.py
from src.data_loader import DataLoader
from src.feature_extraction import FeatureEngineer
from src.label_creator import LabelCreator
from src.model import LiquidityStressModel
from config.settings import PARQUET_OUTPUT_PATH, MODEL_PATH
if __name__ == "__main__":
file = PARQUET_OUTPUT_PATH
feature_cols = [
"spread_bps", "spread_ma_5m", "spread_std_5m",
"total_volume", "volume_ma_5m",
"imbalance", "imbalance_ma_5m",
"price_change_1m", "price_change_5m",
"volatility_5m", "spread_trend", "volume_surge",
]
loader = DataLoader(file)
df = loader.load_data()
engineer = FeatureEngineer()
df = engineer.create_features(df)
labeler = LabelCreator(horizon_minutes=10)
df = labeler.create_labels(df)
label_counts = df['stress_label'].value_counts()
total = len(df)
print("\nLabel distribution:")
print(label_counts)
model = LiquidityStressModel()
X, y = model.prepare_data(df, feature_cols)
scores = model.walk_forward_validation(X, y, n_splits=5)
model.train_final_model(X, y)
latest_features = X[-1]
prediction = model.predict_stress(latest_features)
model.save_model(MODEL_PATH)

Here’s a summary of the results:

| Metric | Score |
|-----------:|:-----:|
| Precision | 0.331 |
| Recall | 0.451 |
| F1 | 0.363 |

Inference

In a live setting, we would load the saved model and process new order-book data with the same feature calculations. For each new row, the model predicts a stress probability and a binary label. For example, we classify the output probability into alert levels (LOW, MEDIUM, HIGH) based on thresholds (e.g., above 0.75 = HIGH).

In practice, because liquidity crises are rare, one would require several consecutive HIGH alerts before triggering an actual alarm.

The core prediction logic is encapsulated in our model class:

def predict_stress(self, current_features):
"""Predict stress for a single row of features"""
if self.model is None:
raise ValueError("Model not trained yet!")
if len(current_features.shape) == 1:
current_features = current_features.reshape(1, -1)
probability = self.model.predict_proba(current_features)[0, 1]
prediction = self.model.predict(current_features)[0]
return {
'stress_probability': probability,
'stress_prediction': bool(prediction),
'alert_level': (
'HIGH' if probability > 0.75 else
'MEDIUM' if probability > 0.5 else
'LOW'
)
}

At runtime, one would take new real-time features (e.g., from the latest QuestDB snapshot export), call model.predict_stress(), and then emit alerts or log them to a monitoring system. In this example we are just querying the 10,000 most recent rows from the live demo dataset, and outputting the results into a file.

# steps/inference.py
from io import BytesIO
from pathlib import Path
import requests
import pandas as pd
from src.feature_extraction import FeatureEngineer
from src.model import LiquidityStressModel
from config.settings import QUESTDB_PATH, TABLE_NAME, MODEL_PATH, INFERENCE_OUTPUT
# 1) Fetch the latest 10K rows (CSV via /exp), same columns as in training
EXP_URL = f"{QUESTDB_PATH.rstrip('/')}/exp"
sql = f"""
SELECT
timestamp,
symbol,
asks[1][1] - bids[1][1] AS spread,
(asks[1][1] - bids[1][1]) / ((asks[1][1] + bids[1][1]) / 2) * 10000 AS spread_bps,
bids[2][1] + asks[2][1] AS total_volume,
(bids[2][1] - asks[2][1]) / (bids[2][1] + asks[2][1]) AS imbalance,
bids[1][1] AS bid_price,
asks[1][1] AS ask_price,
(bids[1][1] + asks[1][1]) / 2 AS mid_price
FROM {TABLE_NAME}
WHERE symbol = 'EURUSD'
LIMIT -10000
"""
r = requests.get(EXP_URL, params={"fmt": "csv", "query": sql})
r.raise_for_status()
df = pd.read_csv(BytesIO(r.content))
# 2) Features
df["timestamp"] = pd.to_datetime(df["timestamp"])
engineer = FeatureEngineer()
df = engineer.create_features(df)
feature_cols = [
"spread_bps", "spread_ma_5m", "spread_std_5m",
"total_volume", "volume_ma_5m",
"imbalance", "imbalance_ma_5m",
"price_change_1m", "price_change_5m",
"volatility_5m", "spread_trend", "volume_surge",
]
# 3) Model + predict
model = LiquidityStressModel()
model.load_model(MODEL_PATH)
model.feature_columns = feature_cols
def align_features(df, cols):
aligned = {c: (df[c] if c in df.columns else 0) for c in cols}
return pd.DataFrame(aligned)[cols]
X = align_features(df, feature_cols).values
preds = [model.predict_stress(row) for row in X]
pred_df = pd.DataFrame(preds)
# 4) Save results
Path(INFERENCE_OUTPUT).parent.mkdir(parents=True, exist_ok=True)
result = pd.concat([df.reset_index(drop=True), pred_df], axis=1)
result.to_csv(INFERENCE_OUTPUT, index=False)
print(f"Wrote inference results → {INFERENCE_OUTPUT}")
# Display sample
print(result[["timestamp", "stress_probability", "alert_level"]])

This is a sample of the inference results:

Wrote inference results → results/inference_latest.csv
timestamp stress_probability alert_level
2025-11-05 13:15:57.821179+00:00 0.516641 MEDIUM
2025-11-05 13:15:57.846104+00:00 0.595590 MEDIUM
2025-11-05 13:15:57.873982+00:00 0.102265 LOW
2025-11-05 13:15:57.884185+00:00 0.102265 LOW
2025-11-05 13:15:57.908093+00:00 0.102265 LOW
2025-11-05 13:15:57.912651+00:00 0.355158 LOW
2025-11-05 13:15:57.917040+00:00 0.312996 LOW
2025-11-05 13:15:57.933019+00:00 0.595590 MEDIUM
2025-11-05 13:15:57.942532+00:00 0.265077 LOW
2025-11-05 13:15:57.947014+00:00 0.102265 LOW
2025-11-05 13:15:57.963684+00:00 0.355158 LOW
2025-11-05 13:15:57.976234+00:00 0.709796 MEDIUM
2025-11-05 13:15:57.999977+00:00 0.088707 LOW
2025-11-05 13:15:58.039618+00:00 0.598207 MEDIUM
2025-11-05 13:15:58.040689+00:00 0.753148 HIGH
2025-11-05 13:15:58.044313+00:00 0.172697 LOW
2025-11-05 13:15:58.046626+00:00 0.172697 LOW
2025-11-05 13:15:58.051901+00:00 0.604241 MEDIUM

Implementation Pipeline

Putting it all together, our pipeline looks like this:

  • Ingestion: Stream FX order-book arrays into QuestDB. QuestDB can ingest these at nanosecond timestamp precision. For our demo, we simply exported historical data from QuestDB to CSV/Parquet and loaded it for offline processing.

  • Feature calculation: Use SQL window and array functions in QuestDB for live metrics. One could, for instance, create a view or use a WINDOW clause to compute moving averages, or use expressions like (ask_prices[1] - bid_prices[1]) for spread. Our prototype instead did feature calc in Python, but the same logic could be implemented in SQL on streaming data, ensuring that all time-based aggregations happen in the database.

  • Data export: Export historical features to Parquet for ML training. After generating features and labels in QuestDB (or in Python), we store them in Parquet files. QuestDB’s native Parquet export makes this trivial. The Parquet files can then be loaded into any ML toolkit (we used Pandas/XGBoost).

  • Prediction demo: Infer on live features. We can run the model as a service, receive the new data rows by streaming from QuestDB, and apply the feature logic plus model.predict_stress. This demonstrates the end-to-end flow.

Throughout this pipeline, QuestDB sits at the center of data movement. It handles the high-frequency ingestion, while handing off data via Parquet to the Python ML components. No custom data preprocessing was needed outside of what's shown.

Results and Validation

On the modeling side, as noted, performance was modest: Precision ≈0.33, Recall ≈0.45, F1 ≈0.36. In fact, well-tuned research models often only reach F1 in the 0.6–0.7 range for this problem, so our ~0.36 underscores how noisy the data is, and also improving the approach used can give us better returns. The purpose of the demo is to show the capability of QuestDB that can seamlessly feed such an analysis.

The key takeaway is that QuestDB’s capabilities (high-throughput ingest, time-series SQL, and Parquet integration) let data scientists iterate quickly on feature and model ideas with tick-level financial data.

Conclusion

This prototype demonstrates a full research loop: ingesting dense FX data, computing real-time liquidity metrics, labeling stress events, and training a predictive model. Even with simple heuristics, it shows how to move from raw ticks to stress probabilities in a few lines of SQL and Python.

While the model is intentionally simple, the pipeline highlights how fast, structured data access can accelerate experimentation. QuestDB’s array columns and time-series SQL let us work directly with order-book snapshots without custom preprocessing. The same pattern, live data in, features out, can be reused for risk monitoring, trade execution, or market microstructure research.

Full code available at our blog examples demo repository.

Subscribe to our newsletters for the latest. Secure and never shared or sold.