Index: src/vdbe.c ================================================================== --- src/vdbe.c +++ src/vdbe.c @@ -4148,17 +4148,15 @@ ** If, excluding the rowid fields at the end, the two records are a match, ** fall through to the next instruction. Otherwise, jump to instruction P2. */ case OP_SorterCompare: { VdbeCursor *pC; - int res; pC = p->apCsr[pOp->p1]; assert( isSorter(pC) ); pIn3 = &aMem[pOp->p3]; - rc = sqlite3VdbeSorterCompare(pC, pIn3, &res); - if( res ){ + if( sqlite3VdbeSorterCompare(pC, pIn3) ){ pc = pOp->p2-1; } break; }; Index: src/vdbeInt.h ================================================================== --- src/vdbeInt.h +++ src/vdbeInt.h @@ -425,19 +425,19 @@ # define sqlite3VdbeSorterWrite(X,Y,Z) SQLITE_OK # define sqlite3VdbeSorterClose(Y,Z) # define sqlite3VdbeSorterRowkey(Y,Z) SQLITE_OK # define sqlite3VdbeSorterRewind(X,Y,Z) SQLITE_OK # define sqlite3VdbeSorterNext(X,Y,Z) SQLITE_OK -# define sqlite3VdbeSorterCompare(X,Y,Z) SQLITE_OK +# define sqlite3VdbeSorterCompare(X,Y) 0 #else int sqlite3VdbeSorterInit(sqlite3 *, VdbeCursor *); void sqlite3VdbeSorterClose(sqlite3 *, VdbeCursor *); int sqlite3VdbeSorterRowkey(const VdbeCursor *, Mem *); int sqlite3VdbeSorterNext(sqlite3 *, const VdbeCursor *, int *); int sqlite3VdbeSorterRewind(sqlite3 *, const VdbeCursor *, int *); int sqlite3VdbeSorterWrite(sqlite3 *, const VdbeCursor *, Mem *); -int sqlite3VdbeSorterCompare(const VdbeCursor *, Mem *, int *); +int sqlite3VdbeSorterCompare(const VdbeCursor *, Mem *); #endif #if !defined(SQLITE_OMIT_SHARED_CACHE) && SQLITE_THREADSAFE>0 void sqlite3VdbeEnter(Vdbe*); void sqlite3VdbeLeave(Vdbe*); Index: src/vdbesort.c ================================================================== --- src/vdbesort.c +++ src/vdbesort.c @@ -37,11 +37,11 @@ ** operations. ** ** The aIter[] array contains an iterator for each of the PMAs being merged. ** An aIter[] iterator either points to a valid key or else is at EOF. For ** the purposes of the paragraphs below, we assume that the array is actually -** N elements in size, where N is the smallest power of 2 greater to or equal +** N elements in size, where N is the smallest power of 2 greater to or equal ** to the number of iterators being merged. The extra aIter[] elements are ** treated as if they are empty (always at EOF). ** ** The aTree[] array is also N elements in size. The value of N is stored in ** the VdbeSorter.nTree variable. @@ -102,13 +102,24 @@ int mnPmaSize; /* Minimum PMA size, in bytes */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ VdbeSorterIter *aIter; /* Array of iterators to merge */ int *aTree; /* Current state of incremental merge */ sqlite3_file *pTemp1; /* PMA file 1 */ - SorterRecord *pRecord; /* Head of in-memory record list */ + SorterRecord *aRec[9]; /* Nine different types of records */ + SorterRecord **aLastRec[9]; /* Locations to write the next pointers to */ UnpackedRecord *pUnpacked; /* Used to unpack keys */ }; + +#define SORTER_NULL 0 +#define SORTER_INT_NEG 1 +#define SORTER_INT_ZERO 2 +#define SORTER_INT_ONE 3 +#define SORTER_INT_POS 4 +#define SORTER_DOUBLE 5 +#define SORTER_TEXT 6 +#define SORTER_BLOB 7 +#define SORTER_LARGE 8 /* ** The following type is an iterator for a PMA. It caches the current key in ** variables nKey/aKey. If the iterator is at EOF, pFile==0. */ @@ -364,55 +375,85 @@ rc = vdbeSorterIterNext(db, pIter); } return rc; } - /* ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, ** size nKey2 bytes). Argument pKeyInfo supplies the collation functions ** used by the comparison. If an error occurs, return an SQLite error code. ** Otherwise, return SQLITE_OK and set *pRes to a negative, zero or positive ** value, depending on whether key1 is smaller, equal to or larger than key2. ** -** If the bOmitRowid argument is non-zero, assume both keys end in a rowid -** field. For the purposes of the comparison, ignore it. Also, if bOmitRowid -** is true and key1 contains even a single NULL value, it is considered to -** be less than key2. Even if key2 also contains NULL values. -** ** If pKey2 is passed a NULL pointer, then it is assumed that the pCsr->aSpace ** has been allocated and contains an unpacked record that is used as key2. */ -static void vdbeSorterCompare( - const VdbeCursor *pCsr, /* Cursor object (for pKeyInfo) */ - int bOmitRowid, /* Ignore rowid field at end of keys */ +static int vdbeSorterCompareRec( + void *p, /* VdbeCursor object */ const void *pKey1, int nKey1, /* Left side of comparison */ - const void *pKey2, int nKey2, /* Right side of comparison */ - int *pRes /* OUT: Result of comparison */ + const void *pKey2, int nKey2 /* Right side of comparison */ ){ + const VdbeCursor *pCsr = (VdbeCursor *)p; KeyInfo *pKeyInfo = pCsr->pKeyInfo; - VdbeSorter *pSorter = pCsr->pSorter; - UnpackedRecord *r2 = pSorter->pUnpacked; - int i; + UnpackedRecord *r2 = pCsr->pSorter->pUnpacked; if( pKey2 ){ sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2); } - - if( bOmitRowid ){ - r2->nField = pKeyInfo->nField; - assert( r2->nField>0 ); - for(i=0; inField; i++){ - if( r2->aMem[i].flags & MEM_Null ){ - *pRes = -1; - return; - } - } - r2->flags |= UNPACKED_PREFIX_MATCH; - } - - *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2); + return sqlite3VdbeRecordCompare(nKey1, pKey1, r2); +} + +/* +** Buffers pKey1 and pKey2 both contain encoded records. The first elements +** of each are both either negative integers (if p!=0) or positive integers +** greater than 1 (if p==0). Return a values less than, equal to or greater +** than zero if the first field in pKey1 is less than, equal to or greater +** than the first field in pKey2, respectively. +*/ +static int vdbeSorterCompareInt( + void *p, + const void *pKey1, int nKey1, /* Left side of comparison */ + const void *pKey2, int nKey2 /* Right side of comparison */ +){ + static const int aLen[] = {0, 1, 2, 3, 4, 6, 8}; + const u8 *aKey1 = (u8 *)pKey1; + const u8 *aKey2 = (u8 *)pKey2; + int res = (int)aKey1[1] - (int)aKey2[1]; + + if( res==0 ){ + res = memcmp(&aKey1[aKey1[0]], &aKey2[aKey2[0]], aLen[aKey1[1]]); + }else if( aKey1[aKey1[0]] & 0x80 ){ + res = res * -1; + } + return res; +} + +/* +** Buffers pKey1 and pKey2 both contain encoded records. The first elements +** of each are both either text or blob values. +** +** Argument p points to a CollSeq structure. If the pKey1 and pKey2 buffers +** contain blobs, then this is always the BINARY collation sequence. Either +** way, compare the contents of the two buffers and return an integer less +** than, equal to or greater than zero if the value in pKey1 is less than, +** equal to or greater than that in pKey2, respectively. +*/ +static int vdbeSorterCompareString( + void *p, /* Pointer to CollSeq object */ + const void *pKey1, int nKey1, /* Left side of comparison */ + const void *pKey2, int nKey2 /* Right side of comparison */ +){ + CollSeq *pColl = (CollSeq *)p; + const u8 *aKey1 = (u8 *)pKey1; + const u8 *aKey2 = (u8 *)pKey2; + int n1, n2; + int res; + + n1 = (aKey1[1] - 12) / 2; + n2 = (aKey2[1] - 12) / 2; + res = pColl->xCmp(pColl->pUser, n1, &aKey1[aKey1[0]], n2, &aKey2[aKey2[0]]); + return res; } /* ** This function is called to compare two iterator keys when merging ** multiple b-tree segments. Parameter iOut is the index of the aTree[] @@ -444,12 +485,12 @@ }else if( p2->pFile==0 ){ iRes = i1; }else{ int res; assert( pCsr->pSorter->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */ - vdbeSorterCompare( - pCsr, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res + res = vdbeSorterCompareRec( + (void *)pCsr, p1->aKey, p1->nKey, p2->aKey, p2->nKey ); if( res<=0 ){ iRes = i1; }else{ iRes = i2; @@ -458,10 +499,22 @@ pSorter->aTree[iOut] = iRes; return SQLITE_OK; } + +/* +** Set each entry of the aLastRec[] array to point to the corresponding entry +** in the aRec[] array. +*/ +static void vdbeSorterSetLastRec(VdbeSorter *pSorter){ + int i; + for(i=0; iaLastRec); i++){ + pSorter->aLastRec[i] = &pSorter->aRec[i]; + } +} + /* ** Initialize the temporary index cursor just opened as a sorter cursor. */ int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){ int pgsz; /* Page size of main database */ @@ -485,10 +538,11 @@ mxCache = db->aDb[0].pSchema->cache_size; if( mxCachemxPmaSize = mxCache * pgsz; } + vdbeSorterSetLastRec(pSorter); return SQLITE_OK; } /* ** Free the list of sorted records starting at pRecord. @@ -506,21 +560,25 @@ ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines. */ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ VdbeSorter *pSorter = pCsr->pSorter; if( pSorter ){ + int i; if( pSorter->aIter ){ - int i; for(i=0; inTree; i++){ vdbeSorterIterZero(db, &pSorter->aIter[i]); } sqlite3DbFree(db, pSorter->aIter); } if( pSorter->pTemp1 ){ sqlite3OsCloseFree(pSorter->pTemp1); } - vdbeSorterRecordFree(db, pSorter->pRecord); + + for(i=0; iaRec); i++){ + vdbeSorterRecordFree(db, pSorter->aRec[i]); + } + sqlite3DbFree(db, pSorter->pUnpacked); sqlite3DbFree(db, pSorter); pCsr->pSorter = 0; } } @@ -543,10 +601,12 @@ ** Merge the two sorted lists p1 and p2 into a single list. ** Set *ppOut to the head of the new list. */ static void vdbeSorterMerge( const VdbeCursor *pCsr, /* For pKeyInfo */ + int (*xCmp)(void*,const void*,int,const void*,int), + void *pCtx, /* First argument to pass to xCmp() */ SorterRecord *p1, /* First list to merge */ SorterRecord *p2, /* Second list to merge */ SorterRecord **ppOut /* OUT: Head of merged list */ ){ SorterRecord *pFinal = 0; @@ -553,16 +613,19 @@ SorterRecord **pp = &pFinal; void *pVal2 = p2 ? p2->pVal : 0; while( p1 && p2 ){ int res; - vdbeSorterCompare(pCsr, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res); + res = xCmp(pCtx, p1->pVal, p1->nVal, pVal2, p2->nVal); + if( xCmp!=vdbeSorterCompareRec && pCsr->pKeyInfo->aSortOrder[0] ){ + res = res * -1; + } if( res<=0 ){ *pp = p1; pp = &p1->pNext; p1 = p1->pNext; - pVal2 = 0; + if( xCmp==vdbeSorterCompareRec ) pVal2 = 0; }else{ *pp = p2; pp = &p2->pNext; p2 = p2->pNext; if( p2==0 ) break; @@ -570,45 +633,133 @@ } } *pp = p1 ? p1 : p2; *ppOut = pFinal; } + +/* +** Concatenate the linked lists headed at elements iStart through iEnd +** (inclusive) of the pSorter->aRec[] array. Store the result in +** pSorter->aRec[iEnd]. Set entries iStart through iEnd-1 to zero. +** +** If parameter bReverse is false, the lists are concatenated so that +** all the elements of list iStart occur before those of iStart+1, and +** so on. Or, if bReverse is true, the original content of iEnd is at +** the start of the result, followed by the content of iEnd-1, etc. +*/ +static void vdbeSorterConcatLists( + VdbeSorter *pSorter, + int bReverse, /* True to concenate in reverse order */ + int iStart, + int iEnd +){ + int i; + for(i=iStart; iaLastRec[i] = pSorter->aRec[iEnd]; + pSorter->aRec[iEnd] = pSorter->aRec[i]; + }else{ + *pSorter->aLastRec[iEnd] = pSorter->aRec[i]; + if( pSorter->aRec[i] ) pSorter->aLastRec[iEnd] = pSorter->aLastRec[i]; + } + pSorter->aRec[i] = 0; + pSorter->aLastRec[i] = &pSorter->aRec[i]; + } +} /* ** Sort the linked list of records headed at pCsr->pRecord. Return SQLITE_OK ** if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if an error ** occurs. */ static int vdbeSorterSort(const VdbeCursor *pCsr){ int i; + int iRec; SorterRecord **aSlot; SorterRecord *p; + KeyInfo *pKeyInfo = pCsr->pKeyInfo; VdbeSorter *pSorter = pCsr->pSorter; + int bReverse = pCsr->pKeyInfo->aSortOrder[0]; aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); if( !aSlot ){ return SQLITE_NOMEM; } - p = pSorter->pRecord; - while( p ){ - SorterRecord *pNext = p->pNext; - p->pNext = 0; - for(i=0; aSlot[i]; i++){ - vdbeSorterMerge(pCsr, p, aSlot[i], &p); + /* If there are one or more SORTER_LARGE records, or if there are + ** SORTER_TEXT records that must be converted to a different encoding + ** before they can be compared, move everything to the SORTER_LARGE slot. */ + if( pSorter->aRec[SORTER_LARGE] + || (pSorter->aRec[SORTER_TEXT] && pKeyInfo->enc!=pKeyInfo->aColl[0]->enc) + ){ + vdbeSorterConcatLists(pSorter, 0, 0, SORTER_LARGE); + } + + /* If there are one or more SORTER_DOUBLE records, move all numeric + ** records to the SORTER_DOUBLE slot. */ + if( pSorter->aRec[SORTER_DOUBLE] ){ + vdbeSorterConcatLists(pSorter, 0, SORTER_INT_NEG, SORTER_DOUBLE); + } + + for(iRec=0; iRecaRec); iRec++){ + void *pCtx = 0; + int (*xCmp)(void*,const void*,int,const void*,int); + switch( iRec ){ + case SORTER_NULL: + case SORTER_INT_ZERO: + case SORTER_INT_ONE: + xCmp = 0; + break; + + case SORTER_INT_NEG: + case SORTER_INT_POS: + xCmp = vdbeSorterCompareInt; + break; + + case SORTER_BLOB: + pCtx = (void *)(pCsr->pKeyInfo->db->pDfltColl); + xCmp = vdbeSorterCompareString; + break; + + case SORTER_TEXT: + pCtx = (void *)(pCsr->pKeyInfo->aColl[0]); + xCmp = vdbeSorterCompareString; + break; + + default: + pCtx = (void *)pCsr; + xCmp = vdbeSorterCompareRec; + break; + } + if( !xCmp ) continue; + + p = pSorter->aRec[iRec]; + while( p ){ + SorterRecord *pNext = p->pNext; + p->pNext = 0; + for(i=0; aSlot[i]; i++){ + vdbeSorterMerge(pCsr, xCmp, pCtx, aSlot[i], p, &p); + aSlot[i] = 0; + } + aSlot[i] = p; + p = pNext; + } + p = 0; + for(i=0; i<64; i++){ + vdbeSorterMerge(pCsr, xCmp, pCtx, aSlot[i], p, &p); aSlot[i] = 0; } - aSlot[i] = p; - p = pNext; + pSorter->aRec[iRec] = p; + if( p ){ + SorterRecord *pRec; + for(pRec=pSorter->aRec[iRec]; pRec->pNext; pRec=pRec->pNext); + pSorter->aLastRec[iRec] = &pRec->pNext; + } } - p = 0; - for(i=0; i<64; i++){ - vdbeSorterMerge(pCsr, p, aSlot[i], &p); - } - pSorter->pRecord = p; - + vdbeSorterConcatLists(pSorter, bReverse, 0, SORTER_LARGE); + vdbeSorterSetLastRec(pSorter); sqlite3_free(aSlot); return SQLITE_OK; } /* @@ -709,19 +860,18 @@ ** * One or more records packed end-to-end in order of ascending keys. ** Each record consists of a varint followed by a blob of data (the ** key). The varint is the number of bytes in the blob of data. */ static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){ - int rc = SQLITE_OK; /* Return code */ + int rc; /* Return code */ VdbeSorter *pSorter = pCsr->pSorter; FileWriter writer; memset(&writer, 0, sizeof(FileWriter)); if( pSorter->nInMemory==0 ){ - assert( pSorter->pRecord==0 ); - return rc; + return SQLITE_OK; } rc = vdbeSorterSort(pCsr); /* If the first temporary PMA file has not been opened, open it now. */ @@ -737,17 +887,17 @@ SorterRecord *pNext = 0; fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff); pSorter->nPMA++; fileWriterWriteVarint(&writer, pSorter->nInMemory); - for(p=pSorter->pRecord; p; p=pNext){ + for(p=pSorter->aRec[SORTER_LARGE]; rc==SQLITE_OK && p; p=pNext){ pNext = p->pNext; fileWriterWriteVarint(&writer, p->nVal); fileWriterWrite(&writer, p->pVal, p->nVal); sqlite3DbFree(db, p); } - pSorter->pRecord = p; + pSorter->aRec[SORTER_LARGE] = p; rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff); } return rc; } @@ -769,15 +919,52 @@ pNew = (SorterRecord *)sqlite3DbMallocRaw(db, pVal->n + sizeof(SorterRecord)); if( pNew==0 ){ rc = SQLITE_NOMEM; }else{ - pNew->pVal = (void *)&pNew[1]; + int iSlot; + u8 *aVal = pNew->pVal = (void *)&pNew[1]; memcpy(pNew->pVal, pVal->z, pVal->n); pNew->nVal = pVal->n; - pNew->pNext = pSorter->pRecord; - pSorter->pRecord = pNew; + + u8 n = aVal[0]; + u8 t = aVal[1]; + + if( pCsr->pKeyInfo->nField!=1 || (t & 0x80) || (n & 0x80) ){ + iSlot = SORTER_LARGE; + }else{ + u8 t = aVal[1]; + switch( t ){ + case 0: + iSlot = SORTER_NULL; + break; + + case 1: case 2: case 3: case 4: case 5: case 6: + iSlot = (aVal[n] & 0x80) ? SORTER_INT_NEG : SORTER_INT_POS; + break; + + case 7: + iSlot = SORTER_DOUBLE; + break; + + case 8: + iSlot = SORTER_INT_ZERO; + break; + + case 9: + iSlot = SORTER_INT_ONE; + break; + + default: + iSlot = SORTER_BLOB - (t & 0x01); + break; + } + } + + pNew->pNext = 0; + *pSorter->aLastRec[iSlot] = pNew; + pSorter->aLastRec[iSlot] = &(pNew->pNext); } /* See if the contents of the sorter should now be written out. They ** are written out when either of the following are true: ** @@ -852,11 +1039,11 @@ /* If no data has been written to disk, then do not do so now. Instead, ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly ** from the in-memory list. */ if( pSorter->nPMA==0 ){ - *pbEof = !pSorter->pRecord; + *pbEof = (pSorter->nInMemory==0); assert( pSorter->aTree==0 ); return vdbeSorterSort(pCsr); } /* Write the current in-memory list to a PMA. */ @@ -961,15 +1148,15 @@ rc = vdbeSorterDoCompare(pCsr, i); } *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0); }else{ - SorterRecord *pFree = pSorter->pRecord; - pSorter->pRecord = pFree->pNext; + SorterRecord *pFree = pSorter->aRec[SORTER_LARGE]; + pSorter->aRec[SORTER_LARGE] = pFree->pNext; pFree->pNext = 0; vdbeSorterRecordFree(db, pFree); - *pbEof = !pSorter->pRecord; + *pbEof = !pSorter->aRec[SORTER_LARGE]; rc = SQLITE_OK; } return rc; } @@ -986,12 +1173,12 @@ VdbeSorterIter *pIter; pIter = &pSorter->aIter[ pSorter->aTree[1] ]; *pnKey = pIter->nKey; pKey = pIter->aKey; }else{ - *pnKey = pSorter->pRecord->nVal; - pKey = pSorter->pRecord->pVal; + *pnKey = pSorter->aRec[SORTER_LARGE]->nVal; + pKey = pSorter->aRec[SORTER_LARGE]->pVal; } return pKey; } /* @@ -1006,11 +1193,10 @@ return SQLITE_NOMEM; } pOut->n = nKey; MemSetTypeFlag(pOut, MEM_Blob); memcpy(pOut->z, pKey, nKey); - return SQLITE_OK; } /* ** Compare the key in memory cell pVal with the key that the sorter cursor @@ -1022,17 +1208,28 @@ ** key in pVal is smaller than, equal to or larger than the current sorter ** key. */ int sqlite3VdbeSorterCompare( const VdbeCursor *pCsr, /* Sorter cursor */ - Mem *pVal, /* Value to compare to current sorter key */ - int *pRes /* OUT: Result of comparison */ + Mem *pVal /* Value to compare to current sorter key */ ){ + KeyInfo *pKeyInfo = pCsr->pKeyInfo; VdbeSorter *pSorter = pCsr->pSorter; + UnpackedRecord *r2 = pSorter->pUnpacked; + int i; void *pKey; int nKey; /* Sorter key to compare pVal with */ pKey = vdbeSorterRowkey(pSorter, &nKey); - vdbeSorterCompare(pCsr, 1, pVal->z, pVal->n, pKey, nKey, pRes); - return SQLITE_OK; + assert( pKey && pVal->z ); + sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); + + r2->nField = pKeyInfo->nField; + assert( r2->nField>0 ); + for(i=0; inField; i++){ + if( r2->aMem[i].flags & MEM_Null ) return -1; + } + r2->flags |= UNPACKED_PREFIX_MATCH; + + return sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2); } #endif /* #ifndef SQLITE_OMIT_MERGE_SORT */ Index: test/sort.test ================================================================== --- test/sort.test +++ test/sort.test @@ -461,7 +461,22 @@ insert into b values (3, 1, 'yyy'); select a.id, b.id, b.text from a join b on (a.id = b.aId) order by a.id, b.text; } } {1 2 xxx 1 3 yyy 1 1 zzz} + +foreach {tn shuffle} { + 1 {1 2 3 4} 2 {1 3 2 4} + 3 {4 3 2 1} 4 {1 3 2 4} +} { + do_test sort-13.$tn { + execsql { DROP TABLE IF EXISTS w1 } + execsql { CREATE TABLE w1(x) } + foreach i $shuffle { + execsql { INSERT INTO w1 VALUES($i) } + } + execsql { SELECT x FROM w1 ORDER BY x } + } {1 2 3 4} +} finish_test +