/ Changes On Branch upsert-opt2
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Changes In Branch upsert-opt2 Excluding Merge-Ins

This is equivalent to a diff from d8eb9f8d9b to 131ed95e14

2018-04-20
17:02
Avoid unnecessary cursor seeking when performing an UPSERT. (check-in: 693a3dcbdd user: drh tags: trunk)
16:49
Improved VDBE comment on UPSERT code. (Closed-Leaf check-in: 131ed95e14 user: drh tags: upsert-opt2)
16:27
Minor simplification of the previous checkin. (check-in: d1906689ab user: drh tags: upsert-opt2)
15:56
Avoid unnecessary cursor seeks during upsert processing. (check-in: 7c4b6d5475 user: drh tags: upsert-opt2)
15:34
Add test cases for UPSERT. And a fix for a "REPLACE INTO ... ON CONFLICT" statement where the new row conflicts with both the IPK and the ON CONFLICT indexes. (check-in: d8eb9f8d9b user: dan tags: trunk)
13:18
Enhance UPSERT so that the UPDATE uses the same set of cursors as the INSERT. (check-in: c37f39d18d user: drh tags: trunk)

Changes to src/update.c.

208
209
210
211
212
213
214

215
216
217
218
219
220
221
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222







+







  for(nIdx=0, pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext, nIdx++){
    if( pPk==pIdx ){
      iDataCur = pParse->nTab;
    }
    pParse->nTab++;
  }
  if( pUpsert ){
    /* On an UPSERT, reuse the same cursors already opened by INSERT */
    iDataCur = pUpsert->iDataCur;
    iIdxCur = pUpsert->iIdxCur;
    pParse->nTab = iBaseCur;
  }
  pTabList->a[0].iCursor = iDataCur;

  /* Allocate space for aXRef[], aRegIdx[], and aToOpen[].  
385
386
387
388
389
390
391




392

393
394
395
396

397
398
399
400
401
402
403
404
405
406
407
408
409

410

411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449



















































450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469

470
471
472
473
474
475
476
477
478

479
480
481
482




483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527













































528
529
530
531
532
533
534
386
387
388
389
390
391
392
393
394
395
396

397
398
399
400

401
402
403
404
405
406
407
408
409
410
411
412
413
414
415

416







































417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486

487
488
489
490
491
492
493
494
495
496
497




498
499
500
501













































502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553







+
+
+
+
-
+



-
+













+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+



















-
+









+
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







  if( IsVirtual(pTab) ){
    updateVirtualTable(pParse, pTabList, pTab, pChanges, pRowidExpr, aXRef,
                       pWhere, onError);
    goto update_cleanup;
  }
#endif

  /* Jump to labelBreak to abandon further processing of this UPDATE */
  labelBreak = sqlite3VdbeMakeLabel(v);

  /* Not an UPSERT.  Normal processing.  Begin by
  /* Initialize the count of updated rows */
  ** initialize the count of updated rows */
  if( (db->flags&SQLITE_CountRows)!=0
   && !pParse->pTriggerTab
   && !pParse->nested
   && !pUpsert
   && pUpsert==0
  ){
    regRowCount = ++pParse->nMem;
    sqlite3VdbeAddOp2(v, OP_Integer, 0, regRowCount);
  }

  if( HasRowid(pTab) ){
    sqlite3VdbeAddOp3(v, OP_Null, 0, regRowSet, regOldRowid);
  }else{
    assert( pPk!=0 );
    nPk = pPk->nKeyCol;
    iPk = pParse->nMem+1;
    pParse->nMem += nPk;
    regKey = ++pParse->nMem;
    if( pUpsert==0 ){
    iEph = pParse->nTab++;
      iEph = pParse->nTab++;

    sqlite3VdbeAddOp3(v, OP_Null, 0, iPk, iPk+nPk-1);
    addrOpen = sqlite3VdbeAddOp2(v, OP_OpenEphemeral, iEph, nPk);
    sqlite3VdbeSetP4KeyInfo(pParse, pPk);
  }

  /* Begin the database scan. 
  **
  ** Do not consider a single-pass strategy for a multi-row update if
  ** there are any triggers or foreign keys to process, or rows may
  ** be deleted as a result of REPLACE conflict handling. Any of these
  ** things might disturb a cursor being used to scan through the table
  ** or index, causing a single-pass approach to malfunction.  */
  flags = WHERE_ONEPASS_DESIRED|WHERE_SEEK_UNIQ_TABLE;
  if( !pParse->nested && !pTrigger && !hasFK && !chngKey && !bReplace ){
    flags |= WHERE_ONEPASS_MULTIROW;
  }
  pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0, flags, iIdxCur);
  if( pWInfo==0 ) goto update_cleanup;

  /* A one-pass strategy that might update more than one row may not
  ** be used if any column of the index used for the scan is being
  ** updated. Otherwise, if there is an index on "b", statements like
  ** the following could create an infinite loop:
  **
  **   UPDATE t1 SET b=b+1 WHERE b>?
  **
  ** Fall back to ONEPASS_OFF if where.c has selected a ONEPASS_MULTI
  ** strategy that uses an index for which one or more columns are being
  ** updated.  */
  eOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
  if( eOnePass==ONEPASS_MULTI ){
    int iCur = aiCurOnePass[1];
    if( iCur>=0 && iCur!=iDataCur && aToOpen[iCur-iBaseCur] ){
      eOnePass = ONEPASS_OFF;
    }
    assert( iCur!=iDataCur || !HasRowid(pTab) );
  }
  
        sqlite3VdbeAddOp3(v, OP_Null, 0, iPk, iPk+nPk-1);
      addrOpen = sqlite3VdbeAddOp2(v, OP_OpenEphemeral, iEph, nPk);
      sqlite3VdbeSetP4KeyInfo(pParse, pPk);
    }
  }
  
  if( pUpsert ){
    /* If this is an UPSERT, then all cursors have already been opened by
    ** the outer INSERT and the data cursor should be pointing at the row
    ** that is to be updated.  So bypass the code that searches for the
    ** row(s) to be updated.
    */
    pWInfo = 0;
    eOnePass = ONEPASS_SINGLE;
    labelContinue = labelBreak;
    sqlite3ExprIfFalse(pParse, pWhere, labelBreak, SQLITE_JUMPIFNULL);
  }else{
    /* Begin the database scan. 
    **
    ** Do not consider a single-pass strategy for a multi-row update if
    ** there are any triggers or foreign keys to process, or rows may
    ** be deleted as a result of REPLACE conflict handling. Any of these
    ** things might disturb a cursor being used to scan through the table
    ** or index, causing a single-pass approach to malfunction.  */
    flags = WHERE_ONEPASS_DESIRED|WHERE_SEEK_UNIQ_TABLE;
    if( !pParse->nested && !pTrigger && !hasFK && !chngKey && !bReplace ){
      flags |= WHERE_ONEPASS_MULTIROW;
    }
    pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0, flags, iIdxCur);
    if( pWInfo==0 ) goto update_cleanup;
  
    /* A one-pass strategy that might update more than one row may not
    ** be used if any column of the index used for the scan is being
    ** updated. Otherwise, if there is an index on "b", statements like
    ** the following could create an infinite loop:
    **
    **   UPDATE t1 SET b=b+1 WHERE b>?
    **
    ** Fall back to ONEPASS_OFF if where.c has selected a ONEPASS_MULTI
    ** strategy that uses an index for which one or more columns are being
    ** updated.  */
    eOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
    if( eOnePass==ONEPASS_MULTI ){
      int iCur = aiCurOnePass[1];
      if( iCur>=0 && iCur!=iDataCur && aToOpen[iCur-iBaseCur] ){
        eOnePass = ONEPASS_OFF;
      }
      assert( iCur!=iDataCur || !HasRowid(pTab) );
    }
  }

  if( HasRowid(pTab) ){
    /* Read the rowid of the current row of the WHERE scan. In ONEPASS_OFF
    ** mode, write the rowid into the FIFO. In either of the one-pass modes,
    ** leave it in register regOldRowid.  */
    sqlite3VdbeAddOp2(v, OP_Rowid, iDataCur, regOldRowid);
    if( eOnePass==ONEPASS_OFF ){
      sqlite3VdbeAddOp2(v, OP_RowSetAdd, regRowSet, regOldRowid);
    }
  }else{
    /* Read the PK of the current row into an array of registers. In
    ** ONEPASS_OFF mode, serialize the array into a record and store it in
    ** the ephemeral table. Or, in ONEPASS_SINGLE or MULTI mode, change
    ** the OP_OpenEphemeral instruction to a Noop (the ephemeral table 
    ** is not required) and leave the PK fields in the array of registers.  */
    for(i=0; i<nPk; i++){
      assert( pPk->aiColumn[i]>=0 );
      sqlite3ExprCodeGetColumnOfTable(v, pTab, iDataCur,pPk->aiColumn[i],iPk+i);
    }
    if( eOnePass ){
      sqlite3VdbeChangeToNoop(v, addrOpen);
      if( addrOpen ) sqlite3VdbeChangeToNoop(v, addrOpen);
      nKey = nPk;
      regKey = iPk;
    }else{
      sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, regKey,
                        sqlite3IndexAffinityStr(db, pPk), nPk);
      sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iEph, regKey, iPk, nPk);
    }
  }

  if( pUpsert==0 ){
  if( eOnePass!=ONEPASS_MULTI ){
    sqlite3WhereEnd(pWInfo);
  }

    if( eOnePass!=ONEPASS_MULTI ){
      sqlite3WhereEnd(pWInfo);
    }
  
  labelBreak = sqlite3VdbeMakeLabel(v);
  if( !isView && pUpsert==0 ){
    int addrOnce = 0;

    /* Open every index that needs updating. */
    if( eOnePass!=ONEPASS_OFF ){
      if( aiCurOnePass[0]>=0 ) aToOpen[aiCurOnePass[0]-iBaseCur] = 0;
      if( aiCurOnePass[1]>=0 ) aToOpen[aiCurOnePass[1]-iBaseCur] = 0;
    }

    if( eOnePass==ONEPASS_MULTI && (nIdx-(aiCurOnePass[1]>=0))>0 ){
      addrOnce = sqlite3VdbeAddOp0(v, OP_Once); VdbeCoverage(v);
    }
    sqlite3OpenTableAndIndices(pParse, pTab, OP_OpenWrite, 0, iBaseCur, aToOpen,
                               0, 0);
    if( addrOnce ) sqlite3VdbeJumpHere(v, addrOnce);
  }

  /* Top of the update loop */
  if( eOnePass!=ONEPASS_OFF ){
    if( !isView && aiCurOnePass[0]!=iDataCur && aiCurOnePass[1]!=iDataCur ){
      assert( pPk );
      sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, labelBreak, regKey, nKey);
      VdbeCoverageNeverTaken(v);
    }
    if( eOnePass==ONEPASS_SINGLE ){
      labelContinue = labelBreak;
    }else{
      labelContinue = sqlite3VdbeMakeLabel(v);
    }
    sqlite3VdbeAddOp2(v, OP_IsNull, pPk ? regKey : regOldRowid, labelBreak);
    VdbeCoverageIf(v, pPk==0);
    VdbeCoverageIf(v, pPk!=0);
  }else if( pPk ){
    labelContinue = sqlite3VdbeMakeLabel(v);
    sqlite3VdbeAddOp2(v, OP_Rewind, iEph, labelBreak); VdbeCoverage(v);
    addrTop = sqlite3VdbeAddOp2(v, OP_RowData, iEph, regKey);
    sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, labelContinue, regKey, 0);
    VdbeCoverage(v);
  }else{
    labelContinue = sqlite3VdbeAddOp3(v, OP_RowSetRead, regRowSet, labelBreak,
                             regOldRowid);
    VdbeCoverage(v);
    sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur, labelContinue, regOldRowid);
    VdbeCoverage(v);
    if( !isView ){
      int addrOnce = 0;
  
      /* Open every index that needs updating. */
      if( eOnePass!=ONEPASS_OFF ){
        if( aiCurOnePass[0]>=0 ) aToOpen[aiCurOnePass[0]-iBaseCur] = 0;
        if( aiCurOnePass[1]>=0 ) aToOpen[aiCurOnePass[1]-iBaseCur] = 0;
      }
  
      if( eOnePass==ONEPASS_MULTI && (nIdx-(aiCurOnePass[1]>=0))>0 ){
        addrOnce = sqlite3VdbeAddOp0(v, OP_Once); VdbeCoverage(v);
      }
      sqlite3OpenTableAndIndices(pParse, pTab, OP_OpenWrite, 0, iBaseCur,
                                 aToOpen, 0, 0);
      if( addrOnce ) sqlite3VdbeJumpHere(v, addrOnce);
    }
  
    /* Top of the update loop */
    if( eOnePass!=ONEPASS_OFF ){
      if( !isView && aiCurOnePass[0]!=iDataCur && aiCurOnePass[1]!=iDataCur ){
        assert( pPk );
        sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, labelBreak, regKey,nKey);
        VdbeCoverageNeverTaken(v);
      }
      if( eOnePass==ONEPASS_SINGLE ){
        labelContinue = labelBreak;
      }else{
        labelContinue = sqlite3VdbeMakeLabel(v);
      }
      sqlite3VdbeAddOp2(v, OP_IsNull, pPk ? regKey : regOldRowid, labelBreak);
      VdbeCoverageIf(v, pPk==0);
      VdbeCoverageIf(v, pPk!=0);
    }else if( pPk ){
      labelContinue = sqlite3VdbeMakeLabel(v);
      sqlite3VdbeAddOp2(v, OP_Rewind, iEph, labelBreak); VdbeCoverage(v);
      addrTop = sqlite3VdbeAddOp2(v, OP_RowData, iEph, regKey);
      sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, labelContinue, regKey, 0);
      VdbeCoverage(v);
    }else{
      labelContinue = sqlite3VdbeAddOp3(v, OP_RowSetRead, regRowSet,labelBreak,
                               regOldRowid);
      VdbeCoverage(v);
      sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur, labelContinue, regOldRowid);
      VdbeCoverage(v);
    }
  }

  /* If the rowid value will change, set register regNewRowid to
  ** contain the new value. If the rowid is not being modified,
  ** then regNewRowid is the same register as regOldRowid, which is
  ** already populated.  */
  assert( chngKey || pTrigger || hasFK || regOldRowid==regNewRowid );

