(a) Streaming aggregation. Flink reads from the Kafka "clicks" topic, groups by (campaign_id, ad_id, geo, minute), and counts clicks per window. Exactly-once semantics via Flink checkpoints + Kafka transactional producer.
// Flink pseudocode: 1-minute tumbling window aggregation
clickStream
.filter(click -> click.fraudScore < THRESHOLD) // post-fraud-filter
.keyBy(click -> (click.campaignId, click.adId, click.geo))
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.minutes(5))
.sideOutputLateData(deadLetterTag)
.aggregate(new ClickCounter()) // {key, minute, count}
.addSink(clickHouseSink); // write aggregate row
Each 1-minute window produces one row per unique (campaign, ad, geo) combination. At 10B clicks/day across ~1M unique key combinations, ClickHouse ingests ~1M rows/day of aggregates — trivial for ClickHouse.
(b) Fraud detection. An inline ML scorer evaluates each click before it enters the aggregation window:
- Bot fingerprint: headless browser detection (missing JS APIs), datacenter IP ranges, impossibly fast click timing (human can't click 50 ads in 1 second).
- Click farm: same device_id clicking many different ads; geographic anomalies (IP in country A, timezone in country B).
- Click injection / SDK spoofing: mobile attribution fraud — fake click events generated by malware to steal credit for organic installs.
Score > threshold: discard the click, log it to a "fraud" Kafka topic for analysis. Score < threshold: pass to aggregation. Model updated weekly from labeled fraud data. False-positive rate must stay < 0.1% — dropping real clicks costs advertisers money and generates disputes.
Fraud scoring pipeline detail:
// Fraud scoring pseudocode — runs per click in < 5 ms
features = {
is_datacenter_ip: ipLookup(click.ip), // known datacenter ranges
is_headless: click.user_agent lacks WebGL/Canvas APIs,
click_velocity: countClicksLast60s(click.device_id),
device_ad_diversity: distinctAdsLast1h(click.device_id),
geo_mismatch: click.ip_country != click.tz_country,
sdk_signature: validateSDKSignature(click.sig)
}
score = model.predict(features) // 0.0 (legit) to 1.0 (fraud)
if score > 0.85: DISCARD + log to fraud topic
if score > 0.60: FLAG for manual review
else: PASS to aggregation
The model runs in a sidecar container co-located with the Flink task manager. Feature lookups (IP ranges, device history) use a local Redis cache refreshed every minute. Inference latency: < 5 ms per click, well within the streaming budget.
Fraud transparency report. All discarded clicks are logged to a separate "fraud" Kafka topic with the full click payload + fraud score + feature values. This data feeds a daily transparency report per advertiser: "We filtered X clicks as fraudulent for your campaign. Breakdown: 40% bot, 35% click farm, 25% SDK spoof." Advertisers can appeal — if a pattern of false positives is detected, the model threshold is adjusted for that advertiser's traffic profile.
(c) Late-arriving events. Clicks can arrive late due to network delays, mobile app batching, or ad server retries. Flink uses watermark-based windowing:
- Events up to 5 minutes late: Flink reprocesses the window (allowed lateness). The ClickHouse row is updated via a ReplacingMergeTree — last version wins.
- Events beyond 5 minutes: routed to a dead-letter topic. A nightly batch job reconciles dead-letter events against ClickHouse aggregates and applies corrections.
(d) Billing pipeline. An hourly batch job reads ClickHouse aggregates for the past hour, computes per-advertiser spend (click_count x CPC bid), and writes billing rows to Postgres. Monthly invoice generation reads Postgres. Separation of concerns: ClickHouse for fast analytics, Postgres for transactional billing integrity.
Billing reconciliation detail:
- Hourly batch reads ClickHouse:
SELECT campaign_id, SUM(click_count) FROM agg_clicks WHERE minute BETWEEN :start AND :end GROUP BY campaign_id.
- Join with campaign metadata (CPC bid, advertiser_id, budget cap) from a campaign config DB.
- Compute spend:
click_count * cpc_bid. Apply budget cap — if spend exceeds daily budget, mark campaign as paused.
- Write billing row to Postgres:
{advertiser_id, hour, campaign_id, clicks, spend, created_at}.
- Monthly invoice job aggregates Postgres billing rows:
SELECT advertiser_id, SUM(spend) WHERE period = '2026-04' GROUP BY advertiser_id.
ClickHouse schema design. The aggregation table uses ReplacingMergeTree to handle late-arriving window updates:
CREATE TABLE agg_clicks (
campaign_id UInt64,
ad_id UInt64,
geo LowCardinality(String),
minute DateTime,
click_count UInt64,
fraud_count UInt64,
version UInt64 -- Flink checkpoint ID for dedup
) ENGINE = ReplacingMergeTree(version)
ORDER BY (campaign_id, ad_id, geo, minute)
PARTITION BY toYYYYMM(minute);
ReplacingMergeTree keeps only the latest version per unique key. When Flink reprocesses a window due to late events, it writes a new row with a higher version — ClickHouse merges and keeps the latest. No double-counting.
Kafka topic design. The clicks topic is partitioned by campaign_id to ensure all clicks for a campaign land on the same partition — preserving ordering per campaign. With ~500K clicks/sec peak, we need ~100 partitions across a 10-broker cluster (each broker handles ~50K msg/sec). Retention: 7 days, giving ample replay window for Flink recovery. Compression: LZ4 for throughput over ratio.
Exactly-once end-to-end. The chain: Kafka consumer offsets committed atomically with Flink checkpoints (Flink's Kafka connector handles this). Flink checkpoints to S3 every 30 seconds. ClickHouse sink uses ReplacingMergeTree with Flink checkpoint ID as the version column — replayed writes are idempotent. The only gap: if ClickHouse acknowledges a write but Flink crashes before checkpointing, the same window is re-sent — but ReplacingMergeTree deduplicates it. True exactly-once from Kafka to ClickHouse.
Click Lifecycle — Ingest to BillingMermaid
sequenceDiagram
participant AS as Ad Server
participant K as Kafka
participant FF as Fraud Filter
participant FL as Flink
participant CH as ClickHouse
participant PG as Postgres (billing)
participant D as Dashboard
AS->>K: batch of click events
K->>FF: consume from topic
FF->>FF: ML score each click
FF-->>K: discard fraud (score > threshold)
FF->>FL: pass clean clicks
FL->>FL: group by (campaign, ad, geo, minute)
FL->>CH: write aggregate row per window
D->>CH: SELECT campaign_id, hour, SUM(clicks) WHERE last 24h
CH-->>D: results in < 1 sec
Note over PG: Hourly billing batch
CH->>PG: read hour aggregates, compute spend
PG->>PG: write billing rows, generate invoice
Interview answer
"Clicks flow from ad servers into Kafka. An inline ML fraud scorer filters bots, click farms, and SDK spoofing — score above threshold is discarded. Clean clicks enter Flink, which aggregates by (campaign, ad, geo, minute) in tumbling windows with exactly-once checkpoints. Aggregates write to ClickHouse for sub-second OLAP queries. Late events up to 5 min are handled via allowed lateness; beyond 5 min go to a dead-letter for batch reconciliation. Hourly billing batch reads ClickHouse, computes advertiser spend, writes to Postgres for invoicing. Raw clicks archived to S3 for audit."