Index: src/build.c ================================================================== --- src/build.c +++ src/build.c @@ -2761,11 +2761,12 @@ sqlite3UniqueConstraint(pParse, OE_Abort, pIndex); }else{ addr2 = sqlite3VdbeCurrentAddr(v); } sqlite3VdbeAddOp3(v, OP_SorterData, iSorter, regRecord, iIdx); - sqlite3VdbeAddOp3(v, OP_IdxInsert, iIdx, regRecord, 1); + sqlite3VdbeAddOp3(v, OP_Last, iIdx, 0, -1); + sqlite3VdbeAddOp3(v, OP_IdxInsert, iIdx, regRecord, 0); sqlite3VdbeChangeP5(v, OPFLAG_USESEEKRESULT); sqlite3ReleaseTempReg(pParse, regRecord); sqlite3VdbeAddOp2(v, OP_SorterNext, iSorter, addr2); VdbeCoverage(v); sqlite3VdbeJumpHere(v, addr1); Index: src/insert.c ================================================================== --- src/insert.c +++ src/insert.c @@ -336,10 +336,27 @@ Table *pDest, /* The table we are inserting into */ Select *pSelect, /* A SELECT statement to use as the data source */ int onError, /* How to handle constraint errors */ int iDbDest /* The database of pDest */ ); + +/* +** Return the conflict handling mode that should be used for index pIdx +** if the statement specified conflict mode overrideError. +** +** If the index is not a UNIQUE index, then the conflict handling mode is +** always OE_None. Otherwise, it is one of OE_Abort, OE_Rollback, OE_Fail, +** OE_Ignore or OE_Replace. +*/ +static u8 idxConflictMode(Index *pIdx, u8 overrideError){ + u8 ret = pIdx->onError; + if( ret!=OE_None ){ + if( overrideError!=OE_Default ) ret = overrideError; + if( ret==OE_Default ) ret = OE_Abort; + } + return ret; +} /* ** This routine is called to handle SQL of the following forms: ** ** insert into TABLE (IDLIST) values(EXPRLIST) @@ -449,10 +466,11 @@ Index *pIdx; /* For looping over indices of the table */ int nColumn; /* Number of columns in the data */ int nHidden = 0; /* Number of hidden columns if TABLE is virtual */ int iDataCur = 0; /* VDBE cursor that is the main data repository */ int iIdxCur = 0; /* First index cursor */ + int iSortCur = 0; /* First sorter cursor (for INSERT INTO ... SELECT) */ int ipkColumn = -1; /* Column that is the INTEGER PRIMARY KEY */ int endOfLoop; /* Label for the end of the insertion loop */ int srcTab = 0; /* Data comes from this temporary cursor if >=0 */ int addrInsTop = 0; /* Jump to label "D" */ int addrCont = 0; /* Top of insert loop. Label "C" in templates 3 and 4 */ @@ -753,10 +771,46 @@ goto insert_cleanup; } for(i=0; inMem; } + + /* If this is an INSERT INTO ... SELECT statement on a non-virtual table, + ** check if it is possible to defer updating any indexes until after + ** all rows have been processed. If it is, the index keys can be sorted + ** before they are inserted into the index b-tree, which is more efficient + ** for large inserts. It is possible to defer updating the indexes if: + ** + ** * there are no triggers to fire, and + ** * no foreign key processing to perform, and + ** * the on-conflict mode used for all UNIQUE and PRIMARY KEY indexes, + ** including INTEGER PRIMARY KEYs, is either ROLLBACK or ABORT. + */ + if( pSelect + && 0==(pSelect->selFlags & SF_Values) + && onError!=OE_Fail && onError!=OE_Replace && onError!=OE_Ignore + && !IsVirtual(pTab) + && pTrigger==0 + && 0==sqlite3FkRequired(pParse, pTab, 0, 0) + ){ + for(pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext){ + u8 oe = idxConflictMode(pIdx, onError); + if( oe==OE_Fail || oe==OE_Replace || oe==OE_Ignore ) break; + assert( oe==OE_None || oe==OE_Abort || oe==OE_Rollback ); + } + if( pIdx==0 ){ + /* This statement can sort the set of new keys for each index before + ** writing them into the b-tree on disk. So open a sorter for each + ** index on the table. */ + iSortCur = pParse->nTab; + for(pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext){ + sqlite3VdbeAddOp1(v, OP_SorterOpen, pParse->nTab++); + sqlite3VdbeSetP4KeyInfo(pParse, pIdx); + } + assert( iSortCur>0 ); + } + } } /* This is the top of the main insertion loop */ if( useTempTable ){ /* This block codes the top of loop only. The complete loop is the @@ -954,16 +1008,24 @@ sqlite3MayAbort(pParse); }else #endif { int isReplace; /* Set to true if constraints may cause a replace */ + int iIdxBase = iIdxCur; + int op = OP_IdxInsert; sqlite3GenerateConstraintChecks(pParse, pTab, aRegIdx, iDataCur, iIdxCur, - regIns, 0, ipkColumn>=0, onError, endOfLoop, &isReplace + regIns, 0, ipkColumn>=0, onError, endOfLoop, iSortCur!=0, &isReplace ); + if( iSortCur ){ + iIdxBase = iSortCur; + isReplace = 1; + op = OP_SorterInsert; + } sqlite3FkCheck(pParse, pTab, 0, regIns, 0, 0); - sqlite3CompleteInsertion(pParse, pTab, iDataCur, iIdxCur, - regIns, aRegIdx, 0, appendFlag, isReplace==0); + sqlite3CompleteInsertion(pParse, pTab, + iDataCur, iIdxBase, regIns, op, aRegIdx, 0, appendFlag, isReplace==0 + ); } } /* Update the count of rows that are inserted */ @@ -989,10 +1051,36 @@ sqlite3VdbeAddOp2(v, OP_Goto, 0, addrCont); sqlite3VdbeJumpHere(v, addrInsTop); } if( !IsVirtual(pTab) && !isView ){ + /* If new index keys were written into sorter objects instead of + ** directly to the index b-trees, copy them from the sorters into the + ** indexes now. And close all the sorters. */ + if( iSortCur ){ + int iTmp = sqlite3GetTempReg(pParse); + for(idx=0, pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext, idx++){ + int oe = idxConflictMode(pIdx, onError); + int iCur = iSortCur + idx; + int iIdx = iIdxCur + idx; + int addr = sqlite3VdbeAddOp1(v, OP_SorterSort, iCur); + sqlite3VdbeAddOp3(v, OP_SorterData, iCur, iTmp, iIdx); + if( oe!=OE_None ){ + int nField = -1 * pIdx->nKeyCol; + int jmp = sqlite3VdbeCurrentAddr(v)+2; + sqlite3VdbeAddOp4Int(v, OP_NoConflict, iIdx, jmp, iTmp, nField); + sqlite3UniqueConstraint(pParse, oe, pIdx); + } + sqlite3VdbeAddOp2(v, OP_IdxInsert, iIdx, iTmp); + if( oe!=OE_None ) sqlite3VdbeChangeP5(v, OPFLAG_USESEEKRESULT); + sqlite3VdbeAddOp2(v, OP_SorterNext, iCur, addr+1); VdbeCoverage(v); + sqlite3VdbeJumpHere(v, addr); + sqlite3VdbeAddOp1(v, OP_Close, iCur); + } + sqlite3ReleaseTempReg(pParse, iTmp); + } + /* Close all tables opened */ if( iDataCurpIndex; pIdx; pIdx=pIdx->pNext, idx++){ sqlite3VdbeAddOp1(v, OP_Close, idx+iIdxCur); } @@ -1131,10 +1219,11 @@ int regNewData, /* First register in a range holding values to insert */ int regOldData, /* Previous content. 0 for INSERTs */ u8 pkChng, /* Non-zero if the rowid or PRIMARY KEY changed */ u8 overrideError, /* Override onError to this if not OE_Default */ int ignoreDest, /* Jump to this label on an OE_Ignore resolution */ + int ignoreUnique, /* Do not enforce UNIQUE constraints */ int *pbMayReplace /* OUT: Set to true if constraint may cause a replace */ ){ Vdbe *v; /* VDBE under constrution */ Index *pIdx; /* Pointer to one of the indices */ Index *pPk = 0; /* The PRIMARY KEY index */ @@ -1411,21 +1500,16 @@ sqlite3VdbeResolveLabel(v, addrUniqueOk); continue; } /* Find out what action to take in case there is a uniqueness conflict */ - onError = pIdx->onError; - if( onError==OE_None ){ + onError = idxConflictMode(pIdx, overrideError); + if( onError==OE_None || ignoreUnique ){ sqlite3ReleaseTempRange(pParse, regIdx, pIdx->nColumn); sqlite3VdbeResolveLabel(v, addrUniqueOk); continue; /* pIdx is not a UNIQUE index */ } - if( overrideError!=OE_Default ){ - onError = overrideError; - }else if( onError==OE_Default ){ - onError = OE_Abort; - } /* Check to see if the new index entry will be unique */ sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur, addrUniqueOk, regIdx, pIdx->nKeyCol); VdbeCoverage(v); @@ -1536,10 +1620,11 @@ Parse *pParse, /* The parser context */ Table *pTab, /* the table into which we are inserting */ int iDataCur, /* Cursor of the canonical data source */ int iIdxCur, /* First index cursor */ int regNewData, /* Range of content */ + int idxop, /* Opcode to use to write to "indexes" */ int *aRegIdx, /* Register used by each index. 0 for unused indices */ int isUpdate, /* True for UPDATE, False for INSERT */ int appendBias, /* True if this is likely to be an append */ int useSeekResult /* True to set the USESEEKRESULT flag on OP_[Idx]Insert */ ){ @@ -1549,10 +1634,12 @@ int regData; /* Content registers (after the rowid) */ int regRec; /* Register holding assembled record for the table */ int i; /* Loop counter */ u8 bAffinityDone = 0; /* True if OP_Affinity has been run already */ + assert( idxop==OP_IdxInsert || idxop==OP_SorterInsert ); + v = sqlite3GetVdbe(pParse); assert( v!=0 ); assert( pTab->pSelect==0 ); /* This table is not a VIEW */ for(i=0, pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext, i++){ if( aRegIdx[i]==0 ) continue; @@ -1559,11 +1646,11 @@ bAffinityDone = 1; if( pIdx->pPartIdxWhere ){ sqlite3VdbeAddOp2(v, OP_IsNull, aRegIdx[i], sqlite3VdbeCurrentAddr(v)+2); VdbeCoverage(v); } - sqlite3VdbeAddOp2(v, OP_IdxInsert, iIdxCur+i, aRegIdx[i]); + sqlite3VdbeAddOp2(v, idxop, iIdxCur+i, aRegIdx[i]); pik_flags = 0; if( useSeekResult ) pik_flags = OPFLAG_USESEEKRESULT; if( IsPrimaryKeyIndex(pIdx) && !HasRowid(pTab) ){ assert( pParse->nested==0 ); pik_flags |= OPFLAG_NCHANGE; @@ -1763,10 +1850,11 @@ Table *pDest, /* The table we are inserting into */ Select *pSelect, /* A SELECT statement to use as the data source */ int onError, /* How to handle constraint errors */ int iDbDest /* The database of pDest */ ){ + sqlite3 *db = pParse->db; ExprList *pEList; /* The result set of the SELECT */ Table *pSrc; /* The table in the FROM clause of SELECT */ Index *pSrcIdx, *pDestIdx; /* Source and destination indices */ struct SrcList_item *pItem; /* An element of pSelect->pSrc */ int i; /* Loop counter */ @@ -1910,15 +1998,15 @@ ** But the main beneficiary of the transfer optimization is the VACUUM ** command, and the VACUUM command disables foreign key constraints. So ** the extra complication to make this rule less restrictive is probably ** not worth the effort. Ticket [6284df89debdfa61db8073e062908af0c9b6118e] */ - if( (pParse->db->flags & SQLITE_ForeignKeys)!=0 && pDest->pFKey!=0 ){ + if( (db->flags & SQLITE_ForeignKeys)!=0 && pDest->pFKey!=0 ){ return 0; } #endif - if( (pParse->db->flags & SQLITE_CountRows)!=0 ){ + if( (db->flags & SQLITE_CountRows)!=0 ){ return 0; /* xfer opt does not play well with PRAGMA count_changes */ } /* If we get this far, it means that the xfer optimization is at ** least a possibility, though it might only work if the destination @@ -1925,28 +2013,32 @@ ** table (tab1) is initially empty. */ #ifdef SQLITE_TEST sqlite3_xferopt_count++; #endif - iDbSrc = sqlite3SchemaToIndex(pParse->db, pSrc->pSchema); + iDbSrc = sqlite3SchemaToIndex(db, pSrc->pSchema); v = sqlite3GetVdbe(pParse); sqlite3CodeVerifySchema(pParse, iDbSrc); iSrc = pParse->nTab++; iDest = pParse->nTab++; regAutoinc = autoIncBegin(pParse, iDbDest, pDest); regData = sqlite3GetTempReg(pParse); regRowid = sqlite3GetTempReg(pParse); sqlite3OpenTable(pParse, iDest, iDbDest, pDest, OP_OpenWrite); assert( HasRowid(pDest) || destHasUniqueIdx ); - if( (pDest->iPKey<0 && pDest->pIndex!=0) /* (1) */ + if( (db->flags & SQLITE_Vacuum)==0 && ( + (pDest->iPKey<0 && pDest->pIndex!=0) /* (1) */ || destHasUniqueIdx /* (2) */ || (onError!=OE_Abort && onError!=OE_Rollback) /* (3) */ - ){ + )){ /* In some circumstances, we are able to run the xfer optimization - ** only if the destination table is initially empty. This code makes - ** that determination. Conditions under which the destination must - ** be empty: + ** only if the destination table is initially empty. Unless the + ** SQLITE_Vacuum flag is set, this block generates code to make + ** that determination. If SQLITE_Vacuum is set, then the destination + ** table is always empty. + ** + ** Conditions under which the destination must be empty: ** ** (1) There is no INTEGER PRIMARY KEY but there are indices. ** (If the destination is not initially empty, the rowid fields ** of index entries might need to change.) ** @@ -1985,10 +2077,11 @@ }else{ sqlite3TableLock(pParse, iDbDest, pDest->tnum, 1, pDest->zName); sqlite3TableLock(pParse, iDbSrc, pSrc->tnum, 0, pSrc->zName); } for(pDestIdx=pDest->pIndex; pDestIdx; pDestIdx=pDestIdx->pNext){ + u8 useSeekResult = 0; for(pSrcIdx=pSrc->pIndex; ALWAYS(pSrcIdx); pSrcIdx=pSrcIdx->pNext){ if( xferCompatibleIndex(pDestIdx, pSrcIdx) ) break; } assert( pSrcIdx ); sqlite3VdbeAddOp3(v, OP_OpenRead, iSrc, pSrcIdx->tnum, iDbSrc); @@ -1998,11 +2091,37 @@ sqlite3VdbeSetP4KeyInfo(pParse, pDestIdx); sqlite3VdbeChangeP5(v, OPFLAG_BULKCSR); VdbeComment((v, "%s", pDestIdx->zName)); addr1 = sqlite3VdbeAddOp2(v, OP_Rewind, iSrc, 0); VdbeCoverage(v); sqlite3VdbeAddOp2(v, OP_RowKey, iSrc, regData); + if( db->flags & SQLITE_Vacuum ){ + /* This INSERT command is part of a VACUUM operation, which guarantees + ** that the destination table is empty. If all indexed columns use + ** collation sequence BINARY, then it can also be assumed that the + ** index will be populated by inserting keys in strictly sorted + ** order. In this case, instead of seeking within the b-tree as part + ** of every OP_IdxInsert opcode, an OP_Last is added before the + ** OP_IdxInsert to seek to the point within the b-tree where each key + ** should be inserted. This is faster. + ** + ** If any of the indexed columns use a collation sequence other than + ** BINARY, this optimization is disabled. This is because the user + ** might change the definition of a collation sequence and then run + ** a VACUUM command. In that case keys may not be written in strictly + ** sorted order. */ + int i; + for(i=0; inColumn; i++){ + char *zColl = pSrcIdx->azColl[i]; + if( zColl && sqlite3_stricmp("BINARY", zColl) ) break; + } + if( i==pSrcIdx->nColumn ){ + useSeekResult = OPFLAG_USESEEKRESULT; + sqlite3VdbeAddOp3(v, OP_Last, iDest, 0, -1); + } + } sqlite3VdbeAddOp3(v, OP_IdxInsert, iDest, regData, 1); + sqlite3VdbeChangeP5(v, useSeekResult); sqlite3VdbeAddOp2(v, OP_Next, iSrc, addr1+1); VdbeCoverage(v); sqlite3VdbeJumpHere(v, addr1); sqlite3VdbeAddOp2(v, OP_Close, iSrc, 0); sqlite3VdbeAddOp2(v, OP_Close, iDest, 0); } Index: src/sqliteInt.h ================================================================== --- src/sqliteInt.h +++ src/sqliteInt.h @@ -1224,10 +1224,11 @@ #define SQLITE_LoadExtension 0x00400000 /* Enable load_extension */ #define SQLITE_EnableTrigger 0x00800000 /* True to enable triggers */ #define SQLITE_DeferFKs 0x01000000 /* Defer all FK constraints */ #define SQLITE_QueryOnly 0x02000000 /* Disable database changes */ #define SQLITE_VdbeEQP 0x04000000 /* Debug EXPLAIN QUERY PLAN */ +#define SQLITE_Vacuum 0x08000000 /* Currently in a VACUUM */ /* ** Bits of the sqlite3.dbOptFlags field that are used by the ** sqlite3_test_control(SQLITE_TESTCTRL_OPTIMIZATIONS,...) interface to @@ -3351,12 +3352,12 @@ void sqlite3GenerateRowDelete(Parse*,Table*,Trigger*,int,int,int,i16,u8,u8,u8); void sqlite3GenerateRowIndexDelete(Parse*, Table*, int, int, int*); int sqlite3GenerateIndexKey(Parse*, Index*, int, int, int, int*,Index*,int); void sqlite3ResolvePartIdxLabel(Parse*,int); void sqlite3GenerateConstraintChecks(Parse*,Table*,int*,int,int,int,int, - u8,u8,int,int*); -void sqlite3CompleteInsertion(Parse*,Table*,int,int,int,int*,int,int,int); + u8,u8,int,int,int*); +void sqlite3CompleteInsertion(Parse*,Table*,int,int,int,int,int*,int,int,int); int sqlite3OpenTableAndIndices(Parse*, Table*, int, int, u8*, int*, int*); void sqlite3BeginWriteOperation(Parse*, int, int); void sqlite3MultiWrite(Parse*); void sqlite3MayAbort(Parse*); void sqlite3HaltConstraint(Parse*, int, int, char*, i8, u8); Index: src/update.c ================================================================== --- src/update.c +++ src/update.c @@ -565,11 +565,11 @@ int bReplace = 0; /* True if REPLACE conflict resolution might happen */ /* Do constraint checks. */ assert( regOldRowid>0 ); sqlite3GenerateConstraintChecks(pParse, pTab, aRegIdx, iDataCur, iIdxCur, - regNewRowid, regOldRowid, chngKey, onError, labelContinue, &bReplace); + regNewRowid, regOldRowid, chngKey, onError, labelContinue, 0,&bReplace); /* Do FK constraint checks. */ if( hasFK ){ sqlite3FkCheck(pParse, pTab, regOldRowid, 0, aXRef, chngKey); } @@ -597,11 +597,11 @@ sqlite3FkCheck(pParse, pTab, 0, regNewRowid, aXRef, chngKey); } /* Insert the new index entries and the new record. */ sqlite3CompleteInsertion(pParse, pTab, iDataCur, iIdxCur, - regNewRowid, aRegIdx, 1, 0, 0); + regNewRowid, OP_IdxInsert, aRegIdx, 1, 0, 0); /* Do any ON CASCADE, SET NULL or SET DEFAULT operations required to ** handle rows (possibly in other tables) that refer via a foreign key ** to the row just updated. */ if( hasFK ){ Index: src/vacuum.c ================================================================== --- src/vacuum.c +++ src/vacuum.c @@ -248,17 +248,21 @@ /* Loop through the tables in the main database. For each, do ** an "INSERT INTO vacuum_db.xxx SELECT * FROM main.xxx;" to copy ** the contents to the temporary database. */ + assert( (db->flags & SQLITE_Vacuum)==0 ); + db->flags |= SQLITE_Vacuum; rc = execExecSql(db, pzErrMsg, "SELECT 'INSERT INTO vacuum_db.' || quote(name) " "|| ' SELECT * FROM main.' || quote(name) || ';'" "FROM main.sqlite_master " "WHERE type = 'table' AND name!='sqlite_sequence' " " AND coalesce(rootpage,1)>0" ); + assert( (db->flags & SQLITE_Vacuum)!=0 ); + db->flags &= ~SQLITE_Vacuum; if( rc!=SQLITE_OK ) goto end_of_vacuum; /* Copy over the sequence table */ rc = execExecSql(db, pzErrMsg, Index: src/vdbe.c ================================================================== --- src/vdbe.c +++ src/vdbe.c @@ -162,11 +162,11 @@ */ #define Deephemeralize(P) \ if( ((P)->flags&MEM_Ephem)!=0 \ && sqlite3VdbeMemMakeWriteable(P) ){ goto no_mem;} -/* Return true if the cursor was opened using the OP_OpenSorter opcode. */ +/* Return true if the cursor was opened using the OP_SorterOpen opcode. */ #define isSorter(x) ((x)->pSorter!=0) /* ** Allocate VdbeCursor number iCur. Return a pointer to it. Return NULL ** if we run out of memory. @@ -3790,11 +3790,12 @@ /* Opcode: NoConflict P1 P2 P3 P4 * ** Synopsis: key=r[P3@P4] ** ** If P4==0 then register P3 holds a blob constructed by MakeRecord. If ** P4>0 then register P3 is the first of P4 registers that form an unpacked -** record. +** record. If P4<0, then P3 holds a blob constructed by MakeRecord, but +** only the first |P4| fields should be considered. ** ** Cursor P1 is on an index btree. If the record identified by P3 and P4 ** contains any NULL value, jump immediately to P2. If all terms of the ** record are not-NULL then a check is done to determine if any row in the ** P1 index btree has a matching key prefix. If there are no matches, jump @@ -3855,10 +3856,13 @@ ); if( pIdxKey==0 ) goto no_mem; assert( pIn3->flags & MEM_Blob ); ExpandBlob(pIn3); sqlite3VdbeRecordUnpack(pC->pKeyInfo, pIn3->n, pIn3->z, pIdxKey); + if( pOp->p4.i<0 ){ + pIdxKey->nField = pOp->p4.i * -1; + } } pIdxKey->default_rc = 0; if( pOp->opcode==OP_NoConflict ){ /* For the OP_NoConflict opcode, take the jump if any of the ** input fields are NULL, since any key with a NULL will not @@ -3869,11 +3873,11 @@ break; } } } rc = sqlite3BtreeMovetoUnpacked(pC->pCursor, pIdxKey, 0, 0, &res); - if( pOp->p4.i==0 ){ + if( pOp->p4.i<=0 ){ sqlite3DbFree(db, pFree); } if( rc!=SQLITE_OK ){ break; } @@ -4484,11 +4488,11 @@ sqlite3BtreeClearCursor(pC->pCursor); } break; } -/* Opcode: Last P1 P2 * * * +/* Opcode: Last P1 P2 P3 * * ** ** The next use of the Rowid or Column or Prev instruction for P1 ** will refer to the last entry in the database table or index. ** If the table or index is empty and P2>0, then jump immediately to P2. ** If P2 is 0 or if the table or index is not empty, fall through @@ -4511,10 +4515,11 @@ assert( pCrsr!=0 ); rc = sqlite3BtreeLast(pCrsr, &res); pC->nullRow = (u8)res; pC->deferredMoveto = 0; pC->cacheStatus = CACHE_STALE; + pC->seekResult = pOp->p3; #ifdef SQLITE_DEBUG pC->seekOp = OP_Last; #endif if( pOp->p2>0 ){ VdbeBranchTaken(res!=0,2); Index: src/vdbe.h ================================================================== --- src/vdbe.h +++ src/vdbe.h @@ -211,10 +211,11 @@ #endif int sqlite3MemCompare(const Mem*, const Mem*, const CollSeq*); void sqlite3VdbeRecordUnpack(KeyInfo*,int,const void*,UnpackedRecord*); int sqlite3VdbeRecordCompare(int,const void*,UnpackedRecord*); +int sqlite3VdbeRecordCompareWithSkip(int, const void *, UnpackedRecord *, int); UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *, char *, int, char **); typedef int (*RecordCompare)(int,const void*,UnpackedRecord*); RecordCompare sqlite3VdbeFindCompare(UnpackedRecord*); Index: src/vdbeaux.c ================================================================== --- src/vdbeaux.c +++ src/vdbeaux.c @@ -3583,11 +3583,11 @@ ** If database corruption is discovered, set pPKey2->errCode to ** SQLITE_CORRUPT and return 0. If an OOM error is encountered, ** pPKey2->errCode is set to SQLITE_NOMEM and, if it is not NULL, the ** malloc-failed flag set on database handle (pPKey2->pKeyInfo->db). */ -static int vdbeRecordCompareWithSkip( +int sqlite3VdbeRecordCompareWithSkip( int nKey1, const void *pKey1, /* Left key */ UnpackedRecord *pPKey2, /* Right key */ int bSkip /* If true, skip the first field */ ){ u32 d1; /* Offset into aKey[] of next data element */ @@ -3769,11 +3769,11 @@ } int sqlite3VdbeRecordCompare( int nKey1, const void *pKey1, /* Left key */ UnpackedRecord *pPKey2 /* Right key */ ){ - return vdbeRecordCompareWithSkip(nKey1, pKey1, pPKey2, 0); + return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, pPKey2, 0); } /* ** This function is an optimized version of sqlite3VdbeRecordCompare() @@ -3857,11 +3857,11 @@ }else if( vr2; }else if( pPKey2->nField>1 ){ /* The first fields of the two keys are equal. Compare the trailing ** fields. */ - res = vdbeRecordCompareWithSkip(nKey1, pKey1, pPKey2, 1); + res = sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, pPKey2, 1); }else{ /* The first fields of the two keys are equal and there are no trailing ** fields. Return pPKey2->default_rc in this case. */ res = pPKey2->default_rc; } @@ -3905,11 +3905,11 @@ if( res==0 ){ res = nStr - pPKey2->aMem[0].n; if( res==0 ){ if( pPKey2->nField>1 ){ - res = vdbeRecordCompareWithSkip(nKey1, pKey1, pPKey2, 1); + res = sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, pPKey2, 1); }else{ res = pPKey2->default_rc; } }else if( res>0 ){ res = pPKey2->r2; Index: src/vdbesort.c ================================================================== --- src/vdbesort.c +++ src/vdbesort.c @@ -289,20 +289,23 @@ ** ** In both cases, the effects of the main thread seeing (bDone==0) even ** after the thread has finished are not dire. So we don't worry about ** memory barriers and such here. */ +typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int); struct SortSubtask { SQLiteThread *pThread; /* Background thread, if any */ int bDone; /* Set if thread is finished but not joined */ VdbeSorter *pSorter; /* Sorter that owns this sub-task */ UnpackedRecord *pUnpacked; /* Space to unpack a record */ SorterList list; /* List for thread to write to a PMA */ int nPMA; /* Number of PMAs currently in file */ + SorterCompare xCompare; /* Compare function to use */ SorterFile file; /* Temp file for level-0 PMAs */ SorterFile file2; /* Space for other PMAs */ }; + /* ** Main sorter structure. A single instance of this is allocated for each ** sorter cursor created by the VDBE. ** @@ -326,12 +329,16 @@ int nMemory; /* Size of list.aMemory allocation in bytes */ u8 bUsePMA; /* True if one or more PMAs created */ u8 bUseThreads; /* True to use background threads */ u8 iPrev; /* Previous thread used to flush PMA */ u8 nTask; /* Size of aTask[] array */ + u8 typeMask; SortSubtask aTask[1]; /* One or more subtasks */ }; + +#define SORTER_TYPE_INTEGER 0x01 +#define SORTER_TYPE_TEXT 0x02 /* ** An instance of the following object is used to read records out of a ** PMA, in sorted order. The next key to be read is cached in nKey/aKey. ** aKey might point into aMap or into aBuffer. If neither of those locations @@ -740,35 +747,164 @@ rc = vdbePmaReaderNext(pReadr); } return rc; } +/* +** A version of vdbeSorterCompare() that assumes that it has already been +** determined that the first field of key1 is equal to the first field of +** key2. +*/ +static int vdbeSorterCompareTail( + SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ + int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ + const void *pKey1, int nKey1, /* Left side of comparison */ + const void *pKey2, int nKey2 /* Right side of comparison */ +){ + UnpackedRecord *r2 = pTask->pUnpacked; + if( *pbKey2Cached==0 ){ + sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); + *pbKey2Cached = 1; + } + return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1); +} /* ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences ** used by the comparison. Return the result of the comparison. ** -** Before returning, object (pTask->pUnpacked) is populated with the -** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it -** is assumed that the (pTask->pUnpacked) structure already contains the -** unpacked key to use as key2. +** If IN/OUT parameter *pbKey2Cached is true when this function is called, +** it is assumed that (pTask->pUnpacked) contains the unpacked version +** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked +** version of key2 and *pbKey2Cached set to true before returning. ** ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set ** to SQLITE_NOMEM. */ static int vdbeSorterCompare( SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ + int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ const void *pKey1, int nKey1, /* Left side of comparison */ const void *pKey2, int nKey2 /* Right side of comparison */ ){ UnpackedRecord *r2 = pTask->pUnpacked; - if( pKey2 ){ + if( !*pbKey2Cached ){ sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); + *pbKey2Cached = 1; } return sqlite3VdbeRecordCompare(nKey1, pKey1, r2); } + +/* +** A specially optimized version of vdbeSorterCompare() that assumes that +** the first field of each key is a TEXT value and that the collation +** sequence to compare them with is BINARY. +*/ +static int vdbeSorterCompareText( + SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ + int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ + const void *pKey1, int nKey1, /* Left side of comparison */ + const void *pKey2, int nKey2 /* Right side of comparison */ +){ + const u8 * const p1 = (const u8 * const)pKey1; + const u8 * const p2 = (const u8 * const)pKey2; + const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ + const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ + + int n1; + int n2; + int res; + + getVarint32(&p1[1], n1); n1 = (n1 - 13) / 2; + getVarint32(&p2[1], n2); n2 = (n2 - 13) / 2; + res = memcmp(v1, v2, MIN(n1, n2)); + if( res==0 ){ + res = n1 - n2; + } + + if( res==0 ){ + if( pTask->pSorter->pKeyInfo->nField>1 ){ + res = vdbeSorterCompareTail( + pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 + ); + } + }else{ + if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){ + res = res * -1; + } + } + + return res; +} + +/* +** A specially optimized version of vdbeSorterCompare() that assumes that +** the first field of each key is an INTEGER value. +*/ +static int vdbeSorterCompareInt( + SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ + int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ + const void *pKey1, int nKey1, /* Left side of comparison */ + const void *pKey2, int nKey2 /* Right side of comparison */ +){ + const u8 * const p1 = (const u8 * const)pKey1; + const u8 * const p2 = (const u8 * const)pKey2; + const int s1 = p1[1]; /* Left hand serial type */ + const int s2 = p2[1]; /* Right hand serial type */ + const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ + const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ + int res; /* Return value */ + + assert( (s1>0 && s1<7) || s1==8 || s1==9 ); + assert( (s2>0 && s2<7) || s2==8 || s2==9 ); + + if( s1>7 && s2>7 ){ + res = s1 - s2; + }else{ + if( s1==s2 ){ + if( (*v1 ^ *v2) & 0x80 ){ + /* The two values have different signs */ + res = (*v1 & 0x80) ? -1 : +1; + }else{ + /* The two values have the same sign. Compare using memcmp(). */ + static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8 }; + int i; + res = 0; + for(i=0; i7 ){ + res = +1; + }else if( s1>7 ){ + res = -1; + }else{ + res = s1 - s2; + } + + if( res>0 ){ + if( *v1 & 0x80 ) res = -1; + }else if( res<0 ){ + if( *v2 & 0x80 ) res = +1; + } + } + } + + if( res==0 ){ + if( pTask->pSorter->pKeyInfo->nField>1 ){ + res = vdbeSorterCompareTail( + pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 + ); + } + }else if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){ + res = res * -1; + } + + return res; +} /* ** Initialize the temporary index cursor just opened as a sorter cursor. ** ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField) @@ -833,13 +969,17 @@ rc = SQLITE_NOMEM; }else{ pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); pKeyInfo->db = 0; - if( nField && nWorker==0 ) pKeyInfo->nField = nField; + if( nField && nWorker==0 ){ + pKeyInfo->nXField += (pKeyInfo->nField - nField); + pKeyInfo->nField = nField; + } pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); pSorter->nTask = nWorker + 1; + pSorter->iPrev = nWorker-1; pSorter->bUseThreads = (pSorter->nTask>1); pSorter->db = db; for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; pTask->pSorter = pSorter; @@ -861,10 +1001,16 @@ pSorter->nMemory = pgsz; pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM; } } + + if( (pKeyInfo->nField+pKeyInfo->nXField)<13 + && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl) + ){ + pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT; + } } return rc; } #undef nWorker /* Defined at the top of this function */ @@ -1197,31 +1343,45 @@ SorterRecord *p2, /* Second list to merge */ SorterRecord **ppOut /* OUT: Head of merged list */ ){ SorterRecord *pFinal = 0; SorterRecord **pp = &pFinal; - void *pVal2 = p2 ? SRVAL(p2) : 0; + int bCached = 0; while( p1 && p2 ){ int res; - res = vdbeSorterCompare(pTask, SRVAL(p1), p1->nVal, pVal2, p2->nVal); + res = pTask->xCompare( + pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal + ); + if( res<=0 ){ *pp = p1; pp = &p1->u.pNext; p1 = p1->u.pNext; - pVal2 = 0; }else{ *pp = p2; - pp = &p2->u.pNext; + pp = &p2->u.pNext; p2 = p2->u.pNext; - if( p2==0 ) break; - pVal2 = SRVAL(p2); + bCached = 0; } } *pp = p1 ? p1 : p2; *ppOut = pFinal; } + +/* +** Return the SorterCompare function to compare values collected by the +** sorter object passed as the only argument. +*/ +static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){ + if( p->typeMask==SORTER_TYPE_INTEGER ){ + return vdbeSorterCompareInt; + }else if( p->typeMask==SORTER_TYPE_TEXT ){ + return vdbeSorterCompareText; + } + return vdbeSorterCompare; +} /* ** Sort the linked list of records headed at pTask->pList. Return ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if ** an error occurs. @@ -1233,16 +1393,18 @@ int rc; rc = vdbeSortAllocUnpacked(pTask); if( rc!=SQLITE_OK ) return rc; + p = pList->pList; + pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter); + aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); if( !aSlot ){ return SQLITE_NOMEM; } - p = pList->pList; while( p ){ SorterRecord *pNext; if( pList->aMemory ){ if( (u8*)p==pList->aMemory ){ pNext = 0; @@ -1452,28 +1614,27 @@ /* Update contents of aTree[] */ if( rc==SQLITE_OK ){ int i; /* Index of aTree[] to recalculate */ PmaReader *pReadr1; /* First PmaReader to compare */ PmaReader *pReadr2; /* Second PmaReader to compare */ - u8 *pKey2; /* To pReadr2->aKey, or 0 if record cached */ + int bCached = 0; /* Find the first two PmaReaders to compare. The one that was just ** advanced (iPrev) and the one next to it in the array. */ pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)]; pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)]; - pKey2 = pReadr2->aKey; for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){ /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */ int iRes; if( pReadr1->pFd==0 ){ iRes = +1; }else if( pReadr2->pFd==0 ){ iRes = -1; }else{ - iRes = vdbeSorterCompare(pTask, - pReadr1->aKey, pReadr1->nKey, pKey2, pReadr2->nKey + iRes = pTask->xCompare(pTask, &bCached, + pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey ); } /* If pReadr1 contained the smaller value, set aTree[i] to its index. ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this @@ -1491,13 +1652,13 @@ ** is sorted from oldest to newest, so pReadr1 contains older values ** than pReadr2 iff (pReadr1aTree[i] = (int)(pReadr1 - pMerger->aReadr); pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; - pKey2 = pReadr2->aKey; + bCached = 0; }else{ - if( pReadr1->pFd ) pKey2 = 0; + if( pReadr1->pFd ) bCached = 0; pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr); pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; } } *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0); @@ -1600,10 +1761,20 @@ SorterRecord *pNew; /* New list element */ int bFlush; /* True to flush contents of memory to PMA */ int nReq; /* Bytes of memory required */ int nPMA; /* Bytes of PMA space required */ + int t; /* serial type of first record field */ + + getVarint32((const u8*)&pVal->z[1], t); + if( t>0 && t<10 && t!=7 ){ + pSorter->typeMask &= SORTER_TYPE_INTEGER; + }else if( t>10 && (t & 0x01) ){ + pSorter->typeMask &= SORTER_TYPE_TEXT; + }else{ + pSorter->typeMask = 0; + } assert( pSorter ); /* Figure out whether or not the current contents of memory should be ** flushed to a PMA before continuing. If so, do so. @@ -1865,14 +2036,16 @@ if( p1->pFd==0 ){ iRes = i2; }else if( p2->pFd==0 ){ iRes = i1; }else{ + SortSubtask *pTask = pMerger->pTask; + int bCached = 0; int res; - assert( pMerger->pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */ - res = vdbeSorterCompare( - pMerger->pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey + assert( pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */ + res = pTask->xCompare( + pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey ); if( res<=0 ){ iRes = i1; }else{ iRes = i2; @@ -2286,10 +2459,15 @@ int rc; /* Return code */ SortSubtask *pTask0 = &pSorter->aTask[0]; MergeEngine *pMain = 0; #if SQLITE_MAX_WORKER_THREADS sqlite3 *db = pTask0->pSorter->db; + int i; + SorterCompare xCompare = vdbeSorterGetCompare(pSorter); + for(i=0; inTask; i++){ + pSorter->aTask[i].xCompare = xCompare; + } #endif rc = vdbeSorterMergeTreeBuild(pSorter, &pMain); if( rc==SQLITE_OK ){ #if SQLITE_MAX_WORKER_THREADS Index: test/e_vacuum.test ================================================================== --- test/e_vacuum.test +++ test/e_vacuum.test @@ -27,20 +27,20 @@ execsql { PRAGMA page_size = 1024; } execsql $sql execsql { CREATE TABLE t1(a PRIMARY KEY, b UNIQUE); INSERT INTO t1 VALUES(1, randomblob(400)); - INSERT INTO t1 SELECT a+1, randomblob(400) FROM t1; - INSERT INTO t1 SELECT a+2, randomblob(400) FROM t1; - INSERT INTO t1 SELECT a+4, randomblob(400) FROM t1; - INSERT INTO t1 SELECT a+8, randomblob(400) FROM t1; - INSERT INTO t1 SELECT a+16, randomblob(400) FROM t1; - INSERT INTO t1 SELECT a+32, randomblob(400) FROM t1; - INSERT INTO t1 SELECT a+64, randomblob(400) FROM t1; + INSERT OR FAIL INTO t1 SELECT a+1, randomblob(400) FROM t1; + INSERT OR FAIL INTO t1 SELECT a+2, randomblob(400) FROM t1; + INSERT OR FAIL INTO t1 SELECT a+4, randomblob(400) FROM t1; + INSERT OR FAIL INTO t1 SELECT a+8, randomblob(400) FROM t1; + INSERT OR FAIL INTO t1 SELECT a+16, randomblob(400) FROM t1; + INSERT OR FAIL INTO t1 SELECT a+32, randomblob(400) FROM t1; + INSERT OR FAIL INTO t1 SELECT a+64, randomblob(400) FROM t1; CREATE TABLE t2(a PRIMARY KEY, b UNIQUE); - INSERT INTO t2 SELECT * FROM t1; + INSERT OR FAIL INTO t2 SELECT * FROM t1; } } return [expr {[file size test.db] / 1024}] } @@ -125,11 +125,11 @@ ifcapable vtab&&compound { create_db register_dbstat_vtab db do_execsql_test e_vacuum-1.2.1 { DELETE FROM t1 WHERE a%2; - INSERT INTO t1 SELECT b, a FROM t2 WHERE a%2; + INSERT OR REPLACE INTO t1 SELECT b, a FROM t2 WHERE a%2; UPDATE t1 SET b=randomblob(600) WHERE (a%2)==0; } {} do_test e_vacuum-1.2.2.1 { expr [fragment_count t1]>100 } 1 do_test e_vacuum-1.2.2.2 { expr [fragment_count sqlite_autoindex_t1_1]>100 } 1 ADDED test/insert6.test Index: test/insert6.test ================================================================== --- /dev/null +++ test/insert6.test @@ -0,0 +1,409 @@ +# 2015 March 20 +# +# The author disclaims copyright to this source code. In place of +# a legal notice, here is a blessing: +# +# May you do good and not evil. +# May you find forgiveness for yourself and forgive others. +# May you share freely, never taking more than you give. +# +#*********************************************************************** +# +# The tests in this file ensure that sorter objects are used by +# "INSERT INTO ... SELECT ..." statements when possible. +# + +set testdir [file dirname $argv0] +source $testdir/tester.tcl +set testprefix insert6 + +# Return the number of OP_SorterOpen instructions in the SQL passed as +# the only argument if it is compiled using connection [db]. +# +proc sorter_count {sql} { + set res 0 + db cache flush + db eval "EXPLAIN $sql" x { + if {$x(opcode) == "SorterOpen"} { incr res } + } + return $res +} + + +#------------------------------------------------------------------------- +# Warm body test. This verifies that the simplest case works for both +# regular and WITHOUT ROWID tables. +# +do_execsql_test 1.1 { + CREATE TABLE t2(x UNIQUE ON CONFLICT IGNORE, y, z); + WITH cnt(x) AS ( SELECT 0 UNION ALL SELECT x+1 FROM cnt WHERE x<99 ) + INSERT INTO t2 SELECT abs(random()), abs(random()), abs(random()) FROM cnt; +} + +foreach {tn nSort schema} { + 1 3 { CREATE TABLE t1(a, b, c) } + 2 4 { CREATE TABLE t1(a PRIMARY KEY, b, c) WITHOUT ROWID } +} { + + do_test 1.$tn.1 { + execsql { DROP TABLE IF EXISTS t1 } + execsql $schema + } {} + + do_execsql_test 1.$tn.2 { + CREATE INDEX t1a ON t1(a); + CREATE INDEX t1b ON t1(b); + CREATE INDEX t1c ON t1(c); + } + + do_execsql_test 1.$tn.3 { + INSERT INTO t1 SELECT x, y, z FROM t2; + PRAGMA integrity_check; + SELECT count(*) FROM t1; + } {ok 100} + + do_execsql_test 1.$tn.4 { + INSERT INTO t1 SELECT -x, y, z FROM t2; + PRAGMA integrity_check; + } {ok} + + do_execsql_test 1.$tn.5 { + SELECT count(*) FROM t1; + } {200} + + do_test 1.$tn.6 { + sorter_count { INSERT INTO t1 SELECT * FROM t2 } + } $nSort +} + +#------------------------------------------------------------------------- +# The following test cases check that the sorters are disabled if any +# of the following are true: +# +# 2.1: The statement specifies "ON CONFLICT FAIL", "IGNORE" or "REPLACE". +# +# 2.2: The statement does not explicitly specify a conflict mode and +# there are one or more PRIMARY KEY or UNIQUE constraints with +# "OR FAIL", "OR IGNORE" or "OR REPLACE" as the conflict handling +# mode. +# +# 2.3: There are one or more INSERT triggers on the target table. +# +# 2.4: The target table is the parent or child of an FK constraint. +# + +do_execsql_test 2.1.1 { + CREATE TABLE x1(a, b, c); + CREATE INDEX x1a ON x1(a); + + CREATE TABLE x2(a, b, c); + CREATE UNIQUE INDEX x2a ON x2(a); + + CREATE TABLE x3(a PRIMARY KEY, b, c); + CREATE TABLE x4(a PRIMARY KEY, b, c) WITHOUT ROWID; +} + +do_test 2.1.2 { sorter_count { INSERT OR REPLACE INTO x1 SELECT * FROM t2 } } 0 +do_test 2.1.3 { sorter_count { INSERT OR REPLACE INTO x2 SELECT * FROM t2 } } 0 +do_test 2.1.4 { sorter_count { INSERT OR REPLACE INTO x3 SELECT * FROM t2 } } 0 +do_test 2.1.5 { sorter_count { INSERT OR REPLACE INTO x4 SELECT * FROM t2 } } 0 + +do_test 2.1.6 { sorter_count { INSERT OR IGNORE INTO x1 SELECT * FROM t2 } } 0 +do_test 2.1.7 { sorter_count { INSERT OR IGNORE INTO x2 SELECT * FROM t2 } } 0 +do_test 2.1.8 { sorter_count { INSERT OR IGNORE INTO x3 SELECT * FROM t2 } } 0 +do_test 2.1.9 { sorter_count { INSERT OR IGNORE INTO x4 SELECT * FROM t2 } } 0 + +do_test 2.1.10 { sorter_count { INSERT OR FAIL INTO x1 SELECT * FROM t2 } } 0 +do_test 2.1.11 { sorter_count { INSERT OR FAIL INTO x2 SELECT * FROM t2 } } 0 +do_test 2.1.12 { sorter_count { INSERT OR FAIL INTO x3 SELECT * FROM t2 } } 0 +do_test 2.1.13 { sorter_count { INSERT OR FAIL INTO x4 SELECT * FROM t2 } } 0 + +do_test 2.1.14 { sorter_count { INSERT OR ROLLBACK INTO x1 SELECT * FROM t2} } 1 +do_test 2.1.15 { sorter_count { INSERT OR ROLLBACK INTO x2 SELECT * FROM t2} } 1 +do_test 2.1.16 { sorter_count { INSERT OR ROLLBACK INTO x3 SELECT * FROM t2} } 1 +do_test 2.1.17 { sorter_count { INSERT OR ROLLBACK INTO x4 SELECT * FROM t2} } 1 + +do_test 2.1.18 { sorter_count { INSERT OR ABORT INTO x1 SELECT * FROM t2 } } 1 +do_test 2.1.19 { sorter_count { INSERT OR ABORT INTO x2 SELECT * FROM t2 } } 1 +do_test 2.1.20 { sorter_count { INSERT OR ABORT INTO x3 SELECT * FROM t2 } } 1 +do_test 2.1.21 { sorter_count { INSERT OR ABORT INTO x4 SELECT * FROM t2 } } 1 + + +foreach {tn scount schema} { + 2.1 0 { CREATE TABLE t1(a UNIQUE ON CONFLICT FAIL, b, c) } + 2.2 0 { CREATE TABLE t1(a, b UNIQUE ON CONFLICT IGNORE, c) } + 2.3 0 { CREATE TABLE t1(a, b, c UNIQUE ON CONFLICT REPLACE) } + 2.4 0 { CREATE TABLE t1(a PRIMARY KEY ON CONFLICT FAIL, b, c) } + 2.5 0 { CREATE TABLE t1(a, b PRIMARY KEY ON CONFLICT IGNORE, c) } + 2.6 0 { CREATE TABLE t1(a, b, c PRIMARY KEY ON CONFLICT REPLACE) } + 2.7 0 { + CREATE TABLE t1(a PRIMARY KEY ON CONFLICT FAIL, b, c) WITHOUT ROWID + } + 2.8 0 { + CREATE TABLE t1(a, b PRIMARY KEY ON CONFLICT IGNORE, c) WITHOUT ROWID + } + 2.9 0 { + CREATE TABLE t1(a, b, c PRIMARY KEY ON CONFLICT REPLACE) WITHOUT ROWID + } + + 3.1 1 { + CREATE TABLE t1(a, b, c); + CREATE INDEX i1 ON t1(a); + } + 3.2 0 { + CREATE TABLE t1(a, b, c); + CREATE INDEX i1 ON t1(a); + CREATE TRIGGER tr1 AFTER INSERT ON t1 BEGIN SELECT 1; END; + } + 3.3 0 { + CREATE TABLE t1(a, b, c); + CREATE INDEX i1 ON t1(a); + CREATE TRIGGER tr2 BEFORE INSERT ON t1 BEGIN SELECT 1; END; + } + + 4.1 2 { + CREATE TABLE t1(a PRIMARY KEY, b, c); + CREATE INDEX i1 ON t1(a); + CREATE TABLE c1(x, y REFERENCES t1 DEFERRABLE INITIALLY DEFERRED); + PRAGMA foreign_keys = 0; + } + 4.2 0 { + CREATE TABLE t1(a PRIMARY KEY, b, c); + CREATE INDEX i1 ON t1(a); + CREATE TABLE c1(x, y REFERENCES t1 DEFERRABLE INITIALLY DEFERRED); + PRAGMA foreign_keys = 1; + } + + 4.3 1 { + CREATE TABLE p1(x, y UNIQUE); + CREATE TABLE t1(a, b, c REFERENCES p1(y)); + CREATE INDEX i1 ON t1(a); + PRAGMA foreign_keys = 0; + } + 4.4 0 { + CREATE TABLE p1(x, y UNIQUE); + CREATE TABLE t1(a, b, c REFERENCES p1(y)); + CREATE INDEX i1 ON t1(a); + PRAGMA foreign_keys = 1; + } + +} { + execsql { + DROP TABLE IF EXISTS t1; + DROP TABLE IF EXISTS c1; + DROP TABLE IF EXISTS p1; + } + + do_test 2.2.$tn { + execsql $schema + sorter_count { INSERT INTO t1 SELECT * FROM t2 } + } $scount +} + +#------------------------------------------------------------------------- +# Test that if a UNIQUE constraint is violated and the on conflict mode +# is either ABORT or ROLLBACK, the conflict is handled correctly. +# +# 3.2: Check that conflicts are actually detected. +# 3.3: Check that OR ROLLBACK really does rollback the transaction. +# 3.4: Check that OR ABORT does not. +# +do_execsql_test 3.1 { + DROP TABLE IF EXISTS t1; + CREATE TABLE t1(a PRIMARY KEY, b, c, UNIQUE(b, c)); + INSERT INTO t1 VALUES(1, 2, 3); + INSERT INTO t1 VALUES(4, 5, 6); + INSERT INTO t1 VALUES(7, 8, 9); + CREATE TABLE src(a, b, c); +} + +do_catchsql_test 3.2.1 { + INSERT INTO src VALUES (10, 11, 12), (7, 14, 12); + INSERT INTO t1 SELECT * FROM src; +} {1 {UNIQUE constraint failed: t1.a}} + +do_catchsql_test 3.2.2 { + DELETE FROM src; + INSERT INTO src VALUES (10, 11, 12), (13, 5, 6); + INSERT INTO t1 SELECT * FROM src; +} {1 {UNIQUE constraint failed: t1.b, t1.c}} + +do_catchsql_test 3.2.3.1 { + CREATE TABLE t3(a); + CREATE UNIQUE INDEX t3a ON t3(a); + + CREATE TABLE t3src(a); + WITH cnt(x) AS ( SELECT 0 UNION ALL SELECT x+1 FROM cnt WHERE x<10 ) + INSERT INTO t3src SELECT 'abc' FROM cnt; +} {0 {}} + +# execsql { PRAGMA vdbe_trace = 1 } +do_catchsql_test 3.2.3.2 { + INSERT INTO t3 SELECT * FROM t3src; +} {1 {UNIQUE constraint failed: t3.a}} + +do_catchsql_test 3.3.1 { + DELETE FROM src; + BEGIN; + INSERT INTO src VALUES (10, 11, 12), (7, 13, 14); + INSERT OR ROLLBACK INTO t1 SELECT * FROM src; +} {1 {UNIQUE constraint failed: t1.a}} +do_catchsql_test 3.3.2 { + DELETE FROM src; + BEGIN; + INSERT INTO src VALUES (10, 11, 12), (13, 5, 6); + INSERT OR ROLLBACK INTO t1 SELECT * FROM src; +} {1 {UNIQUE constraint failed: t1.b, t1.c}} +do_test 3.3.3 { + sqlite3_get_autocommit db +} 1 + +do_catchsql_test 3.4.1 { + DELETE FROM src; + BEGIN; + INSERT INTO src VALUES (10, 11, 12), (7, 14, 12); + INSERT OR ABORT INTO t1 SELECT * FROM src; +} {1 {UNIQUE constraint failed: t1.a}} +do_catchsql_test 3.4.2 { + ROLLBACK; + DELETE FROM src; + BEGIN; + INSERT INTO src VALUES (10, 11, 12), (13, 5, 6); + INSERT OR ABORT INTO t1 SELECT * FROM src; +} {1 {UNIQUE constraint failed: t1.b, t1.c}} +do_test 3.4.3 { + sqlite3_get_autocommit db +} 0 +do_execsql_test 3.4.4 { ROLLBACK } + +#------------------------------------------------------------------------- +# The following tests - 4.* - check that this optimization is actually +# doing something helpful. They do this by executing a big +# "INSERT INTO SELECT" statement in wal mode with a small pager cache. +# Once with "OR FAIL" (so that the sorters are not used) and once with +# the default "OR ABORT" (so that they are). +# +# If the sorters are doing their job, the wal file generated by the +# "OR ABORT" case should be much smaller than the "OR FAIL" trial. +# + +proc odd_collate {lhs rhs} { + string compare [string range $lhs 6 end] [string range $rhs 6 end] +} + +proc do_insert6_4_test {tn sql} { + + reset_db + db collate odd_collate odd_collate + execsql $sql + db_save_and_close + + foreach {tn2 ::onerror ::var} { + 1 "OR ABORT" ::sz1 + 2 "OR FAIL" ::sz2 + } { + do_test $tn.$tn2 { + db_restore_and_reopen + db collate odd_collate odd_collate + execsql " + PRAGMA journal_mode = wal; + PRAGMA cache_size = 5; + PRAGMA wal_autocheckpoint = 0; + INSERT $onerror INTO t1 SELECT * FROM src; + " + set $var [file size test.db-wal] + db close + } {} + } + + do_test $tn.3.($::sz1<$::sz2) { + expr {$sz1 < ($sz2/2)} + } 1 + + sqlite3 db test.db + db collate odd_collate odd_collate + integrity_check $tn.4 +} + +do_insert6_4_test 4.1 { + CREATE TABLE t1(a, b, c); + CREATE UNIQUE INDEX t1a ON t1(a); + CREATE UNIQUE INDEX t1bc ON t1(b, c); + + CREATE TABLE src(x, y, z); + WITH cnt(x) AS ( SELECT 0 UNION ALL SELECT x+1 FROM cnt WHERE x<2999 ) + INSERT INTO src + SELECT randomblob(50), randomblob(50), randomblob(50) FROM cnt; +} + +do_insert6_4_test 4.2 { + CREATE TABLE t1(a INTEGER PRIMARY KEY, b, x); + CREATE UNIQUE INDEX t1b ON t1(b); + CREATE INDEX t1x1 ON t1(x); + CREATE INDEX t1x2 ON t1(x); + CREATE INDEX t1x3 ON t1(x); + CREATE INDEX t1x4 ON t1(x); + + CREATE TABLE src(a, b, x); + WITH cnt(x) AS ( SELECT 0 UNION ALL SELECT x+1 FROM cnt WHERE x<2999 ) + INSERT INTO src + SELECT random(), x, zeroblob(50) FROM cnt; +} + +do_insert6_4_test 4.3 { + CREATE TABLE t1(a, b, c); + CREATE UNIQUE INDEX t1ab ON t1(a, b); + CREATE UNIQUE INDEX t1ac ON t1(a, c); + + CREATE TABLE src(a, b, c); + WITH cnt(x) AS ( SELECT 0 UNION ALL SELECT x+1 FROM cnt WHERE x<2999 ) + INSERT INTO src + SELECT zeroblob(50), randomblob(50), randomblob(50) FROM cnt; +} + +db collate odd_collate odd_collate +do_insert6_4_test 4.5 { + CREATE TABLE t1(t COLLATE odd_collate, v COLLATE odd_collate); + CREATE UNIQUE INDEX t1t ON t1(t); + CREATE UNIQUE INDEX t1v ON t1(v); + + CREATE TABLE src(t, v); + WITH cnt(x) AS ( SELECT 0 UNION ALL SELECT x+1 FROM cnt WHERE x<2999 ) + INSERT INTO src + SELECT hex(randomblob(50)), hex(randomblob(50)) FROM cnt; +} + +db collate odd_collate odd_collate +do_insert6_4_test 4.6 { + CREATE TABLE t1(t COLLATE odd_collate PRIMARY KEY) WITHOUT ROWID; + CREATE TABLE src(t); + WITH cnt(x) AS ( SELECT 0 UNION ALL SELECT x+1 FROM cnt WHERE x<2999 ) + INSERT INTO src + SELECT hex(randomblob(50)) FROM cnt; +} + +#------------------------------------------------------------------------- +# At one point the sorters were used for INSERT statements that specify +# "OR FAIL", "REPLACE" or "IGNORE" if there were no PRIMARY KEY or +# UNIQUE indexes. This is incorrect, as all such tables have an implicit +# IPK column. So using the sorters can cause corruption. This test checks +# that that problem no longer exists. +# +reset_db +do_execsql_test 5.1 { + CREATE TABLE t1(a INTEGER PRIMARY KEY, b, c); + CREATE INDEX t1b ON t1(b); + INSERT INTO t1 VALUES(1, 2, 3); + INSERT INTO t1 VALUES(4, 5, 6); +} + +do_catchsql_test 5.2 { + INSERT OR FAIL INTO t1 + SELECT 2, 'x', 'x' UNION ALL SELECT 3, 'x', 'x' UNION ALL SELECT 4, 'x', 'x'; +} {1 {UNIQUE constraint failed: t1.a}} + +integrity_check 5.3 + + +finish_test + Index: test/stat.test ================================================================== --- test/stat.test +++ test/stat.test @@ -74,19 +74,19 @@ } {} do_execsql_test stat-2.1 { CREATE TABLE t3(a PRIMARY KEY, b); INSERT INTO t3(rowid, a, b) VALUES(2, a_string(111), a_string(222)); - INSERT INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 - ORDER BY rowid; - INSERT INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 - ORDER BY rowid; - INSERT INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 - ORDER BY rowid; - INSERT INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 - ORDER BY rowid; - INSERT INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 + INSERT OR FAIL INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 + ORDER BY rowid; + INSERT OR FAIL INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 + ORDER BY rowid; + INSERT OR FAIL INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 + ORDER BY rowid; + INSERT OR FAIL INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 + ORDER BY rowid; + INSERT OR FAIL INTO t3 SELECT a_string(110+rowid), a_string(221+rowid) FROM t3 ORDER BY rowid; SELECT name, path, pageno, pagetype, ncell, payload, unused, mx_payload FROM stat WHERE name != 'sqlite_master'; } [list \ sqlite_autoindex_t3_1 / 3 internal 3 368 623 125 \ Index: test/wal.test ================================================================== --- test/wal.test +++ test/wal.test @@ -693,14 +693,14 @@ } [list 3 [wal_file_size 4 1024]] do_test wal-11.4 { execsql { BEGIN; - INSERT INTO t1 SELECT blob(900) FROM t1; -- 2 - INSERT INTO t1 SELECT blob(900) FROM t1; -- 4 - INSERT INTO t1 SELECT blob(900) FROM t1; -- 8 - INSERT INTO t1 SELECT blob(900) FROM t1; -- 16 + INSERT OR FAIL INTO t1 SELECT blob(900) FROM t1; -- 2 + INSERT OR FAIL INTO t1 SELECT blob(900) FROM t1; -- 4 + INSERT OR FAIL INTO t1 SELECT blob(900) FROM t1; -- 8 + INSERT OR FAIL INTO t1 SELECT blob(900) FROM t1; -- 16 } list [expr [file size test.db]/1024] [file size test.db-wal] } [list 3 [wal_file_size 32 1024]] do_test wal-11.5 { execsql { @@ -732,11 +732,11 @@ ifcapable !mmap {set nWal 37} do_test wal-11.10 { execsql { PRAGMA cache_size = 10; BEGIN; - INSERT INTO t1 SELECT blob(900) FROM t1; -- 32 + INSERT OR FAIL INTO t1 SELECT blob(900) FROM t1; -- 32 SELECT count(*) FROM t1; } list [expr [file size test.db]/1024] [file size test.db-wal] } [list 37 [wal_file_size $nWal 1024]] do_test wal-11.11 {