Index: src/shell.c ================================================================== --- src/shell.c +++ src/shell.c @@ -3533,11 +3533,11 @@ sqlite3_config(SQLITE_CONFIG_URI, 1); sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data); sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> "); sqlite3_snprintf(sizeof(continuePrompt), continuePrompt," ...> "); sqlite3_config(SQLITE_CONFIG_MULTITHREAD); - sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3); + sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4); } /* ** Output text to the console in a font that attracts extra attention. */ Index: src/vdbesort.c ================================================================== --- src/vdbesort.c +++ src/vdbesort.c @@ -95,17 +95,46 @@ typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */ typedef struct SorterRecord SorterRecord; /* A record being sorted */ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ +typedef struct SortList SortList; +typedef struct SortFile SortFile; +typedef struct SortLevel SortLevel; + + +/* +** A file containing zero or more PMAs. +*/ +struct SortFile { + sqlite3_file *pFd; /* File handle */ + i64 iOff; /* Current write offset */ + i64 nByte; /* Actual size of file */ + int nPMA; /* Number of PMA currently in file */ +}; /* -** Candidate values for SortSubtask.eWork +** A list of records. */ -#define SORT_SUBTASK_SORT 1 /* Sort records on pList */ -#define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */ -#define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */ +struct SortList { + SorterRecord *pRecord; /* List of records for pTask to sort */ + int nInMemory; /* Expected size of PMA based on pList */ + u8 *aMemory; /* Records memory (or NULL) */ +}; + +struct SortLevel { + SortSubtask *pTask; /* Sorter task this level is a part of */ + SQLiteThread *pThread; /* Thread handle, or NULL */ + int bDone; /* Set to true by pThread when finished */ + union { + SortFile f; /* Input for level 1 and greater */ + SortList l; /* Input for level 0 */ + } in; + SortFile out; /* Level storage */ + SortLevel *pNext; /* Next level (containing larger PMAs) */ + UnpackedRecord *pUnpacked; /* Space to unpack a record */ +}; /* ** Sorting is divided up into smaller subtasks. Each subtask is controlled ** by an instance of this object. A Subtask might run in either the main thread ** or in a background thread. @@ -139,27 +168,17 @@ ** SORT_SUBTASK_CONS: ** Merge existing PMAs until SortSubtask.nConsolidate or fewer ** remain in temp file SortSubtask.pTemp1. */ struct SortSubtask { - SQLiteThread *pThread; /* Thread handle, or NULL */ - int bDone; /* Set to true by pTask when finished */ - + int iId; /* Sub-task id */ sqlite3 *db; /* Database connection */ + VdbeSorter *pSorter; /* Sorter that owns this object */ KeyInfo *pKeyInfo; /* How to compare records */ - UnpackedRecord *pUnpacked; /* Space to unpack a record */ int pgsz; /* Main database page size */ - - u8 eWork; /* One of the SORT_SUBTASK_* constants */ - int nConsolidate; /* For SORT_SUBTASK_CONS, max final PMAs */ - SorterRecord *pList; /* List of records for pTask to sort */ - int nInMemory; /* Expected size of PMA based on pList */ - u8 *aListMemory; /* Records memory (or NULL) */ - - int nPMA; /* Number of PMAs currently in pTemp1 */ - i64 iTemp1Off; /* Offset to write to in pTemp1 */ - sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */ + int nConsolidate; /* For consolidation, max final PMAs */ + SortLevel *pLevel; /* PMA level 0 */ }; /* ** The MergeEngine object is used to combine two or more smaller PMAs into @@ -233,20 +252,19 @@ /* ** Main sorter structure. A single instance of this is allocated for each ** sorter cursor created by the VDBE. */ struct VdbeSorter { - int nInMemory; /* Current size of pRecord list as PMA */ int mnPmaSize; /* Minimum PMA size, in bytes */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int bUsePMA; /* True if one or more PMAs created */ - SorterRecord *pRecord; /* Head of in-memory record list */ MergeEngine *pMerger; /* For final merge of PMAs (by caller) */ - u8 *aMemory; /* Block of memory to alloc records from */ - int iMemory; /* Offset of first free byte in aMemory */ - int nMemory; /* Size of aMemory allocation in bytes */ - int iPrev; /* Previous thread used to flush PMA */ + UnpackedRecord *pUnpacked; /* Used by sqlite3VdbeSorterCompare */ + int iMemory; /* Offset of free byte in list.aMemory */ + int nMemory; /* Size of list.aMemory allocation in bytes */ + SortList list; /* In memory records */ + int iPrev; /* Previous PMA flushed via task iPrev */ int nTask; /* Size of aTask[] array */ SortSubtask aTask[1]; /* One or more subtasks */ }; /* @@ -494,30 +512,31 @@ ** leaves the iterator pointing to the first key in the PMA (or EOF if the ** PMA is empty). */ static int vdbePmaReaderInit( SortSubtask *pTask, /* Thread context */ + SortFile *pFile, /* File to read from */ i64 iStart, /* Start offset in pTask->pTemp1 */ PmaReader *pIter, /* Iterator to populate */ i64 *pnByte /* IN/OUT: Increment this value by PMA size */ ){ int rc = SQLITE_OK; int nBuf = pTask->pgsz; void *pMap = 0; /* Mapping of temp file */ - assert( pTask->iTemp1Off>iStart ); + assert( pFile->iOff>iStart ); assert( pIter->aAlloc==0 ); assert( pIter->aBuffer==0 ); - pIter->pFile = pTask->pTemp1; + pIter->pFile = pFile->pFd; pIter->iReadOff = iStart; pIter->nAlloc = 128; pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc); if( pIter->aAlloc ){ /* Try to xFetch() a mapping of the entire temp file. If this is possible, ** the PMA will be read via the mapping. Otherwise, use xRead(). */ - if( pTask->iTemp1Off<=(i64)(pTask->db->nMaxSorterMmap) ){ - rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap); + if( pFile->iOff<=(i64)(pTask->db->nMaxSorterMmap) ){ + rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iOff, &pMap); } }else{ rc = SQLITE_NOMEM; } @@ -531,25 +550,25 @@ rc = SQLITE_NOMEM; }else{ int iBuf = iStart % nBuf; if( iBuf ){ int nRead = nBuf - iBuf; - if( (iStart + nRead) > pTask->iTemp1Off ){ - nRead = (int)(pTask->iTemp1Off - iStart); + if( (iStart + nRead) > pFile->iOff ){ + nRead = (int)(pFile->iOff - iStart); } rc = sqlite3OsRead( - pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart - ); + pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart + ); assert( rc!=SQLITE_IOERR_SHORT_READ ); } } } } if( rc==SQLITE_OK ){ u64 nByte; /* Size of PMA in bytes */ - pIter->iEof = pTask->iTemp1Off; + pIter->iEof = pFile->iOff; rc = vdbePmaReadVarint(pIter, &nByte); pIter->iEof = pIter->iReadOff + nByte; *pnByte += nByte; } @@ -560,29 +579,29 @@ } /* ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, -** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences +** size nKey2 bytes). Use pKeyInfo for the collation sequences ** used by the comparison. Return the result of the comparison. ** -** Before returning, object (pTask->pUnpacked) is populated with the +** Before returning, object pUnpacked is populated with the ** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it -** is assumed that the (pTask->pUnpacked) structure already contains the +** is assumed that the pUnpacked structure already contains the ** unpacked key to use as key2. ** -** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set +** If an OOM error is encountered, (pUnpacked->error_rc) is set ** to SQLITE_NOMEM. */ static int vdbeSorterCompare( - SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ + KeyInfo *pKeyInfo, + UnpackedRecord *r2, const void *pKey1, int nKey1, /* Left side of comparison */ const void *pKey2, int nKey2 /* Right side of comparison */ ){ - UnpackedRecord *r2 = pTask->pUnpacked; if( pKey2 ){ - sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2); + sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2); } return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0); } /* @@ -589,11 +608,11 @@ ** This function is called to compare two iterator keys when merging ** multiple b-tree segments. Parameter iOut is the index of the aTree[] ** value to recalculate. */ static int vdbeSorterDoCompare( - SortSubtask *pTask, + SortLevel *pLvl, MergeEngine *pMerger, int iOut ){ int i1; int i2; @@ -618,13 +637,13 @@ iRes = i2; }else if( p2->pFile==0 ){ iRes = i1; }else{ int res; - assert( pTask->pUnpacked!=0 ); /* allocated in vdbeSortSubtaskMain() */ - res = vdbeSorterCompare( - pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey + assert( pLvl->pUnpacked!=0 ); /* allocated in vdbeSorterThread() */ + res = vdbeSorterCompare(pLvl->pTask->pKeyInfo, pLvl->pUnpacked, + p1->aKey, p1->nKey, p2->aKey, p2->nKey ); if( res<=0 ){ iRes = i1; }else{ iRes = i2; @@ -672,10 +691,11 @@ for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; pTask->pKeyInfo = pKeyInfo; pTask->pgsz = pgsz; pTask->db = db; + pTask->pSorter = pSorter; } if( !sqlite3TempInMemory(db) ){ pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz; mxCache = db->aDb[0].pSchema->cache_size; @@ -686,12 +706,12 @@ ** allocation for each sort-key in memory. Otherwise, use a single big ** allocation at pSorter->aMemory for all sort-keys. */ if( sqlite3GlobalConfig.pHeap==0 ){ assert( pSorter->iMemory==0 ); pSorter->nMemory = pgsz; - pSorter->aMemory = (u8*)sqlite3Malloc(pgsz); - if( !pSorter->aMemory ) rc = SQLITE_NOMEM; + pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); + if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM; } } } return rc; @@ -708,27 +728,35 @@ sqlite3DbFree(db, p); } } /* -** Free all resources owned by the object indicated by argument pTask. All -** fields of *pTask are zeroed before returning. +** Free all resources owned by the object indicated by argument pTask. +** This does not include joining any outstanding threads. All fields of +** *pTask are zeroed before returning. */ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ - sqlite3DbFree(db, pTask->pUnpacked); - pTask->pUnpacked = 0; - if( pTask->aListMemory==0 ){ - vdbeSorterRecordFree(0, pTask->pList); - }else{ - sqlite3_free(pTask->aListMemory); - pTask->aListMemory = 0; - } - pTask->pList = 0; - if( pTask->pTemp1 ){ - sqlite3OsCloseFree(pTask->pTemp1); - pTask->pTemp1 = 0; - } + SortLevel *pLvl; + SortLevel *pNext; + for(pLvl=pTask->pLevel; pLvl; pLvl=pNext){ + pNext = pLvl->pNext; + assert( pLvl->pThread==0 ); + if( pLvl==pTask->pLevel ){ + if( pLvl->in.l.aMemory==0 ){ + vdbeSorterRecordFree(0, pLvl->in.l.pRecord); + }else{ + sqlite3_free(pLvl->in.l.aMemory); + } + }else{ + if( pLvl->in.f.pFd ) sqlite3OsCloseFree(pLvl->in.f.pFd); + } + if( pLvl->out.pFd ) sqlite3OsCloseFree(pLvl->out.pFd); + sqlite3DbFree(db, pLvl->pUnpacked); + sqlite3_free(pLvl); + } + pTask->pLevel = 0; + pTask->nConsolidate = 0; } /* ** Join all threads. */ @@ -736,17 +764,20 @@ static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int rc = rcin; int i; for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; - if( pTask->pThread ){ - void *pRet; - int rc2 = sqlite3ThreadJoin(pTask->pThread, &pRet); - pTask->pThread = 0; - pTask->bDone = 0; - if( rc==SQLITE_OK ) rc = rc2; - if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + SortLevel *pLvl; + for(pLvl=pTask->pLevel; pLvl; pLvl=pLvl->pNext){ + if( pLvl->pThread ){ + void *pRet; + int rc2 = sqlite3ThreadJoin(pLvl->pThread, &pRet); + pLvl->pThread = 0; + pLvl->bDone = 0; + if( rc==SQLITE_OK ) rc = rc2; + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + } } } return rc; } #else @@ -797,15 +828,15 @@ pSorter->pMerger = 0; for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; vdbeSortSubtaskCleanup(db, pTask); } - if( pSorter->aMemory==0 ){ - vdbeSorterRecordFree(0, pSorter->pRecord); + if( pSorter->list.aMemory==0 ){ + vdbeSorterRecordFree(0, pSorter->list.pRecord); } - pSorter->pRecord = 0; - pSorter->nInMemory = 0; + pSorter->list.pRecord = 0; + pSorter->list.nInMemory = 0; pSorter->bUsePMA = 0; pSorter->iMemory = 0; } /* @@ -814,11 +845,12 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ VdbeSorter *pSorter = pCsr->pSorter; if( pSorter ){ sqlite3VdbeSorterReset(db, pSorter); vdbeMergeEngineFree(pSorter->pMerger); - sqlite3_free(pSorter->aMemory); + sqlite3_free(pSorter->list.aMemory); + sqlite3DbFree(db, pSorter->pUnpacked); sqlite3DbFree(db, pSorter); pCsr->pSorter = 0; } } @@ -844,11 +876,12 @@ /* ** Merge the two sorted lists p1 and p2 into a single list. ** Set *ppOut to the head of the new list. */ static void vdbeSorterMerge( - SortSubtask *pTask, /* Calling thread context */ + KeyInfo *pKeyInfo, + UnpackedRecord *r2, SorterRecord *p1, /* First list to merge */ SorterRecord *p2, /* Second list to merge */ SorterRecord **ppOut /* OUT: Head of merged list */ ){ SorterRecord *pFinal = 0; @@ -855,11 +888,11 @@ SorterRecord **pp = &pFinal; void *pVal2 = p2 ? SRVAL(p2) : 0; while( p1 && p2 ){ int res; - res = vdbeSorterCompare(pTask, SRVAL(p1), p1->nVal, pVal2, p2->nVal); + res = vdbeSorterCompare(pKeyInfo, r2, SRVAL(p1), p1->nVal, pVal2, p2->nVal); if( res<=0 ){ *pp = p1; pp = &p1->u.pNext; p1 = p1->u.pNext; pVal2 = 0; @@ -874,52 +907,56 @@ *pp = p1 ? p1 : p2; *ppOut = pFinal; } /* -** Sort the linked list of records headed at pTask->pList. Return +** Sort the linked list of records headed at pLvl->in.l.pRecord. Return ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if ** an error occurs. */ -static int vdbeSorterSort(SortSubtask *pTask){ +static int vdbeSorterSort( + SortList *pList, + KeyInfo *pKeyInfo, + UnpackedRecord *pUnpacked +){ int i; SorterRecord **aSlot; SorterRecord *p; aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); if( !aSlot ){ return SQLITE_NOMEM; } - p = pTask->pList; + p = pList->pRecord; while( p ){ SorterRecord *pNext; - if( pTask->aListMemory ){ - if( (u8*)p==pTask->aListMemory ){ + if( pList->aMemory ){ + if( (u8*)p==pList->aMemory ){ pNext = 0; }else{ - assert( p->u.iNextaListMemory) ); - pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext]; + assert( p->u.iNextaMemory) ); + pNext = (SorterRecord*)&pList->aMemory[p->u.iNext]; } }else{ pNext = p->u.pNext; } p->u.pNext = 0; for(i=0; aSlot[i]; i++){ - vdbeSorterMerge(pTask, p, aSlot[i], &p); + vdbeSorterMerge(pKeyInfo, pUnpacked, p, aSlot[i], &p); aSlot[i] = 0; } aSlot[i] = p; p = pNext; } p = 0; for(i=0; i<64; i++){ - vdbeSorterMerge(pTask, p, aSlot[i], &p); + vdbeSorterMerge(pKeyInfo, pUnpacked, p, aSlot[i], &p); } - pTask->pList = p; + pList->pRecord = p; sqlite3_free(aSlot); return SQLITE_OK; } @@ -1015,96 +1052,46 @@ ** the VFS has memory mapped it. ** ** Whether or not the file does end up memory mapped of course depends on ** the specific VFS implementation. */ -static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){ - if( nByte<=(i64)(db->nMaxSorterMmap) ){ - int rc = sqlite3OsTruncate(pFile, nByte); +static void vdbeSorterExtendFile( + sqlite3 *db, + SortFile *pFile, + i64 nByte +){ + if( nByte<=(i64)(db->nMaxSorterMmap) && nByte>pFile->nByte ){ + sqlite3_file *pFd = pFile->pFd; + int rc = sqlite3OsTruncate(pFd, nByte); if( rc==SQLITE_OK ){ void *p = 0; - sqlite3OsFetch(pFile, 0, nByte, &p); - sqlite3OsUnfetch(pFile, 0, p); + sqlite3OsFetch(pFd, 0, nByte, &p); + sqlite3OsUnfetch(pFd, 0, p); + pFile->nByte = nByte; } } } #else # define vdbeSorterExtendFile(x,y,z) SQLITE_OK #endif - -/* -** Write the current contents of the in-memory linked-list to a PMA. Return -** SQLITE_OK if successful, or an SQLite error code otherwise. -** -** The format of a PMA is: -** -** * A varint. This varint contains the total number of bytes of content -** in the PMA (not including the varint itself). -** -** * One or more records packed end-to-end in order of ascending keys. -** Each record consists of a varint followed by a blob of data (the -** key). The varint is the number of bytes in the blob of data. -*/ -static int vdbeSorterListToPMA(SortSubtask *pTask){ - int rc = SQLITE_OK; /* Return code */ - PmaWriter writer; /* Object used to write to the file */ - - memset(&writer, 0, sizeof(PmaWriter)); - assert( pTask->nInMemory>0 ); - - /* If the first temporary PMA file has not been opened, open it now. */ - if( pTask->pTemp1==0 ){ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->pTemp1); - assert( rc!=SQLITE_OK || pTask->pTemp1 ); - assert( pTask->iTemp1Off==0 ); - assert( pTask->nPMA==0 ); - } - - /* Try to get the file to memory map */ - if( rc==SQLITE_OK ){ - vdbeSorterExtendFile(pTask->db, - pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9 - ); - } - - if( rc==SQLITE_OK ){ - SorterRecord *p; - SorterRecord *pNext = 0; - - vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz, - pTask->iTemp1Off); - pTask->nPMA++; - vdbePmaWriteVarint(&writer, pTask->nInMemory); - for(p=pTask->pList; p; p=pNext){ - pNext = p->u.pNext; - vdbePmaWriteVarint(&writer, p->nVal); - vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); - if( pTask->aListMemory==0 ) sqlite3_free(p); - } - pTask->pList = p; - rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off); - } - - assert( pTask->pList==0 || rc!=SQLITE_OK ); - return rc; -} - /* ** Advance the MergeEngine iterator passed as the second argument to ** the next entry. Set *pbEof to true if this means the iterator has ** reached EOF. ** ** Return SQLITE_OK if successful or an error code if an error occurs. */ static int vdbeSorterNext( - SortSubtask *pTask, - MergeEngine *pMerger, + SortLevel *pLvl, + MergeEngine *pMerger, int *pbEof ){ int rc; int iPrev = pMerger->aTree[1];/* Index of iterator to advance */ + KeyInfo *pKeyInfo = pLvl->pTask->pKeyInfo; + UnpackedRecord *r2 = pLvl->pUnpacked; /* Advance the current iterator */ rc = vdbePmaReaderNext(&pMerger->aIter[iPrev]); /* Update contents of aTree[] */ @@ -1126,23 +1113,23 @@ if( pIter1->pFile==0 ){ iRes = +1; }else if( pIter2->pFile==0 ){ iRes = -1; }else{ - iRes = vdbeSorterCompare(pTask, + iRes = vdbeSorterCompare(pKeyInfo, r2, pIter1->aKey, pIter1->nKey, pKey2, pIter2->nKey ); } /* If pIter1 contained the smaller value, set aTree[i] to its index. ** Then set pIter2 to the next iterator to compare to pIter1. In this - ** case there is no cache of pIter2 in pTask->pUnpacked, so set + ** case there is no cache of pIter2 in pLvl->pUnpacked, so set ** pKey2 to point to the record belonging to pIter2. ** ** Alternatively, if pIter2 contains the smaller of the two values, ** set aTree[i] to its index and update pIter1. If vdbeSorterCompare() - ** was actually called above, then pTask->pUnpacked now contains + ** was actually called above, then pLvl->pUnpacked now contains ** a value equivalent to pIter2. So set pKey2 to NULL to prevent ** vdbeSorterCompare() from decoding pIter2 again. ** ** If the two values were equal, then the value from the oldest ** PMA should be considered smaller. The VdbeSorter.aIter[] array @@ -1162,139 +1149,237 @@ } return rc; } -/* -** The main routine for sorter-thread operations. -*/ -static void *vdbeSortSubtaskMain(void *pCtx){ - int rc = SQLITE_OK; - SortSubtask *pTask = (SortSubtask*)pCtx; - - assert( pTask->eWork==SORT_SUBTASK_SORT - || pTask->eWork==SORT_SUBTASK_TO_PMA - || pTask->eWork==SORT_SUBTASK_CONS - ); - assert( pTask->bDone==0 ); - - if( pTask->pUnpacked==0 ){ - char *pFree; - pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( - pTask->pKeyInfo, 0, 0, &pFree - ); - assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); - if( pFree==0 ){ - rc = SQLITE_NOMEM; - goto thread_out; - } - pTask->pUnpacked->nField = pTask->pKeyInfo->nField; - pTask->pUnpacked->errCode = 0; - } - - if( pTask->eWork==SORT_SUBTASK_CONS ){ - assert( pTask->pList==0 ); - while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){ - int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT); - sqlite3_file *pTemp2 = 0; /* Second temp file to use */ - MergeEngine *pMerger; /* Object for reading/merging PMA data */ - i64 iReadOff = 0; /* Offset in pTemp1 to read from */ - i64 iWriteOff = 0; /* Offset in pTemp2 to write to */ - int i; - - /* Allocate a merger object to merge PMAs together. */ - pMerger = vdbeMergeEngineNew(nIter); - if( pMerger==0 ){ - rc = SQLITE_NOMEM; - break; - } - - /* Open a second temp file to write merged data to */ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2); - if( rc==SQLITE_OK ){ - vdbeSorterExtendFile(pTask->db, pTemp2, pTask->iTemp1Off); - }else{ - vdbeMergeEngineFree(pMerger); - break; - } - - /* This loop runs once for each output PMA. Each output PMA is made - ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */ - for(i=0; rc==SQLITE_OK && inPMA; i+=SORTER_MAX_MERGE_COUNT){ - PmaWriter writer; /* Object for writing data to pTemp2 */ - i64 nOut = 0; /* Bytes of data in output PMA */ - int bEof = 0; - int rc2; - - /* Configure the merger object to read and merge data from the next - ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs, - ** if that is fewer). */ - int iIter; - for(iIter=0; iIteraIter[iIter]; - rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut); - iReadOff = pIter->iEof; - if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break; - } - for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){ - rc = vdbeSorterDoCompare(pTask, pMerger, iIter); - } - - vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff); - vdbePmaWriteVarint(&writer, nOut); - while( rc==SQLITE_OK && bEof==0 ){ - PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ]; - assert( pIter->pFile!=0 ); /* pIter is not at EOF */ - vdbePmaWriteVarint(&writer, pIter->nKey); - vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey); - rc = vdbeSorterNext(pTask, pMerger, &bEof); - } - rc2 = vdbePmaWriterFinish(&writer, &iWriteOff); - if( rc==SQLITE_OK ) rc = rc2; - } - - vdbeMergeEngineFree(pMerger); - sqlite3OsCloseFree(pTask->pTemp1); - pTask->pTemp1 = pTemp2; - pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT); - pTask->iTemp1Off = iWriteOff; - } - }else{ - /* Sort the pTask->pList list */ - rc = vdbeSorterSort(pTask); - - /* If required, write the list out to a PMA. */ - if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){ -#ifdef SQLITE_DEBUG - i64 nExpect = pTask->nInMemory - + sqlite3VarintLen(pTask->nInMemory) - + pTask->iTemp1Off; -#endif - rc = vdbeSorterListToPMA(pTask); - assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) ); - } - } - - thread_out: - pTask->bDone = 1; - if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){ - assert( pTask->pUnpacked->errCode==SQLITE_NOMEM ); - rc = SQLITE_NOMEM; - } - return SQLITE_INT_TO_PTR(rc); +static UnpackedRecord *vdbeSorterAllocUnpackedRecord(KeyInfo *pKeyInfo){ + char *pFree; + UnpackedRecord *pRet; + pRet = sqlite3VdbeAllocUnpackedRecord(pKeyInfo, 0, 0, &pFree); + assert( pRet==(UnpackedRecord*)pFree ); + if( pRet ){ + pRet->nField = pKeyInfo->nField; + pRet->errCode = 0; + } + return pRet; +} + +#if 0 +static void vdbeSorterWorkDebug(SortLevel *pLvl, const char *zEvent){ + i64 t; + SortLevel *p; + SortSubtask *pTask = pLvl->pTask; + int iTask = (pTask - pTask->pSorter->aTask); + int iLvl = 0; + for(p=pTask->pLevel; p!=pLvl; p=p->pNext) iLvl++; + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:%d.%d %s\n", t, iTask, iLvl, zEvent); +} +static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){ + i64 t; + sqlite3OsCurrentTimeInt64(db->pVfs, &t); + fprintf(stderr, "%lld:X %s\n", t, zEvent); +} +#else +# define vdbeSorterWorkDebug(x,y) +# define vdbeSorterRewindDebug(x,y) +#endif + +/* +** Merge the data currently stored in (pLevel->in), if any, into a new PMA +** stored within (pLevel->out). +*/ +static int vdbeSorterWorkLevel(SortLevel *pLvl){ + int rc = SQLITE_OK; /* Return code */ + SortSubtask *pTask = pLvl->pTask; + MergeEngine *pMerger = 0; + SortFile *pOut = &pLvl->out; /* Write new PMA here */ + i64 nOut = 0; /* Expected size of new PMA */ + PmaWriter writer; /* Used to write new PMA to pOut */ + int bEof = 0; + + vdbeSorterWorkDebug(pLvl, "enter"); + + if( pLvl->pUnpacked==0 ){ + pLvl->pUnpacked = vdbeSorterAllocUnpackedRecord(pTask->pKeyInfo); + if( pLvl->pUnpacked==0 ){ + rc = SQLITE_NOMEM; + goto work_level_out; + } + } + + if( pLvl->out.pFd==0 ){ + assert( pLvl->out.iOff==0 ); + assert( pLvl->out.nByte==0 ); + assert( pLvl->out.nPMA==0 ); + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pLvl->out.pFd); + if( rc!=SQLITE_OK ) goto work_level_out; + } + + if( pLvl==pTask->pLevel ){ + if( pLvl->in.l.pRecord==0 ){ + bEof = 1; + }else{ + rc = vdbeSorterSort(&pLvl->in.l, pTask->pKeyInfo, pLvl->pUnpacked); + nOut = pLvl->in.l.nInMemory; + } + }else{ + int nPMA = pLvl->in.f.nPMA; + if( nPMA==0 ){ + bEof = 1; + }else{ + pMerger = vdbeMergeEngineNew(nPMA); + if( pMerger==0 ){ + rc = SQLITE_NOMEM; + }else{ + /* Configure the merger object to read and merge data from all + ** PMAs at pLvl. */ + int iIter; + i64 iReadOff = 0; + for(iIter=0; iIteraIter[iIter]; + rc = vdbePmaReaderInit(pTask, &pLvl->in.f, iReadOff, pIter, &nOut); + iReadOff = pIter->iEof; + } + + for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){ + rc = vdbeSorterDoCompare(pLvl, pMerger, iIter); + } + } + } + } + if( rc!=SQLITE_OK ) goto work_level_out; + + /* If mmap is to be used, pre-extend and map the temp file. */ + vdbeSorterExtendFile(pTask->db, &pLvl->out, pLvl->out.iOff + nOut + 9); + + if( bEof==0 ){ + vdbePmaWriterInit(pOut->pFd, &writer, pTask->pgsz, pOut->iOff); + vdbePmaWriteVarint(&writer, nOut); + + while( rc==SQLITE_OK && bEof==0 ){ + u8 *aKey; /* Next key to write to output */ + int nKey; /* Size of aKey[] in bytes */ + if( pMerger==0 ){ + aKey = SRVAL(pLvl->in.l.pRecord); + nKey = pLvl->in.l.pRecord->nVal; + }else{ + PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ]; + assert( pIter->pFile ); /* pIter is not at EOF */ + aKey = pIter->aKey; + nKey = pIter->nKey; + } + + vdbePmaWriteVarint(&writer, nKey); + vdbePmaWriteBlob(&writer, aKey, nKey); + + if( pMerger==0 ){ + SorterRecord *pNext = pLvl->in.l.pRecord->u.pNext; + if( pLvl->in.l.aMemory==0 ) sqlite3_free(pLvl->in.l.pRecord); + pLvl->in.l.pRecord = pNext; + bEof = (pNext==0); + }else{ + rc = vdbeSorterNext(pLvl, pMerger, &bEof); + } + } + rc = vdbePmaWriterFinish(&writer, &pOut->iOff); + pOut->nPMA++; + + if( rc==SQLITE_OK && pMerger ){ + sqlite3OsCloseFree(pLvl->in.f.pFd); + pLvl->in.f.pFd = 0; + } + vdbeMergeEngineFree(pMerger); + } + + if( rc==SQLITE_OK && ( + (pOut->nPMA>=SORTER_MAX_MERGE_COUNT) + || (pTask->nConsolidate && pLvl->pNext) + || (pTask->nConsolidate && pTask->nConsolidatenPMA) + )){ + SortLevel *pNext = pLvl->pNext; + if( pNext==0 ){ + pNext = (SortLevel*)sqlite3_malloc(sizeof(SortLevel)); + if( pNext==0 ){ + rc = SQLITE_NOMEM; + goto work_level_out; + } + memset(pNext, 0, sizeof(SortLevel)); + pLvl->pNext = pNext; + pNext->pTask = pTask; + } + + /* If there is a thread running on the next level, block on it. */ +#if SQLITE_MAX_WORKER_THREADS>0 + if( pNext->pThread ){ + void *pRet; + rc = sqlite3ThreadJoin(pNext->pThread, &pRet); + pNext->pThread = 0; + pNext->bDone = 0; + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + if( rc!=SQLITE_OK ) goto work_level_out; + } +#endif + + pNext->in.f = pLvl->out; + memset(&pLvl->out, 0, sizeof(pLvl->out)); + } + + work_level_out: + vdbeSorterWorkDebug(pLvl, "exit"); + if( rc==SQLITE_OK && pLvl->pUnpacked->errCode ){ + assert( pLvl->pUnpacked->errCode==SQLITE_NOMEM ); + rc = SQLITE_NOMEM; + } + return rc; } /* ** Run the activity scheduled by the object passed as the only argument ** in the current thread. */ -static int vdbeSorterRunTask(SortSubtask *pTask){ - int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) ); - assert( pTask->bDone ); - pTask->bDone = 0; +static int vdbeSorterRun(SortLevel *pLvl){ + int rc; + + assert( pLvl->bDone==0 ); + assert( pLvl->pThread==0 ); + while( 1 ){ + rc = vdbeSorterWorkLevel(pLvl); + if( rc==SQLITE_OK && pLvl->pTask->pLevel==pLvl && pLvl->in.l.aMemory ){ + assert( pLvl->pTask->pSorter->list.aMemory==0 ); + assert( pLvl->in.l.pRecord==0 ); + pLvl->pTask->pSorter->list.aMemory = pLvl->in.l.aMemory; + pLvl->in.l.aMemory = 0; + } + + if( rc!=SQLITE_OK || pLvl->out.nPMA>0 ) break; + pLvl = pLvl->pNext; + assert( pLvl->bDone==0 ); + assert( pLvl->pThread==0 ); + } + + pLvl->bDone = 0; return rc; } + +#if SQLITE_MAX_WORKER_THREADS>0 +static void *vdbeSorterThread(void *pCtx){ + int rc; + SortLevel *pLvl = (SortLevel*)pCtx; + + rc = vdbeSorterWorkLevel(pLvl); + if( rc==SQLITE_OK && pLvl->out.nPMA==0 ){ + SortLevel *pNext = pLvl->pNext; + void *pCtx = (void*)pNext; + assert( pNext->pThread==0 ); + rc = sqlite3ThreadCreate(&pNext->pThread, vdbeSorterThread, pCtx); + } + + pLvl->bDone = 1; + return SQLITE_INT_TO_PTR(rc); +} +#endif /* ** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly ** using a background thread. ** @@ -1302,75 +1387,78 @@ */ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; int i; - SortSubtask *pTask = 0; /* Thread context used to create new PMA */ - int nWorker = (pSorter->nTask-1); - - pSorter->bUsePMA = 1; - for(i=0; iiPrev + i + 1) % nWorker; - pTask = &pSorter->aTask[iTest]; -#if SQLITE_MAX_WORKER_THREADS>0 - if( pTask->bDone ){ - void *pRet; - assert( pTask->pThread ); - rc = sqlite3ThreadJoin(pTask->pThread, &pRet); - pTask->pThread = 0; - pTask->bDone = 0; - if( rc==SQLITE_OK ){ - rc = SQLITE_PTR_TO_INT(pRet); - } - } -#endif - if( pTask->pThread==0 ) break; - pTask = 0; - } - if( pTask==0 ){ - pTask = &pSorter->aTask[nWorker]; - } - pSorter->iPrev = (pTask - pSorter->aTask); - - if( rc==SQLITE_OK ){ - assert( pTask->pThread==0 && pTask->bDone==0 ); - pTask->eWork = SORT_SUBTASK_TO_PMA; - pTask->pList = pSorter->pRecord; - pTask->nInMemory = pSorter->nInMemory; - pSorter->nInMemory = 0; - pSorter->pRecord = 0; - - if( pSorter->aMemory ){ - u8 *aMem = pTask->aListMemory; - pTask->aListMemory = pSorter->aMemory; - pSorter->aMemory = aMem; - } - -#if SQLITE_MAX_WORKER_THREADS>0 - if( !bFg && pTask!=&pSorter->aTask[nWorker] ){ - /* Launch a background thread for this operation */ - void *pCtx = (void*)pTask; - assert( pSorter->aMemory==0 || pTask->aListMemory!=0 ); - if( pTask->aListMemory ){ - if( pSorter->aMemory==0 ){ - pSorter->aMemory = sqlite3Malloc(pSorter->nMemory); - if( pSorter->aMemory==0 ) return SQLITE_NOMEM; - }else{ - pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory); - } - } - rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx); - }else -#endif - { - /* Use the foreground thread for this operation */ - rc = vdbeSorterRunTask(pTask); - if( rc==SQLITE_OK ){ - u8 *aMem = pTask->aListMemory; - pTask->aListMemory = pSorter->aMemory; - pSorter->aMemory = aMem; - assert( pTask->pList==0 ); + SortSubtask *pTask = 0; /* Sub-task new PMA is written to */ + SortLevel *pLevel; /* Level to write to */ + + /* Set the use-temp-files flag. */ + pSorter->bUsePMA = 1; + + /* Select one of the sub-tasks to flush this PMA. In single threaded + ** mode (pSorter->nTask==1), this is always aTask[0]. In multi-threaded mode, + ** it may be any of the pSorter->nTask sub-tasks. */ + for(i=0; inTask; i++){ + pTask = &pSorter->aTask[i]; + if( pTask->pLevel==0 + || pTask->pLevel->pThread==0 + || pTask->pLevel->bDone + ){ + break; + } + } + + /* If the first level for this task has not been allocated, allocate it. */ + if( pTask->pLevel==0 ){ + SortLevel *pNew = (SortLevel*)sqlite3_malloc(sizeof(SortLevel)); + if( pNew==0 ){ + rc = SQLITE_NOMEM; + }else{ + memset(pNew, 0, sizeof(SortLevel)); + pNew->pTask = pTask; + pTask->pLevel = pNew; + } + } + pLevel = pTask->pLevel; + + /* If there is a background thread using the selected task, wait for + ** it to finish. */ +#if SQLITE_MAX_WORKER_THREADS>0 + if( rc==SQLITE_OK && pLevel->pThread ){ + void *pRet = 0; + rc = sqlite3ThreadJoin(pLevel->pThread, &pRet); + pLevel->pThread = 0; + pLevel->bDone = 0; + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + } +#endif + + if( rc==SQLITE_OK ){ + u8 *aNewMem = 0; + if( pSorter->list.aMemory && pSorter->nTask>1 ){ + aNewMem = pLevel->in.l.aMemory; + if( aNewMem==0 ){ + aNewMem = sqlite3_malloc(pSorter->mxPmaSize); + if( aNewMem==0 ) rc = SQLITE_NOMEM; + } + } + assert( pLevel->in.l.pRecord==0 ); + pLevel->in.l = pSorter->list; + pSorter->list.pRecord = 0; + pSorter->list.nInMemory = 0; + pSorter->list.aMemory = aNewMem; + if( rc==SQLITE_OK ){ +#if SQLITE_MAX_WORKER_THREADS>0 + if( pSorter->nTask>1 ){ + void *pCtx = (void*)pLevel; + rc = sqlite3ThreadCreate(&pLevel->pThread, vdbeSorterThread, pCtx); + pSorter->nMemory = aNewMem ? sqlite3MallocSize(aNewMem) : 0; + }else +#endif + { + rc = vdbeSorterRun(pLevel); } } } return rc; @@ -1411,136 +1499,125 @@ ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true. */ nReq = pVal->n + sizeof(SorterRecord); nPMA = pVal->n + sqlite3VarintLen(pVal->n); if( pSorter->mxPmaSize ){ - if( pSorter->aMemory ){ + if( pSorter->list.aMemory ){ bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize; }else{ + int nInMemory = pSorter->list.nInMemory; bFlush = ( - (pSorter->nInMemory > pSorter->mxPmaSize) - || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) + (nInMemory > pSorter->mxPmaSize) + || (nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) ); } if( bFlush ){ rc = vdbeSorterFlushPMA(db, pCsr, 0); - pSorter->nInMemory = 0; + pSorter->list.nInMemory = 0; pSorter->iMemory = 0; - assert( rc!=SQLITE_OK || pSorter->pRecord==0 ); + assert( rc!=SQLITE_OK || pSorter->list.pRecord==0 ); } } - pSorter->nInMemory += nPMA; + pSorter->list.nInMemory += nPMA; - if( pSorter->aMemory ){ + if( pSorter->list.aMemory ){ int nMin = pSorter->iMemory + nReq; if( nMin>pSorter->nMemory ){ u8 *aNew; int nNew = pSorter->nMemory * 2; while( nNew < nMin ) nNew = nNew*2; if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; if( nNew < nMin ) nNew = nMin; - aNew = sqlite3Realloc(pSorter->aMemory, nNew); + aNew = sqlite3Realloc(pSorter->list.aMemory, nNew); if( !aNew ) return SQLITE_NOMEM; - pSorter->pRecord = (SorterRecord*)( - aNew + ((u8*)pSorter->pRecord - pSorter->aMemory) + pSorter->list.pRecord = (SorterRecord*)( + aNew + ((u8*)pSorter->list.pRecord - pSorter->list.aMemory) ); - pSorter->aMemory = aNew; + pSorter->list.aMemory = aNew; pSorter->nMemory = nNew; } - pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory]; + pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory]; pSorter->iMemory += ROUND8(nReq); - pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory; + pNew->u.iNext = (u8*)(pSorter->list.pRecord) - pSorter->list.aMemory; }else{ pNew = (SorterRecord *)sqlite3Malloc(nReq); if( pNew==0 ){ return SQLITE_NOMEM; } - pNew->u.pNext = pSorter->pRecord; + pNew->u.pNext = pSorter->list.pRecord; } memcpy(SRVAL(pNew), pVal->z, pVal->n); pNew->nVal = pVal->n; - pSorter->pRecord = pNew; + pSorter->list.pRecord = pNew; return rc; } -/* -** Return the total number of PMAs in all temporary files. -*/ -static int vdbeSorterCountPMA(VdbeSorter *pSorter){ - int nPMA = 0; - int i; - for(i=0; inTask; i++){ - nPMA += pSorter->aTask[i].nPMA; - } - return nPMA; -} /* ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, ** this function is called to prepare for iterating through the records ** in sorted order. */ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; /* Return code */ + int nTask = 0; + int i; assert( pSorter ); /* If no data has been written to disk, then do not do so now. Instead, - ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly - ** from the in-memory list. */ + ** sort the VdbeSorter.list.pRecord list. The vdbe layer will read data + ** directly from the in-memory list. */ + *pbEof = 0; if( pSorter->bUsePMA==0 ){ - if( pSorter->pRecord ){ + if( pSorter->list.pRecord ){ SortSubtask *pTask = &pSorter->aTask[0]; - *pbEof = 0; - pTask->pList = pSorter->pRecord; - pTask->eWork = SORT_SUBTASK_SORT; - assert( pTask->aListMemory==0 ); - pTask->aListMemory = pSorter->aMemory; - rc = vdbeSorterRunTask(pTask); - pTask->aListMemory = 0; - pSorter->pRecord = pTask->pList; - pTask->pList = 0; + UnpackedRecord *pUnpack = vdbeSorterAllocUnpackedRecord(pTask->pKeyInfo); + if( pUnpack==0 ) return SQLITE_NOMEM; + rc = vdbeSorterSort(&pSorter->list, pTask->pKeyInfo, pUnpack); + sqlite3DbFree(db, pUnpack); }else{ *pbEof = 1; } return rc; } /* Write the current in-memory list to a PMA. */ - if( pSorter->pRecord ){ - rc = vdbeSorterFlushPMA(db, pCsr, 1); + if( pSorter->list.pRecord ){ + rc = vdbeSorterFlushPMA(db, pCsr, 0); } /* Join all threads */ rc = vdbeSorterJoinAll(pSorter, rc); - /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge - ** some of them together so that this is no longer the case. */ - if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ - int i; - for(i=0; rc==SQLITE_OK && inTask; i++){ - SortSubtask *pTask = &pSorter->aTask[i]; - if( pTask->pTemp1 ){ - pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask; - pTask->eWork = SORT_SUBTASK_CONS; - + vdbeSorterRewindDebug(db, "rewind"); + + for(i=0; inTask; i++){ + if( pSorter->aTask[i].pLevel ) nTask++; + } + + for(i=0; rc==SQLITE_OK && inTask; i++){ + SortSubtask *pTask = &pSorter->aTask[i]; + if( pTask->pLevel ){ + SortLevel *pLvl = pTask->pLevel; + pTask->nConsolidate = (SORTER_MAX_MERGE_COUNT / nTask); #if SQLITE_MAX_WORKER_THREADS>0 - if( i<(pSorter->nTask-1) ){ - void *pCtx = (void*)pTask; - rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx); - }else + if( i<(pSorter->nTask-1) ){ + void *pCtx = (void*)pLvl; + rc = sqlite3ThreadCreate(&pLvl->pThread, vdbeSorterThread, pCtx); + }else #endif - { - rc = vdbeSorterRunTask(pTask); - } + { + assert( pLvl->pThread==0 ); + rc = vdbeSorterRun(pLvl); } } } /* Join all threads */ @@ -1552,40 +1629,52 @@ if( rc==SQLITE_OK ){ int nIter = 0; /* Number of iterators used */ int i; MergeEngine *pMerger; for(i=0; inTask; i++){ - nIter += pSorter->aTask[i].nPMA; + SortSubtask *pTask = &pSorter->aTask[i]; + if( pTask->pLevel ){ + SortLevel *pLvl; + for(pLvl=pTask->pLevel; pLvl->pNext; pLvl=pLvl->pNext){ + assert( pLvl->out.nPMA==0 ); + } + nIter += pLvl->out.nPMA; + } } pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter); if( pMerger==0 ){ rc = SQLITE_NOMEM; }else{ int iIter = 0; - int iThread = 0; - for(iThread=0; iThreadnTask; iThread++){ - int iPMA; - i64 iReadOff = 0; - SortSubtask *pTask = &pSorter->aTask[iThread]; - for(iPMA=0; iPMAnPMA && rc==SQLITE_OK; iPMA++){ - i64 nDummy = 0; - PmaReader *pIter = &pMerger->aIter[iIter++]; - rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy); - iReadOff = pIter->iEof; + for(i=0; inTask; i++){ + SortSubtask *pTask = &pSorter->aTask[i]; + if( pTask->pLevel ){ + int iPMA; + i64 iReadOff = 0; + SortLevel *pLvl; + for(pLvl=pTask->pLevel; pLvl->pNext; pLvl=pLvl->pNext); + + for(iPMA=0; iPMAout.nPMA && rc==SQLITE_OK; iPMA++){ + i64 nDummy = 0; + PmaReader *pIter = &pMerger->aIter[iIter++]; + rc = vdbePmaReaderInit(pTask, &pLvl->out, iReadOff, pIter, &nDummy); + iReadOff = pIter->iEof; + } } } for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ - rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i); + rc = vdbeSorterDoCompare(pSorter->aTask[0].pLevel, pMerger, i); } } } if( rc==SQLITE_OK ){ *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0); } + vdbeSorterRewindDebug(db, "rewinddone"); return rc; } /* ** Advance to the next element in the sorter. @@ -1593,17 +1682,17 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ VdbeSorter *pSorter = pCsr->pSorter; int rc; /* Return code */ if( pSorter->pMerger ){ - rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof); + rc = vdbeSorterNext(pSorter->aTask[0].pLevel, pSorter->pMerger, pbEof); }else{ - SorterRecord *pFree = pSorter->pRecord; - pSorter->pRecord = pFree->u.pNext; + SorterRecord *pFree = pSorter->list.pRecord; + pSorter->list.pRecord = pFree->u.pNext; pFree->u.pNext = 0; - if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree); - *pbEof = !pSorter->pRecord; + if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); + *pbEof = !pSorter->list.pRecord; rc = SQLITE_OK; } return rc; } @@ -1620,12 +1709,12 @@ PmaReader *pIter; pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ]; *pnKey = pIter->nKey; pKey = pIter->aKey; }else{ - *pnKey = pSorter->pRecord->nVal; - pKey = SRVAL(pSorter->pRecord); + *pnKey = pSorter->list.pRecord->nVal; + pKey = SRVAL(pSorter->list.pRecord); } return pKey; } /* @@ -1667,17 +1756,23 @@ Mem *pVal, /* Value to compare to current sorter key */ int nIgnore, /* Ignore this many fields at the end */ int *pRes /* OUT: Result of comparison */ ){ VdbeSorter *pSorter = pCsr->pSorter; - UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked; + UnpackedRecord *r2 = pSorter->pUnpacked; KeyInfo *pKeyInfo = pCsr->pKeyInfo; int i; void *pKey; int nKey; /* Sorter key to compare pVal with */ - assert( r2->nField>=pKeyInfo->nField-nIgnore ); - r2->nField = pKeyInfo->nField-nIgnore; + if( r2==0 ){ + r2 = vdbeSorterAllocUnpackedRecord(pSorter->aTask[0].pKeyInfo); + if( r2==0 ) return SQLITE_NOMEM; + pSorter->pUnpacked = r2; + assert( r2->nField>=pKeyInfo->nField-nIgnore ); + r2->nField = pKeyInfo->nField-nIgnore; + } + assert( r2->nField==pKeyInfo->nField-nIgnore ); pKey = vdbeSorterRowkey(pSorter, &nKey); sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); for(i=0; inField; i++){ if( r2->aMem[i].flags & MEM_Null ){