Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Changes In Branch btree-opt2 Excluding Merge-Ins
This is equivalent to a diff from 4df852ce26 to eed6a33145
2015-06-24
| ||
00:05 | Performance optimization on balance_nonroot() and related routines. 2.6% faster overall with a size increase of less than 750 bytes. (check-in: 25131e7062 user: drh tags: trunk) | |
2015-06-23
| ||
23:31 | Mark some branches as unreachable after the recent change that recognizes mismatch result set sizes on compound SELECT statements sooner. (check-in: c8d1f305b6 user: drh tags: trunk) | |
21:35 | Testability improvement. (Closed-Leaf check-in: eed6a33145 user: drh tags: btree-opt2) | |
18:24 | Multiple overflow cells are always adjacent and sequential. Exploit this invariant for a small size reduction and performance increase and add assert()s to prove the invariant. (check-in: f77f2f48f4 user: drh tags: btree-opt2) | |
15:06 | Merge latest trunk changes with this branch. Add tests for columnsize=0. (check-in: ef44c71a22 user: dan tags: fts5) | |
13:02 | Merge the compound SELECT operator fix from trunk. (check-in: a7be554f4b user: drh tags: btree-opt2) | |
12:19 | Test that the left and right sides of a compound SELECT operator have the same number of expressions in the expanded expression list before beginning to generate code. (check-in: 4df852ce26 user: dan tags: trunk) | |
2015-06-20
| ||
14:11 | Update the fuzztest data using the latest test vectors discovered by AFL. (check-in: b97f9cf73e user: drh tags: trunk) | |
Changes to src/btree.c.
952 952 ** the page, 1 means the second cell, and so forth) return a pointer 953 953 ** to the cell content. 954 954 ** 955 955 ** This routine works only for pages that do not contain overflow cells. 956 956 */ 957 957 #define findCell(P,I) \ 958 958 ((P)->aData + ((P)->maskPage & get2byte(&(P)->aCellIdx[2*(I)]))) 959 -#define findCellv2(D,M,O,I) (D+(M&get2byte(D+(O+2*(I))))) 960 - 961 - 962 -/* 963 -** This a more complex version of findCell() that works for 964 -** pages that do contain overflow cells. 965 -*/ 966 -static u8 *findOverflowCell(MemPage *pPage, int iCell){ 967 - int i; 968 - assert( sqlite3_mutex_held(pPage->pBt->mutex) ); 969 - for(i=pPage->nOverflow-1; i>=0; i--){ 970 - int k; 971 - k = pPage->aiOvfl[i]; 972 - if( k<=iCell ){ 973 - if( k==iCell ){ 974 - return pPage->apOvfl[i]; 975 - } 976 - iCell--; 977 - } 978 - } 979 - return findCell(pPage, iCell); 980 -} 981 959 982 960 /* 983 961 ** This is common tail processing for btreeParseCellPtr() and 984 962 ** btreeParseCellPtrIndex() for the case when the cell does not fit entirely 985 963 ** on a single B-tree page. Make necessary adjustments to the CellInfo 986 964 ** structure. 987 965 */ ................................................................................ 1571 1549 ** 1572 1550 ** Check to see if iFreeBlk should be coalesced onto the end of iStart. 1573 1551 */ 1574 1552 if( iFreeBlk && iEnd+3>=iFreeBlk ){ 1575 1553 nFrag = iFreeBlk - iEnd; 1576 1554 if( iEnd>iFreeBlk ) return SQLITE_CORRUPT_BKPT; 1577 1555 iEnd = iFreeBlk + get2byte(&data[iFreeBlk+2]); 1578 - if( iEnd > pPage->pBt->usableSize ) return SQLITE_CORRUPT_BKPT; 1556 + if( NEVER(iEnd > pPage->pBt->usableSize) ) return SQLITE_CORRUPT_BKPT; 1579 1557 iSize = iEnd - iStart; 1580 1558 iFreeBlk = get2byte(&data[iFreeBlk]); 1581 1559 } 1582 1560 1583 1561 /* If iPtr is another freeblock (that is, if iPtr is not the freelist 1584 1562 ** pointer in the page header) then check to see if iStart should be 1585 1563 ** coalesced onto the end of iPtr. ................................................................................ 6241 6219 if( iChild ){ 6242 6220 put4byte(pCell, iChild); 6243 6221 } 6244 6222 j = pPage->nOverflow++; 6245 6223 assert( j<(int)(sizeof(pPage->apOvfl)/sizeof(pPage->apOvfl[0])) ); 6246 6224 pPage->apOvfl[j] = pCell; 6247 6225 pPage->aiOvfl[j] = (u16)i; 6226 + 6227 + /* When multiple overflows occur, they are always sequential and in 6228 + ** sorted order. This invariants arise because multiple overflows can 6229 + ** only occur when inserting divider cells into the parent page during 6230 + ** balancing, and the dividers are adjacent and sorted. 6231 + */ 6232 + assert( j==0 || pPage->aiOvfl[j-1]<(u16)i ); /* Overflows in sorted order */ 6233 + assert( j==0 || i==pPage->aiOvfl[j-1]+1 ); /* Overflows are sequential */ 6248 6234 }else{ 6249 6235 int rc = sqlite3PagerWrite(pPage->pDbPage); 6250 6236 if( rc!=SQLITE_OK ){ 6251 6237 *pRC = rc; 6252 6238 return; 6253 6239 } 6254 6240 assert( sqlite3PagerIswriteable(pPage->pDbPage) ); ................................................................................ 6277 6263 ** the entry for the overflow page into the pointer map. 6278 6264 */ 6279 6265 ptrmapPutOvflPtr(pPage, pCell, pRC); 6280 6266 } 6281 6267 #endif 6282 6268 } 6283 6269 } 6270 + 6271 +/* 6272 +** A CellArray object contains a cache of pointers and sizes for a 6273 +** consecutive sequence of cells that might be held multiple pages. 6274 +*/ 6275 +typedef struct CellArray CellArray; 6276 +struct CellArray { 6277 + int nCell; /* Number of cells in apCell[] */ 6278 + MemPage *pRef; /* Reference page */ 6279 + u8 **apCell; /* All cells begin balanced */ 6280 + u16 *szCell; /* Local size of all cells in apCell[] */ 6281 +}; 6282 + 6283 +/* 6284 +** Make sure the cell sizes at idx, idx+1, ..., idx+N-1 have been 6285 +** computed. 6286 +*/ 6287 +static void populateCellCache(CellArray *p, int idx, int N){ 6288 + assert( idx>=0 && idx+N<=p->nCell ); 6289 + while( N>0 ){ 6290 + assert( p->apCell[idx]!=0 ); 6291 + if( p->szCell[idx]==0 ){ 6292 + p->szCell[idx] = p->pRef->xCellSize(p->pRef, p->apCell[idx]); 6293 + }else{ 6294 + assert( CORRUPT_DB || 6295 + p->szCell[idx]==p->pRef->xCellSize(p->pRef, p->apCell[idx]) ); 6296 + } 6297 + idx++; 6298 + N--; 6299 + } 6300 +} 6301 + 6302 +/* 6303 +** Return the size of the Nth element of the cell array 6304 +*/ 6305 +static SQLITE_NOINLINE u16 computeCellSize(CellArray *p, int N){ 6306 + assert( N>=0 && N<p->nCell ); 6307 + assert( p->szCell[N]==0 ); 6308 + p->szCell[N] = p->pRef->xCellSize(p->pRef, p->apCell[N]); 6309 + return p->szCell[N]; 6310 +} 6311 +static u16 cachedCellSize(CellArray *p, int N){ 6312 + assert( N>=0 && N<p->nCell ); 6313 + if( p->szCell[N] ) return p->szCell[N]; 6314 + return computeCellSize(p, N); 6315 +} 6284 6316 6285 6317 /* 6286 6318 ** Array apCell[] contains pointers to nCell b-tree page cells. The 6287 6319 ** szCell[] array contains the size in bytes of each cell. This function 6288 6320 ** replaces the current contents of page pPg with the contents of the cell 6289 6321 ** array. 6290 6322 ** ................................................................................ 6291 6323 ** Some of the cells in apCell[] may currently be stored in pPg. This 6292 6324 ** function works around problems caused by this by making a copy of any 6293 6325 ** such cells before overwriting the page data. 6294 6326 ** 6295 6327 ** The MemPage.nFree field is invalidated by this function. It is the 6296 6328 ** responsibility of the caller to set it correctly. 6297 6329 */ 6298 -static void rebuildPage( 6330 +static int rebuildPage( 6299 6331 MemPage *pPg, /* Edit this page */ 6300 6332 int nCell, /* Final number of cells on page */ 6301 6333 u8 **apCell, /* Array of cells */ 6302 6334 u16 *szCell /* Array of cell sizes */ 6303 6335 ){ 6304 6336 const int hdr = pPg->hdrOffset; /* Offset of header on pPg */ 6305 6337 u8 * const aData = pPg->aData; /* Pointer to data for pPg */ ................................................................................ 6316 6348 pData = pEnd; 6317 6349 for(i=0; i<nCell; i++){ 6318 6350 u8 *pCell = apCell[i]; 6319 6351 if( pCell>aData && pCell<pEnd ){ 6320 6352 pCell = &pTmp[pCell - aData]; 6321 6353 } 6322 6354 pData -= szCell[i]; 6323 - memcpy(pData, pCell, szCell[i]); 6324 6355 put2byte(pCellptr, (pData - aData)); 6325 6356 pCellptr += 2; 6357 + if( pData < pCellptr ) return SQLITE_CORRUPT_BKPT; 6358 + memcpy(pData, pCell, szCell[i]); 6326 6359 assert( szCell[i]==pPg->xCellSize(pPg, pCell) || CORRUPT_DB ); 6327 - testcase( szCell[i]==pPg->xCellSize(pPg,pCell) ); 6360 + testcase( szCell[i]!=pPg->xCellSize(pPg,pCell) ); 6328 6361 } 6329 6362 6330 6363 /* The pPg->nFree field is now set incorrectly. The caller will fix it. */ 6331 6364 pPg->nCell = nCell; 6332 6365 pPg->nOverflow = 0; 6333 6366 6334 6367 put2byte(&aData[hdr+1], 0); 6335 6368 put2byte(&aData[hdr+3], pPg->nCell); 6336 6369 put2byte(&aData[hdr+5], pData - aData); 6337 6370 aData[hdr+7] = 0x00; 6371 + return SQLITE_OK; 6338 6372 } 6339 6373 6340 6374 /* 6341 6375 ** Array apCell[] contains nCell pointers to b-tree cells. Array szCell 6342 6376 ** contains the size in bytes of each such cell. This function attempts to 6343 6377 ** add the cells stored in the array to page pPg. If it cannot (because 6344 6378 ** the page needs to be defragmented before the cells will fit), non-zero ................................................................................ 6363 6397 ** cells in apCell[], then the cells do not fit and non-zero is returned. 6364 6398 */ 6365 6399 static int pageInsertArray( 6366 6400 MemPage *pPg, /* Page to add cells to */ 6367 6401 u8 *pBegin, /* End of cell-pointer array */ 6368 6402 u8 **ppData, /* IN/OUT: Page content -area pointer */ 6369 6403 u8 *pCellptr, /* Pointer to cell-pointer area */ 6404 + int iFirst, /* Index of first cell to add */ 6370 6405 int nCell, /* Number of cells to add to pPg */ 6371 - u8 **apCell, /* Array of cells */ 6372 - u16 *szCell /* Array of cell sizes */ 6406 + CellArray *pCArray /* Array of cells */ 6373 6407 ){ 6374 6408 int i; 6375 6409 u8 *aData = pPg->aData; 6376 6410 u8 *pData = *ppData; 6377 6411 const int bFreelist = aData[1] || aData[2]; 6412 + int iEnd = iFirst + nCell; 6378 6413 assert( CORRUPT_DB || pPg->hdrOffset==0 ); /* Never called on page 1 */ 6379 - for(i=0; i<nCell; i++){ 6380 - int sz = szCell[i]; 6381 - int rc; 6414 + for(i=iFirst; i<iEnd; i++){ 6415 + int sz, rc; 6382 6416 u8 *pSlot; 6417 + sz = cachedCellSize(pCArray, i); 6383 6418 if( bFreelist==0 || (pSlot = pageFindSlot(pPg, sz, &rc, 0))==0 ){ 6384 6419 pData -= sz; 6385 6420 if( pData<pBegin ) return 1; 6386 6421 pSlot = pData; 6387 6422 } 6388 - memcpy(pSlot, apCell[i], sz); 6423 + memcpy(pSlot, pCArray->apCell[i], sz); 6389 6424 put2byte(pCellptr, (pSlot - aData)); 6390 6425 pCellptr += 2; 6391 6426 } 6392 6427 *ppData = pData; 6393 6428 return 0; 6394 6429 } 6395 6430 ................................................................................ 6400 6435 ** within the body of pPg to the pPg free-list. The cell-pointers and other 6401 6436 ** fields of the page are not updated. 6402 6437 ** 6403 6438 ** This function returns the total number of cells added to the free-list. 6404 6439 */ 6405 6440 static int pageFreeArray( 6406 6441 MemPage *pPg, /* Page to edit */ 6442 + int iFirst, /* First cell to delete */ 6407 6443 int nCell, /* Cells to delete */ 6408 - u8 **apCell, /* Array of cells */ 6409 - u16 *szCell /* Array of cell sizes */ 6444 + CellArray *pCArray /* Array of cells */ 6410 6445 ){ 6411 6446 u8 * const aData = pPg->aData; 6412 6447 u8 * const pEnd = &aData[pPg->pBt->usableSize]; 6413 6448 u8 * const pStart = &aData[pPg->hdrOffset + 8 + pPg->childPtrSize]; 6414 6449 int nRet = 0; 6415 6450 int i; 6451 + int iEnd = iFirst + nCell; 6416 6452 u8 *pFree = 0; 6417 6453 int szFree = 0; 6418 6454 6419 - for(i=0; i<nCell; i++){ 6420 - u8 *pCell = apCell[i]; 6455 + for(i=iFirst; i<iEnd; i++){ 6456 + u8 *pCell = pCArray->apCell[i]; 6421 6457 if( pCell>=pStart && pCell<pEnd ){ 6422 - int sz = szCell[i]; 6458 + int sz; 6459 + /* No need to use cachedCellSize() here. The sizes of all cells that 6460 + ** are to be freed have already been computing while deciding which 6461 + ** cells need freeing */ 6462 + sz = pCArray->szCell[i]; assert( sz>0 ); 6423 6463 if( pFree!=(pCell + sz) ){ 6424 6464 if( pFree ){ 6425 6465 assert( pFree>aData && (pFree - aData)<65536 ); 6426 6466 freeSpace(pPg, (u16)(pFree - aData), szFree); 6427 6467 } 6428 6468 pFree = pCell; 6429 6469 szFree = sz; ................................................................................ 6450 6490 ** 6451 6491 ** This routine makes the necessary adjustments to pPg so that it contains 6452 6492 ** the correct cells after being balanced. 6453 6493 ** 6454 6494 ** The pPg->nFree field is invalid when this function returns. It is the 6455 6495 ** responsibility of the caller to set it correctly. 6456 6496 */ 6457 -static void editPage( 6497 +static int editPage( 6458 6498 MemPage *pPg, /* Edit this page */ 6459 6499 int iOld, /* Index of first cell currently on page */ 6460 6500 int iNew, /* Index of new first cell on page */ 6461 6501 int nNew, /* Final number of cells on page */ 6462 - u8 **apCell, /* Array of cells */ 6463 - u16 *szCell /* Array of cell sizes */ 6502 + CellArray *pCArray /* Array of cells and sizes */ 6464 6503 ){ 6465 6504 u8 * const aData = pPg->aData; 6466 6505 const int hdr = pPg->hdrOffset; 6467 6506 u8 *pBegin = &pPg->aCellIdx[nNew * 2]; 6468 6507 int nCell = pPg->nCell; /* Cells stored on pPg */ 6469 6508 u8 *pData; 6470 6509 u8 *pCellptr; ................................................................................ 6475 6514 #ifdef SQLITE_DEBUG 6476 6515 u8 *pTmp = sqlite3PagerTempSpace(pPg->pBt->pPager); 6477 6516 memcpy(pTmp, aData, pPg->pBt->usableSize); 6478 6517 #endif 6479 6518 6480 6519 /* Remove cells from the start and end of the page */ 6481 6520 if( iOld<iNew ){ 6482 - int nShift = pageFreeArray( 6483 - pPg, iNew-iOld, &apCell[iOld], &szCell[iOld] 6484 - ); 6521 + int nShift = pageFreeArray(pPg, iOld, iNew-iOld, pCArray); 6485 6522 memmove(pPg->aCellIdx, &pPg->aCellIdx[nShift*2], nCell*2); 6486 6523 nCell -= nShift; 6487 6524 } 6488 6525 if( iNewEnd < iOldEnd ){ 6489 - nCell -= pageFreeArray( 6490 - pPg, iOldEnd-iNewEnd, &apCell[iNewEnd], &szCell[iNewEnd] 6491 - ); 6526 + nCell -= pageFreeArray(pPg, iNewEnd, iOldEnd - iNewEnd, pCArray); 6492 6527 } 6493 6528 6494 6529 pData = &aData[get2byteNotZero(&aData[hdr+5])]; 6495 6530 if( pData<pBegin ) goto editpage_fail; 6496 6531 6497 6532 /* Add cells to the start of the page */ 6498 6533 if( iNew<iOld ){ 6499 6534 int nAdd = MIN(nNew,iOld-iNew); 6500 6535 assert( (iOld-iNew)<nNew || nCell==0 || CORRUPT_DB ); 6501 6536 pCellptr = pPg->aCellIdx; 6502 6537 memmove(&pCellptr[nAdd*2], pCellptr, nCell*2); 6503 6538 if( pageInsertArray( 6504 6539 pPg, pBegin, &pData, pCellptr, 6505 - nAdd, &apCell[iNew], &szCell[iNew] 6540 + iNew, nAdd, pCArray 6506 6541 ) ) goto editpage_fail; 6507 6542 nCell += nAdd; 6508 6543 } 6509 6544 6510 6545 /* Add any overflow cells */ 6511 6546 for(i=0; i<pPg->nOverflow; i++){ 6512 6547 int iCell = (iOld + pPg->aiOvfl[i]) - iNew; 6513 6548 if( iCell>=0 && iCell<nNew ){ 6514 6549 pCellptr = &pPg->aCellIdx[iCell * 2]; 6515 6550 memmove(&pCellptr[2], pCellptr, (nCell - iCell) * 2); 6516 6551 nCell++; 6517 6552 if( pageInsertArray( 6518 6553 pPg, pBegin, &pData, pCellptr, 6519 - 1, &apCell[iCell + iNew], &szCell[iCell + iNew] 6554 + iCell+iNew, 1, pCArray 6520 6555 ) ) goto editpage_fail; 6521 6556 } 6522 6557 } 6523 6558 6524 6559 /* Append cells to the end of the page */ 6525 6560 pCellptr = &pPg->aCellIdx[nCell*2]; 6526 6561 if( pageInsertArray( 6527 6562 pPg, pBegin, &pData, pCellptr, 6528 - nNew-nCell, &apCell[iNew+nCell], &szCell[iNew+nCell] 6563 + iNew+nCell, nNew-nCell, pCArray 6529 6564 ) ) goto editpage_fail; 6530 6565 6531 6566 pPg->nCell = nNew; 6532 6567 pPg->nOverflow = 0; 6533 6568 6534 6569 put2byte(&aData[hdr+3], pPg->nCell); 6535 6570 put2byte(&aData[hdr+5], pData - aData); 6536 6571 6537 6572 #ifdef SQLITE_DEBUG 6538 6573 for(i=0; i<nNew && !CORRUPT_DB; i++){ 6539 - u8 *pCell = apCell[i+iNew]; 6574 + u8 *pCell = pCArray->apCell[i+iNew]; 6540 6575 int iOff = get2byte(&pPg->aCellIdx[i*2]); 6541 6576 if( pCell>=aData && pCell<&aData[pPg->pBt->usableSize] ){ 6542 6577 pCell = &pTmp[pCell - aData]; 6543 6578 } 6544 - assert( 0==memcmp(pCell, &aData[iOff], szCell[i+iNew]) ); 6579 + assert( 0==memcmp(pCell, &aData[iOff], 6580 + pCArray->pRef->xCellSize(pCArray->pRef, pCArray->apCell[i+iNew])) ); 6545 6581 } 6546 6582 #endif 6547 6583 6548 - return; 6584 + return SQLITE_OK; 6549 6585 editpage_fail: 6550 6586 /* Unable to edit this page. Rebuild it from scratch instead. */ 6551 - rebuildPage(pPg, nNew, &apCell[iNew], &szCell[iNew]); 6587 + populateCellCache(pCArray, iNew, nNew); 6588 + return rebuildPage(pPg, nNew, &pCArray->apCell[iNew], &pCArray->szCell[iNew]); 6552 6589 } 6553 6590 6554 6591 /* 6555 6592 ** The following parameters determine how many adjacent pages get involved 6556 6593 ** in a balancing operation. NN is the number of neighbors on either side 6557 6594 ** of the page that participate in the balancing operation. NB is the 6558 6595 ** total number of pages that participate, including the target page and ................................................................................ 6616 6653 u8 *pCell = pPage->apOvfl[0]; 6617 6654 u16 szCell = pPage->xCellSize(pPage, pCell); 6618 6655 u8 *pStop; 6619 6656 6620 6657 assert( sqlite3PagerIswriteable(pNew->pDbPage) ); 6621 6658 assert( pPage->aData[0]==(PTF_INTKEY|PTF_LEAFDATA|PTF_LEAF) ); 6622 6659 zeroPage(pNew, PTF_INTKEY|PTF_LEAFDATA|PTF_LEAF); 6623 - rebuildPage(pNew, 1, &pCell, &szCell); 6660 + rc = rebuildPage(pNew, 1, &pCell, &szCell); 6661 + if( NEVER(rc) ) return rc; 6624 6662 pNew->nFree = pBt->usableSize - pNew->cellOffset - 2 - szCell; 6625 6663 6626 6664 /* If this is an auto-vacuum database, update the pointer map 6627 6665 ** with entries for the new page, and any pointer from the 6628 6666 ** cell on the page to an overflow page. If either of these 6629 6667 ** operations fails, the return code is set, but the contents 6630 6668 ** of the parent page are still manipulated by thh code below. ................................................................................ 6820 6858 MemPage *pParent, /* Parent page of siblings being balanced */ 6821 6859 int iParentIdx, /* Index of "the page" in pParent */ 6822 6860 u8 *aOvflSpace, /* page-size bytes of space for parent ovfl */ 6823 6861 int isRoot, /* True if pParent is a root-page */ 6824 6862 int bBulk /* True if this call is part of a bulk load */ 6825 6863 ){ 6826 6864 BtShared *pBt; /* The whole database */ 6827 - int nCell = 0; /* Number of cells in apCell[] */ 6828 6865 int nMaxCells = 0; /* Allocated size of apCell, szCell, aFrom. */ 6829 6866 int nNew = 0; /* Number of pages in apNew[] */ 6830 6867 int nOld; /* Number of pages in apOld[] */ 6831 6868 int i, j, k; /* Loop counters */ 6832 6869 int nxDiv; /* Next divider slot in pParent->aCell[] */ 6833 6870 int rc = SQLITE_OK; /* The return code */ 6834 6871 u16 leafCorrection; /* 4 if pPage is a leaf. 0 if not */ 6835 6872 int leafData; /* True if pPage is a leaf of a LEAFDATA tree */ 6836 6873 int usableSpace; /* Bytes in pPage beyond the header */ 6837 6874 int pageFlags; /* Value of pPage->aData[0] */ 6838 - int subtotal; /* Subtotal of bytes in cells on one page */ 6839 6875 int iSpace1 = 0; /* First unused byte of aSpace1[] */ 6840 6876 int iOvflSpace = 0; /* First unused byte of aOvflSpace[] */ 6841 6877 int szScratch; /* Size of scratch memory requested */ 6842 6878 MemPage *apOld[NB]; /* pPage and up to two siblings */ 6843 6879 MemPage *apNew[NB+2]; /* pPage and up to NB siblings after balancing */ 6844 6880 u8 *pRight; /* Location in parent of right-sibling pointer */ 6845 6881 u8 *apDiv[NB-1]; /* Divider cells in pParent */ 6846 - int cntNew[NB+2]; /* Index in aCell[] of cell after i-th page */ 6847 - int cntOld[NB+2]; /* Old index in aCell[] after i-th page */ 6882 + int cntNew[NB+2]; /* Index in b.paCell[] of cell after i-th page */ 6883 + int cntOld[NB+2]; /* Old index in b.apCell[] */ 6848 6884 int szNew[NB+2]; /* Combined size of cells placed on i-th page */ 6849 - u8 **apCell = 0; /* All cells begin balanced */ 6850 - u16 *szCell; /* Local size of all cells in apCell[] */ 6851 6885 u8 *aSpace1; /* Space for copies of dividers cells */ 6852 6886 Pgno pgno; /* Temp var to store a page number in */ 6853 6887 u8 abDone[NB+2]; /* True after i'th new page is populated */ 6854 6888 Pgno aPgno[NB+2]; /* Page numbers of new pages before shuffling */ 6855 6889 Pgno aPgOrder[NB+2]; /* Copy of aPgno[] used for sorting pages */ 6856 6890 u16 aPgFlags[NB+2]; /* flags field of new pages before shuffling */ 6891 + CellArray b; /* Parsed information on cells being balanced */ 6857 6892 6858 6893 memset(abDone, 0, sizeof(abDone)); 6894 + b.nCell = 0; 6895 + b.apCell = 0; 6859 6896 pBt = pParent->pBt; 6860 6897 assert( sqlite3_mutex_held(pBt->mutex) ); 6861 6898 assert( sqlite3PagerIswriteable(pParent->pDbPage) ); 6862 6899 6863 6900 #if 0 6864 6901 TRACE(("BALANCE: begin page %d child of %d\n", pPage->pgno, pParent->pgno)); 6865 6902 #endif ................................................................................ 6960 6997 ** alignment */ 6961 6998 nMaxCells = (nMaxCells + 3)&~3; 6962 6999 6963 7000 /* 6964 7001 ** Allocate space for memory structures 6965 7002 */ 6966 7003 szScratch = 6967 - nMaxCells*sizeof(u8*) /* apCell */ 6968 - + nMaxCells*sizeof(u16) /* szCell */ 7004 + nMaxCells*sizeof(u8*) /* b.apCell */ 7005 + + nMaxCells*sizeof(u16) /* b.szCell */ 6969 7006 + pBt->pageSize; /* aSpace1 */ 6970 7007 6971 7008 /* EVIDENCE-OF: R-28375-38319 SQLite will never request a scratch buffer 6972 7009 ** that is more than 6 times the database page size. */ 6973 7010 assert( szScratch<=6*(int)pBt->pageSize ); 6974 - apCell = sqlite3ScratchMalloc( szScratch ); 6975 - if( apCell==0 ){ 7011 + b.apCell = sqlite3ScratchMalloc( szScratch ); 7012 + if( b.apCell==0 ){ 6976 7013 rc = SQLITE_NOMEM; 6977 7014 goto balance_cleanup; 6978 7015 } 6979 - szCell = (u16*)&apCell[nMaxCells]; 6980 - aSpace1 = (u8*)&szCell[nMaxCells]; 7016 + b.szCell = (u16*)&b.apCell[nMaxCells]; 7017 + aSpace1 = (u8*)&b.szCell[nMaxCells]; 6981 7018 assert( EIGHT_BYTE_ALIGNMENT(aSpace1) ); 6982 7019 6983 7020 /* 6984 7021 ** Load pointers to all cells on sibling pages and the divider cells 6985 - ** into the local apCell[] array. Make copies of the divider cells 7022 + ** into the local b.apCell[] array. Make copies of the divider cells 6986 7023 ** into space obtained from aSpace1[]. The divider cells have already 6987 7024 ** been removed from pParent. 6988 7025 ** 6989 7026 ** If the siblings are on leaf pages, then the child pointers of the 6990 7027 ** divider cells are stripped from the cells before they are copied 6991 - ** into aSpace1[]. In this way, all cells in apCell[] are without 7028 + ** into aSpace1[]. In this way, all cells in b.apCell[] are without 6992 7029 ** child pointers. If siblings are not leaves, then all cell in 6993 - ** apCell[] include child pointers. Either way, all cells in apCell[] 7030 + ** b.apCell[] include child pointers. Either way, all cells in b.apCell[] 6994 7031 ** are alike. 6995 7032 ** 6996 7033 ** leafCorrection: 4 if pPage is a leaf. 0 if pPage is not a leaf. 6997 7034 ** leafData: 1 if pPage holds key+data and pParent holds only keys. 6998 7035 */ 6999 - leafCorrection = apOld[0]->leaf*4; 7000 - leafData = apOld[0]->intKeyLeaf; 7036 + b.pRef = apOld[0]; 7037 + leafCorrection = b.pRef->leaf*4; 7038 + leafData = b.pRef->intKeyLeaf; 7001 7039 for(i=0; i<nOld; i++){ 7002 - int limit; 7003 7040 MemPage *pOld = apOld[i]; 7041 + int limit = pOld->nCell; 7042 + u8 *aData = pOld->aData; 7043 + u16 maskPage = pOld->maskPage; 7044 + u8 *piCell = aData + pOld->cellOffset; 7045 + u8 *piEnd; 7004 7046 7005 7047 /* Verify that all sibling pages are of the same "type" (table-leaf, 7006 7048 ** table-interior, index-leaf, or index-interior). 7007 7049 */ 7008 7050 if( pOld->aData[0]!=apOld[0]->aData[0] ){ 7009 7051 rc = SQLITE_CORRUPT_BKPT; 7010 7052 goto balance_cleanup; 7011 7053 } 7012 7054 7013 - limit = pOld->nCell+pOld->nOverflow; 7055 + /* Load b.apCell[] with pointers to all cells in pOld. If pOld 7056 + ** constains overflow cells, include them in the b.apCell[] array 7057 + ** in the correct spot. 7058 + ** 7059 + ** Note that when there are multiple overflow cells, it is always the 7060 + ** case that they are sequential and adjacent. This invariant arises 7061 + ** because multiple overflows can only occurs when inserting divider 7062 + ** cells into a parent on a prior balance, and divider cells are always 7063 + ** adjacent and are inserted in order. There is an assert() tagged 7064 + ** with "NOTE 1" in the overflow cell insertion loop to prove this 7065 + ** invariant. 7066 + ** 7067 + ** This must be done in advance. Once the balance starts, the cell 7068 + ** offset section of the btree page will be overwritten and we will no 7069 + ** long be able to find the cells if a pointer to each cell is not saved 7070 + ** first. 7071 + */ 7072 + memset(&b.szCell[b.nCell], 0, sizeof(b.szCell[0])*limit); 7014 7073 if( pOld->nOverflow>0 ){ 7015 - for(j=0; j<limit; j++){ 7016 - assert( nCell<nMaxCells ); 7017 - apCell[nCell] = findOverflowCell(pOld, j); 7018 - szCell[nCell] = pOld->xCellSize(pOld, apCell[nCell]); 7019 - nCell++; 7020 - } 7021 - }else{ 7022 - u8 *aData = pOld->aData; 7023 - u16 maskPage = pOld->maskPage; 7024 - u16 cellOffset = pOld->cellOffset; 7074 + memset(&b.szCell[b.nCell+limit], 0, sizeof(b.szCell[0])*pOld->nOverflow); 7075 + limit = pOld->aiOvfl[0]; 7025 7076 for(j=0; j<limit; j++){ 7026 - assert( nCell<nMaxCells ); 7027 - apCell[nCell] = findCellv2(aData, maskPage, cellOffset, j); 7028 - szCell[nCell] = pOld->xCellSize(pOld, apCell[nCell]); 7029 - nCell++; 7077 + b.apCell[b.nCell] = aData + (maskPage & get2byte(piCell)); 7078 + piCell += 2; 7079 + b.nCell++; 7030 7080 } 7031 - } 7032 - cntOld[i] = nCell; 7081 + for(k=0; k<pOld->nOverflow; k++){ 7082 + assert( k==0 || pOld->aiOvfl[k-1]+1==pOld->aiOvfl[k] );/* NOTE 1 */ 7083 + b.apCell[b.nCell] = pOld->apOvfl[k]; 7084 + b.nCell++; 7085 + } 7086 + limit = pOld->nCell - pOld->aiOvfl[0]; 7087 + } 7088 + piEnd = aData + pOld->cellOffset + 2*pOld->nCell; 7089 + while( piCell<piEnd ){ 7090 + assert( b.nCell<nMaxCells ); 7091 + b.apCell[b.nCell] = aData + (maskPage & get2byte(piCell)); 7092 + piCell += 2; 7093 + b.nCell++; 7094 + } 7095 + 7096 + cntOld[i] = b.nCell; 7033 7097 if( i<nOld-1 && !leafData){ 7034 7098 u16 sz = (u16)szNew[i]; 7035 7099 u8 *pTemp; 7036 - assert( nCell<nMaxCells ); 7037 - szCell[nCell] = sz; 7100 + assert( b.nCell<nMaxCells ); 7101 + b.szCell[b.nCell] = sz; 7038 7102 pTemp = &aSpace1[iSpace1]; 7039 7103 iSpace1 += sz; 7040 7104 assert( sz<=pBt->maxLocal+23 ); 7041 7105 assert( iSpace1 <= (int)pBt->pageSize ); 7042 7106 memcpy(pTemp, apDiv[i], sz); 7043 - apCell[nCell] = pTemp+leafCorrection; 7107 + b.apCell[b.nCell] = pTemp+leafCorrection; 7044 7108 assert( leafCorrection==0 || leafCorrection==4 ); 7045 - szCell[nCell] = szCell[nCell] - leafCorrection; 7109 + b.szCell[b.nCell] = b.szCell[b.nCell] - leafCorrection; 7046 7110 if( !pOld->leaf ){ 7047 7111 assert( leafCorrection==0 ); 7048 7112 assert( pOld->hdrOffset==0 ); 7049 7113 /* The right pointer of the child page pOld becomes the left 7050 7114 ** pointer of the divider cell */ 7051 - memcpy(apCell[nCell], &pOld->aData[8], 4); 7115 + memcpy(b.apCell[b.nCell], &pOld->aData[8], 4); 7052 7116 }else{ 7053 7117 assert( leafCorrection==4 ); 7054 - while( szCell[nCell]<4 ){ 7118 + while( b.szCell[b.nCell]<4 ){ 7055 7119 /* Do not allow any cells smaller than 4 bytes. If a smaller cell 7056 7120 ** does exist, pad it with 0x00 bytes. */ 7057 - assert( szCell[nCell]==3 || CORRUPT_DB ); 7058 - assert( apCell[nCell]==&aSpace1[iSpace1-3] || CORRUPT_DB ); 7121 + assert( b.szCell[b.nCell]==3 || CORRUPT_DB ); 7122 + assert( b.apCell[b.nCell]==&aSpace1[iSpace1-3] || CORRUPT_DB ); 7059 7123 aSpace1[iSpace1++] = 0x00; 7060 - szCell[nCell]++; 7124 + b.szCell[b.nCell]++; 7061 7125 } 7062 7126 } 7063 - nCell++; 7127 + b.nCell++; 7064 7128 } 7065 7129 } 7066 7130 7067 7131 /* 7068 - ** Figure out the number of pages needed to hold all nCell cells. 7132 + ** Figure out the number of pages needed to hold all b.nCell cells. 7069 7133 ** Store this number in "k". Also compute szNew[] which is the total 7070 7134 ** size of all cells on the i-th page and cntNew[] which is the index 7071 - ** in apCell[] of the cell that divides page i from page i+1. 7072 - ** cntNew[k] should equal nCell. 7135 + ** in b.apCell[] of the cell that divides page i from page i+1. 7136 + ** cntNew[k] should equal b.nCell. 7073 7137 ** 7074 7138 ** Values computed by this block: 7075 7139 ** 7076 7140 ** k: The total number of sibling pages 7077 7141 ** szNew[i]: Spaced used on the i-th sibling page. 7078 - ** cntNew[i]: Index in apCell[] and szCell[] for the first cell to 7142 + ** cntNew[i]: Index in b.apCell[] and b.szCell[] for the first cell to 7079 7143 ** the right of the i-th sibling page. 7080 7144 ** usableSpace: Number of bytes of space available on each sibling. 7081 7145 ** 7082 7146 */ 7083 7147 usableSpace = pBt->usableSize - 12 + leafCorrection; 7084 - for(subtotal=k=i=0; i<nCell; i++){ 7085 - assert( i<nMaxCells ); 7086 - subtotal += szCell[i] + 2; 7087 - if( subtotal > usableSpace ){ 7088 - szNew[k] = subtotal - szCell[i] - 2; 7089 - cntNew[k] = i; 7090 - if( leafData ){ i--; } 7091 - subtotal = 0; 7092 - k++; 7093 - if( k>NB+1 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; } 7094 - } 7095 - } 7096 - szNew[k] = subtotal; 7097 - cntNew[k] = nCell; 7098 - k++; 7148 + for(i=0; i<nOld; i++){ 7149 + MemPage *p = apOld[i]; 7150 + szNew[i] = usableSpace - p->nFree; 7151 + if( szNew[i]<0 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; } 7152 + for(j=0; j<p->nOverflow; j++){ 7153 + szNew[i] += 2 + p->xCellSize(p, p->apOvfl[j]); 7154 + } 7155 + cntNew[i] = cntOld[i]; 7156 + } 7157 + k = nOld; 7158 + for(i=0; i<k; i++){ 7159 + int sz; 7160 + while( szNew[i]>usableSpace ){ 7161 + if( i+1>=k ){ 7162 + k = i+2; 7163 + if( k>NB+2 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; } 7164 + szNew[k-1] = 0; 7165 + cntNew[k-1] = b.nCell; 7166 + } 7167 + sz = 2 + cachedCellSize(&b, cntNew[i]-1); 7168 + szNew[i] -= sz; 7169 + if( !leafData ){ 7170 + if( cntNew[i]<b.nCell ){ 7171 + sz = 2 + cachedCellSize(&b, cntNew[i]); 7172 + }else{ 7173 + sz = 0; 7174 + } 7175 + } 7176 + szNew[i+1] += sz; 7177 + cntNew[i]--; 7178 + } 7179 + while( cntNew[i]<b.nCell ){ 7180 + sz = 2 + cachedCellSize(&b, cntNew[i]); 7181 + if( szNew[i]+sz>usableSpace ) break; 7182 + szNew[i] += sz; 7183 + cntNew[i]++; 7184 + if( !leafData ){ 7185 + if( cntNew[i]<b.nCell ){ 7186 + sz = 2 + cachedCellSize(&b, cntNew[i]); 7187 + }else{ 7188 + sz = 0; 7189 + } 7190 + } 7191 + szNew[i+1] -= sz; 7192 + } 7193 + if( cntNew[i]>=b.nCell ){ 7194 + k = i+1; 7195 + }else if( cntNew[i] - (i>0 ? cntNew[i-1] : 0) <= 0 ){ 7196 + rc = SQLITE_CORRUPT_BKPT; 7197 + goto balance_cleanup; 7198 + } 7199 + } 7099 7200 7100 7201 /* 7101 7202 ** The packing computed by the previous block is biased toward the siblings 7102 7203 ** on the left side (siblings with smaller keys). The left siblings are 7103 7204 ** always nearly full, while the right-most sibling might be nearly empty. 7104 7205 ** The next block of code attempts to adjust the packing of siblings to 7105 7206 ** get a better balance. ................................................................................ 7112 7213 int szRight = szNew[i]; /* Size of sibling on the right */ 7113 7214 int szLeft = szNew[i-1]; /* Size of sibling on the left */ 7114 7215 int r; /* Index of right-most cell in left sibling */ 7115 7216 int d; /* Index of first cell to the left of right sibling */ 7116 7217 7117 7218 r = cntNew[i-1] - 1; 7118 7219 d = r + 1 - leafData; 7119 - assert( d<nMaxCells ); 7120 - assert( r<nMaxCells ); 7121 - while( szRight==0 7122 - || (!bBulk && szRight+szCell[d]+2<=szLeft-(szCell[r]+2)) 7123 - ){ 7124 - szRight += szCell[d] + 2; 7125 - szLeft -= szCell[r] + 2; 7126 - cntNew[i-1]--; 7127 - r = cntNew[i-1] - 1; 7128 - d = r + 1 - leafData; 7220 + (void)cachedCellSize(&b, d); 7221 + while(1){ 7222 + assert( d<nMaxCells ); 7223 + assert( r<nMaxCells ); 7224 + (void)cachedCellSize(&b, r); 7225 + if( szRight!=0 7226 + && (bBulk || szRight+b.szCell[d]+2 > szLeft-(b.szCell[r]+2)) ){ 7227 + break; 7228 + } 7229 + szRight += b.szCell[d] + 2; 7230 + szLeft -= b.szCell[r] + 2; 7231 + cntNew[i-1] = r; 7232 + if( cntNew[i-1] <= 0 ){ 7233 + rc = SQLITE_CORRUPT_BKPT; 7234 + goto balance_cleanup; 7235 + } 7236 + r--; 7237 + d--; 7129 7238 } 7130 7239 szNew[i] = szRight; 7131 7240 szNew[i-1] = szLeft; 7132 7241 } 7133 7242 7134 7243 /* Sanity check: For a non-corrupt database file one of the follwing 7135 7244 ** must be true: ................................................................................ 7160 7269 }else{ 7161 7270 assert( i>0 ); 7162 7271 rc = allocateBtreePage(pBt, &pNew, &pgno, (bBulk ? 1 : pgno), 0); 7163 7272 if( rc ) goto balance_cleanup; 7164 7273 zeroPage(pNew, pageFlags); 7165 7274 apNew[i] = pNew; 7166 7275 nNew++; 7167 - cntOld[i] = nCell; 7276 + cntOld[i] = b.nCell; 7168 7277 7169 7278 /* Set the pointer-map entry for the new sibling page. */ 7170 7279 if( ISAUTOVACUUM ){ 7171 7280 ptrmapPut(pBt, pNew->pgno, PTRMAP_BTREE, pParent->pgno, &rc); 7172 7281 if( rc!=SQLITE_OK ){ 7173 7282 goto balance_cleanup; 7174 7283 } ................................................................................ 7265 7374 MemPage *pNew = apNew[0]; 7266 7375 u8 *aOld = pNew->aData; 7267 7376 int cntOldNext = pNew->nCell + pNew->nOverflow; 7268 7377 int usableSize = pBt->usableSize; 7269 7378 int iNew = 0; 7270 7379 int iOld = 0; 7271 7380 7272 - for(i=0; i<nCell; i++){ 7273 - u8 *pCell = apCell[i]; 7381 + for(i=0; i<b.nCell; i++){ 7382 + u8 *pCell = b.apCell[i]; 7274 7383 if( i==cntOldNext ){ 7275 7384 MemPage *pOld = (++iOld)<nNew ? apNew[iOld] : apOld[iOld]; 7276 7385 cntOldNext += pOld->nCell + pOld->nOverflow + !leafData; 7277 7386 aOld = pOld->aData; 7278 7387 } 7279 7388 if( i==cntNew[iNew] ){ 7280 7389 pNew = apNew[++iNew]; ................................................................................ 7291 7400 || pNew->pgno!=aPgno[iOld] 7292 7401 || pCell<aOld 7293 7402 || pCell>=&aOld[usableSize] 7294 7403 ){ 7295 7404 if( !leafCorrection ){ 7296 7405 ptrmapPut(pBt, get4byte(pCell), PTRMAP_BTREE, pNew->pgno, &rc); 7297 7406 } 7298 - if( szCell[i]>pNew->minLocal ){ 7407 + if( cachedCellSize(&b,i)>pNew->minLocal ){ 7299 7408 ptrmapPutOvflPtr(pNew, pCell, &rc); 7300 7409 } 7410 + if( rc ) goto balance_cleanup; 7301 7411 } 7302 7412 } 7303 7413 } 7304 7414 7305 7415 /* Insert new divider cells into pParent. */ 7306 7416 for(i=0; i<nNew-1; i++){ 7307 7417 u8 *pCell; 7308 7418 u8 *pTemp; 7309 7419 int sz; 7310 7420 MemPage *pNew = apNew[i]; 7311 7421 j = cntNew[i]; 7312 7422 7313 7423 assert( j<nMaxCells ); 7314 - pCell = apCell[j]; 7315 - sz = szCell[j] + leafCorrection; 7424 + assert( b.apCell[j]!=0 ); 7425 + pCell = b.apCell[j]; 7426 + sz = b.szCell[j] + leafCorrection; 7316 7427 pTemp = &aOvflSpace[iOvflSpace]; 7317 7428 if( !pNew->leaf ){ 7318 7429 memcpy(&pNew->aData[8], pCell, 4); 7319 7430 }else if( leafData ){ 7320 7431 /* If the tree is a leaf-data tree, and the siblings are leaves, 7321 - ** then there is no divider cell in apCell[]. Instead, the divider 7432 + ** then there is no divider cell in b.apCell[]. Instead, the divider 7322 7433 ** cell consists of the integer key for the right-most cell of 7323 7434 ** the sibling-page assembled above only. 7324 7435 */ 7325 7436 CellInfo info; 7326 7437 j--; 7327 - pNew->xParseCell(pNew, apCell[j], &info); 7438 + pNew->xParseCell(pNew, b.apCell[j], &info); 7328 7439 pCell = pTemp; 7329 7440 sz = 4 + putVarint(&pCell[4], info.nKey); 7330 7441 pTemp = 0; 7331 7442 }else{ 7332 7443 pCell -= 4; 7333 7444 /* Obscure case for non-leaf-data trees: If the cell at pCell was 7334 7445 ** previously stored on a leaf node, and its reported size was 4 ................................................................................ 7337 7448 ** any cell). But it is important to pass the correct size to 7338 7449 ** insertCell(), so reparse the cell now. 7339 7450 ** 7340 7451 ** Note that this can never happen in an SQLite data file, as all 7341 7452 ** cells are at least 4 bytes. It only happens in b-trees used 7342 7453 ** to evaluate "IN (SELECT ...)" and similar clauses. 7343 7454 */ 7344 - if( szCell[j]==4 ){ 7455 + if( b.szCell[j]==4 ){ 7345 7456 assert(leafCorrection==4); 7346 7457 sz = pParent->xCellSize(pParent, pCell); 7347 7458 } 7348 7459 } 7349 7460 iOvflSpace += sz; 7350 7461 assert( sz<=pBt->maxLocal+23 ); 7351 7462 assert( iOvflSpace <= (int)pBt->pageSize ); ................................................................................ 7395 7506 ** only after iPg+1 has already been updated. */ 7396 7507 assert( cntNew[iPg]>=cntOld[iPg] || abDone[iPg+1] ); 7397 7508 7398 7509 if( iPg==0 ){ 7399 7510 iNew = iOld = 0; 7400 7511 nNewCell = cntNew[0]; 7401 7512 }else{ 7402 - iOld = iPg<nOld ? (cntOld[iPg-1] + !leafData) : nCell; 7513 + iOld = iPg<nOld ? (cntOld[iPg-1] + !leafData) : b.nCell; 7403 7514 iNew = cntNew[iPg-1] + !leafData; 7404 7515 nNewCell = cntNew[iPg] - iNew; 7405 7516 } 7406 7517 7407 - editPage(apNew[iPg], iOld, iNew, nNewCell, apCell, szCell); 7518 + rc = editPage(apNew[iPg], iOld, iNew, nNewCell, &b); 7519 + if( rc ) goto balance_cleanup; 7408 7520 abDone[iPg]++; 7409 7521 apNew[iPg]->nFree = usableSpace-szNew[iPg]; 7410 7522 assert( apNew[iPg]->nOverflow==0 ); 7411 7523 assert( apNew[iPg]->nCell==nNewCell ); 7412 7524 } 7413 7525 } 7414 7526 ................................................................................ 7451 7563 u32 key = get4byte(&apNew[i]->aData[8]); 7452 7564 ptrmapPut(pBt, key, PTRMAP_BTREE, apNew[i]->pgno, &rc); 7453 7565 } 7454 7566 } 7455 7567 7456 7568 assert( pParent->isInit ); 7457 7569 TRACE(("BALANCE: finished: old=%d new=%d cells=%d\n", 7458 - nOld, nNew, nCell)); 7570 + nOld, nNew, b.nCell)); 7459 7571 7460 7572 /* Free any old pages that were not reused as new pages. 7461 7573 */ 7462 7574 for(i=nNew; i<nOld; i++){ 7463 7575 freePage(apOld[i], &rc); 7464 7576 } 7465 7577 ................................................................................ 7474 7586 } 7475 7587 #endif 7476 7588 7477 7589 /* 7478 7590 ** Cleanup before returning. 7479 7591 */ 7480 7592 balance_cleanup: 7481 - sqlite3ScratchFree(apCell); 7593 + sqlite3ScratchFree(b.apCell); 7482 7594 for(i=0; i<nOld; i++){ 7483 7595 releasePage(apOld[i]); 7484 7596 } 7485 7597 for(i=0; i<nNew; i++){ 7486 7598 releasePage(apNew[i]); 7487 7599 } 7488 7600 ................................................................................ 7925 8037 if( !pPage->leaf ){ 7926 8038 MemPage *pLeaf = pCur->apPage[pCur->iPage]; 7927 8039 int nCell; 7928 8040 Pgno n = pCur->apPage[iCellDepth+1]->pgno; 7929 8041 unsigned char *pTmp; 7930 8042 7931 8043 pCell = findCell(pLeaf, pLeaf->nCell-1); 7932 - if( pCell<&pLeaf->aData[4] ) return SQLITE_CORRUPT_BKPT; 8044 + if( NEVER(pCell<&pLeaf->aData[4]) ) return SQLITE_CORRUPT_BKPT; 7933 8045 nCell = pLeaf->xCellSize(pLeaf, pCell); 7934 8046 assert( MX_CELL_SIZE(pBt) >= nCell ); 7935 8047 pTmp = pBt->pTmpSpace; 7936 8048 assert( pTmp!=0 ); 7937 8049 rc = sqlite3PagerWrite(pLeaf->pDbPage); 7938 8050 insertCell(pPage, iCellIdx, pCell-4, nCell+4, pTmp, n, &rc); 7939 8051 dropCell(pLeaf, pLeaf->nCell-1, nCell, &rc);