Building an FX Liquidity Stress Analysis Workflow with QuestDB
Liquidity dry-ups in FX markets can sharply raise execution costs for large institutional orders. During stressed conditions, bid–ask spreads widen and market depth thins out, making it harder to execute large trades efficiently. Traditional indicators often react only after the fact, so traders have little time to adjust.
In this tutorial, we build a compact FX liquidity stress analysis pipeline using QuestDB and Python. We’ll show how to stream and aggregate high-frequency order book data, engineer microstructure features, label stress periods, and train an XGBoost model for early warnings. The focus is on reproducible data science, not black-box trading: everything runs end-to-end with open tools and SQL-based time-series processing.
The Liquidity Challenge in FX Markets
Episodes of stress in FX markets often emerge without warning. The IMF reports that during periods of uncertainty,
currency bid-ask spreads widen, and foreign funding and hedging costs rise.
These conditions typically appear around macro announcements, flash crashes, or when major trading sessions overlap and liquidity providers withdraw. As order book depth evaporates, execution costs can spike within seconds.
Conventional monitoring tools only register these shifts once spreads have already widened. Understanding and anticipating such moments requires data with fine time resolution, showing how spreads, volumes, and imbalance evolve in the seconds before stress appears. In this workflow we use second-level aggregated data for simplicity, but the same ideas apply to sub-second or microsecond feeds.
Detecting stress ahead of time involves two main ingredients:
- Computing liquidity measures such as spread, imbalance, depth, and volatility on streaming market data.
- Training a model on historical patterns to identify conditions that tend to precede stress events.
QuestDB Architecture for High-Frequency FX Data
QuestDB is built for this kind of workload, sustaining ingestion rates of over 5 million rows per second. In practice, this means FX order book snapshots or trade ticks can stream into QuestDB with minimal latency.
Its columnar, multi-tier storage keeps recent data in memory and older data on disk, allowing fast time-ordered scans across large ranges without complex ETL or duplication. Aggregations and historical queries remain interactive, even over billions of rows.
For Level 2 (order book) data, QuestDB supports array columns.
Instead of
storing each price level in separate rows, bid and ask ladders are stored as
arrays per timestamp (for example, bid_prices, bid_sizes, ask_prices,
ask_sizes). This compact design allows fast per-tick updates and SQL array
functions to extract top-of-book or deeper levels on demand.
QuestDB’s SQL extensions include finance-oriented functions
such as
l2price(volume, sizes, prices) for volume-weighted execution prices and
spread_bps(bid, ask) for bid–ask spreads in basis points.
Finally, QuestDB fully supports open formats. Any table or query can be exported to Apache Parquet via SQL or REST, and older partitions can be converted in-place. Parquet integrates natively with Python, Pandas, Spark, and ML frameworks, providing a compressed, portable dataset for modeling.
Example Machine Learning Workflow
To illustrate how QuestDB fits into an ML pipeline, we built a prototype workflow for predicting liquidity stress. The goal is to show how data and features flow from QuestDB into a modeling pipeline. The main steps are: labeling stress periods, engineering features from market data, exporting those features, training a classification model, and finally running inference on new data.
Setup
The full code for the example is available in this GitHub repository. The implementation was developed and tested using Python 3.13, though any Python version ≥3.10 should work reliably.
You can clone the blog posts examples repository and enter this post's directory:
git clone https://github.com/questdb/blog-examples.gitcd blog-examples/2025-11-fx-liquidity-stress-analysis
Although you can install the dependencies directly, it is advisable to use a virtual environment. To set
up the environment, install uv. We are also using Poe the Poet
to manage multi-step workflows. Each pipeline stage is defined as a separate task within
pyproject.toml.
[project]name = "fx-liquidity-stress"version = "0.1.0"requires-python = ">=3.10"dependencies = ["pandas==2.3.3","poethepoet==0.37.0","pyarrow==21.0.0","requests==2.32.5","scikit-learn==1.7.2","xgboost==3.1.0",][tool.poe.tasks]# if you have access to a QuestDB instance with Parquet export enabled, you can use# the convert/export_to_parquet.py script instead, as it is more efficient. To be# conservative, we are defaulting to CSV export plus Parquet conversion via Pandasconvert = { cmd = "python steps/convert_to_parquet.py", env = { PYTHONPATH = "." } }train = { cmd = "python steps/train.py", env = { PYTHONPATH = "." } }infer = { cmd = "python steps/inference.py", env = { PYTHONPATH = "." } }pipeline = ["convert", "train", "infer"]
You can create a virtual environment and install the dependencies by running:
uv venvuv sync
This creates an isolated environment under .venv/ and synchronizes all dependencies using the
lockfile (uv.lock), ensuring deterministic builds across systems. If the uv.lock file doesn’t
exist, uv sync will generate it automatically.
The directory structure for this example will be:
├── config│ └── settings.py├── dataset│ └── output.parquet├── model_registry│ └── fx_liquidity_model.json├── pyproject.toml├── results│ └── inference_latest.csv├── src│ ├── data_loader.py│ ├── feature_extraction.py│ ├── label_creator.py│ └── model.py├── steps│ ├── convert_to_parquet.py│ ├── export_to_parquet.py│ ├── inference.py│ └── train.py└── uv.lock
After you read the rest of the post explaining every step, you will be able to run the pipeline in order via:
uv run poe pipeline
Dataset
For this demo, we used an FX order-book dataset exported from the QuestDB public demo.
Each row in the market_data table represents an order book snapshot for a given currency pair and includes the
top-of-book prices and sizes. We query up to 2,000,000 rows from one day ago, and average derived fields
at 1-second intervals, filling any potential gaps so we have a dense dataset for training. Please note in this
dataset, we store asks and bids as two-dimensional arrays, where the first position is an array with the prices, and
the second an array with the sizes, so asks[1][1] would be top-of-book price and asks[2][1] would be top-of-book size.
This is the query we will execute:
WITH training_data AS (SELECT timestamp, symbol,asks[1][1] as ask_price, bids[1][1] as bid_price,asks[2][1] as ask_volume, bids[2][1] as bid_volumeFROM market_dataWHERE symbol = 'EURUSD'AND timestamp BETWEENdateadd('d', -2, now()) ANDdateadd('d', -1, now())LIMIT 2000000)SELECTtimestamp,symbol,avg(ask_price - bid_price) AS spread,avg((ask_price - bid_price) /((ask_price + bid_price) / 2) * 10000)AS spread_bps,avg(bid_volume + ask_volume) AS total_volume,avg((bid_volume - ask_volume) /(bid_volume + ask_volume))AS imbalance,avg(bid_price) AS bid_price,avg(ask_price) AS ask_price,avg((bid_price + ask_price) / 2) AS mid_priceFROM training_dataSAMPLE BY 1s FILL(prev);
| timestamp | symbol | spread | spread_bps | total_volume | imbalance | bid_price | ask_price | mid_price || --------------------------- | ------ | ---------------------- | ------------------ | ------------------ | ----------------------- | ------------------ | ------------------ | ------------------ || 2025-11-03T18:18:53.000000Z | EURUSD | 0.00039999999999995595 | 3.4547869802372113 | 152603.3103448276 | -0.03717683856855787 | 1.1576137931034478 | 1.1580137931034487 | 1.1578137931034482 || 2025-11-03T18:18:54.000000Z | EURUSD | 0.00039999999999995595 | 3.455489918465511 | 151580.80434782608 | 0.01167956282073817 | 1.157378260869566 | 1.1577782608695653 | 1.1575782608695657 || 2025-11-03T18:18:55.000000Z | EURUSD | 0.00039999999999997817 | 3.4546730079518153 | 149224.9 | -0.022907699937196654 | 1.1576519999999995 | 1.1580519999999996 | 1.157852 || 2025-11-03T18:18:56.000000Z | EURUSD | 0.00040000000000014097 | 3.453232474702322 | 152220.23333333334 | -0.005828367764725703 | 1.158135 | 1.158535000000001 | 1.1583350000000003 |...| 2025-11-03T18:19:54.000000Z | EURUSD | 0.00039999999999995595 | 3.4607301777196686 | 151476.27272727274 | -0.006196169043030609 | 1.1556254545454552 | 1.1560254545454547 | 1.155825454545455 || 2025-11-03T18:19:55.000000Z | EURUSD | 0.00039999999999995595 | 3.461426439417276 | 147406.2394366197 | -0.004053108913267634 | 1.155392957746478 | 1.1557929577464778 | 1.1555929577464799 || 2025-11-03T18:19:56.000000Z | EURUSD | 0.00039999999999995595 | 3.462081559721717 | 154457.58571428573 | -0.021747505131181462 | 1.155174285714284 | 1.1555742857142879 | 1.1553742857142848 |
QuestDB can export data both as CSV and as Parquet. Since Parquet export can be disabled via a configuration flag, to
make this demo more universal we are using the CSV export, then we convert to Parquet using Pandas. But in the
example repository you will find the script export_to_parquet.py that would export directly into Parquet format,
without any extra processing required. This is the script to download as CSV and convert to Parquet.
#steps.convert_to_parquet.pyimport requestsimport pandas as pdfrom config.settings import QUESTDB_PATH, PARQUET_OUTPUT_PATH, TABLE_NAME, ROWS_LIMIT# Use the same SAMPLE BY query shown in the Dataset sectionquery = """<same SQL query as above>"""response = requests.get(url=f"{QUESTDB_PATH.rstrip('/')}/exec",params={'query': query})response.raise_for_status()data = response.json()columns = [col['name'] for col in data['columns']]df = pd.DataFrame(data['dataset'], columns=columns)df.to_parquet(PARQUET_OUTPUT_PATH, index=False)
Key columns extracted and derived:
-
Timestamp: Datetime index marking each snapshot.
-
Symbol: Currency pair identifier (e.g.,
EURUSD). -
Bid_price: Best bid price at the top of the book.
-
Ask_price: Best ask price at the top of the book.
-
Mid_price: Midpoint of bid and ask prices ((
bid_price+ask_price) / 2).
Derived microstructure metrics:
-
Spread: Difference between ask and bid prices (
ask_price-bid_price). -
Spread_bps: Spread expressed in basis points (
spread/mid_price× 10,000). -
Total_volume: Combined visible volume at top levels (
bid_size+ask_size). -
Imbalance: Book imbalance ratio ((
bid_size-ask_size) / (bid_size+ask_size)).
Data Loading
The data loading process uses a lightweight DataLoader utility to read and preprocess the Parquet dataset. It automatically converts timestamps to datetime objects, sorts the records chronologically, and prepares the data for downstream feature engineering and modeling.
# src/data_loader.pyimport pandas as pdclass DataLoader:def __init__(self, file_path):self.file_path = file_pathdef load_data(self):"""Load and prepare raw Parquet data"""print("Loading data...")df = pd.read_parquet(self.file_path)df['timestamp'] = pd.to_datetime(df['timestamp'])df = df.sort_values('timestamp').reset_index(drop=True)return df
Feature Extraction
From the raw columns, we engineer a set of predictive features that capture
short-term trends and volatility. Our Python FeatureEngineer class applies
rolling-window and differencing operations (at a 5-minute window by default).
For example, we compute:
-
Rolling metrics (5-minute):
spread_ma_5mandspread_std_5m(mean and std of spread over past 5 minutes),volume_ma_5m(mean top-of-book size), andimbalance_ma_5m(mean order imbalance). These capture recent liquidity conditions. -
Momentum/trend:
price_change_1mandprice_change_5m, which are the change in mid-price over the past 1 and 5 minutes, respectively. Large moves may precede stress. -
Volatility:
volatility_5m, the rolling standard deviation of 1-second returns (percent changes) over the last 5 minutes. This measures market uncertainty. -
Spread trend:
spread_trend, the 1-minute difference in spread (is the spread widening or tightening). -
Volume surge:
volume_surge, defined as the current L1 size divided by the 5-minute rolling mean L1 size. This highlights sudden volume spikes.
These features are computed in Pandas by rolling windows and diff operations. We
ensure missing values are forward-filled so that no NaNs remain. In code, the
create_features method
looks like:
# src/feature_extraction.pyimport pandas as pdclass FeatureEngineer:def __init__(self, window_seconds=300):"""window_seconds: rolling window size (default = 5 minutes at 1-second intervals)"""self.window = window_secondsdef create_features(self, df: pd.DataFrame) -> pd.DataFrame:"""Generate rolling, trend, and volatility features"""# Rolling averages & std (5 min window)df['spread_ma_5m'] = df['spread_bps'].rolling(self.window, min_periods=1).mean()df['spread_std_5m'] = df['spread_bps'].rolling(self.window, min_periods=1).std()df['volume_ma_5m'] = df['total_volume'].rolling(self.window, min_periods=1).mean()df['imbalance_ma_5m'] = df['imbalance'].rolling(self.window, min_periods=1).mean()# Price change trendsdf['price_change_1m'] = df['mid_price'].diff(60) # 1 min = 60 secdf['price_change_5m'] = df['mid_price'].diff(300) # 5 min = 300 sec# Volatility (rolling std of pct changes)df['volatility_5m'] = df['mid_price'].pct_change().rolling(self.window, min_periods=1).std()# Spread trend (1 min difference)df['spread_trend'] = df['spread_bps'].diff(60)# Volume surge (current / avg of last 5 min)df['volume_surge'] = df['total_volume'] / df['volume_ma_5m']# Fill any NaNsdf = df.ffill().fillna(0)return df
Label Creation
Next, we need target labels indicating whether a given time is or will become a
liquidity-stress event. We define stress heuristically: if the spread is
extremely wide or the top-of-book size is extremely low, that suggests illiquidity.
Concretely, we take the 95th percentile of spread_bps and the 5th percentile
of total_volume as thresholds.
Any snapshot above the spread threshold or below the size threshold is marked as a stress event. Then we shift these labels forward by a prediction horizon (e.g., 10 minutes) so that the model is learning to predict a future stress event from current features. Rows at the very end (with no future data to look ahead) are dropped. In code form:
# src/label_creator.pyimport pandas as pdclass LabelCreator:def __init__(self, horizon_minutes=10):"""horizon_minutes: how far ahead to predict stress (default = 10 minutes)"""self.horizon_minutes = horizon_minutesdef create_labels(self, df: pd.DataFrame) -> pd.DataFrame:"""Create binary stress labels"""# Thresholdsspread_threshold = df['spread_bps'].quantile(0.95)volume_threshold = df['total_volume'].quantile(0.05)# Stress conditionsstress_events = ((df['spread_bps'] > spread_threshold) |(df['total_volume'] < volume_threshold)).astype(int)# Shift labels forward in timehorizon_rows = self.horizon_minutes * 60 # assuming 1s intervalsdf['stress_label'] = stress_events.shift(-horizon_rows)# Drop last rows where future labels don't existdf = df[:-horizon_rows].copy()stress_count = df['stress_label'].sum()total_count = len(df)return df
Applying this to a subset of our data yielded about 97,049 stress events out of 999,400 rows (~9.71%), with the remaining ~90.3% labeled normal. This class imbalance (≈10:1) means we must account for it during training.
Model Training
With features (X) and labels (y) ready, we train a binary classifier. For
simplicity, we used an XGBoost gradient boosting classifier. We handle the class
imbalance by setting scale_pos_weight = (#normal / #stress) so that the
minority class is up-weighted. We also evaluate via walk-forward (time-series)
cross-validation to respect the temporal ordering. In each fold, we train on
earlier data and test on the next segment, computing precision, recall, and F1.
We observed modest scores (e.g., average F1 ~0.36), reflecting the difficulty of
the problem. The final model is then retrained on all data.
Our Python class for modeling looks like this:
# src/model.pyimport numpy as npimport pandas as pdimport xgboost as xgbfrom sklearn.model_selection import TimeSeriesSplitfrom sklearn.metrics import precision_score, recall_score, f1_scoreclass LiquidityStressModel:def __init__(self):self.model = Noneself.feature_columns = Nonedef prepare_data(self, df, feature_cols):"""Prepare features and labels for training"""self.feature_columns = feature_colsX = df[feature_cols].valuesy = df['stress_label'].valuesreturn X, ydef walk_forward_validation(self, X, y, n_splits=5):"""Perform walk-forward validation for time series data"""tscv = TimeSeriesSplit(n_splits=n_splits)scores = []for fold, (train_idx, test_idx) in enumerate(tscv.split(X)):print(f"\nFold {fold+1}/{n_splits}")X_train, X_test = X[train_idx], X[test_idx]y_train, y_test = y[train_idx], y[test_idx]# Compute scale_pos_weight for imbalanced classesneg = np.sum(y_train == 0)pos = np.sum(y_train == 1)scale_pos_weight = neg / pos if pos > 0 else 1.0# Train XGBoost modelmodel = xgb.XGBClassifier(n_estimators=500,max_depth=6,learning_rate=0.05,scale_pos_weight=scale_pos_weight,eval_metric='logloss',random_state=42)model.fit(X_train, y_train)preds = model.predict(X_test)precision = precision_score(y_test, preds, zero_division=0)recall = recall_score(y_test, preds, zero_division=0)f1 = f1_score(y_test, preds, zero_division=0)print(f"Precision: {precision:.3f}, Recall: {recall:.3f}, F1: {f1:.3f}")scores.append({'precision': precision, 'recall': recall, 'f1': f1})# Average metrics across foldsavg_precision = np.mean([s['precision'] for s in scores])avg_recall = np.mean([s['recall'] for s in scores])avg_f1 = np.mean([s['f1'] for s in scores])print(f"\nAverage Performance → Precision: {avg_precision:.3f}, Recall: {avg_recall:.3f}, F1: {avg_f1:.3f}")return scoresdef predict_stress(self, current_features):"""Predict stress for a single feature row or 1D array."""if self.model is None:raise ValueError("Model not trained yet!")arr = np.asarray(current_features)if arr.ndim == 1:arr = arr.reshape(1, -1)prob = float(self.model.predict_proba(arr)[0, 1])pred = bool(self.model.predict(arr)[0])return {"stress_probability": prob,"stress_prediction": pred,"alert_level": ("HIGH" if prob > 0.75 else "MEDIUM" if prob > 0.5 else "LOW"),}def save_model(self, path: str):"""Persist model to disk."""import os# MODEL_PATH should include path and file, as in model_registry/fx_liquidity_model.jsonos.makedirs(os.path.dirname(path), exist_ok=True)self.model.save_model(path)def load_model(self, path: str):"""Load previously saved model."""import xgboost as xgbself.model = xgb.XGBClassifier()self.model.load_model(path)def train_final_model(self, X, y):"""Train final model on all data"""print("Training final model on full dataset...")neg = np.sum(y == 0)pos = np.sum(y == 1)scale_pos_weight = neg / pos if pos > 0 else 1.0self.model = xgb.XGBClassifier(n_estimators=500,max_depth=6,learning_rate=0.05,scale_pos_weight=scale_pos_weight,eval_metric='logloss',random_state=42)self.model.fit(X, y)print("Final model trained!")# Display feature importanceimportance = self.model.feature_importances_feature_importance = pd.DataFrame({'feature': self.feature_columns,'importance': importance}).sort_values('importance', ascending=False)print("\nTop 5 features by importance:")print(feature_importance.head())
To use the function above, we are required to run the following file:
#steps/train.pyfrom src.data_loader import DataLoaderfrom src.feature_extraction import FeatureEngineerfrom src.label_creator import LabelCreatorfrom src.model import LiquidityStressModelfrom config.settings import PARQUET_OUTPUT_PATH, MODEL_PATHif __name__ == "__main__":file = PARQUET_OUTPUT_PATHfeature_cols = ["spread_bps", "spread_ma_5m", "spread_std_5m","total_volume", "volume_ma_5m","imbalance", "imbalance_ma_5m","price_change_1m", "price_change_5m","volatility_5m", "spread_trend", "volume_surge",]loader = DataLoader(file)df = loader.load_data()engineer = FeatureEngineer()df = engineer.create_features(df)labeler = LabelCreator(horizon_minutes=10)df = labeler.create_labels(df)label_counts = df['stress_label'].value_counts()total = len(df)print("\nLabel distribution:")print(label_counts)model = LiquidityStressModel()X, y = model.prepare_data(df, feature_cols)scores = model.walk_forward_validation(X, y, n_splits=5)model.train_final_model(X, y)latest_features = X[-1]prediction = model.predict_stress(latest_features)model.save_model(MODEL_PATH)
Here’s a summary of the results:
| Metric | Score ||-----------:|:-----:|| Precision | 0.331 || Recall | 0.451 || F1 | 0.363 |
Inference
In a live setting, we would load the saved model and process new order-book data
with the same feature calculations. For each new row, the model predicts a
stress probability and a binary label. For example, we classify the output
probability into alert levels (LOW, MEDIUM, HIGH) based on thresholds
(e.g., above 0.75 = HIGH).
In practice, because liquidity crises are rare, one would require several consecutive HIGH alerts before triggering an actual alarm.
The core prediction logic is encapsulated in our model class:
def predict_stress(self, current_features):"""Predict stress for a single row of features"""if self.model is None:raise ValueError("Model not trained yet!")if len(current_features.shape) == 1:current_features = current_features.reshape(1, -1)probability = self.model.predict_proba(current_features)[0, 1]prediction = self.model.predict(current_features)[0]return {'stress_probability': probability,'stress_prediction': bool(prediction),'alert_level': ('HIGH' if probability > 0.75 else'MEDIUM' if probability > 0.5 else'LOW')}
At runtime, one would take new real-time features (e.g., from the latest QuestDB
snapshot export), call model.predict_stress(), and then emit alerts or log
them to a monitoring system. In this example
we are just querying the 10,000 most
recent rows from the live demo dataset, and outputting the results into a file.
# steps/inference.pyfrom io import BytesIOfrom pathlib import Pathimport requestsimport pandas as pdfrom src.feature_extraction import FeatureEngineerfrom src.model import LiquidityStressModelfrom config.settings import QUESTDB_PATH, TABLE_NAME, MODEL_PATH, INFERENCE_OUTPUT# 1) Fetch the latest 10K rows (CSV via /exp), same columns as in trainingEXP_URL = f"{QUESTDB_PATH.rstrip('/')}/exp"sql = f"""SELECTtimestamp,symbol,asks[1][1] - bids[1][1] AS spread,(asks[1][1] - bids[1][1]) / ((asks[1][1] + bids[1][1]) / 2) * 10000 AS spread_bps,bids[2][1] + asks[2][1] AS total_volume,(bids[2][1] - asks[2][1]) / (bids[2][1] + asks[2][1]) AS imbalance,bids[1][1] AS bid_price,asks[1][1] AS ask_price,(bids[1][1] + asks[1][1]) / 2 AS mid_priceFROM {TABLE_NAME}WHERE symbol = 'EURUSD'LIMIT -10000"""r = requests.get(EXP_URL, params={"fmt": "csv", "query": sql})r.raise_for_status()df = pd.read_csv(BytesIO(r.content))# 2) Featuresdf["timestamp"] = pd.to_datetime(df["timestamp"])engineer = FeatureEngineer()df = engineer.create_features(df)feature_cols = ["spread_bps", "spread_ma_5m", "spread_std_5m","total_volume", "volume_ma_5m","imbalance", "imbalance_ma_5m","price_change_1m", "price_change_5m","volatility_5m", "spread_trend", "volume_surge",]# 3) Model + predictmodel = LiquidityStressModel()model.load_model(MODEL_PATH)model.feature_columns = feature_colsdef align_features(df, cols):aligned = {c: (df[c] if c in df.columns else 0) for c in cols}return pd.DataFrame(aligned)[cols]X = align_features(df, feature_cols).valuespreds = [model.predict_stress(row) for row in X]pred_df = pd.DataFrame(preds)# 4) Save resultsPath(INFERENCE_OUTPUT).parent.mkdir(parents=True, exist_ok=True)result = pd.concat([df.reset_index(drop=True), pred_df], axis=1)result.to_csv(INFERENCE_OUTPUT, index=False)print(f"Wrote inference results → {INFERENCE_OUTPUT}")# Display sampleprint(result[["timestamp", "stress_probability", "alert_level"]])
This is a sample of the inference results:
Wrote inference results → results/inference_latest.csvtimestamp stress_probability alert_level2025-11-05 13:15:57.821179+00:00 0.516641 MEDIUM2025-11-05 13:15:57.846104+00:00 0.595590 MEDIUM2025-11-05 13:15:57.873982+00:00 0.102265 LOW2025-11-05 13:15:57.884185+00:00 0.102265 LOW2025-11-05 13:15:57.908093+00:00 0.102265 LOW2025-11-05 13:15:57.912651+00:00 0.355158 LOW2025-11-05 13:15:57.917040+00:00 0.312996 LOW2025-11-05 13:15:57.933019+00:00 0.595590 MEDIUM2025-11-05 13:15:57.942532+00:00 0.265077 LOW2025-11-05 13:15:57.947014+00:00 0.102265 LOW2025-11-05 13:15:57.963684+00:00 0.355158 LOW2025-11-05 13:15:57.976234+00:00 0.709796 MEDIUM2025-11-05 13:15:57.999977+00:00 0.088707 LOW2025-11-05 13:15:58.039618+00:00 0.598207 MEDIUM2025-11-05 13:15:58.040689+00:00 0.753148 HIGH2025-11-05 13:15:58.044313+00:00 0.172697 LOW2025-11-05 13:15:58.046626+00:00 0.172697 LOW2025-11-05 13:15:58.051901+00:00 0.604241 MEDIUM
Implementation Pipeline
Putting it all together, our pipeline looks like this:
-
Ingestion: Stream FX order-book arrays into QuestDB. QuestDB can ingest these at nanosecond timestamp precision. For our demo, we simply exported historical data from QuestDB to CSV/Parquet and loaded it for offline processing.
-
Feature calculation: Use SQL window and array functions in QuestDB for live metrics. One could, for instance, create a view or use a
WINDOWclause to compute moving averages, or use expressions like(ask_prices[1] - bid_prices[1])for spread. Our prototype instead did feature calc in Python, but the same logic could be implemented in SQL on streaming data, ensuring that all time-based aggregations happen in the database. -
Data export: Export historical features to Parquet for ML training. After generating features and labels in QuestDB (or in Python), we store them in Parquet files. QuestDB’s native Parquet export makes this trivial. The Parquet files can then be loaded into any ML toolkit (we used Pandas/XGBoost).
-
Prediction demo: Infer on live features. We can run the model as a service, receive the new data rows by streaming from QuestDB, and apply the feature logic plus
model.predict_stress. This demonstrates the end-to-end flow.
Throughout this pipeline, QuestDB sits at the center of data movement. It handles the high-frequency ingestion, while handing off data via Parquet to the Python ML components. No custom data preprocessing was needed outside of what's shown.
Results and Validation
On the modeling side, as noted, performance was modest: Precision ≈0.33, Recall ≈0.45, F1 ≈0.36. In fact, well-tuned research models often only reach F1 in the 0.6–0.7 range for this problem, so our ~0.36 underscores how noisy the data is, and also improving the approach used can give us better returns. The purpose of the demo is to show the capability of QuestDB that can seamlessly feed such an analysis.
The key takeaway is that QuestDB’s capabilities (high-throughput ingest, time-series SQL, and Parquet integration) let data scientists iterate quickly on feature and model ideas with tick-level financial data.
Conclusion
This prototype demonstrates a full research loop: ingesting dense FX data, computing real-time liquidity metrics, labeling stress events, and training a predictive model. Even with simple heuristics, it shows how to move from raw ticks to stress probabilities in a few lines of SQL and Python.
While the model is intentionally simple, the pipeline highlights how fast, structured data access can accelerate experimentation. QuestDB’s array columns and time-series SQL let us work directly with order-book snapshots without custom preprocessing. The same pattern, live data in, features out, can be reused for risk monitoring, trade execution, or market microstructure research.
Full code available at our blog examples demo repository.