Index: src/shell.c ================================================================== --- src/shell.c +++ src/shell.c @@ -3533,11 +3533,11 @@ sqlite3_config(SQLITE_CONFIG_URI, 1); sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data); sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> "); sqlite3_snprintf(sizeof(continuePrompt), continuePrompt," ...> "); sqlite3_config(SQLITE_CONFIG_MULTITHREAD); - sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3); + sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4); } /* ** Output text to the console in a font that attracts extra attention. */ Index: src/vdbesort.c ================================================================== --- src/vdbesort.c +++ src/vdbesort.c @@ -86,26 +86,71 @@ ** enough to fit entirely in memory, everything happens on the main thread. */ #include "sqliteInt.h" #include "vdbeInt.h" +/* +** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various +** messages to stderr that may be helpful in understanding the performance +** characteristics of the sorter in multi-threaded mode. +*/ +#if 0 +# define SQLITE_DEBUG_SORTER_THREADS 1 +#endif + /* ** Private objects used by the sorter */ typedef struct MergeEngine MergeEngine; /* Merge PMAs together */ typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */ typedef struct SorterRecord SorterRecord; /* A record being sorted */ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ +typedef struct SorterFile SorterFile; +typedef struct SorterThread SorterThread; +typedef struct SorterList SorterList; +typedef struct IncrMerger IncrMerger; +/* +** A container for a temp file handle and the current amount of data +** stored in the file. +*/ +struct SorterFile { + sqlite3_file *pFd; /* File handle */ + i64 iEof; /* Bytes of data stored in pFd */ +}; /* -** Candidate values for SortSubtask.eWork +** An object of this type is used to store the thread handle for each +** background thread launched by the sorter. Before the thread is launched, +** variable bDone is set to 0. Then, right before it exits, the thread +** itself sets bDone to 1. +** +** This is then used for two purposes: +** +** 1. When flushing the contents of memory to a level-0 PMA on disk, to +** attempt to select a SortSubtask for which there is not already an +** active background thread (since doing so causes the main thread +** to block until it finishes). +** +** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call +** to sqlite3ThreadJoin() is likely to block. +** +** In both cases, the effects of the main thread seeing (bDone==0) even +** after the thread has finished are not dire. So we don't worry about +** memory barriers and such here. */ -#define SORT_SUBTASK_SORT 1 /* Sort records on pList */ -#define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */ -#define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */ +struct SorterThread { + SQLiteThread *pThread; + int bDone; +}; + +struct SorterList { + SorterRecord *pList; /* Linked list of records */ + u8 *aMemory; /* If non-NULL, blob of memory for pList */ + int szPMA; /* Size of pList as PMA in bytes */ +}; /* ** Sorting is divided up into smaller subtasks. Each subtask is controlled ** by an instance of this object. A Subtask might run in either the main thread ** or in a background thread. @@ -139,27 +184,20 @@ ** SORT_SUBTASK_CONS: ** Merge existing PMAs until SortSubtask.nConsolidate or fewer ** remain in temp file SortSubtask.pTemp1. */ struct SortSubtask { - SQLiteThread *pThread; /* Thread handle, or NULL */ - int bDone; /* Set to true by pTask when finished */ - + SorterThread thread; sqlite3 *db; /* Database connection */ + VdbeSorter *pSorter; /* Sorter */ KeyInfo *pKeyInfo; /* How to compare records */ UnpackedRecord *pUnpacked; /* Space to unpack a record */ int pgsz; /* Main database page size */ - - u8 eWork; /* One of the SORT_SUBTASK_* constants */ - int nConsolidate; /* For SORT_SUBTASK_CONS, max final PMAs */ - SorterRecord *pList; /* List of records for pTask to sort */ - int nInMemory; /* Expected size of PMA based on pList */ - u8 *aListMemory; /* Records memory (or NULL) */ - - int nPMA; /* Number of PMAs currently in pTemp1 */ - i64 iTemp1Off; /* Offset to write to in pTemp1 */ - sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */ + SorterList list; /* List for thread to write to a PMA */ + int nPMA; /* Number of PMAs currently in file */ + SorterFile file; /* Temp file for level-0 PMAs */ + SorterFile file2; /* Space for other PMAs */ }; /* ** The MergeEngine object is used to combine two or more smaller PMAs into @@ -231,23 +269,30 @@ }; /* ** Main sorter structure. A single instance of this is allocated for each ** sorter cursor created by the VDBE. +** +** mxKeysize: +** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(), +** this variable is updated so as to be set to the size on disk of the +** largest record in the sorter. */ struct VdbeSorter { - int nInMemory; /* Current size of pRecord list as PMA */ int mnPmaSize; /* Minimum PMA size, in bytes */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ - int bUsePMA; /* True if one or more PMAs created */ - SorterRecord *pRecord; /* Head of in-memory record list */ - MergeEngine *pMerger; /* For final merge of PMAs (by caller) */ - u8 *aMemory; /* Block of memory to alloc records from */ - int iMemory; /* Offset of first free byte in aMemory */ - int nMemory; /* Size of aMemory allocation in bytes */ - int iPrev; /* Previous thread used to flush PMA */ - int nTask; /* Size of aTask[] array */ + PmaReader *pReader; /* Read data from here after Rewind() */ + MergeEngine *pMerger; /* Or here, if bUseThreads==0 */ + int mxKeysize; /* Largest serialized key seen so far */ + UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ + SorterList list; /* List of in-memory records */ + int iMemory; /* Offset of free space in list.aMemory */ + int nMemory; /* Size of list.aMemory allocation in bytes */ + u8 bUsePMA; /* True if one or more PMAs created */ + u8 bUseThreads; /* True to use background threads */ + u8 iPrev; /* Previous thread used to flush PMA */ + u8 nTask; /* Size of aTask[] array */ SortSubtask aTask[1]; /* One or more subtasks */ }; /* ** An instance of the following object is used to read records out of a @@ -263,10 +308,28 @@ u8 *aAlloc; /* Allocated space */ u8 *aKey; /* Pointer to current key */ u8 *aBuffer; /* Current read buffer */ int nBuffer; /* Size of read buffer in bytes */ u8 *aMap; /* Pointer to mapping of entire file */ + IncrMerger *pIncr; /* Incremental merger */ +}; + +/* +** Normally, a PmaReader object iterates through an existing PMA stored +** within a temp file. However, if the PmaReader.pIncr variable points to +** an object of the following type, it may be used to iterate/merge through +** multiple PMAs simultaneously. +*/ +struct IncrMerger { + SortSubtask *pTask; /* Task that owns this merger */ + SorterThread thread; /* Thread for populating aFile[1] */ + MergeEngine *pMerger; /* Merge engine thread reads data from */ + i64 iStartOff; /* Offset to start writing file at */ + int mxSz; /* Maximum bytes of data to store */ + int bEof; /* Set to true when merge is finished */ + int bUseThread; /* True to use a bg thread for this object */ + SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */ }; /* ** An instance of this object is used for writing a PMA. ** @@ -324,18 +387,22 @@ #define SORTER_MIN_WORKING 10 /* Maximum number of PMAs that a single MergeEngine can merge */ #define SORTER_MAX_MERGE_COUNT 16 +static int vdbeIncrSwap(IncrMerger*); +static void vdbeIncrFree(IncrMerger*); + /* ** Free all memory belonging to the PmaReader object passed as the second ** argument. All structure fields are set to zero before returning. */ static void vdbePmaReaderClear(PmaReader *pIter){ sqlite3_free(pIter->aAlloc); sqlite3_free(pIter->aBuffer); if( pIter->aMap ) sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap); + if( pIter->pIncr ) vdbeIncrFree(pIter->pIncr); memset(pIter, 0, sizeof(PmaReader)); } /* ** Read nByte bytes of data from the stream of data iterated by object p. @@ -398,11 +465,11 @@ int nRem; /* Bytes remaining to copy */ /* Extend the p->aAlloc[] allocation if required. */ if( p->nAllocnAlloc*2; + int nNew = MAX(128, p->nAlloc*2); while( nByte>nNew ) nNew = nNew*2; aNew = sqlite3Realloc(p->aAlloc, nNew); if( !aNew ) return SQLITE_NOMEM; p->nAlloc = nNew; p->aAlloc = aNew; @@ -462,26 +529,88 @@ } return SQLITE_OK; } +static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ + int rc = SQLITE_OK; + if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){ + rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp); + } + return rc; +} + +static int vdbePmaReaderReinit(PmaReader *pIter){ + IncrMerger *pIncr = pIter->pIncr; + SortSubtask *pTask = pIncr->pTask; + int rc = SQLITE_OK; + + assert( pIncr->bEof==0 ); + + if( pIter->aMap ){ + sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap); + pIter->aMap = 0; + } + pIter->iReadOff = pIncr->iStartOff; + pIter->iEof = pIncr->aFile[0].iEof; + pIter->pFile = pIncr->aFile[0].pFd; + + rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap); + if( rc==SQLITE_OK ){ + if( pIter->aMap==0 ){ + /* TODO: Combine this code with similar code in vdbePmaReaderInit() */ + int iBuf = pIter->iReadOff % pTask->pgsz; + if( pIter->aBuffer==0 ){ + pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz); + if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM; + pIter->nBuffer = pTask->pgsz; + } + if( iBuf ){ + int nRead = pTask->pgsz - iBuf; + if( (pIter->iReadOff + nRead) > pIter->iEof ){ + nRead = (int)(pIter->iEof - pIter->iReadOff); + } + rc = sqlite3OsRead( + pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff + ); + assert( rc!=SQLITE_IOERR_SHORT_READ ); + } + } + } + + return rc; +} + /* ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if ** no error occurs, or an SQLite error code if one does. */ static int vdbePmaReaderNext(PmaReader *pIter){ - int rc; /* Return Code */ + int rc = SQLITE_OK; /* Return Code */ u64 nRec = 0; /* Size of record in bytes */ if( pIter->iReadOff>=pIter->iEof ){ - /* This is an EOF condition */ - vdbePmaReaderClear(pIter); - return SQLITE_OK; + int bEof = 1; + if( pIter->pIncr ){ + rc = vdbeIncrSwap(pIter->pIncr); + if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){ + rc = vdbePmaReaderReinit(pIter); + bEof = 0; + } + } + + if( bEof ){ + /* This is an EOF condition */ + vdbePmaReaderClear(pIter); + return rc; + } } - rc = vdbePmaReadVarint(pIter, &nRec); + if( rc==SQLITE_OK ){ + rc = vdbePmaReadVarint(pIter, &nRec); + } if( rc==SQLITE_OK ){ pIter->nKey = (int)nRec; rc = vdbePmaReadBlob(pIter, (int)nRec, &pIter->aKey); } @@ -491,65 +620,62 @@ /* ** Initialize iterator pIter to scan through the PMA stored in file pFile ** starting at offset iStart and ending at offset iEof-1. This function ** leaves the iterator pointing to the first key in the PMA (or EOF if the ** PMA is empty). +** +** If the pnByte parameter is NULL, then it is assumed that the file +** contains a single PMA, and that that PMA omits the initial length varint. */ static int vdbePmaReaderInit( - SortSubtask *pTask, /* Thread context */ - i64 iStart, /* Start offset in pTask->pTemp1 */ + SortSubtask *pTask, /* Task context */ + SorterFile *pFile, /* Sorter file to read from */ + i64 iStart, /* Start offset in pFile */ PmaReader *pIter, /* Iterator to populate */ i64 *pnByte /* IN/OUT: Increment this value by PMA size */ ){ int rc = SQLITE_OK; int nBuf = pTask->pgsz; - void *pMap = 0; /* Mapping of temp file */ - assert( pTask->iTemp1Off>iStart ); + assert( pFile->iEof>iStart ); assert( pIter->aAlloc==0 ); assert( pIter->aBuffer==0 ); - pIter->pFile = pTask->pTemp1; + pIter->pFile = pFile->pFd; pIter->iReadOff = iStart; pIter->nAlloc = 128; pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc); if( pIter->aAlloc ){ /* Try to xFetch() a mapping of the entire temp file. If this is possible, ** the PMA will be read via the mapping. Otherwise, use xRead(). */ - if( pTask->iTemp1Off<=(i64)(pTask->db->nMaxSorterMmap) ){ - rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap); - } + rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap); }else{ rc = SQLITE_NOMEM; } - if( rc==SQLITE_OK ){ - if( pMap ){ - pIter->aMap = (u8*)pMap; - }else{ - pIter->nBuffer = nBuf; - pIter->aBuffer = (u8*)sqlite3Malloc(nBuf); - if( !pIter->aBuffer ){ - rc = SQLITE_NOMEM; - }else{ - int iBuf = iStart % nBuf; - if( iBuf ){ - int nRead = nBuf - iBuf; - if( (iStart + nRead) > pTask->iTemp1Off ){ - nRead = (int)(pTask->iTemp1Off - iStart); - } - rc = sqlite3OsRead( - pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart - ); - assert( rc!=SQLITE_IOERR_SHORT_READ ); - } + if( rc==SQLITE_OK && pIter->aMap==0 ){ + pIter->nBuffer = nBuf; + pIter->aBuffer = (u8*)sqlite3Malloc(nBuf); + if( !pIter->aBuffer ){ + rc = SQLITE_NOMEM; + }else{ + int iBuf = iStart % nBuf; + if( iBuf ){ + int nRead = nBuf - iBuf; + if( (iStart + nRead) > pFile->iEof ){ + nRead = (int)(pFile->iEof - iStart); + } + rc = sqlite3OsRead( + pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart + ); + assert( rc!=SQLITE_IOERR_SHORT_READ ); } } } if( rc==SQLITE_OK ){ u64 nByte; /* Size of PMA in bytes */ - pIter->iEof = pTask->iTemp1Off; + pIter->iEof = pFile->iEof; rc = vdbePmaReadVarint(pIter, &nByte); pIter->iEof = pIter->iReadOff + nByte; *pnByte += nByte; } @@ -667,15 +793,17 @@ pKeyInfo->db = 0; if( nField && nWorker==0 ) pKeyInfo->nField = nField; pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); pSorter->nTask = nWorker + 1; + pSorter->bUseThreads = (pSorter->nTask>1); for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; pTask->pKeyInfo = pKeyInfo; pTask->pgsz = pgsz; pTask->db = db; + pTask->pSorter = pSorter; } if( !sqlite3TempInMemory(db) ){ pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz; mxCache = db->aDb[0].pSchema->cache_size; @@ -686,12 +814,12 @@ ** allocation for each sort-key in memory. Otherwise, use a single big ** allocation at pSorter->aMemory for all sort-keys. */ if( sqlite3GlobalConfig.pHeap==0 ){ assert( pSorter->iMemory==0 ); pSorter->nMemory = pgsz; - pSorter->aMemory = (u8*)sqlite3Malloc(pgsz); - if( !pSorter->aMemory ) rc = SQLITE_NOMEM; + pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); + if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM; } } } return rc; @@ -714,45 +842,119 @@ ** fields of *pTask are zeroed before returning. */ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ sqlite3DbFree(db, pTask->pUnpacked); pTask->pUnpacked = 0; - if( pTask->aListMemory==0 ){ - vdbeSorterRecordFree(0, pTask->pList); + if( pTask->list.aMemory==0 ){ + vdbeSorterRecordFree(0, pTask->list.pList); }else{ - sqlite3_free(pTask->aListMemory); - pTask->aListMemory = 0; + sqlite3_free(pTask->list.aMemory); + pTask->list.aMemory = 0; + } + pTask->list.pList = 0; + if( pTask->file.pFd ){ + sqlite3OsCloseFree(pTask->file.pFd); + pTask->file.pFd = 0; + pTask->file.iEof = 0; + } + if( pTask->file2.pFd ){ + sqlite3OsCloseFree(pTask->file2.pFd); + pTask->file2.pFd = 0; + pTask->file2.iEof = 0; } - pTask->pList = 0; - if( pTask->pTemp1 ){ - sqlite3OsCloseFree(pTask->pTemp1); - pTask->pTemp1 = 0; +} + +#ifdef SQLITE_DEBUG_SORTER_THREADS +static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){ + i64 t; + int iTask = (pTask - pTask->pSorter->aTask); + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent); +} +static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){ + i64 t; + sqlite3OsCurrentTimeInt64(db->pVfs, &t); + fprintf(stderr, "%lld:X %s\n", t, zEvent); +} +static void vdbeSorterPopulateDebug( + SortSubtask *pTask, + const char *zEvent +){ + i64 t; + int iTask = (pTask - pTask->pSorter->aTask); + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent); +} +static void vdbeSorterBlockDebug( + SortSubtask *pTask, + int bBlocked, + const char *zEvent +){ + if( bBlocked ){ + i64 t; + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:main %s\n", t, zEvent); } } +#else +# define vdbeSorterWorkDebug(x,y) +# define vdbeSorterRewindDebug(x,y) +# define vdbeSorterPopulateDebug(x,y) +# define vdbeSorterBlockDebug(x,y,z) +#endif -/* -** Join all threads. -*/ #if SQLITE_MAX_WORKER_THREADS>0 +/* +** Join thread p. +*/ +static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){ + int rc = SQLITE_OK; + if( p->pThread ){ +#ifdef SQLITE_DEBUG_SORTER_THREADS + int bDone = p->bDone; +#endif + void *pRet; + vdbeSorterBlockDebug(pTask, !bDone, "enter"); + rc = sqlite3ThreadJoin(p->pThread, &pRet); + vdbeSorterBlockDebug(pTask, !bDone, "exit"); + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + assert( p->bDone==1 ); + p->bDone = 0; + p->pThread = 0; + } + return rc; +} + +/* +** Launch a background thread to run xTask(pIn). +*/ +static int vdbeSorterCreateThread( + SorterThread *p, /* Thread object to populate */ + void *(*xTask)(void*), /* Routine to run in a separate thread */ + void *pIn /* Argument passed into xTask() */ +){ + assert( p->pThread==0 && p->bDone==0 ); + return sqlite3ThreadCreate(&p->pThread, xTask, pIn); +} + +/* +** Join all outstanding threads launched by SorterWrite() to create +** level-0 PMAs. +*/ static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int rc = rcin; int i; for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; - if( pTask->pThread ){ - void *pRet; - int rc2 = sqlite3ThreadJoin(pTask->pThread, &pRet); - pTask->pThread = 0; - pTask->bDone = 0; - if( rc==SQLITE_OK ) rc = rc2; - if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); - } + int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread); + if( rc==SQLITE_OK ) rc = rc2; } return rc; } #else # define vdbeSorterJoinAll(x,rcin) (rcin) +# define vdbeSorterJoinThread(pTask,p) SQLITE_OK #endif /* ** Allocate a new MergeEngine object with space for nIter iterators. */ @@ -760,10 +962,11 @@ int N = 2; /* Smallest power of two >= nIter */ int nByte; /* Total bytes of space to allocate */ MergeEngine *pNew; /* Pointer to allocated object to return */ assert( nIter<=SORTER_MAX_MERGE_COUNT ); + while( NpReader ){ + vdbePmaReaderClear(pSorter->pReader); + sqlite3DbFree(db, pSorter->pReader); + pSorter->pReader = 0; + } vdbeMergeEngineFree(pSorter->pMerger); pSorter->pMerger = 0; for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; vdbeSortSubtaskCleanup(db, pTask); } - if( pSorter->aMemory==0 ){ - vdbeSorterRecordFree(0, pSorter->pRecord); + if( pSorter->list.aMemory==0 ){ + vdbeSorterRecordFree(0, pSorter->list.pList); } - pSorter->pRecord = 0; - pSorter->nInMemory = 0; + pSorter->list.pList = 0; + pSorter->list.szPMA = 0; pSorter->bUsePMA = 0; pSorter->iMemory = 0; + pSorter->mxKeysize = 0; + sqlite3DbFree(db, pSorter->pUnpacked); + pSorter->pUnpacked = 0; } /* ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines. */ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ VdbeSorter *pSorter = pCsr->pSorter; if( pSorter ){ sqlite3VdbeSorterReset(db, pSorter); - vdbeMergeEngineFree(pSorter->pMerger); - sqlite3_free(pSorter->aMemory); + sqlite3_free(pSorter->list.aMemory); sqlite3DbFree(db, pSorter); pCsr->pSorter = 0; } } @@ -838,10 +1048,25 @@ i64 max = SQLITE_MAX_MMAP_SIZE; sqlite3OsFileControlHint( *ppFile, SQLITE_FCNTL_MMAP_SIZE, (void*)&max); } return rc; } + +static int vdbeSortAllocUnpacked(SortSubtask *pTask){ + if( pTask->pUnpacked==0 ){ + char *pFree; + pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( + pTask->pKeyInfo, 0, 0, &pFree + ); + assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); + if( pFree==0 ) return SQLITE_NOMEM; + pTask->pUnpacked->nField = pTask->pKeyInfo->nField; + pTask->pUnpacked->errCode = 0; + } + return SQLITE_OK; +} + /* ** Merge the two sorted lists p1 and p2 into a single list. ** Set *ppOut to the head of the new list. */ @@ -878,29 +1103,33 @@ /* ** Sort the linked list of records headed at pTask->pList. Return ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if ** an error occurs. */ -static int vdbeSorterSort(SortSubtask *pTask){ +static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){ int i; SorterRecord **aSlot; SorterRecord *p; + int rc; + + rc = vdbeSortAllocUnpacked(pTask); + if( rc!=SQLITE_OK ) return rc; aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); if( !aSlot ){ return SQLITE_NOMEM; } - p = pTask->pList; + p = pList->pList; while( p ){ SorterRecord *pNext; - if( pTask->aListMemory ){ - if( (u8*)p==pTask->aListMemory ){ + if( pList->aMemory ){ + if( (u8*)p==pList->aMemory ){ pNext = 0; }else{ - assert( p->u.iNextaListMemory) ); - pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext]; + assert( p->u.iNextaMemory) ); + pNext = (SorterRecord*)&pList->aMemory[p->u.iNext]; } }else{ pNext = p->u.pNext; } @@ -915,13 +1144,17 @@ p = 0; for(i=0; i<64; i++){ vdbeSorterMerge(pTask, p, aSlot[i], &p); } - pTask->pList = p; + pList->pList = p; sqlite3_free(aSlot); + if( pTask->pUnpacked->errCode ){ + assert( pTask->pUnpacked->errCode==SQLITE_NOMEM ); + return SQLITE_NOMEM; + } return SQLITE_OK; } /* ** Initialize a PMA-writer object. @@ -1031,12 +1264,13 @@ # define vdbeSorterExtendFile(x,y,z) SQLITE_OK #endif /* -** Write the current contents of the in-memory linked-list to a PMA. Return -** SQLITE_OK if successful, or an SQLite error code otherwise. +** Write the current contents of in-memory linked-list pList to a level-0 +** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if +** successful, or an SQLite error code otherwise. ** ** The format of a PMA is: ** ** * A varint. This varint contains the total number of bytes of content ** in the PMA (not including the varint itself). @@ -1043,51 +1277,65 @@ ** ** * One or more records packed end-to-end in order of ascending keys. ** Each record consists of a varint followed by a blob of data (the ** key). The varint is the number of bytes in the blob of data. */ -static int vdbeSorterListToPMA(SortSubtask *pTask){ +static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ int rc = SQLITE_OK; /* Return code */ PmaWriter writer; /* Object used to write to the file */ +#ifdef SQLITE_DEBUG + /* Set iSz to the expected size of file pTask->file after writing the PMA. + ** This is used by an assert() statement at the end of this function. */ + i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof; +#endif + + vdbeSorterWorkDebug(pTask, "enter"); memset(&writer, 0, sizeof(PmaWriter)); - assert( pTask->nInMemory>0 ); + assert( pList->szPMA>0 ); /* If the first temporary PMA file has not been opened, open it now. */ - if( pTask->pTemp1==0 ){ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->pTemp1); - assert( rc!=SQLITE_OK || pTask->pTemp1 ); - assert( pTask->iTemp1Off==0 ); + if( pTask->file.pFd==0 ){ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd); + assert( rc!=SQLITE_OK || pTask->file.pFd ); + assert( pTask->file.iEof==0 ); assert( pTask->nPMA==0 ); } /* Try to get the file to memory map */ if( rc==SQLITE_OK ){ vdbeSorterExtendFile(pTask->db, - pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9 + pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9 ); } + + /* Sort the list */ + if( rc==SQLITE_OK ){ + rc = vdbeSorterSort(pTask, pList); + } if( rc==SQLITE_OK ){ SorterRecord *p; SorterRecord *pNext = 0; - vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz, - pTask->iTemp1Off); + vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz, + pTask->file.iEof); pTask->nPMA++; - vdbePmaWriteVarint(&writer, pTask->nInMemory); - for(p=pTask->pList; p; p=pNext){ + vdbePmaWriteVarint(&writer, pList->szPMA); + for(p=pList->pList; p; p=pNext){ pNext = p->u.pNext; vdbePmaWriteVarint(&writer, p->nVal); vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); - if( pTask->aListMemory==0 ) sqlite3_free(p); + if( pList->aMemory==0 ) sqlite3_free(p); } - pTask->pList = p; - rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off); + pList->pList = p; + rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof); } - assert( pTask->pList==0 || rc!=SQLITE_OK ); + vdbeSorterWorkDebug(pTask, "exit"); + assert( rc!=SQLITE_OK || pList->pList==0 ); + assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); return rc; } /* ** Advance the MergeEngine iterator passed as the second argument to @@ -1165,217 +1413,85 @@ } /* ** The main routine for sorter-thread operations. */ -static void *vdbeSortSubtaskMain(void *pCtx){ - int rc = SQLITE_OK; +static void *vdbeSorterFlushThread(void *pCtx){ SortSubtask *pTask = (SortSubtask*)pCtx; - - assert( pTask->eWork==SORT_SUBTASK_SORT - || pTask->eWork==SORT_SUBTASK_TO_PMA - || pTask->eWork==SORT_SUBTASK_CONS - ); - assert( pTask->bDone==0 ); - - if( pTask->pUnpacked==0 ){ - char *pFree; - pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( - pTask->pKeyInfo, 0, 0, &pFree - ); - assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); - if( pFree==0 ){ - rc = SQLITE_NOMEM; - goto thread_out; - } - pTask->pUnpacked->nField = pTask->pKeyInfo->nField; - pTask->pUnpacked->errCode = 0; - } - - if( pTask->eWork==SORT_SUBTASK_CONS ){ - assert( pTask->pList==0 ); - while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){ - int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT); - sqlite3_file *pTemp2 = 0; /* Second temp file to use */ - MergeEngine *pMerger; /* Object for reading/merging PMA data */ - i64 iReadOff = 0; /* Offset in pTemp1 to read from */ - i64 iWriteOff = 0; /* Offset in pTemp2 to write to */ - int i; - - /* Allocate a merger object to merge PMAs together. */ - pMerger = vdbeMergeEngineNew(nIter); - if( pMerger==0 ){ - rc = SQLITE_NOMEM; - break; - } - - /* Open a second temp file to write merged data to */ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2); - if( rc==SQLITE_OK ){ - vdbeSorterExtendFile(pTask->db, pTemp2, pTask->iTemp1Off); - }else{ - vdbeMergeEngineFree(pMerger); - break; - } - - /* This loop runs once for each output PMA. Each output PMA is made - ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */ - for(i=0; rc==SQLITE_OK && inPMA; i+=SORTER_MAX_MERGE_COUNT){ - PmaWriter writer; /* Object for writing data to pTemp2 */ - i64 nOut = 0; /* Bytes of data in output PMA */ - int bEof = 0; - int rc2; - - /* Configure the merger object to read and merge data from the next - ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs, - ** if that is fewer). */ - int iIter; - for(iIter=0; iIteraIter[iIter]; - rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut); - iReadOff = pIter->iEof; - if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break; - } - for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){ - rc = vdbeSorterDoCompare(pTask, pMerger, iIter); - } - - vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff); - vdbePmaWriteVarint(&writer, nOut); - while( rc==SQLITE_OK && bEof==0 ){ - PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ]; - assert( pIter->pFile!=0 ); /* pIter is not at EOF */ - vdbePmaWriteVarint(&writer, pIter->nKey); - vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey); - rc = vdbeSorterNext(pTask, pMerger, &bEof); - } - rc2 = vdbePmaWriterFinish(&writer, &iWriteOff); - if( rc==SQLITE_OK ) rc = rc2; - } - - vdbeMergeEngineFree(pMerger); - sqlite3OsCloseFree(pTask->pTemp1); - pTask->pTemp1 = pTemp2; - pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT); - pTask->iTemp1Off = iWriteOff; - } - }else{ - /* Sort the pTask->pList list */ - rc = vdbeSorterSort(pTask); - - /* If required, write the list out to a PMA. */ - if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){ -#ifdef SQLITE_DEBUG - i64 nExpect = pTask->nInMemory - + sqlite3VarintLen(pTask->nInMemory) - + pTask->iTemp1Off; -#endif - rc = vdbeSorterListToPMA(pTask); - assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) ); - } - } - - thread_out: - pTask->bDone = 1; - if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){ - assert( pTask->pUnpacked->errCode==SQLITE_NOMEM ); - rc = SQLITE_NOMEM; - } + int rc; /* Return code */ + assert( pTask->thread.bDone==0 ); + rc = vdbeSorterListToPMA(pTask, &pTask->list); + pTask->thread.bDone = 1; return SQLITE_INT_TO_PTR(rc); } /* -** Run the activity scheduled by the object passed as the only argument -** in the current thread. -*/ -static int vdbeSorterRunTask(SortSubtask *pTask){ - int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) ); - assert( pTask->bDone ); - pTask->bDone = 0; - return rc; -} - -/* -** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly +** Flush the current contents of VdbeSorter.list to a new PMA, possibly ** using a background thread. -** -** If argument bFg is non-zero, the operation always uses the calling thread. */ -static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ - VdbeSorter *pSorter = pCsr->pSorter; +static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ +#if SQLITE_MAX_WORKER_THREADS==0 + pSorter->bUsePMA = 1; + return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list); +#else int rc = SQLITE_OK; int i; SortSubtask *pTask = 0; /* Thread context used to create new PMA */ int nWorker = (pSorter->nTask-1); + /* Set the flag to indicate that at least one PMA has been written. + ** Or will be, anyhow. */ pSorter->bUsePMA = 1; + + /* Select a sub-task to sort and flush the current list of in-memory + ** records to disk. If the sorter is running in multi-threaded mode, + ** round-robin between the first (pSorter->nTask-1) tasks. Except, if + ** the background thread from a sub-tasks previous turn is still running, + ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy, + ** fall back to using the final sub-task. The first (pSorter->nTask-1) + ** sub-tasks are prefered as they use background threads - the final + ** sub-task uses the main thread. */ for(i=0; iiPrev + i + 1) % nWorker; pTask = &pSorter->aTask[iTest]; -#if SQLITE_MAX_WORKER_THREADS>0 - if( pTask->bDone ){ - void *pRet; - assert( pTask->pThread ); - rc = sqlite3ThreadJoin(pTask->pThread, &pRet); - pTask->pThread = 0; - pTask->bDone = 0; - if( rc==SQLITE_OK ){ - rc = SQLITE_PTR_TO_INT(pRet); - } - } -#endif - if( pTask->pThread==0 ) break; - pTask = 0; - } - if( pTask==0 ){ - pTask = &pSorter->aTask[nWorker]; - } - pSorter->iPrev = (pTask - pSorter->aTask); - - if( rc==SQLITE_OK ){ - assert( pTask->pThread==0 && pTask->bDone==0 ); - pTask->eWork = SORT_SUBTASK_TO_PMA; - pTask->pList = pSorter->pRecord; - pTask->nInMemory = pSorter->nInMemory; - pSorter->nInMemory = 0; - pSorter->pRecord = 0; - - if( pSorter->aMemory ){ - u8 *aMem = pTask->aListMemory; - pTask->aListMemory = pSorter->aMemory; - pSorter->aMemory = aMem; - } - -#if SQLITE_MAX_WORKER_THREADS>0 - if( !bFg && pTask!=&pSorter->aTask[nWorker] ){ - /* Launch a background thread for this operation */ - void *pCtx = (void*)pTask; - assert( pSorter->aMemory==0 || pTask->aListMemory!=0 ); - if( pTask->aListMemory ){ - if( pSorter->aMemory==0 ){ - pSorter->aMemory = sqlite3Malloc(pSorter->nMemory); - if( pSorter->aMemory==0 ) return SQLITE_NOMEM; - }else{ - pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory); - } - } - rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx); - }else -#endif - { - /* Use the foreground thread for this operation */ - rc = vdbeSorterRunTask(pTask); - if( rc==SQLITE_OK ){ - u8 *aMem = pTask->aListMemory; - pTask->aListMemory = pSorter->aMemory; - pSorter->aMemory = aMem; - assert( pTask->pList==0 ); - } - } - } - - return rc; + if( pTask->thread.bDone ){ + rc = vdbeSorterJoinThread(pTask, &pTask->thread); + } + if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break; + } + + if( rc==SQLITE_OK ){ + if( i==nWorker ){ + /* Use the foreground thread for this operation */ + rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list); + }else{ + /* Launch a background thread for this operation */ + u8 *aMem = pTask->list.aMemory; + void *pCtx = (void*)pTask; + + assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 ); + assert( pTask->list.pList==0 ); + assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 ); + + pSorter->iPrev = (pTask - pSorter->aTask); + pTask->list = pSorter->list; + pSorter->list.pList = 0; + pSorter->list.szPMA = 0; + if( aMem ){ + pSorter->list.aMemory = aMem; + pSorter->nMemory = sqlite3MallocSize(aMem); + }else{ + pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory); + if( !pSorter->list.aMemory ) return SQLITE_NOMEM; + } + + rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx); + } + } + + return rc; +#endif } /* ** Add a record to the sorter. */ @@ -1411,76 +1527,483 @@ ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true. */ nReq = pVal->n + sizeof(SorterRecord); nPMA = pVal->n + sqlite3VarintLen(pVal->n); if( pSorter->mxPmaSize ){ - if( pSorter->aMemory ){ + if( pSorter->list.aMemory ){ bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize; }else{ bFlush = ( - (pSorter->nInMemory > pSorter->mxPmaSize) - || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) + (pSorter->list.szPMA > pSorter->mxPmaSize) + || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) ); } if( bFlush ){ - rc = vdbeSorterFlushPMA(db, pCsr, 0); - pSorter->nInMemory = 0; + rc = vdbeSorterFlushPMA(pSorter); + pSorter->list.szPMA = 0; pSorter->iMemory = 0; - assert( rc!=SQLITE_OK || pSorter->pRecord==0 ); + assert( rc!=SQLITE_OK || pSorter->list.pList==0 ); } } - pSorter->nInMemory += nPMA; + pSorter->list.szPMA += nPMA; + if( nPMA>pSorter->mxKeysize ){ + pSorter->mxKeysize = nPMA; + } - if( pSorter->aMemory ){ + if( pSorter->list.aMemory ){ int nMin = pSorter->iMemory + nReq; if( nMin>pSorter->nMemory ){ u8 *aNew; int nNew = pSorter->nMemory * 2; while( nNew < nMin ) nNew = nNew*2; if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; if( nNew < nMin ) nNew = nMin; - aNew = sqlite3Realloc(pSorter->aMemory, nNew); + aNew = sqlite3Realloc(pSorter->list.aMemory, nNew); if( !aNew ) return SQLITE_NOMEM; - pSorter->pRecord = (SorterRecord*)( - aNew + ((u8*)pSorter->pRecord - pSorter->aMemory) + pSorter->list.pList = (SorterRecord*)( + aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory) ); - pSorter->aMemory = aNew; + pSorter->list.aMemory = aNew; pSorter->nMemory = nNew; } - pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory]; + pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory]; pSorter->iMemory += ROUND8(nReq); - pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory; + pNew->u.iNext = (u8*)(pSorter->list.pList) - pSorter->list.aMemory; }else{ pNew = (SorterRecord *)sqlite3Malloc(nReq); if( pNew==0 ){ return SQLITE_NOMEM; } - pNew->u.pNext = pSorter->pRecord; + pNew->u.pNext = pSorter->list.pList; } memcpy(SRVAL(pNew), pVal->z, pVal->n); pNew->nVal = pVal->n; - pSorter->pRecord = pNew; + pSorter->list.pList = pNew; + + return rc; +} + +/* +** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format +** of the data stored in aFile[1] is the same as that used by regular PMAs, +** except that the number-of-bytes varint is omitted from the start. +*/ +static int vdbeIncrPopulate(IncrMerger *pIncr){ + int rc = SQLITE_OK; + int rc2; + i64 iStart = pIncr->iStartOff; + SorterFile *pOut = &pIncr->aFile[1]; + MergeEngine *pMerger = pIncr->pMerger; + PmaWriter writer; + assert( pIncr->bEof==0 ); + + vdbeSorterPopulateDebug(pIncr->pTask, "enter"); + + vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart); + while( rc==SQLITE_OK ){ + int dummy; + PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ]; + int nKey = pReader->nKey; + i64 iEof = writer.iWriteOff + writer.iBufEnd; + + /* Check if the output file is full or if the input has been exhausted. + ** In either case exit the loop. */ + if( pReader->pFile==0 ) break; + if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break; + + /* Write the next key to the output. */ + vdbePmaWriteVarint(&writer, nKey); + vdbePmaWriteBlob(&writer, pReader->aKey, nKey); + rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy); + } + + rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); + if( rc==SQLITE_OK ) rc = rc2; + vdbeSorterPopulateDebug(pIncr->pTask, "exit"); + return rc; +} + +static void *vdbeIncrPopulateThread(void *pCtx){ + IncrMerger *pIncr = (IncrMerger*)pCtx; + void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); + pIncr->thread.bDone = 1; + return pRet; +} + +#if SQLITE_MAX_WORKER_THREADS>0 +static int vdbeIncrBgPopulate(IncrMerger *pIncr){ + void *pCtx = (void*)pIncr; + assert( pIncr->bUseThread ); + return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx); +} +#endif + +static int vdbeIncrSwap(IncrMerger *pIncr){ + int rc = SQLITE_OK; + +#if SQLITE_MAX_WORKER_THREADS>0 + if( pIncr->bUseThread ){ + rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread); + + if( rc==SQLITE_OK ){ + SorterFile f0 = pIncr->aFile[0]; + pIncr->aFile[0] = pIncr->aFile[1]; + pIncr->aFile[1] = f0; + } + + if( rc==SQLITE_OK ){ + if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ + pIncr->bEof = 1; + }else{ + rc = vdbeIncrBgPopulate(pIncr); + } + } + }else +#endif + { + rc = vdbeIncrPopulate(pIncr); + pIncr->aFile[0] = pIncr->aFile[1]; + if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ + pIncr->bEof = 1; + } + } + + return rc; +} + +static void vdbeIncrFree(IncrMerger *pIncr){ + if( pIncr ){ +#if SQLITE_MAX_WORKER_THREADS>0 + vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread); + if( pIncr->bUseThread ){ + if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); + if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); + } +#endif + vdbeMergeEngineFree(pIncr->pMerger); + sqlite3_free(pIncr); + } +} + +static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){ + IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger)); + if( pIncr ){ + memset(pIncr, 0, sizeof(IncrMerger)); + pIncr->pMerger = pMerger; + pIncr->pTask = pTask; + pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); + pTask->file2.iEof += pIncr->mxSz; + } + return pIncr; +} + +static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){ + if( bUseThread ){ + pIncr->bUseThread = 1; + pIncr->pTask->file2.iEof -= pIncr->mxSz; + } +} + +#define INCRINIT2_NORMAL 0 +#define INCRINIT2_TASK 1 +#define INCRINIT2_ROOT 2 + +static int vdbeIncrInit2(PmaReader *pIter, int eMode); + +static int vdbeIncrInitMerger( + SortSubtask *pTask, + MergeEngine *pMerger, + int eMode +){ + int i; + int rc = SQLITE_OK; + + for(i=0; rc==SQLITE_OK && inTree; i++){ + if( eMode==INCRINIT2_ROOT ){ + rc = vdbePmaReaderNext(&pMerger->aIter[i]); + }else{ + rc = vdbeIncrInit2(&pMerger->aIter[i], INCRINIT2_NORMAL); + } + } + + for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ + rc = vdbeSorterDoCompare(pTask, pMerger, i); + } + + return rc; +} + +static int vdbeIncrInit2(PmaReader *pIter, int eMode){ + int rc = SQLITE_OK; + IncrMerger *pIncr = pIter->pIncr; + if( pIncr ){ + SortSubtask *pTask = pIncr->pTask; + + rc = vdbeIncrInitMerger(pTask, pIncr->pMerger, eMode); + + /* Set up the required files for pIncr */ + if( rc==SQLITE_OK ){ + if( pIncr->bUseThread==0 ){ + if( pTask->file2.pFd==0 ){ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd); + assert( pTask->file2.iEof>0 ); + if( rc==SQLITE_OK ){ + vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof); + pTask->file2.iEof = 0; + } + } + if( rc==SQLITE_OK ){ + pIncr->aFile[1].pFd = pTask->file2.pFd; + pIncr->iStartOff = pTask->file2.iEof; + pTask->file2.iEof += pIncr->mxSz; + } + }else{ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd); + if( rc==SQLITE_OK ){ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd); + } + } + } + + if( rc==SQLITE_OK && pIncr->bUseThread ){ + /* Use the current thread */ + assert( eMode==INCRINIT2_ROOT || eMode==INCRINIT2_TASK ); + rc = vdbeIncrPopulate(pIncr); + } + + if( rc==SQLITE_OK && eMode!=INCRINIT2_TASK ){ + rc = vdbePmaReaderNext(pIter); + } + } + return rc; +} + +#if SQLITE_MAX_WORKER_THREADS>0 +static void *vdbeIncrInit2Thread(void *pCtx){ + PmaReader *pReader = (PmaReader*)pCtx; + void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) ); + pReader->pIncr->thread.bDone = 1; + return pRet; +} + +static int vdbeIncrBgInit2(PmaReader *pIter){ + void *pCtx = (void*)pIter; + return vdbeSorterCreateThread( + &pIter->pIncr->thread, vdbeIncrInit2Thread, pCtx + ); +} +#endif + +/* +** Allocate a new MergeEngine object to merge the contents of nPMA level-0 +** PMAs from pTask->file. If no error occurs, set *ppOut to point to +** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut +** to NULL and return an SQLite error code. +** +** When this function is called, *piOffset is set to the offset of the +** first PMA to read from pTask->file. Assuming no error occurs, it is +** set to the offset immediately following the last byte of the last +** PMA before returning. If an error does occur, then the final value of +** *piOffset is undefined. +*/ +static int vdbeMergeEngineLevel0( + SortSubtask *pTask, /* Sorter task to read from */ + int nPMA, /* Number of PMAs to read */ + i64 *piOffset, /* IN/OUT: Read offset in pTask->file */ + MergeEngine **ppOut /* OUT: New merge-engine */ +){ + MergeEngine *pNew; /* Merge engine to return */ + i64 iOff = *piOffset; + int i; + int rc = SQLITE_OK; + + *ppOut = pNew = vdbeMergeEngineNew(nPMA); + if( pNew==0 ) rc = SQLITE_NOMEM; + + for(i=0; iaIter[i]; + rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pIter, &nDummy); + iOff = pIter->iEof; + } + + if( rc!=SQLITE_OK ){ + vdbeMergeEngineFree(pNew); + *ppOut = 0; + } + *piOffset = iOff; + return rc; +} + +typedef struct IncrBuilder IncrBuilder; +struct IncrBuilder { + int nPMA; /* Number of iterators used so far */ + MergeEngine *pMerger; /* Merge engine to populate. */ +}; + +static int vdbeAddToBuilder( + SortSubtask *pTask, + IncrBuilder *pBuilder, + MergeEngine *pMerger +){ + int rc = SQLITE_OK; + IncrMerger *pIncr; + + assert( pMerger ); + if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){ + rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger); + pBuilder->pMerger = 0; + pBuilder->nPMA = 0; + } + + if( rc==SQLITE_OK && pBuilder->pMerger==0 ){ + pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); + if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM; + } + + if( rc==SQLITE_OK ){ + pIncr = vdbeIncrNew(pTask, pMerger); + if( pIncr==0 ) rc = SQLITE_NOMEM; + pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr; + } + + if( rc!=SQLITE_OK ){ + vdbeMergeEngineFree(pMerger); + } return rc; } /* -** Return the total number of PMAs in all temporary files. +** Populate iterator *pIter so that it may be used to iterate through all +** keys stored in all PMAs created by this sorter. */ -static int vdbeSorterCountPMA(VdbeSorter *pSorter){ - int nPMA = 0; - int i; - for(i=0; inTask; i++){ - nPMA += pSorter->aTask[i].nPMA; - } - return nPMA; -} +static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){ + SortSubtask *pTask0 = &pSorter->aTask[0]; + MergeEngine *pMain = 0; + sqlite3 *db = pTask0->db; + int rc = SQLITE_OK; + int iTask; + + IncrBuilder *aMerge; + const int nMerge = 32; + aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge); + if( aMerge==0 ) return SQLITE_NOMEM; + + if( pSorter->nTask>1 ){ + pMain = vdbeMergeEngineNew(pSorter->nTask); + if( pMain==0 ) rc = SQLITE_NOMEM; + } + + for(iTask=0; iTasknTask && rc==SQLITE_OK; iTask++){ + MergeEngine *pRoot = 0; + int iPMA; + i64 iReadOff = 0; + SortSubtask *pTask = &pSorter->aTask[iTask]; + if( pTask->nPMA==0 ) continue; + for(iPMA=0; iPMAnPMA; iPMA += SORTER_MAX_MERGE_COUNT){ + MergeEngine *pMerger = 0; + int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT); + + rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger); + if( rc!=SQLITE_OK ) break; + + if( iPMA==0 ){ + pRoot = pMerger; + }else{ + if( pRoot ){ + rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot); + pRoot = 0; + if( rc!=SQLITE_OK ){ + vdbeMergeEngineFree(pMerger); + break; + } + } + rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger); + } + } + + if( pRoot==0 ){ + int i; + for(i=0; rc==SQLITE_OK && iaIter[iTask].pIncr = pNew; + if( pNew==0 ) rc = SQLITE_NOMEM; + } + memset(aMerge, 0, nMerge*sizeof(aMerge[0])); + } + } + + if( rc==SQLITE_OK ){ +#if SQLITE_MAX_WORKER_THREADS + if( pSorter->bUseThreads ){ + PmaReader *pIter; + SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1]; + rc = vdbeSortAllocUnpacked(pLast); + if( rc==SQLITE_OK ){ + pIter = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader)); + pSorter->pReader = pIter; + } + if( rc==SQLITE_OK ){ + pIter->pIncr = vdbeIncrNew(pLast, pMain); + if( pIter->pIncr==0 ){ + rc = SQLITE_NOMEM; + } + else{ + vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads); + for(iTask=0; iTask<(pSorter->nTask-1); iTask++){ + IncrMerger *pIncr; + if( (pIncr = pMain->aIter[iTask].pIncr) ){ + vdbeIncrSetThreads(pIncr, pSorter->bUseThreads); + assert( pIncr->pTask!=pLast ); + } + } + if( pSorter->nTask>1 ){ + for(iTask=0; rc==SQLITE_OK && iTasknTask; iTask++){ + PmaReader *p = &pMain->aIter[iTask]; + assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] ); + if( p->pIncr ){ rc = vdbeIncrBgInit2(p); } + } + } + } + } + if( rc==SQLITE_OK ){ + int eMode = (pSorter->nTask>1 ? INCRINIT2_ROOT : INCRINIT2_NORMAL); + rc = vdbeIncrInit2(pIter, eMode); + } + }else +#endif + { + pSorter->pMerger = pMain; + rc = vdbeIncrInitMerger(pTask0, pMain, INCRINIT2_NORMAL); + } + } + + sqlite3_free(aMerge); + return rc; +} + /* ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, ** this function is called to prepare for iterating through the records ** in sorted order. @@ -1493,99 +2016,38 @@ /* If no data has been written to disk, then do not do so now. Instead, ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly ** from the in-memory list. */ if( pSorter->bUsePMA==0 ){ - if( pSorter->pRecord ){ - SortSubtask *pTask = &pSorter->aTask[0]; + if( pSorter->list.pList ){ *pbEof = 0; - pTask->pList = pSorter->pRecord; - pTask->eWork = SORT_SUBTASK_SORT; - assert( pTask->aListMemory==0 ); - pTask->aListMemory = pSorter->aMemory; - rc = vdbeSorterRunTask(pTask); - pTask->aListMemory = 0; - pSorter->pRecord = pTask->pList; - pTask->pList = 0; + rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list); }else{ *pbEof = 1; } return rc; } /* Write the current in-memory list to a PMA. */ - if( pSorter->pRecord ){ - rc = vdbeSorterFlushPMA(db, pCsr, 1); - } - - /* Join all threads */ - rc = vdbeSorterJoinAll(pSorter, rc); - - /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge - ** some of them together so that this is no longer the case. */ - if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ - int i; - for(i=0; rc==SQLITE_OK && inTask; i++){ - SortSubtask *pTask = &pSorter->aTask[i]; - if( pTask->pTemp1 ){ - pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask; - pTask->eWork = SORT_SUBTASK_CONS; - -#if SQLITE_MAX_WORKER_THREADS>0 - if( i<(pSorter->nTask-1) ){ - void *pCtx = (void*)pTask; - rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx); - }else -#endif - { - rc = vdbeSorterRunTask(pTask); - } - } - } - } - - /* Join all threads */ - rc = vdbeSorterJoinAll(pSorter, rc); - - /* Assuming no errors have occurred, set up a merger structure to read - ** and merge all remaining PMAs. */ - assert( pSorter->pMerger==0 ); - if( rc==SQLITE_OK ){ - int nIter = 0; /* Number of iterators used */ - int i; - MergeEngine *pMerger; - for(i=0; inTask; i++){ - nIter += pSorter->aTask[i].nPMA; - } - - pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter); - if( pMerger==0 ){ - rc = SQLITE_NOMEM; - }else{ - int iIter = 0; - int iThread = 0; - for(iThread=0; iThreadnTask; iThread++){ - int iPMA; - i64 iReadOff = 0; - SortSubtask *pTask = &pSorter->aTask[iThread]; - for(iPMA=0; iPMAnPMA && rc==SQLITE_OK; iPMA++){ - i64 nDummy = 0; - PmaReader *pIter = &pMerger->aIter[iIter++]; - rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy); - iReadOff = pIter->iEof; - } - } - - for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ - rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i); - } - } - } - - if( rc==SQLITE_OK ){ - *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0); - } + if( pSorter->list.pList ){ + rc = vdbeSorterFlushPMA(pSorter); + } + + /* Join all threads */ + rc = vdbeSorterJoinAll(pSorter, rc); + + vdbeSorterRewindDebug(db, "rewind"); + + /* Assuming no errors have occurred, set up a merger structure to + ** incrementally read and merge all remaining PMAs. */ + assert( pSorter->pReader==0 ); + if( rc==SQLITE_OK ){ + rc = vdbePmaReaderIncrInit(pSorter); + *pbEof = 0; + } + + vdbeSorterRewindDebug(db, "rewinddone"); return rc; } /* ** Advance to the next element in the sorter. @@ -1592,18 +2054,27 @@ */ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ VdbeSorter *pSorter = pCsr->pSorter; int rc; /* Return code */ - if( pSorter->pMerger ){ - rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof); + assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) ); + if( pSorter->bUsePMA ){ + assert( pSorter->pReader==0 || pSorter->pMerger==0 ); + assert( pSorter->bUseThreads==0 || pSorter->pReader ); + assert( pSorter->bUseThreads==1 || pSorter->pMerger ); + if( pSorter->bUseThreads ){ + rc = vdbePmaReaderNext(pSorter->pReader); + *pbEof = (pSorter->pReader->pFile==0); + }else{ + rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof); + } }else{ - SorterRecord *pFree = pSorter->pRecord; - pSorter->pRecord = pFree->u.pNext; + SorterRecord *pFree = pSorter->list.pList; + pSorter->list.pList = pFree->u.pNext; pFree->u.pNext = 0; - if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree); - *pbEof = !pSorter->pRecord; + if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); + *pbEof = !pSorter->list.pList; rc = SQLITE_OK; } return rc; } @@ -1614,18 +2085,19 @@ static void *vdbeSorterRowkey( const VdbeSorter *pSorter, /* Sorter object */ int *pnKey /* OUT: Size of current key in bytes */ ){ void *pKey; - if( pSorter->pMerger ){ - PmaReader *pIter; - pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ]; - *pnKey = pIter->nKey; - pKey = pIter->aKey; + if( pSorter->bUsePMA ){ + PmaReader *pReader = (pSorter->bUseThreads ? + pSorter->pReader : &pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]] + ); + *pnKey = pReader->nKey; + pKey = pReader->aKey; }else{ - *pnKey = pSorter->pRecord->nVal; - pKey = SRVAL(pSorter->pRecord); + *pnKey = pSorter->list.pList->nVal; + pKey = SRVAL(pSorter->list.pList); } return pKey; } /* @@ -1667,17 +2139,23 @@ Mem *pVal, /* Value to compare to current sorter key */ int nIgnore, /* Ignore this many fields at the end */ int *pRes /* OUT: Result of comparison */ ){ VdbeSorter *pSorter = pCsr->pSorter; - UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked; + UnpackedRecord *r2 = pSorter->pUnpacked; KeyInfo *pKeyInfo = pCsr->pKeyInfo; int i; void *pKey; int nKey; /* Sorter key to compare pVal with */ + if( r2==0 ){ + char *p; + r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p); + assert( pSorter->pUnpacked==(UnpackedRecord*)p ); + if( r2==0 ) return SQLITE_NOMEM; + r2->nField = pKeyInfo->nField-nIgnore; + } assert( r2->nField>=pKeyInfo->nField-nIgnore ); - r2->nField = pKeyInfo->nField-nIgnore; pKey = vdbeSorterRowkey(pSorter, &nKey); sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); for(i=0; inField; i++){ if( r2->aMem[i].flags & MEM_Null ){ Index: test/sort2.test ================================================================== --- test/sort2.test +++ test/sort2.test @@ -13,43 +13,73 @@ set testdir [file dirname $argv0] source $testdir/tester.tcl set testprefix sort2 -db close -sqlite3_shutdown -sqlite3_config_worker_threads 7 -reset_db - -do_execsql_test 1 { - PRAGMA cache_size = 5; - WITH r(x,y) AS ( - SELECT 1, randomblob(100) - UNION ALL - SELECT x+1, randomblob(100) FROM r - LIMIT 100000 - ) - SELECT count(x), length(y) FROM r GROUP BY (x%5) -} { - 20000 100 20000 100 20000 100 20000 100 20000 100 -} - -do_execsql_test 2.1 { - CREATE TABLE t1(a, b); - WITH r(x,y) AS ( - SELECT 1, randomblob(100) - UNION ALL - SELECT x+1, randomblob(100) FROM r - LIMIT 10000 - ) INSERT INTO t1 SELECT * FROM r; -} - -do_execsql_test 2.2 { - CREATE UNIQUE INDEX i1 ON t1(b, a); -} - -db close -sqlite3_shutdown -sqlite3_config_worker_threads 0 -sqlite3_initialize +foreach {tn script} { + 1 { } + 2 { + catch { db close } + sqlite3_shutdown + sqlite3_config_worker_threads 7 + reset_db + } +} { + + eval $script + + do_execsql_test $tn.1 { + PRAGMA cache_size = 5; + WITH r(x,y) AS ( + SELECT 1, randomblob(100) + UNION ALL + SELECT x+1, randomblob(100) FROM r + LIMIT 100000 + ) + SELECT count(x), length(y) FROM r GROUP BY (x%5) + } { + 20000 100 20000 100 20000 100 20000 100 20000 100 + } + + do_execsql_test $tn.2.1 { + CREATE TABLE t1(a, b); + WITH r(x,y) AS ( + SELECT 1, randomblob(100) + UNION ALL + SELECT x+1, randomblob(100) FROM r + LIMIT 10000 + ) INSERT INTO t1 SELECT * FROM r; + } + + do_execsql_test $tn.2.2 { + CREATE UNIQUE INDEX i1 ON t1(b, a); + } + + do_execsql_test $tn.2.3 { + CREATE UNIQUE INDEX i2 ON t1(a); + } + + do_execsql_test $tn.2.4 { PRAGMA integrity_check } {ok} + + breakpoint + do_execsql_test $tn.3 { + PRAGMA cache_size = 5; + WITH r(x,y) AS ( + SELECT 1, randomblob(100) + UNION ALL + SELECT x+1, randomblob(100) FROM r + LIMIT 1000000 + ) + SELECT count(x), length(y) FROM r GROUP BY (x%5) + } { + 200000 100 200000 100 200000 100 200000 100 200000 100 + } + + db close + sqlite3_shutdown + sqlite3_config_worker_threads 0 + sqlite3_initialize + +} + finish_test