11import { Readable , Transform } from 'node:stream'
22import { promisify } from 'node:util'
3+ import { DBPatch } from '@naturalcycles/db-lib'
34import {
45 BaseCommonDB ,
56 CommonDB ,
@@ -15,6 +16,7 @@ import {
1516 _mapKeys ,
1617 _mapValues ,
1718 _Memo ,
19+ _omit ,
1820 AnyObjectWithId ,
1921 CommonLogger ,
2022 commonLoggerPrefix ,
@@ -34,7 +36,7 @@ import {
3436 TypeCast ,
3537} from 'mysql'
3638import * as mysql from 'mysql'
37- import { dbQueryToSQLDelete , dbQueryToSQLSelect , insertSQL } from './query.util'
39+ import { dbQueryToSQLDelete , dbQueryToSQLSelect , dbQueryToSQLUpdate , insertSQL } from './query.util'
3840import {
3941 jsonSchemaToMySQLDDL ,
4042 mapNameFromMySQL ,
@@ -306,7 +308,7 @@ export class MysqlDB extends BaseCommonDB implements CommonDB {
306308
307309 const verb = opt . saveMethod === 'insert' ? 'INSERT' : 'REPLACE'
308310
309- if ( opt . assignGeneratedIds ) {
311+ if ( opt . assignGeneratedIds && opt . saveMethod !== 'update' ) {
310312 // Insert rows one-by-one, to get their auto-generated id
311313
312314 let i = - 1
@@ -323,10 +325,18 @@ export class MysqlDB extends BaseCommonDB implements CommonDB {
323325 }
324326
325327 // inserts are split into multiple sentenses to respect the max_packet_size (1Mb usually)
326- const sqls = insertSQL ( table , rows , verb , this . cfg . logger )
328+ if ( opt . saveMethod === 'update' ) {
329+ for await ( const row of rows ) {
330+ _assert ( row . id , 'Cannot update without providing an id' )
331+ const query = new DBQuery ( table ) . filterEq ( 'id' , row . id )
332+ await this . updateByQuery ( query , _omit ( row , [ 'id' ] ) )
333+ }
334+ } else {
335+ const sqls = insertSQL ( table , rows , verb , this . cfg . logger )
327336
328- for await ( const sql of sqls ) {
329- await this . runSQL ( { sql } )
337+ for await ( const sql of sqls ) {
338+ await this . runSQL ( { sql } )
339+ }
330340 }
331341 }
332342
@@ -393,4 +403,15 @@ export class MysqlDB extends BaseCommonDB implements CommonDB {
393403
394404 return mysqlTableStatsToJsonSchemaField < ROW > ( table , stats , this . cfg . logger )
395405 }
406+
407+ override async updateByQuery < ROW extends ObjectWithId > (
408+ q : DBQuery < ROW > ,
409+ patch : DBPatch < ROW > ,
410+ ) : Promise < number > {
411+ const sql = dbQueryToSQLUpdate ( q , patch )
412+ if ( ! sql ) return 0
413+
414+ const { affectedRows } = await this . runSQL < OkPacket > ( { sql } )
415+ return affectedRows
416+ }
396417}
0 commit comments