LogoKnex Dart
knex-dartkartikey321/knex-dart 999999

Streaming

Stream large result sets row-by-row with streamQuery() for memory-efficient processing

Streaming#

For large result sets, loading all rows into memory at once with db.select() may be impractical. Use streamQuery() (or the driver's .stream() method) to process rows one at a time as they arrive from the database.

Basic Usage#

final stream = db.streamQuery(
  db('events').where('processed', '=', false).orderBy('id'),
);

await for (final row in stream) {
  await processEvent(row);
}

Each row is a Map<String, dynamic> — the same type returned by db.select().

Why Stream Instead of Select?#

db.select()streamQuery()
Loads all rows into a ListYields rows one at a time
Simple for small result setsMemory-efficient for large sets
Easier to use with await Requires await for or Stream APIs

For a table with millions of rows, db.select() allocates a large list up-front. streamQuery() holds only the current row in memory.

Supported Drivers#

DriverStreaming support
PostgreSQLstreamQuery() via cursor
MySQLstreamQuery() via row streaming
SQLitestreamQuery() via statement cursor
DuckDBstreamQuery() via fetchAllStream()
MSSQLNot supported
BigQueryNot supported
SnowflakeNot supported
TursoNot supported
D1Not supported

PostgreSQL Example#

import 'package:knex_dart_postgres/knex_dart_postgres.dart';

final db = await KnexPostgres.connect(
  host: 'localhost',
  database: 'analytics',
  username: 'user',
  password: 'pass',
);

final stream = db.streamQuery(
  db('events')
    .select(['id', 'type', 'payload', 'created_at'])
    .where('created_at', '>', '2024-01-01')
    .orderBy('id'),
);

int count = 0;
await for (final row in stream) {
  count++;
  if (count % 10000 == 0) print('Processed $count rows...');
  await handleEvent(row);
}

print('Done — $count events processed');
await db.destroy();

SQLite Example#

import 'package:knex_dart_sqlite/knex_dart_sqlite.dart';

final db = await KnexSQLite.connect(filename: 'app.db');

final stream = db.streamQuery(
  db('logs').orderBy('timestamp'),
);

await for (final row in stream) {
  print('[${row['timestamp']}] ${row['message']}');
}

await db.destroy();

DuckDB Example#

DuckDB is particularly well-suited for streaming large analytical queries:

import 'package:knex_dart_duckdb/knex_dart_duckdb.dart';

final db = await KnexDuckDB.file('/data/warehouse.db');

final stream = db.streamQuery(
  db.queryBuilder()
    .from('sales')
    .select(['region', 'product_id'])
    .sum('amount as total')
    .groupBy(['region', 'product_id'])
    .orderBy('total', 'desc'),
);

await for (final row in stream) {
  await writeSummary(row);
}

await db.close();

Error Handling#

try {
  await for (final row in db.streamQuery(db('events').orderBy('id'))) {
    await process(row);
  }
} catch (e) {
  print('Stream error: $e');
}

Any database error during streaming will throw inside the await for loop and can be caught normally.

Streaming Inside a Transaction#

The driver-level .stream() method works inside transactions:

await db.trx((trx) async {
  final stream = trx.stream(
    trx('jobs').where('status', '=', 'pending').orderBy('id'),
  );

  await for (final row in stream) {
    await trx.update(
      trx('jobs')
        .where('id', '=', row['id'])
        .update({'status': 'processing'}),
    );
    await doWork(row);
  }
});

Next Steps#

  • Connection Pooling — Configure pool size for high-throughput streaming workloads
  • Transactions — Streaming inside atomic operations
  • DuckDB — OLAP driver optimized for large analytical result sets