/ Check-in [ff0b5c851b]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add an experimental multi-threaded capability to vdbesorter.c.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | threads
Files: files | file ages | folders
SHA1: ff0b5c851ba7d04d1836d7c6a3222713e7d8d357
User & Date: dan 2014-03-17 15:43:05
Context
2014-03-25
13:17
Merge all fixes and enhancements from trunk. check-in: b415dfb6cb user: drh tags: threads
2014-03-17
15:43
Add an experimental multi-threaded capability to vdbesorter.c. check-in: ff0b5c851b user: dan tags: threads
2014-03-13
15:41
Merge latest trunk changes into this branch. check-in: d17231b63d user: dan tags: threads
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/threads.c.

    43     43   /* Create a new thread */
    44     44   int sqlite3ThreadCreate(
    45     45     SQLiteThread **ppThread,  /* OUT: Write the thread object here */
    46     46     void *(*xTask)(void*),    /* Routine to run in a separate thread */
    47     47     void *pIn                 /* Argument passed into xTask() */
    48     48   ){
    49     49     SQLiteThread *p;
    50         -  int rc;
    51     50   
    52     51     assert( ppThread!=0 );
    53     52     assert( xTask!=0 );
    54     53     *ppThread = 0;
    55     54     p = sqlite3Malloc(sizeof(*p));
    56     55     if( p==0 ) return SQLITE_NOMEM;
    57     56     memset(p, 0, sizeof(*p));

Changes to src/vdbesort.c.

    16     16   */
    17     17   
    18     18   #include "sqliteInt.h"
    19     19   #include "vdbeInt.h"
    20     20   
    21     21   
    22     22   typedef struct VdbeSorterIter VdbeSorterIter;
           23  +typedef struct SorterThread SorterThread;
    23     24   typedef struct SorterRecord SorterRecord;
           25  +typedef struct SorterMerger SorterMerger;
    24     26   typedef struct FileWriter FileWriter;
           27  +
           28  +
           29  +/*
           30  +** Maximum number of threads to use. Setting this value to 1 forces all
           31  +** operations to be single-threaded.
           32  +*/
           33  +#ifndef SQLITE_MAX_SORTER_THREAD
           34  +# define SQLITE_MAX_SORTER_THREAD 1
           35  +#endif
           36  +
           37  +/*
           38  +** Candidate values for SorterThread.eWork
           39  +*/
           40  +#define SORTER_THREAD_SORT   1
           41  +#define SORTER_THREAD_TO_PMA 2
           42  +#define SORTER_THREAD_CONS   3
           43  +
           44  +/*
           45  +** Much of the work performed in this module to sort the list of records is 
           46  +** broken down into smaller units that may be peformed in parallel. In order
           47  +** to perform such a unit of work, an instance of the following structure
           48  +** is configured and passed to vdbeSorterThreadMain() - either directly by 
           49  +** the main thread or via a background thread.
           50  +**
           51  +** Exactly SQLITE_MAX_SORTER_THREAD instances of this structure are allocated
           52  +** as part of each VdbeSorter object. Instances are never allocated any other
           53  +** way.
           54  +**
           55  +** When a background thread is launched to perform work, SorterThread.bDone
           56  +** is set to 0 and the SorterThread.pThread variable set to point to the
           57  +** thread handle. SorterThread.bDone is set to 1 (to indicate to the main
           58  +** thread that joining SorterThread.pThread will not block) before the thread
           59  +** exits. SorterThread.pThread and bDone are always cleared after the 
           60  +** background thread has been joined.
           61  +**
           62  +** One object (specifically, VdbeSorter.aThread[SQLITE_MAX_SORTER_THREAD-1])
           63  +** is reserved for the foreground thread.
           64  +**
           65  +** The nature of the work performed is determined by SorterThread.eWork,
           66  +** as follows:
           67  +**
           68  +**   SORTER_THREAD_SORT:
           69  +**     Sort the linked list of records at SorterThread.pList.
           70  +**
           71  +**   SORTER_THREAD_TO_PMA:
           72  +**     Sort the linked list of records at SorterThread.pList, and write
           73  +**     the results to a new PMA in temp file SorterThread.pTemp1. Open
           74  +**     the temp file if it is not already open.
           75  +**
           76  +**   SORTER_THREAD_CONS:
           77  +**     Merge existing PMAs until SorterThread.nConsolidate or fewer
           78  +**     remain in temp file SorterThread.pTemp1.
           79  +*/
           80  +struct SorterThread {
           81  +  SQLiteThread *pThread;          /* Thread handle, or NULL */
           82  +  int bDone;                      /* Set to true by pThread when finished */
           83  +
           84  +  sqlite3_vfs *pVfs;              /* VFS used to open temporary files */
           85  +  KeyInfo *pKeyInfo;              /* How to compare records */
           86  +  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
           87  +  int pgsz;                       /* Main database page size */
           88  +
           89  +  u8 eWork;                       /* One of the SORTER_THREAD_* constants */
           90  +  int nConsolidate;               /* For THREAD_CONS, max final PMAs */
           91  +  SorterRecord *pList;            /* List of records for pThread to sort */
           92  +  int nInMemory;                  /* Expected size of PMA based on pList */
           93  +
           94  +  int nPMA;                       /* Number of PMAs currently in pTemp1 */
           95  +  i64 iTemp1Off;                  /* Offset to write to in pTemp1 */
           96  +  sqlite3_file *pTemp1;           /* File to write PMAs to, or NULL */
           97  +};
           98  +
    25     99   
    26    100   /*
    27    101   ** NOTES ON DATA STRUCTURE USED FOR N-WAY MERGES:
    28    102   **
    29    103   ** As keys are added to the sorter, they are written to disk in a series
    30    104   ** of sorted packed-memory-arrays (PMAs). The size of each PMA is roughly
    31    105   ** the same as the cache-size allowed for temporary databases. In order
................................................................................
    88    162   **
    89    163   **     aTree[] = { X, 0   0, 6    0, 3, 5, 6 }
    90    164   **
    91    165   ** In other words, each time we advance to the next sorter element, log2(N)
    92    166   ** key comparison operations are required, where N is the number of segments
    93    167   ** being merged (rounded up to the next power of 2).
    94    168   */
    95         -struct VdbeSorter {
    96         -  i64 iWriteOff;                  /* Current write offset within file pTemp1 */
    97         -  i64 iReadOff;                   /* Current read offset within file pTemp1 */
    98         -  int nInMemory;                  /* Current size of pRecord list as PMA */
          169  +struct SorterMerger {
    99    170     int nTree;                      /* Used size of aTree/aIter (power of 2) */
   100         -  int nPMA;                       /* Number of PMAs stored in pTemp1 */
          171  +  int *aTree;                     /* Current state of incremental merge */
          172  +  VdbeSorterIter *aIter;          /* Array of iterators to merge data from */
          173  +};
          174  +
          175  +/*
          176  +** Main sorter structure. A single instance of this is allocated for each 
          177  +** sorter cursor created by the VDBE.
          178  +*/
          179  +struct VdbeSorter {
          180  +  int nInMemory;                  /* Current size of pRecord list as PMA */
   101    181     int mnPmaSize;                  /* Minimum PMA size, in bytes */
   102    182     int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
   103         -  VdbeSorterIter *aIter;          /* Array of iterators to merge */
   104         -  int *aTree;                     /* Current state of incremental merge */
   105         -  sqlite3_file *pTemp1;           /* PMA file 1 */
          183  +  int bUsePMA;                    /* True if one or more PMAs created */
   106    184     SorterRecord *pRecord;          /* Head of in-memory record list */
   107         -  UnpackedRecord *pUnpacked;      /* Used to unpack keys */
          185  +  SorterMerger *pMerger;          /* For final merge of PMAs (by caller) */ 
          186  +  SorterThread aThread[SQLITE_MAX_SORTER_THREAD];
   108    187   };
   109    188   
   110    189   /*
   111    190   ** The following type is an iterator for a PMA. It caches the current key in 
   112    191   ** variables nKey/aKey. If the iterator is at EOF, pFile==0.
   113    192   */
   114    193   struct VdbeSorterIter {
................................................................................
   146    225   */
   147    226   struct SorterRecord {
   148    227     void *pVal;
   149    228     int nVal;
   150    229     SorterRecord *pNext;
   151    230   };
   152    231   
   153         -/* Minimum allowable value for the VdbeSorter.nWorking variable */
          232  +/* The minimum PMA size is set to this value multiplied by the database
          233  +** page size in bytes.  */
   154    234   #define SORTER_MIN_WORKING 10
   155    235   
   156    236   /* Maximum number of segments to merge in a single pass. */
   157    237   #define SORTER_MAX_MERGE_COUNT 16
   158    238   
   159    239   /*
   160    240   ** Free all memory belonging to the VdbeSorterIter object passed as the second
   161    241   ** argument. All structure fields are set to zero before returning.
   162    242   */
   163         -static void vdbeSorterIterZero(sqlite3 *db, VdbeSorterIter *pIter){
   164         -  sqlite3DbFree(db, pIter->aAlloc);
   165         -  sqlite3DbFree(db, pIter->aBuffer);
          243  +static void vdbeSorterIterZero(VdbeSorterIter *pIter){
          244  +  sqlite3_free(pIter->aAlloc);
          245  +  sqlite3_free(pIter->aBuffer);
   166    246     memset(pIter, 0, sizeof(VdbeSorterIter));
   167    247   }
   168    248   
   169    249   /*
   170    250   ** Read nByte bytes of data from the stream of data iterated by object p.
   171    251   ** If successful, set *ppOut to point to a buffer containing the data
   172    252   ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
   173    253   ** error code.
   174    254   **
   175    255   ** The buffer indicated by *ppOut may only be considered valid until the
   176    256   ** next call to this function.
   177    257   */
   178    258   static int vdbeSorterIterRead(
   179         -  sqlite3 *db,                    /* Database handle (for malloc) */
   180    259     VdbeSorterIter *p,              /* Iterator */
   181    260     int nByte,                      /* Bytes of data to read */
   182    261     u8 **ppOut                      /* OUT: Pointer to buffer containing data */
   183    262   ){
   184    263     int iBuf;                       /* Offset within buffer to read from */
   185    264     int nAvail;                     /* Bytes of data available in buffer */
   186    265     assert( p->aBuffer );
................................................................................
   218    297       /* The requested data is not all available in the in-memory buffer.
   219    298       ** In this case, allocate space at p->aAlloc[] to copy the requested
   220    299       ** range into. Then return a copy of pointer p->aAlloc to the caller.  */
   221    300       int nRem;                     /* Bytes remaining to copy */
   222    301   
   223    302       /* Extend the p->aAlloc[] allocation if required. */
   224    303       if( p->nAlloc<nByte ){
          304  +      u8 *aNew;
   225    305         int nNew = p->nAlloc*2;
   226    306         while( nByte>nNew ) nNew = nNew*2;
   227         -      p->aAlloc = sqlite3DbReallocOrFree(db, p->aAlloc, nNew);
   228         -      if( !p->aAlloc ) return SQLITE_NOMEM;
          307  +      aNew = sqlite3Realloc(p->aAlloc, nNew);
          308  +      if( !aNew ) return SQLITE_NOMEM;
   229    309         p->nAlloc = nNew;
          310  +      p->aAlloc = aNew;
   230    311       }
   231    312   
   232    313       /* Copy as much data as is available in the buffer into the start of
   233    314       ** p->aAlloc[].  */
   234    315       memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail);
   235    316       p->iReadOff += nAvail;
   236    317       nRem = nByte - nAvail;
................................................................................
   240    321       while( nRem>0 ){
   241    322         int rc;                     /* vdbeSorterIterRead() return code */
   242    323         int nCopy;                  /* Number of bytes to copy */
   243    324         u8 *aNext;                  /* Pointer to buffer to copy data from */
   244    325   
   245    326         nCopy = nRem;
   246    327         if( nRem>p->nBuffer ) nCopy = p->nBuffer;
   247         -      rc = vdbeSorterIterRead(db, p, nCopy, &aNext);
          328  +      rc = vdbeSorterIterRead(p, nCopy, &aNext);
   248    329         if( rc!=SQLITE_OK ) return rc;
   249    330         assert( aNext!=p->aAlloc );
   250    331         memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
   251    332         nRem -= nCopy;
   252    333       }
   253    334   
   254    335       *ppOut = p->aAlloc;
................................................................................
   257    338     return SQLITE_OK;
   258    339   }
   259    340   
   260    341   /*
   261    342   ** Read a varint from the stream of data accessed by p. Set *pnOut to
   262    343   ** the value read.
   263    344   */
   264         -static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){
          345  +static int vdbeSorterIterVarint(VdbeSorterIter *p, u64 *pnOut){
   265    346     int iBuf;
   266    347   
   267    348     iBuf = p->iReadOff % p->nBuffer;
   268    349     if( iBuf && (p->nBuffer-iBuf)>=9 ){
   269    350       p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
   270    351     }else{
   271    352       u8 aVarint[16], *a;
   272    353       int i = 0, rc;
   273    354       do{
   274         -      rc = vdbeSorterIterRead(db, p, 1, &a);
          355  +      rc = vdbeSorterIterRead(p, 1, &a);
   275    356         if( rc ) return rc;
   276    357         aVarint[(i++)&0xf] = a[0];
   277    358       }while( (a[0]&0x80)!=0 );
   278    359       sqlite3GetVarint(aVarint, pnOut);
   279    360     }
   280    361   
   281    362     return SQLITE_OK;
................................................................................
   282    363   }
   283    364   
   284    365   
   285    366   /*
   286    367   ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
   287    368   ** no error occurs, or an SQLite error code if one does.
   288    369   */
   289         -static int vdbeSorterIterNext(
   290         -  sqlite3 *db,                    /* Database handle (for sqlite3DbMalloc() ) */
   291         -  VdbeSorterIter *pIter           /* Iterator to advance */
   292         -){
          370  +static int vdbeSorterIterNext(VdbeSorterIter *pIter){
   293    371     int rc;                         /* Return Code */
   294    372     u64 nRec = 0;                   /* Size of record in bytes */
   295    373   
   296    374     if( pIter->iReadOff>=pIter->iEof ){
   297    375       /* This is an EOF condition */
   298         -    vdbeSorterIterZero(db, pIter);
          376  +    vdbeSorterIterZero(pIter);
   299    377       return SQLITE_OK;
   300    378     }
   301    379   
   302         -  rc = vdbeSorterIterVarint(db, pIter, &nRec);
          380  +  rc = vdbeSorterIterVarint(pIter, &nRec);
   303    381     if( rc==SQLITE_OK ){
   304    382       pIter->nKey = (int)nRec;
   305         -    rc = vdbeSorterIterRead(db, pIter, (int)nRec, &pIter->aKey);
          383  +    rc = vdbeSorterIterRead(pIter, (int)nRec, &pIter->aKey);
   306    384     }
   307    385   
   308    386     return rc;
   309    387   }
   310    388   
   311    389   /*
   312    390   ** Initialize iterator pIter to scan through the PMA stored in file pFile
   313    391   ** starting at offset iStart and ending at offset iEof-1. This function 
   314    392   ** leaves the iterator pointing to the first key in the PMA (or EOF if the 
   315    393   ** PMA is empty).
   316    394   */
   317    395   static int vdbeSorterIterInit(
   318         -  sqlite3 *db,                    /* Database handle */
   319         -  const VdbeSorter *pSorter,      /* Sorter object */
   320         -  i64 iStart,                     /* Start offset in pFile */
          396  +  SorterThread *pThread,          /* Thread context */
          397  +  i64 iStart,                     /* Start offset in pThread->pTemp1 */
   321    398     VdbeSorterIter *pIter,          /* Iterator to populate */
   322    399     i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
   323    400   ){
   324    401     int rc = SQLITE_OK;
   325         -  int nBuf;
          402  +  int nBuf = pThread->pgsz;
   326    403   
   327         -  nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
   328         -
   329         -  assert( pSorter->iWriteOff>iStart );
          404  +  assert( pThread->iTemp1Off>iStart );
   330    405     assert( pIter->aAlloc==0 );
   331    406     assert( pIter->aBuffer==0 );
   332         -  pIter->pFile = pSorter->pTemp1;
          407  +  pIter->pFile = pThread->pTemp1;
   333    408     pIter->iReadOff = iStart;
   334    409     pIter->nAlloc = 128;
   335         -  pIter->aAlloc = (u8 *)sqlite3DbMallocRaw(db, pIter->nAlloc);
          410  +  pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
   336    411     pIter->nBuffer = nBuf;
   337         -  pIter->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf);
          412  +  pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
   338    413   
   339    414     if( !pIter->aBuffer ){
   340    415       rc = SQLITE_NOMEM;
   341    416     }else{
   342    417       int iBuf;
   343    418   
   344    419       iBuf = iStart % nBuf;
   345    420       if( iBuf ){
   346    421         int nRead = nBuf - iBuf;
   347         -      if( (iStart + nRead) > pSorter->iWriteOff ){
   348         -        nRead = (int)(pSorter->iWriteOff - iStart);
          422  +      if( (iStart + nRead) > pThread->iTemp1Off ){
          423  +        nRead = (int)(pThread->iTemp1Off - iStart);
   349    424         }
   350    425         rc = sqlite3OsRead(
   351         -          pSorter->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
          426  +          pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
   352    427         );
   353    428         assert( rc!=SQLITE_IOERR_SHORT_READ );
   354    429       }
   355    430   
   356    431       if( rc==SQLITE_OK ){
   357         -      u64 nByte;                       /* Size of PMA in bytes */
   358         -      pIter->iEof = pSorter->iWriteOff;
   359         -      rc = vdbeSorterIterVarint(db, pIter, &nByte);
          432  +      u64 nByte;
          433  +      pIter->iEof = pThread->iTemp1Off;
          434  +      rc = vdbeSorterIterVarint(pIter, &nByte);
   360    435         pIter->iEof = pIter->iReadOff + nByte;
   361    436         *pnByte += nByte;
   362    437       }
   363    438     }
   364    439   
   365    440     if( rc==SQLITE_OK ){
   366         -    rc = vdbeSorterIterNext(db, pIter);
          441  +    rc = vdbeSorterIterNext(pIter);
   367    442     }
   368    443     return rc;
   369    444   }
   370    445   
   371    446   
   372    447   /*
   373    448   ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, 
................................................................................
   381    456   ** is true and key1 contains even a single NULL value, it is considered to
   382    457   ** be less than key2. Even if key2 also contains NULL values.
   383    458   **
   384    459   ** If pKey2 is passed a NULL pointer, then it is assumed that the pCsr->aSpace
   385    460   ** has been allocated and contains an unpacked record that is used as key2.
   386    461   */
   387    462   static void vdbeSorterCompare(
   388         -  const VdbeCursor *pCsr,         /* Cursor object (for pKeyInfo) */
          463  +  SorterThread *pThread,          /* Thread context (for pKeyInfo) */
   389    464     int nIgnore,                    /* Ignore the last nIgnore fields */
   390    465     const void *pKey1, int nKey1,   /* Left side of comparison */
   391    466     const void *pKey2, int nKey2,   /* Right side of comparison */
   392    467     int *pRes                       /* OUT: Result of comparison */
   393    468   ){
   394         -  KeyInfo *pKeyInfo = pCsr->pKeyInfo;
   395         -  VdbeSorter *pSorter = pCsr->pSorter;
   396         -  UnpackedRecord *r2 = pSorter->pUnpacked;
          469  +  KeyInfo *pKeyInfo = pThread->pKeyInfo;
          470  +  UnpackedRecord *r2 = pThread->pUnpacked;
   397    471     int i;
   398    472   
   399    473     if( pKey2 ){
   400    474       sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2);
   401    475     }
   402    476   
   403    477     if( nIgnore ){
................................................................................
   416    490   }
   417    491   
   418    492   /*
   419    493   ** This function is called to compare two iterator keys when merging 
   420    494   ** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
   421    495   ** value to recalculate.
   422    496   */
   423         -static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
   424         -  VdbeSorter *pSorter = pCsr->pSorter;
          497  +static int vdbeSorterDoCompare(
          498  +  SorterThread *pThread, 
          499  +  SorterMerger *pMerger, 
          500  +  int iOut
          501  +){
   425    502     int i1;
   426    503     int i2;
   427    504     int iRes;
   428    505     VdbeSorterIter *p1;
   429    506     VdbeSorterIter *p2;
   430    507   
   431         -  assert( iOut<pSorter->nTree && iOut>0 );
          508  +  assert( iOut<pMerger->nTree && iOut>0 );
   432    509   
   433         -  if( iOut>=(pSorter->nTree/2) ){
   434         -    i1 = (iOut - pSorter->nTree/2) * 2;
          510  +  if( iOut>=(pMerger->nTree/2) ){
          511  +    i1 = (iOut - pMerger->nTree/2) * 2;
   435    512       i2 = i1 + 1;
   436    513     }else{
   437         -    i1 = pSorter->aTree[iOut*2];
   438         -    i2 = pSorter->aTree[iOut*2+1];
          514  +    i1 = pMerger->aTree[iOut*2];
          515  +    i2 = pMerger->aTree[iOut*2+1];
   439    516     }
   440    517   
   441         -  p1 = &pSorter->aIter[i1];
   442         -  p2 = &pSorter->aIter[i2];
          518  +  p1 = &pMerger->aIter[i1];
          519  +  p2 = &pMerger->aIter[i2];
   443    520   
   444    521     if( p1->pFile==0 ){
   445    522       iRes = i2;
   446    523     }else if( p2->pFile==0 ){
   447    524       iRes = i1;
   448    525     }else{
   449    526       int res;
   450         -    assert( pCsr->pSorter->pUnpacked!=0 );  /* allocated in vdbeSorterMerge() */
          527  +    assert( pThread->pUnpacked!=0 );  /* allocated in vdbeSorterMerge() */
   451    528       vdbeSorterCompare(
   452         -        pCsr, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
          529  +        pThread, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
   453    530       );
   454    531       if( res<=0 ){
   455    532         iRes = i1;
   456    533       }else{
   457    534         iRes = i2;
   458    535       }
   459    536     }
   460    537   
   461         -  pSorter->aTree[iOut] = iRes;
          538  +  pMerger->aTree[iOut] = iRes;
   462    539     return SQLITE_OK;
   463    540   }
   464    541   
   465    542   /*
   466    543   ** Initialize the temporary index cursor just opened as a sorter cursor.
   467    544   */
   468    545   int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){
   469    546     int pgsz;                       /* Page size of main database */
          547  +  int i;                          /* Used to iterate through aThread[] */
   470    548     int mxCache;                    /* Cache size */
   471    549     VdbeSorter *pSorter;            /* The new sorter */
   472         -  char *d;                        /* Dummy */
          550  +  KeyInfo *pKeyInfo;              /* Copy of pCsr->pKeyInfo with db==0 */
          551  +  int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */
   473    552   
   474    553     assert( pCsr->pKeyInfo && pCsr->pBt==0 );
   475         -  pCsr->pSorter = pSorter = sqlite3DbMallocZero(db, sizeof(VdbeSorter));
          554  +  szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
          555  +  pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sizeof(VdbeSorter)+szKeyInfo);
          556  +  pCsr->pSorter = pSorter;
   476    557     if( pSorter==0 ){
   477    558       return SQLITE_NOMEM;
   478    559     }
   479         -  
   480         -  pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pCsr->pKeyInfo, 0, 0, &d);
   481         -  if( pSorter->pUnpacked==0 ) return SQLITE_NOMEM;
   482         -  assert( pSorter->pUnpacked==(UnpackedRecord *)d );
          560  +  pKeyInfo = (KeyInfo*)&pSorter[1];
          561  +  memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
          562  +  pKeyInfo->db = 0;
          563  +  pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
          564  +
          565  +  for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
          566  +    SorterThread *pThread = &pSorter->aThread[i];
          567  +    pThread->pKeyInfo = pKeyInfo;
          568  +    pThread->pVfs = db->pVfs;
          569  +    pThread->pgsz = pgsz;
          570  +  }
   483    571   
   484    572     if( !sqlite3TempInMemory(db) ){
   485         -    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
   486    573       pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
   487    574       mxCache = db->aDb[0].pSchema->cache_size;
   488    575       if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
   489    576       pSorter->mxPmaSize = mxCache * pgsz;
   490    577     }
   491    578   
   492    579     return SQLITE_OK;
................................................................................
   499    586     SorterRecord *p;
   500    587     SorterRecord *pNext;
   501    588     for(p=pRecord; p; p=pNext){
   502    589       pNext = p->pNext;
   503    590       sqlite3DbFree(db, p);
   504    591     }
   505    592   }
          593  +
          594  +/*
          595  +** Free all resources owned by the object indicated by argument pThread. All 
          596  +** fields of *pThread are zeroed before returning.
          597  +*/
          598  +static void vdbeSorterThreadCleanup(sqlite3 *db, SorterThread *pThread){
          599  +  sqlite3DbFree(db, pThread->pUnpacked);
          600  +  vdbeSorterRecordFree(0, pThread->pList);
          601  +  if( pThread->pTemp1 ){
          602  +    sqlite3OsCloseFree(pThread->pTemp1);
          603  +  }
          604  +  memset(pThread, 0, sizeof(SorterThread));
          605  +}
          606  +
          607  +/*
          608  +** Join all threads.  
          609  +*/
          610  +static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
          611  +  int rc = rcin;
          612  +  int i;
          613  +  for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
          614  +    SorterThread *pThread = &pSorter->aThread[i];
          615  +    if( pThread->pThread ){
          616  +      void *pRet;
          617  +      int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet);
          618  +      pThread->pThread = 0;
          619  +      pThread->bDone = 0;
          620  +      if( rc==SQLITE_OK ) rc = rc2;
          621  +      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
          622  +    }
          623  +  }
          624  +  return rc;
          625  +}
          626  +
          627  +/*
          628  +** Allocate a new SorterMerger object with space for nIter iterators.
          629  +*/
          630  +static SorterMerger *vdbeSorterMergerNew(int nIter){
          631  +  int N = 2;                      /* Smallest power of two >= nIter */
          632  +  int nByte;                      /* Total bytes of space to allocate */
          633  +  SorterMerger *pNew;             /* Pointer to allocated object to return */
          634  +
          635  +  assert( nIter<=SORTER_MAX_MERGE_COUNT );
          636  +  while( N<nIter ) N += N;
          637  +  nByte = sizeof(SorterMerger) + N * (sizeof(int) + sizeof(VdbeSorterIter));
          638  +
          639  +  pNew = (SorterMerger*)sqlite3MallocZero(nByte);
          640  +  if( pNew ){
          641  +    pNew->nTree = N;
          642  +    pNew->aIter = (VdbeSorterIter*)&pNew[1];
          643  +    pNew->aTree = (int*)&pNew->aIter[N];
          644  +  }
          645  +
          646  +  return pNew;
          647  +}
          648  +
          649  +/*
          650  +** Free the SorterMerger object passed as the only argument.
          651  +*/
          652  +static void vdbeSorterMergerFree(SorterMerger *pMerger){
          653  +  if( pMerger ){
          654  +    int i;
          655  +    for(i=0; i<pMerger->nTree; i++){
          656  +      vdbeSorterIterZero(&pMerger->aIter[i]);
          657  +    }
          658  +    sqlite3_free(pMerger);
          659  +  }
          660  +}
   506    661   
   507    662   /*
   508    663   ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
   509    664   */
   510    665   void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
   511    666     VdbeSorter *pSorter = pCsr->pSorter;
   512    667     if( pSorter ){
   513         -    if( pSorter->aIter ){
   514         -      int i;
   515         -      for(i=0; i<pSorter->nTree; i++){
   516         -        vdbeSorterIterZero(db, &pSorter->aIter[i]);
   517         -      }
   518         -      sqlite3DbFree(db, pSorter->aIter);
          668  +    int i;
          669  +    vdbeSorterJoinAll(pSorter, SQLITE_OK);
          670  +    for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
          671  +      SorterThread *pThread = &pSorter->aThread[i];
          672  +      vdbeSorterThreadCleanup(db, pThread);
   519    673       }
   520         -    if( pSorter->pTemp1 ){
   521         -      sqlite3OsCloseFree(pSorter->pTemp1);
   522         -    }
   523         -    vdbeSorterRecordFree(db, pSorter->pRecord);
   524         -    sqlite3DbFree(db, pSorter->pUnpacked);
          674  +
          675  +    vdbeSorterRecordFree(0, pSorter->pRecord);
          676  +    vdbeSorterMergerFree(pSorter->pMerger);
   525    677       sqlite3DbFree(db, pSorter);
   526    678       pCsr->pSorter = 0;
   527    679     }
   528    680   }
   529    681   
   530    682   /*
   531    683   ** Allocate space for a file-handle and open a temporary file. If successful,
   532    684   ** set *ppFile to point to the malloc'd file-handle and return SQLITE_OK.
   533    685   ** Otherwise, set *ppFile to 0 and return an SQLite error code.
   534    686   */
   535         -static int vdbeSorterOpenTempFile(sqlite3 *db, sqlite3_file **ppFile){
          687  +static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){
   536    688     int dummy;
   537         -  return sqlite3OsOpenMalloc(db->pVfs, 0, ppFile,
          689  +  return sqlite3OsOpenMalloc(pVfs, 0, ppFile,
   538    690         SQLITE_OPEN_TEMP_JOURNAL |
   539    691         SQLITE_OPEN_READWRITE    | SQLITE_OPEN_CREATE |
   540    692         SQLITE_OPEN_EXCLUSIVE    | SQLITE_OPEN_DELETEONCLOSE, &dummy
   541    693     );
   542    694   }
   543    695   
   544    696   /*
   545    697   ** Merge the two sorted lists p1 and p2 into a single list.
   546    698   ** Set *ppOut to the head of the new list.
   547    699   */
   548    700   static void vdbeSorterMerge(
   549         -  const VdbeCursor *pCsr,         /* For pKeyInfo */
          701  +  SorterThread *pThread,          /* Calling thread context */
   550    702     SorterRecord *p1,               /* First list to merge */
   551    703     SorterRecord *p2,               /* Second list to merge */
   552    704     SorterRecord **ppOut            /* OUT: Head of merged list */
   553    705   ){
   554    706     SorterRecord *pFinal = 0;
   555    707     SorterRecord **pp = &pFinal;
   556    708     void *pVal2 = p2 ? p2->pVal : 0;
   557    709   
   558    710     while( p1 && p2 ){
   559    711       int res;
   560         -    vdbeSorterCompare(pCsr, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
          712  +    vdbeSorterCompare(pThread, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
   561    713       if( res<=0 ){
   562    714         *pp = p1;
   563    715         pp = &p1->pNext;
   564    716         p1 = p1->pNext;
   565    717         pVal2 = 0;
   566    718       }else{
   567    719         *pp = p2;
................................................................................
   572    724       }
   573    725     }
   574    726     *pp = p1 ? p1 : p2;
   575    727     *ppOut = pFinal;
   576    728   }
   577    729   
   578    730   /*
   579         -** Sort the linked list of records headed at pCsr->pRecord. Return SQLITE_OK
   580         -** if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if an error
   581         -** occurs.
          731  +** Sort the linked list of records headed at pThread->pList. Return 
          732  +** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
          733  +** an error occurs.
   582    734   */
   583         -static int vdbeSorterSort(const VdbeCursor *pCsr){
          735  +static int vdbeSorterSort(SorterThread *pThread){
   584    736     int i;
   585    737     SorterRecord **aSlot;
   586    738     SorterRecord *p;
   587         -  VdbeSorter *pSorter = pCsr->pSorter;
   588    739   
   589    740     aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
   590    741     if( !aSlot ){
   591    742       return SQLITE_NOMEM;
   592    743     }
   593    744   
   594         -  p = pSorter->pRecord;
          745  +  p = pThread->pList;
   595    746     while( p ){
   596    747       SorterRecord *pNext = p->pNext;
   597    748       p->pNext = 0;
   598    749       for(i=0; aSlot[i]; i++){
   599         -      vdbeSorterMerge(pCsr, p, aSlot[i], &p);
          750  +      vdbeSorterMerge(pThread, p, aSlot[i], &p);
   600    751         aSlot[i] = 0;
   601    752       }
   602    753       aSlot[i] = p;
   603    754       p = pNext;
   604    755     }
   605    756   
   606    757     p = 0;
   607    758     for(i=0; i<64; i++){
   608         -    vdbeSorterMerge(pCsr, p, aSlot[i], &p);
          759  +    vdbeSorterMerge(pThread, p, aSlot[i], &p);
   609    760     }
   610         -  pSorter->pRecord = p;
          761  +  pThread->pList = p;
   611    762   
   612    763     sqlite3_free(aSlot);
   613    764     return SQLITE_OK;
   614    765   }
   615    766   
   616    767   /*
   617    768   ** Initialize a file-writer object.
   618    769   */
   619    770   static void fileWriterInit(
   620         -  sqlite3 *db,                    /* Database (for malloc) */
   621    771     sqlite3_file *pFile,            /* File to write to */
   622    772     FileWriter *p,                  /* Object to populate */
          773  +  int nBuf,                       /* Buffer size */
   623    774     i64 iStart                      /* Offset of pFile to begin writing at */
   624    775   ){
   625         -  int nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
   626         -
   627    776     memset(p, 0, sizeof(FileWriter));
   628         -  p->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf);
          777  +  p->aBuffer = (u8*)sqlite3Malloc(nBuf);
   629    778     if( !p->aBuffer ){
   630    779       p->eFWErr = SQLITE_NOMEM;
   631    780     }else{
   632    781       p->iBufEnd = p->iBufStart = (iStart % nBuf);
   633    782       p->iWriteOff = iStart - p->iBufStart;
   634    783       p->nBuffer = nBuf;
   635    784       p->pFile = pFile;
................................................................................
   669    818   ** The results of using the file-writer after this call are undefined.
   670    819   ** Return SQLITE_OK if flushing the buffered data succeeds or is not 
   671    820   ** required. Otherwise, return an SQLite error code.
   672    821   **
   673    822   ** Before returning, set *piEof to the offset immediately following the
   674    823   ** last byte written to the file.
   675    824   */
   676         -static int fileWriterFinish(sqlite3 *db, FileWriter *p, i64 *piEof){
          825  +static int fileWriterFinish(FileWriter *p, i64 *piEof){
   677    826     int rc;
   678    827     if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
   679    828       p->eFWErr = sqlite3OsWrite(p->pFile, 
   680    829           &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, 
   681    830           p->iWriteOff + p->iBufStart
   682    831       );
   683    832     }
   684    833     *piEof = (p->iWriteOff + p->iBufEnd);
   685         -  sqlite3DbFree(db, p->aBuffer);
          834  +  sqlite3_free(p->aBuffer);
   686    835     rc = p->eFWErr;
   687    836     memset(p, 0, sizeof(FileWriter));
   688    837     return rc;
   689    838   }
   690    839   
   691    840   /*
   692    841   ** Write value iVal encoded as a varint to the file-write object. Return 
................................................................................
   708    857   **     * A varint. This varint contains the total number of bytes of content
   709    858   **       in the PMA (not including the varint itself).
   710    859   **
   711    860   **     * One or more records packed end-to-end in order of ascending keys. 
   712    861   **       Each record consists of a varint followed by a blob of data (the 
   713    862   **       key). The varint is the number of bytes in the blob of data.
   714    863   */
   715         -static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){
          864  +static int vdbeSorterListToPMA(SorterThread *pThread){
   716    865     int rc = SQLITE_OK;             /* Return code */
   717         -  VdbeSorter *pSorter = pCsr->pSorter;
   718         -  FileWriter writer;
          866  +  FileWriter writer;              /* Object used to write to the file */
   719    867   
   720    868     memset(&writer, 0, sizeof(FileWriter));
   721         -
   722         -  if( pSorter->nInMemory==0 ){
   723         -    assert( pSorter->pRecord==0 );
   724         -    return rc;
   725         -  }
   726         -
   727         -  rc = vdbeSorterSort(pCsr);
          869  +  assert( pThread->nInMemory>0 );
   728    870   
   729    871     /* If the first temporary PMA file has not been opened, open it now. */
   730         -  if( rc==SQLITE_OK && pSorter->pTemp1==0 ){
   731         -    rc = vdbeSorterOpenTempFile(db, &pSorter->pTemp1);
   732         -    assert( rc!=SQLITE_OK || pSorter->pTemp1 );
   733         -    assert( pSorter->iWriteOff==0 );
   734         -    assert( pSorter->nPMA==0 );
          872  +  if( pThread->pTemp1==0 ){
          873  +    rc = vdbeSorterOpenTempFile(pThread->pVfs, &pThread->pTemp1);
          874  +    assert( rc!=SQLITE_OK || pThread->pTemp1 );
          875  +    assert( pThread->iTemp1Off==0 );
          876  +    assert( pThread->nPMA==0 );
   735    877     }
   736    878   
   737    879     if( rc==SQLITE_OK ){
   738    880       SorterRecord *p;
   739    881       SorterRecord *pNext = 0;
   740    882   
   741         -    fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
   742         -    pSorter->nPMA++;
   743         -    fileWriterWriteVarint(&writer, pSorter->nInMemory);
   744         -    for(p=pSorter->pRecord; p; p=pNext){
          883  +    fileWriterInit(pThread->pTemp1, &writer, pThread->pgsz, pThread->iTemp1Off);
          884  +    pThread->nPMA++;
          885  +    fileWriterWriteVarint(&writer, pThread->nInMemory);
          886  +    for(p=pThread->pList; p; p=pNext){
   745    887         pNext = p->pNext;
   746    888         fileWriterWriteVarint(&writer, p->nVal);
   747    889         fileWriterWrite(&writer, p->pVal, p->nVal);
   748         -      sqlite3DbFree(db, p);
          890  +      sqlite3_free(p);
          891  +    }
          892  +    pThread->pList = p;
          893  +    rc = fileWriterFinish(&writer, &pThread->iTemp1Off);
          894  +  }
          895  +
          896  +  return rc;
          897  +}
          898  +
          899  +/*
          900  +** Advance the SorterMerger iterator passed as the second argument to
          901  +** the next entry. Set *pbEof to true if this means the iterator has 
          902  +** reached EOF.
          903  +**
          904  +** Return SQLITE_OK if successful or an error code if an error occurs.
          905  +*/
          906  +static int vdbeSorterNext(
          907  +  SorterThread *pThread, 
          908  +  SorterMerger *pMerger, 
          909  +  int *pbEof
          910  +){
          911  +  int rc;
          912  +  int iPrev = pMerger->aTree[1];/* Index of iterator to advance */
          913  +  int i;                        /* Index of aTree[] to recalculate */
          914  +
          915  +  /* Advance the current iterator */
          916  +  rc = vdbeSorterIterNext(&pMerger->aIter[iPrev]);
          917  +
          918  +  /* Update contents of aTree[] */
          919  +  for(i=(pMerger->nTree+iPrev)/2; rc==SQLITE_OK && i>0; i=i/2){
          920  +    rc = vdbeSorterDoCompare(pThread, pMerger, i);
          921  +  }
          922  +
          923  +  *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0);
          924  +  return rc;
          925  +}
          926  +
          927  +/*
          928  +** The main routine for sorter-thread operations.
          929  +*/
          930  +static void *vdbeSorterThreadMain(void *pCtx){
          931  +  int rc = SQLITE_OK;
          932  +  SorterThread *pThread = (SorterThread*)pCtx;
          933  +
          934  +  assert( pThread->eWork==SORTER_THREAD_SORT
          935  +       || pThread->eWork==SORTER_THREAD_TO_PMA
          936  +       || pThread->eWork==SORTER_THREAD_CONS
          937  +  );
          938  +  assert( pThread->bDone==0 );
          939  +
          940  +  if( pThread->pUnpacked==0 ){
          941  +    char *pFree;
          942  +    pThread->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
          943  +        pThread->pKeyInfo, 0, 0, &pFree
          944  +    );
          945  +    assert( pThread->pUnpacked==(UnpackedRecord*)pFree );
          946  +    if( pFree==0 ){
          947  +      rc = SQLITE_NOMEM;
          948  +      goto thread_out;
          949  +    }
          950  +  }
          951  +
          952  +  if( pThread->eWork==SORTER_THREAD_CONS ){
          953  +    assert( pThread->pList==0 );
          954  +    while( pThread->nPMA>pThread->nConsolidate && rc==SQLITE_OK ){
          955  +      int nIter = MIN(pThread->nPMA, SORTER_MAX_MERGE_COUNT);
          956  +      sqlite3_file *pTemp2 = 0;     /* Second temp file to use */
          957  +      SorterMerger *pMerger;        /* Object for reading/merging PMA data */
          958  +      i64 iReadOff = 0;             /* Offset in pTemp1 to read from */
          959  +      i64 iWriteOff = 0;            /* Offset in pTemp2 to write to */
          960  +      int i;
          961  +      
          962  +      /* Allocate a merger object to merge PMAs together. */
          963  +      pMerger = vdbeSorterMergerNew(nIter);
          964  +      if( pMerger==0 ){
          965  +        rc = SQLITE_NOMEM;
          966  +        break;
          967  +      }
          968  +
          969  +      /* Open a second temp file to write merged data to */
          970  +      rc = vdbeSorterOpenTempFile(pThread->pVfs, &pTemp2);
          971  +      if( rc!=SQLITE_OK ){
          972  +        vdbeSorterMergerFree(pMerger);
          973  +        break;
          974  +      }
          975  +
          976  +      /* This loop runs once for each output PMA. Each output PMA is made
          977  +      ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */
          978  +      for(i=0; i<pThread->nPMA; i+=SORTER_MAX_MERGE_COUNT){
          979  +        FileWriter writer;        /* Object for writing data to pTemp2 */
          980  +        i64 nOut = 0;             /* Bytes of data in output PMA */
          981  +        int bEof = 0;
          982  +        int rc2;
          983  +
          984  +        /* Configure the merger object to read and merge data from the next 
          985  +        ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs,
          986  +        ** if that is fewer). */
          987  +        int iIter;
          988  +        for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
          989  +          VdbeSorterIter *pIter = &pMerger->aIter[iIter];
          990  +          rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nOut);
          991  +          iReadOff = pIter->iEof;
          992  +          if( iReadOff>=pThread->iTemp1Off || rc!=SQLITE_OK ) break;
          993  +        }
          994  +        for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
          995  +          rc = vdbeSorterDoCompare(pThread, pMerger, iIter);
          996  +        }
          997  +
          998  +        fileWriterInit(pTemp2, &writer, pThread->pgsz, iWriteOff);
          999  +        fileWriterWriteVarint(&writer, nOut);
         1000  +        while( rc==SQLITE_OK && bEof==0 ){
         1001  +          VdbeSorterIter *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
         1002  +          assert( pIter->pFile!=0 );        /* pIter is not at EOF */
         1003  +          fileWriterWriteVarint(&writer, pIter->nKey);
         1004  +          fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
         1005  +          rc = vdbeSorterNext(pThread, pMerger, &bEof);
         1006  +        }
         1007  +        rc2 = fileWriterFinish(&writer, &iWriteOff);
         1008  +        if( rc==SQLITE_OK ) rc = rc2;
         1009  +      }
         1010  +
         1011  +      vdbeSorterMergerFree(pMerger);
         1012  +      sqlite3OsCloseFree(pThread->pTemp1);
         1013  +      pThread->pTemp1 = pTemp2;
         1014  +      pThread->nPMA = (i / SORTER_MAX_MERGE_COUNT);
         1015  +      pThread->iTemp1Off = iWriteOff;
         1016  +    }
         1017  +  }else{
         1018  +    /* Sort the pThread->pList list */
         1019  +    rc = vdbeSorterSort(pThread);
         1020  +
         1021  +    /* If required, write the list out to a PMA. */
         1022  +    if( rc==SQLITE_OK && pThread->eWork==SORTER_THREAD_TO_PMA ){
         1023  +#ifdef SQLITE_DEBUG
         1024  +      i64 nExpect = pThread->nInMemory
         1025  +        + sqlite3VarintLen(pThread->nInMemory)
         1026  +        + pThread->iTemp1Off;
         1027  +#endif
         1028  +      rc = vdbeSorterListToPMA(pThread);
         1029  +      assert( rc!=SQLITE_OK || (nExpect==pThread->iTemp1Off) );
         1030  +    }
         1031  +  }
         1032  +
         1033  + thread_out:
         1034  +  pThread->bDone = 1;
         1035  +  return SQLITE_INT_TO_PTR(rc);
         1036  +}
         1037  +
         1038  +/*
         1039  +** Run the activity scheduled by the object passed as the only argument
         1040  +** in the current thread.
         1041  +*/
         1042  +static int vdbeSorterRunThread(SorterThread *pThread){
         1043  +  int rc = SQLITE_PTR_TO_INT( vdbeSorterThreadMain((void*)pThread) );
         1044  +  assert( pThread->bDone );
         1045  +  pThread->bDone = 0;
         1046  +  return rc;
         1047  +}
         1048  +
         1049  +/*
         1050  +** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
         1051  +** using a background thread.
         1052  +**
         1053  +** If argument bFg is non-zero, the operation always uses the calling thread.
         1054  +*/
         1055  +static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
         1056  +  VdbeSorter *pSorter = pCsr->pSorter;
         1057  +  int rc = SQLITE_OK;
         1058  +  int i;
         1059  +  SorterThread *pThread;        /* Thread context used to create new PMA */
         1060  +
         1061  +  pSorter->bUsePMA = 1;
         1062  +  for(i=0; ALWAYS( i<SQLITE_MAX_SORTER_THREAD ); i++){
         1063  +    pThread = &pSorter->aThread[i];
         1064  +    if( pThread->bDone ){
         1065  +      void *pRet;
         1066  +      assert( pThread->pThread );
         1067  +      rc = sqlite3ThreadJoin(pThread->pThread, &pRet);
         1068  +      pThread->pThread = 0;
         1069  +      pThread->bDone = 0;
         1070  +      if( rc==SQLITE_OK ){
         1071  +        rc = SQLITE_PTR_TO_INT(pRet);
         1072  +      }
   749   1073       }
   750         -    pSorter->pRecord = p;
   751         -    rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
         1074  +    if( pThread->pThread==0 ) break;
         1075  +  }
         1076  +
         1077  +  if( rc==SQLITE_OK ){
         1078  +    assert( pThread->pThread==0 && pThread->bDone==0 );
         1079  +    pThread->eWork = SORTER_THREAD_TO_PMA;
         1080  +    pThread->pList = pSorter->pRecord;
         1081  +    pThread->nInMemory = pSorter->nInMemory;
         1082  +    pSorter->nInMemory = 0;
         1083  +    pSorter->pRecord = 0;
         1084  +
         1085  +    if( bFg || i<(SQLITE_MAX_SORTER_THREAD-1) ){
         1086  +      void *pCtx = (void*)pThread;
         1087  +      rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx);
         1088  +    }else{
         1089  +      /* Use the foreground thread for this operation */
         1090  +      rc = vdbeSorterRunThread(pThread);
         1091  +    }
   752   1092     }
   753   1093   
   754   1094     return rc;
   755   1095   }
   756   1096   
   757   1097   /*
   758   1098   ** Add a record to the sorter.
................................................................................
   765   1105     VdbeSorter *pSorter = pCsr->pSorter;
   766   1106     int rc = SQLITE_OK;             /* Return Code */
   767   1107     SorterRecord *pNew;             /* New list element */
   768   1108   
   769   1109     assert( pSorter );
   770   1110     pSorter->nInMemory += sqlite3VarintLen(pVal->n) + pVal->n;
   771   1111   
   772         -  pNew = (SorterRecord *)sqlite3DbMallocRaw(db, pVal->n + sizeof(SorterRecord));
         1112  +  pNew = (SorterRecord *)sqlite3Malloc(pVal->n + sizeof(SorterRecord));
   773   1113     if( pNew==0 ){
   774   1114       rc = SQLITE_NOMEM;
   775   1115     }else{
   776   1116       pNew->pVal = (void *)&pNew[1];
   777   1117       memcpy(pNew->pVal, pVal->z, pVal->n);
   778   1118       pNew->nVal = pVal->n;
   779   1119       pNew->pNext = pSorter->pRecord;
................................................................................
   789   1129     **   * The total memory allocated for the in-memory list is greater 
   790   1130     **     than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
   791   1131     */
   792   1132     if( rc==SQLITE_OK && pSorter->mxPmaSize>0 && (
   793   1133           (pSorter->nInMemory>pSorter->mxPmaSize)
   794   1134        || (pSorter->nInMemory>pSorter->mnPmaSize && sqlite3HeapNearlyFull())
   795   1135     )){
   796         -#ifdef SQLITE_DEBUG
   797         -    i64 nExpect = pSorter->iWriteOff
   798         -                + sqlite3VarintLen(pSorter->nInMemory)
   799         -                + pSorter->nInMemory;
   800         -#endif
   801         -    rc = vdbeSorterListToPMA(db, pCsr);
   802         -    pSorter->nInMemory = 0;
   803         -    assert( rc!=SQLITE_OK || (nExpect==pSorter->iWriteOff) );
         1136  +    rc = vdbeSorterFlushPMA(db, pCsr, 0);
   804   1137     }
   805   1138   
   806   1139     return rc;
   807   1140   }
   808   1141   
   809   1142   /*
   810         -** Helper function for sqlite3VdbeSorterRewind(). 
         1143  +** Return the total number of PMAs in all temporary files.
   811   1144   */
   812         -static int vdbeSorterInitMerge(
   813         -  sqlite3 *db,                    /* Database handle */
   814         -  const VdbeCursor *pCsr,         /* Cursor handle for this sorter */
   815         -  i64 *pnByte                     /* Sum of bytes in all opened PMAs */
   816         -){
   817         -  VdbeSorter *pSorter = pCsr->pSorter;
   818         -  int rc = SQLITE_OK;             /* Return code */
   819         -  int i;                          /* Used to iterator through aIter[] */
   820         -  i64 nByte = 0;                  /* Total bytes in all opened PMAs */
   821         -
   822         -  /* Initialize the iterators. */
   823         -  for(i=0; i<SORTER_MAX_MERGE_COUNT; i++){
   824         -    VdbeSorterIter *pIter = &pSorter->aIter[i];
   825         -    rc = vdbeSorterIterInit(db, pSorter, pSorter->iReadOff, pIter, &nByte);
   826         -    pSorter->iReadOff = pIter->iEof;
   827         -    assert( rc!=SQLITE_OK || pSorter->iReadOff<=pSorter->iWriteOff );
   828         -    if( rc!=SQLITE_OK || pSorter->iReadOff>=pSorter->iWriteOff ) break;
         1145  +static int vdbeSorterCountPMA(VdbeSorter *pSorter){
         1146  +  int nPMA = 0;
         1147  +  int i;
         1148  +  for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
         1149  +    nPMA += pSorter->aThread[i].nPMA;
   829   1150     }
   830         -
   831         -  /* Initialize the aTree[] array. */
   832         -  for(i=pSorter->nTree-1; rc==SQLITE_OK && i>0; i--){
   833         -    rc = vdbeSorterDoCompare(pCsr, i);
   834         -  }
   835         -
   836         -  *pnByte = nByte;
   837         -  return rc;
         1151  +  return nPMA;
   838   1152   }
   839   1153   
   840   1154   /*
   841   1155   ** Once the sorter has been populated, this function is called to prepare
   842   1156   ** for iterating through its contents in sorted order.
   843   1157   */
   844   1158   int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
   845   1159     VdbeSorter *pSorter = pCsr->pSorter;
   846         -  int rc;                         /* Return code */
   847         -  sqlite3_file *pTemp2 = 0;       /* Second temp file to use */
   848         -  i64 iWrite2 = 0;                /* Write offset for pTemp2 */
   849         -  int nIter;                      /* Number of iterators used */
   850         -  int nByte;                      /* Bytes of space required for aIter/aTree */
   851         -  int N = 2;                      /* Power of 2 >= nIter */
         1160  +  int rc = SQLITE_OK;             /* Return code */
   852   1161   
   853   1162     assert( pSorter );
   854   1163   
   855   1164     /* If no data has been written to disk, then do not do so now. Instead,
   856   1165     ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
   857   1166     ** from the in-memory list.  */
   858         -  if( pSorter->nPMA==0 ){
   859         -    *pbEof = !pSorter->pRecord;
   860         -    assert( pSorter->aTree==0 );
   861         -    return vdbeSorterSort(pCsr);
         1167  +  if( pSorter->bUsePMA==0 ){
         1168  +    if( pSorter->pRecord ){
         1169  +      SorterThread *pThread = &pSorter->aThread[0];
         1170  +      *pbEof = 0;
         1171  +      pThread->pList = pSorter->pRecord;
         1172  +      pThread->eWork = SORTER_THREAD_SORT;
         1173  +      rc = vdbeSorterRunThread(pThread);
         1174  +      pSorter->pRecord = pThread->pList;
         1175  +      pThread->pList = 0;
         1176  +    }else{
         1177  +      *pbEof = 1;
         1178  +    }
         1179  +    return rc;
   862   1180     }
   863   1181   
   864   1182     /* Write the current in-memory list to a PMA. */
   865         -  rc = vdbeSorterListToPMA(db, pCsr);
   866         -  if( rc!=SQLITE_OK ) return rc;
         1183  +  if( pSorter->pRecord ){
         1184  +    rc = vdbeSorterFlushPMA(db, pCsr, 1);
         1185  +  }
   867   1186   
   868         -  /* Allocate space for aIter[] and aTree[]. */
   869         -  nIter = pSorter->nPMA;
   870         -  if( nIter>SORTER_MAX_MERGE_COUNT ) nIter = SORTER_MAX_MERGE_COUNT;
   871         -  assert( nIter>0 );
   872         -  while( N<nIter ) N += N;
   873         -  nByte = N * (sizeof(int) + sizeof(VdbeSorterIter));
   874         -  pSorter->aIter = (VdbeSorterIter *)sqlite3DbMallocZero(db, nByte);
   875         -  if( !pSorter->aIter ) return SQLITE_NOMEM;
   876         -  pSorter->aTree = (int *)&pSorter->aIter[N];
   877         -  pSorter->nTree = N;
         1187  +  /* Join all threads */
         1188  +  rc = vdbeSorterJoinAll(pSorter, rc);
         1189  +
         1190  +  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
         1191  +  ** some of them together so that this is no longer the case. */
         1192  +  assert( SORTER_MAX_MERGE_COUNT>=SQLITE_MAX_SORTER_THREAD );
         1193  +  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
         1194  +    int i;
         1195  +    for(i=0; rc==SQLITE_OK && i<SQLITE_MAX_SORTER_THREAD; i++){
         1196  +      SorterThread *pThread = &pSorter->aThread[i];
         1197  +      if( pThread->pTemp1 ){
         1198  +        pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/SQLITE_MAX_SORTER_THREAD;
         1199  +        pThread->eWork = SORTER_THREAD_CONS;
         1200  +
         1201  +        if( i<(SQLITE_MAX_SORTER_THREAD-1) ){
         1202  +          void *pCtx = (void*)pThread;
         1203  +          rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx);
         1204  +        }else{
         1205  +          rc = vdbeSorterRunThread(pThread);
         1206  +        }
         1207  +      }
         1208  +    }
         1209  +  }
         1210  +
         1211  +  /* Join all threads */
         1212  +  rc = vdbeSorterJoinAll(pSorter, rc);
   878   1213   
   879         -  do {
   880         -    int iNew;                     /* Index of new, merged, PMA */
   881         -
   882         -    for(iNew=0; 
   883         -        rc==SQLITE_OK && iNew*SORTER_MAX_MERGE_COUNT<pSorter->nPMA; 
   884         -        iNew++
   885         -    ){
   886         -      int rc2;                    /* Return code from fileWriterFinish() */
   887         -      FileWriter writer;          /* Object used to write to disk */
   888         -      i64 nWrite;                 /* Number of bytes in new PMA */
         1214  +  /* Assuming no errors have occurred, set up a merger structure to read
         1215  +  ** and merge all remaining PMAs.  */
         1216  +  assert( pSorter->pMerger==0 );
         1217  +  if( rc==SQLITE_OK ){
         1218  +    int nIter = 0;                /* Number of iterators used */
         1219  +    int i;
         1220  +    SorterMerger *pMerger;
         1221  +    for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
         1222  +      nIter += pSorter->aThread[i].nPMA;
         1223  +    }
   889   1224   
   890         -      memset(&writer, 0, sizeof(FileWriter));
   891         -
   892         -      /* If there are SORTER_MAX_MERGE_COUNT or less PMAs in file pTemp1,
   893         -      ** initialize an iterator for each of them and break out of the loop.
   894         -      ** These iterators will be incrementally merged as the VDBE layer calls
   895         -      ** sqlite3VdbeSorterNext().
   896         -      **
   897         -      ** Otherwise, if pTemp1 contains more than SORTER_MAX_MERGE_COUNT PMAs,
   898         -      ** initialize interators for SORTER_MAX_MERGE_COUNT of them. These PMAs
   899         -      ** are merged into a single PMA that is written to file pTemp2.
   900         -      */
   901         -      rc = vdbeSorterInitMerge(db, pCsr, &nWrite);
   902         -      assert( rc!=SQLITE_OK || pSorter->aIter[ pSorter->aTree[1] ].pFile );
   903         -      if( rc!=SQLITE_OK || pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){
   904         -        break;
         1225  +    pSorter->pMerger = pMerger = vdbeSorterMergerNew(nIter);
         1226  +    if( pMerger==0 ){
         1227  +      rc = SQLITE_NOMEM;
         1228  +    }else{
         1229  +      int iIter = 0;
         1230  +      int iThread = 0;
         1231  +      for(iThread=0; iThread<SQLITE_MAX_SORTER_THREAD; iThread++){
         1232  +        int iPMA;
         1233  +        i64 iReadOff = 0;
         1234  +        SorterThread *pThread = &pSorter->aThread[iThread];
         1235  +        for(iPMA=0; iPMA<pThread->nPMA && rc==SQLITE_OK; iPMA++){
         1236  +          i64 nDummy = 0;
         1237  +          VdbeSorterIter *pIter = &pMerger->aIter[iIter++];
         1238  +          rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy);
         1239  +          iReadOff = pIter->iEof;
         1240  +        }
   905   1241         }
   906   1242   
   907         -      /* Open the second temp file, if it is not already open. */
   908         -      if( pTemp2==0 ){
   909         -        assert( iWrite2==0 );
   910         -        rc = vdbeSorterOpenTempFile(db, &pTemp2);
   911         -      }
   912         -
   913         -      if( rc==SQLITE_OK ){
   914         -        int bEof = 0;
   915         -        fileWriterInit(db, pTemp2, &writer, iWrite2);
   916         -        fileWriterWriteVarint(&writer, nWrite);
   917         -        while( rc==SQLITE_OK && bEof==0 ){
   918         -          VdbeSorterIter *pIter = &pSorter->aIter[ pSorter->aTree[1] ];
   919         -          assert( pIter->pFile );
   920         -
   921         -          fileWriterWriteVarint(&writer, pIter->nKey);
   922         -          fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
   923         -          rc = sqlite3VdbeSorterNext(db, pCsr, &bEof);
   924         -        }
   925         -        rc2 = fileWriterFinish(db, &writer, &iWrite2);
   926         -        if( rc==SQLITE_OK ) rc = rc2;
         1243  +      for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
         1244  +        rc = vdbeSorterDoCompare(&pSorter->aThread[0], pMerger, i);
   927   1245         }
   928   1246       }
         1247  +  }
   929   1248   
   930         -    if( pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){
   931         -      break;
   932         -    }else{
   933         -      sqlite3_file *pTmp = pSorter->pTemp1;
   934         -      pSorter->nPMA = iNew;
   935         -      pSorter->pTemp1 = pTemp2;
   936         -      pTemp2 = pTmp;
   937         -      pSorter->iWriteOff = iWrite2;
   938         -      pSorter->iReadOff = 0;
   939         -      iWrite2 = 0;
   940         -    }
   941         -  }while( rc==SQLITE_OK );
   942         -
   943         -  if( pTemp2 ){
   944         -    sqlite3OsCloseFree(pTemp2);
         1249  +  if( rc==SQLITE_OK ){
         1250  +    *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0);
   945   1251     }
   946         -  *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
   947   1252     return rc;
   948   1253   }
   949   1254   
   950   1255   /*
   951   1256   ** Advance to the next element in the sorter.
   952   1257   */
   953   1258   int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
   954   1259     VdbeSorter *pSorter = pCsr->pSorter;
   955   1260     int rc;                         /* Return code */
   956   1261   
   957         -  if( pSorter->aTree ){
   958         -    int iPrev = pSorter->aTree[1];/* Index of iterator to advance */
   959         -    int i;                        /* Index of aTree[] to recalculate */
   960         -
   961         -    rc = vdbeSorterIterNext(db, &pSorter->aIter[iPrev]);
   962         -    for(i=(pSorter->nTree+iPrev)/2; rc==SQLITE_OK && i>0; i=i/2){
   963         -      rc = vdbeSorterDoCompare(pCsr, i);
   964         -    }
   965         -
   966         -    *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
         1262  +  if( pSorter->pMerger ){
         1263  +    rc = vdbeSorterNext(&pSorter->aThread[0], pSorter->pMerger, pbEof);
   967   1264     }else{
   968   1265       SorterRecord *pFree = pSorter->pRecord;
   969   1266       pSorter->pRecord = pFree->pNext;
   970   1267       pFree->pNext = 0;
   971   1268       vdbeSorterRecordFree(db, pFree);
   972   1269       *pbEof = !pSorter->pRecord;
   973   1270       rc = SQLITE_OK;
................................................................................
   980   1277   ** current key.
   981   1278   */
   982   1279   static void *vdbeSorterRowkey(
   983   1280     const VdbeSorter *pSorter,      /* Sorter object */
   984   1281     int *pnKey                      /* OUT: Size of current key in bytes */
   985   1282   ){
   986   1283     void *pKey;
   987         -  if( pSorter->aTree ){
         1284  +  if( pSorter->pMerger ){
   988   1285       VdbeSorterIter *pIter;
   989         -    pIter = &pSorter->aIter[ pSorter->aTree[1] ];
         1286  +    pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ];
   990   1287       *pnKey = pIter->nKey;
   991   1288       pKey = pIter->aKey;
   992   1289     }else{
   993   1290       *pnKey = pSorter->pRecord->nVal;
   994   1291       pKey = pSorter->pRecord->pVal;
   995   1292     }
   996   1293     return pKey;
................................................................................
  1027   1324   int sqlite3VdbeSorterCompare(
  1028   1325     const VdbeCursor *pCsr,         /* Sorter cursor */
  1029   1326     Mem *pVal,                      /* Value to compare to current sorter key */
  1030   1327     int nIgnore,                    /* Ignore this many fields at the end */
  1031   1328     int *pRes                       /* OUT: Result of comparison */
  1032   1329   ){
  1033   1330     VdbeSorter *pSorter = pCsr->pSorter;
         1331  +  SorterThread *pMain = &pSorter->aThread[0];
  1034   1332     void *pKey; int nKey;           /* Sorter key to compare pVal with */
  1035   1333   
  1036   1334     pKey = vdbeSorterRowkey(pSorter, &nKey);
  1037         -  vdbeSorterCompare(pCsr, nIgnore, pVal->z, pVal->n, pKey, nKey, pRes);
         1335  +  vdbeSorterCompare(pMain, nIgnore, pVal->z, pVal->n, pKey, nKey, pRes);
  1038   1336     return SQLITE_OK;
  1039   1337   }