Changes to src/upsert.c.

199
200
201
202
203
204
205
206
207
208
209

210
211
212
213
214


215
216

217
218

219
220

221
222
223
224
225
226

227
228

229
230
231
232
233
234

235
236
237






238
239




240
241

242
243
244
245
246
247
248

249
250
251
252
253



254
255
256
257
258
259
260
261



262
263
264
265
199
200
201
202
203
204
205



206
207
208
209
210


211
212


213


214


215






216


217






218



219
220
221
222
223
224


225
226
227
228


229





230

231





232
233
234
235
236
237
238
239
240


241
242
243
244
245
246
247







-
-
-

+



-
-
+
+
-
-
+
-
-
+
-
-
+
-
-
-
-
-
-
+
-
-
+
-
-
-
-
-
-
+
-
-
-
+
+
+
+
+
+
-
-
+
+
+
+
-
-
+
-
-
-
-
-

-
+
-
-
-
-
-
+
+
+






-
-
+
+
+




  Upsert *pUpsert,      /* The ON CONFLICT clause for the upsert */
  Table *pTab,          /* The table being updated */
  Index *pIdx,          /* The UNIQUE constraint that failed */
  int iCur              /* Cursor for pIdx (or pTab if pIdx==NULL) */
){
  Vdbe *v = pParse->pVdbe;
  sqlite3 *db = pParse->db;
  int regKey;               /* Register(s) containing the key */
  Expr *pWhere;             /* Where clause for the UPDATE */
  Expr *pE1, *pE2;
  SrcList *pSrc;            /* FROM clause for the UPDATE */
  int iDataCur = pUpsert->iDataCur;

  assert( v!=0 );
  VdbeNoopComment((v, "Begin DO UPDATE of UPSERT"));
  pWhere = sqlite3ExprDup(db, pUpsert->pUpsertWhere, 0);
  if( pIdx==0 || HasRowid(pTab) ){
  if( pIdx && iCur!=iDataCur ){
    if( HasRowid(pTab) ){
    /* We are dealing with an IPK */
    regKey = ++pParse->nMem;
      int regRowid = sqlite3GetTempReg(pParse);
    if( pIdx ){
      sqlite3VdbeAddOp2(v, OP_IdxRowid, iCur, regKey);
      sqlite3VdbeAddOp2(v, OP_IdxRowid, iCur, regRowid);
    }else{
      sqlite3VdbeAddOp2(v, OP_Rowid, iCur, regKey);
      sqlite3VdbeAddOp3(v, OP_SeekRowid, iDataCur, 0, regRowid);
    }
    pE1 = sqlite3ExprAlloc(db, TK_COLUMN, 0, 0);
    if( pE1 ){
      pE1->pTab = pTab;
      pE1->iTable = pUpsert->iDataCur;
      pE1->iColumn = -1;
      VdbeCoverage(v);
    }
    pE2 = sqlite3ExprAlloc(db, TK_REGISTER, 0, 0);
      sqlite3ReleaseTempReg(pParse, regRowid);
    if( pE2 ){
      pE2->iTable = regKey;
      pE2->affinity = SQLITE_AFF_INTEGER;
    }
    pWhere = sqlite3ExprAnd(db,pWhere,sqlite3PExpr(pParse, TK_EQ, pE1, pE2));
  }else{
    }else{
    /* a WITHOUT ROWID table */
    int i, j;
    for(i=0; i<pIdx->nKeyCol; i++){
      Index *pPk = sqlite3PrimaryKeyIndex(pTab);
      int nPk = pPk->nKeyCol;
      int iPk = pParse->nMem+1;
      int i;
      pParse->nMem += nPk;
      for(i=0; i<nPk; i++){
      regKey = ++pParse->nMem;
      sqlite3VdbeAddOp3(v, OP_Column, iCur, i, regKey);
        int k;
        assert( pPk->aiColumn[i]>=0 );
        k = sqlite3ColumnOfIndex(pIdx, pPk->aiColumn[i]);
        sqlite3VdbeAddOp3(v, OP_Column, iCur, k, iPk+i);
      j = pIdx->aiColumn[i];
      VdbeComment((v, "%s", pTab->aCol[j].zName));
        VdbeComment((v, "%s.%s", pIdx->zName, pTab->aCol[i].zName));
      pE1 = sqlite3ExprAlloc(db, TK_COLUMN, 0, 0);
      if( pE1 ){
        pE1->pTab = pTab;
        pE1->iTable = pUpsert->iDataCur;
        pE1->iColumn = j;
      }
      pE2 = sqlite3ExprAlloc(db, TK_REGISTER, 0, 0);
      i = sqlite3VdbeAddOp4Int(v, OP_Found, iDataCur, 0, iPk, nPk);
      if( pE2 ){
        pE2->iTable = regKey;
        pE2->affinity = pTab->zColAff[j];
      }
      pWhere = sqlite3ExprAnd(db,pWhere,sqlite3PExpr(pParse, TK_EQ, pE1, pE2));
      VdbeCoverage(v);
      sqlite3VdbeAddOp2(v, OP_Halt, SQLITE_CORRUPT, OE_Abort);
      sqlite3VdbeJumpHere(v, i);
    }
  }
  /* pUpsert does not own pUpsertSrc - the outer INSERT statement does.  So
  ** we have to make a copy before passing it down into sqlite3Update() */
  pSrc = sqlite3SrcListDup(db, pUpsert->pUpsertSrc, 0);
  sqlite3Update(pParse, pSrc, pUpsert->pUpsertSet,
      pWhere, OE_Abort, 0, 0, pUpsert);
  pUpsert->pUpsertSet = 0;  /* Will have been deleted by sqlite3Update() */
      pUpsert->pUpsertWhere, OE_Abort, 0, 0, pUpsert);
  pUpsert->pUpsertSet = 0;    /* Will have been deleted by sqlite3Update() */
  pUpsert->pUpsertWhere = 0;  /* Will have been deleted by sqlite3Update() */
  VdbeNoopComment((v, "End DO UPDATE of UPSERT"));
}

#endif /* SQLITE_OMIT_UPSERT */