From 3e45faccc6bafe06a2dd4c8cad6f55cec8dcf397 Mon Sep 17 00:00:00 2001 From: jodavies Date: Thu, 7 May 2026 19:41:12 +0100 Subject: [PATCH] perf: improve parallelism of SortBotMerge Currently, the sortbot stage of tform sorting does not achieve good parallelism. Primarily, this is because the "sortblock" size has grown with default SmallSize and LargeSize adjustments, such that in many cases the sortbot levels do not run in parallel at all, because each thread's output fits in a single block. This commit adjusts the logic for filling and unlocking the blocks such that sortbot threads can start work as soon as possible: - Only put complete terms into the blocks, no splitting over the blocks. - Track the number of terms in each block, and use this when reading the data to determine when a block is complete, rather than waiting for a term to overlap the "stop" pointer. - When filling blocks (in PutToMaster), if a certain (small) minimum number of terms has been written, probe if a reading thread is waiting on the current block by attempting to lock the previous block. If a thread is waiting (so the lock fails), unlock the current block immediately and write the term into the next. Also reduce the MINIMUMNUMBEROFTERMS parameter from 10 to 1, so that the small+large buffer does not scale (so much) with large MaxTermSize and many threads, and similarly reduce NUMBEROFBLOCKSINSORT to its minimal value of 8. --- sources/ftypes.h | 20 ++++- sources/structs.h | 1 + sources/threads.c | 217 ++++++++++++++++++++++++---------------------- 3 files changed, 129 insertions(+), 109 deletions(-) diff --git a/sources/ftypes.h b/sources/ftypes.h index c2273c13..73d10a3d 100644 --- a/sources/ftypes.h +++ b/sources/ftypes.h @@ -971,9 +971,6 @@ typedef int (*TFUN1)(UBYTE *,int); #define BUCKETTERMINATED 3 #define BUCKETRELEASED 4 -#define NUMBEROFBLOCKSINSORT 10 -#define MINIMUMNUMBEROFTERMS 10 - #define BUCKETDOINGTERM 1 #define BUCKETASSIGNED -1 #define BUCKETTOBERELEASED -2 @@ -981,6 +978,23 @@ typedef int (*TFUN1)(UBYTE *,int); #define BUCKETDOINGTERMS 0 #define BUCKETDOINGBRACKET 1 + +/* + Sortblock config +*/ +// The number of blocks in the ring-buffer for data transfer between +// workers or sortbots and the master thread or other sortbots. With +// sortbots, each thread has half of this number. +// The merging algorithm requires at least 4 blocks, so this must be +// at least 8, with sortbots. +#define NUMBEROFBLOCKSINSORT 8 +// The minimum number of terms which must fit in a block, i.e. they +// must be at least this number times MaxTermSize. +#define MINIMUMNUMBEROFTERMS 1 +// The minimum number of terms which will be written in a block, +// before potentially yielding the block to a waiting reading thread. +#define MINWRITENUMBEROFTERMS 1 + #endif /* diff --git a/sources/structs.h b/sources/structs.h index 4c2fe359..82e2cbbd 100644 --- a/sources/structs.h +++ b/sources/structs.h @@ -1184,6 +1184,7 @@ typedef struct SoRtBlOcK { WORD **MasterStart; WORD **MasterFill; WORD **MasterStop; + LONG *BlockTerms; int MasterNumBlocks; int MasterBlock; int FillBlock; diff --git a/sources/threads.c b/sources/threads.c index eabd35eb..b0d0a3d7 100644 --- a/sources/threads.c +++ b/sources/threads.c @@ -3537,6 +3537,7 @@ intercepted:; int PutToMaster(PHEAD WORD *term) { int i,j,nexti,ret = 0; + int urgent = 0; WORD *t, *fill, *top, zero = 0; if ( term == 0 ) { /* Mark the end of the expression */ t = &zero; j = 1; @@ -3548,29 +3549,48 @@ int PutToMaster(PHEAD WORD *term) i = AT.SB.FillBlock; /* The block we are working at */ fill = AT.SB.MasterFill[i]; /* Where we are filling */ top = AT.SB.MasterStop[i]; /* End of the block */ - while ( j > 0 ) { - LONG copy = MiN(top - fill, j); - j -= copy; - NCOPY(fill, t, copy); - if ( j > 0 ) { -/* - We reached the end of the block. - Get the next block and release this block. - The order is important. This is why there should be at least - 4 blocks or deadlocks can occur. -*/ - nexti = i+1; - if ( nexti > AT.SB.MasterNumBlocks ) { - nexti = 1; - } - LOCK(AT.SB.MasterBlockLock[nexti]); - UNLOCK(AT.SB.MasterBlockLock[i]); - AT.SB.MasterFill[i] = AT.SB.MasterStart[i]; - AT.SB.FillBlock = i = nexti; - fill = AT.SB.MasterStart[i]; - top = AT.SB.MasterStop[i]; + + // If there is space in the block, and we have already written MINWRITENUMBEROFTERMS, + // determine if the reading thread is waiting for us by trying to lock the previous + // block. If we manage to lock it, then we still have time to continue filling this + // block. If we can't lock it, the reading thread is waiting for us and we should + // move to the next block ASAP. + if ( j < top - fill && AT.SB.BlockTerms[i] > MINWRITENUMBEROFTERMS ) { + const int prev = ( i == 1 ? AT.SB.MasterNumBlocks : i-1 ); + if ( ! pthread_mutex_trylock(&(AT.SB.MasterBlockLock[prev])) ) { + UNLOCK(AT.SB.MasterBlockLock[prev]); + } + else { + urgent = 1; + } + } + + // If the term doesn't fit in the current block, or a thread is waiting for us + // (and we've already written at least MINWRITENUMBEROFTERMS), move to the next: + if ( ( j >= top - fill ) || urgent ) { + nexti = i+1; + if ( nexti > AT.SB.MasterNumBlocks ) { + nexti = 1; + } + LOCK(AT.SB.MasterBlockLock[nexti]); + UNLOCK(AT.SB.MasterBlockLock[i]); + AT.SB.MasterFill[i] = AT.SB.MasterStart[i]; + AT.SB.FillBlock = i = nexti; + fill = AT.SB.MasterStart[i]; + top = AT.SB.MasterStop[i]; + if ( AT.SB.BlockTerms[i] != 0 ) { + // In this case, there has been an accounting error in a previous use + // of this block. Blocks that have been read from and unlocked, should + // have BlockTerms == 0. + MLOCK(ErrorMessageLock); + MesPrint("Error in PutToMaster, starting a block with BlockTerms != 0"); + MUNLOCK(ErrorMessageLock); + Terminate(-1); } } + + NCOPY(fill, t, j); + AT.SB.BlockTerms[i]++; AT.SB.MasterFill[i] = fill; return(ret); } @@ -3773,7 +3793,15 @@ int MasterMerge(void) k = S->used[level]; i = k + lpat - 1; if ( !*(poin[k]) ) { - do { if ( !( i >>= 1 ) ) goto EndOfMerge; } while ( !S->tree[i] ); + // Stream k has hit the end-of-stream "0". We still need to decrement + // BlockTerms, which includes the marker in the count. + ki = S->ktoi[k]; + AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock]--; + do { + if ( !( i >>= 1 ) ) { + goto EndOfMerge; + } + } while ( !S->tree[i] ); if ( S->tree[i] == -1 ) { S->tree[i] = 0; level--; @@ -3851,6 +3879,12 @@ int MasterMerge(void) poin[S->tree[i]] = m1; } else { + // Here we are writing the new merged term *before* the original start of term1. + // We can always do this, since before term1 there is previous term data of this + // block, or the previous block, for which we are holding a lock. This requires + // the existence of "block 0", if term1 is the first term of block 1! + // It also requires the blocks to be contiguous in memory; we can't allocate + // separate memory regions for each block without larger-scale changes. r2 = r1 - m1[1]; m2 = tt1 - r2; r1 = S->PolyWise; @@ -3914,9 +3948,8 @@ int MasterMerge(void) im = *poin2[ul]; poin[ul] = poin2[ul]; ki = S->ktoi[ul]; - if ( (poin[ul] + im + COMPINC) >= - AB[ki+1]->T.SB.MasterStop[AB[ki+1]->T.SB.MasterBlock] - && im > 0 ) { + AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock]--; + if ( AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock] == 0 ) { /* We made it to the end of the block. We have to release the previous block and claim the next. @@ -3930,19 +3963,13 @@ int MasterMerge(void) UNLOCK(AT.SB.MasterBlockLock[i-1]); } if ( i == AT.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - WORD *from, *to; - to = AT.SB.MasterStart[1]; - from = AT.SB.MasterStop[i]; - while ( from > poin[ul] ) *--to = *--from; - poin[ul] = to; i = 1; } else { i++; } LOCK(AT.SB.MasterBlockLock[i]); AT.SB.MasterBlock = i; + poin[ul] = AT.SB.MasterStart[i]; + im = *poin[ul]; poin2[ul] = poin[ul] + im; } else { @@ -3994,9 +4021,8 @@ int MasterMerge(void) im = poin2[k][0]; poin[k] = poin2[k]; ki = S->ktoi[k]; - if ( (poin[k] + im + COMPINC) >= - AB[ki+1]->T.SB.MasterStop[AB[ki+1]->T.SB.MasterBlock] - && im > 0 ) { + AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock]--; + if ( AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock] == 0 ) { /* We made it to the end of the block. We have to release the previous block and claim the next. @@ -4010,19 +4036,13 @@ int MasterMerge(void) UNLOCK(AT.SB.MasterBlockLock[i-1]); } if ( i == AT.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - WORD *from, *to; - to = AT.SB.MasterStart[1]; - from = AT.SB.MasterStop[i]; - while ( from > poin[k] ) *--to = *--from; - poin[k] = to; i = 1; } else { i++; } LOCK(AT.SB.MasterBlockLock[i]); AT.SB.MasterBlock = i; + poin[k] = AT.SB.MasterStart[i]; + im = *poin[k]; poin2[k] = poin[k] + im; } else { @@ -4238,7 +4258,7 @@ int SortBotMerge(PHEAD0) { GETBIDENTITY ALLPRIVATES *Bin1 = AB[AT.SortBotIn1],*Bin2 = AB[AT.SortBotIn2]; - WORD *term1, *term2, *next, *wp; + WORD *term1, *term2, *wp; int blin1, blin2; /* Current block numbers */ int error = 0; WORD l1, l2, *m1, *m2, *w, r1, r2, r3, r33, r31, *tt1, ii; @@ -4279,6 +4299,7 @@ int SortBotMerge(PHEAD0) /* #[ One is smallest : */ + Bin1->T.SB.BlockTerms[blin1]--; if ( SortBotOut(BHEAD term1) < 0 ) { MLOCK(ErrorMessageLock); MesPrint("Called from SortBotMerge with thread = %d",AT.identity); @@ -4286,10 +4307,8 @@ int SortBotMerge(PHEAD0) error = -1; goto ReturnError; } - im = *term1; - next = term1 + im; - if ( next >= Bin1->T.SB.MasterStop[blin1] || ( *next && - next+*next+COMPINC > Bin1->T.SB.MasterStop[blin1] ) ) { + term1 += *term1; + if ( Bin1->T.SB.BlockTerms[blin1] == 0 ) { if ( blin1 == 1 ) { UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]); } @@ -4297,13 +4316,6 @@ int SortBotMerge(PHEAD0) UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]); } if ( blin1 == Bin1->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin1->T.SB.MasterStart[1]; - from = Bin1->T.SB.MasterStop[Bin1->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin1 = 1; } else { @@ -4311,8 +4323,8 @@ int SortBotMerge(PHEAD0) } LOCK(Bin1->T.SB.MasterBlockLock[blin1]); Bin1->T.SB.MasterBlock = blin1; + term1 = Bin1->T.SB.MasterStart[blin1]; } - term1 = next; /* #] One is smallest : */ @@ -4321,6 +4333,7 @@ int SortBotMerge(PHEAD0) /* #[ Two is smallest : */ + Bin2->T.SB.BlockTerms[blin2]--; if ( SortBotOut(BHEAD term2) < 0 ) { MLOCK(ErrorMessageLock); MesPrint("Called from SortBotMerge with thread = %d",AT.identity); @@ -4328,10 +4341,9 @@ int SortBotMerge(PHEAD0) error = -1; goto ReturnError; } -next2: im = *term2; - next = term2 + im; - if ( next >= Bin2->T.SB.MasterStop[blin2] || ( *next - && next+*next+COMPINC > Bin2->T.SB.MasterStop[blin2] ) ) { +next2: + term2 += *term2; + if ( Bin2->T.SB.BlockTerms[blin2] == 0 ) { if ( blin2 == 1 ) { UNLOCK(Bin2->T.SB.MasterBlockLock[Bin2->T.SB.MasterNumBlocks]); } @@ -4339,13 +4351,6 @@ next2: im = *term2; UNLOCK(Bin2->T.SB.MasterBlockLock[blin2-1]); } if ( blin2 == Bin2->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin2->T.SB.MasterStart[1]; - from = Bin2->T.SB.MasterStop[Bin2->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin2 = 1; } else { @@ -4353,8 +4358,8 @@ next2: im = *term2; } LOCK(Bin2->T.SB.MasterBlockLock[blin2]); Bin2->T.SB.MasterBlock = blin2; + term2 = Bin2->T.SB.MasterStart[blin2]; } - term2 = next; /* #] Two is smallest : */ @@ -4363,6 +4368,8 @@ next2: im = *term2; /* #[ Equal : */ + Bin1->T.SB.BlockTerms[blin1]--; + Bin2->T.SB.BlockTerms[blin2]--; l1 = *( m1 = term1 ); l2 = *( m2 = term2 ); if ( S->PolyWise ) { /* Here we work with PolyFun */ @@ -4417,6 +4424,12 @@ next2: im = *term2; term1 = m1; } else { + // Here we are writing the new merged term *before* the original start of term1. + // We can always do this, since before term1 there is previous term data of this + // block, or the previous block, for which we are holding a lock. This requires + // the existence of "block 0", if term1 is the first term of block 1! + // It also requires the blocks to be contiguous in memory; we can't allocate + // separate memory regions for each block without larger-scale changes. r2 = r1 - m1[1]; m2 = tt1 - r2; r1 = S->PolyWise; @@ -4580,10 +4593,9 @@ next2: im = *term2; goto ReturnError; } cancelled:; /* Now we need two new terms */ - im = *term1; - next = term1 + im; - if ( next >= Bin1->T.SB.MasterStop[blin1] || ( *next && - next+*next+COMPINC > Bin1->T.SB.MasterStop[blin1] ) ) { + term1 += *term1; + if ( Bin1->T.SB.BlockTerms[blin1] == 0 ) { + if ( blin1 == 1 ) { UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]); } @@ -4591,13 +4603,6 @@ cancelled:; /* Now we need two new terms */ UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]); } if ( blin1 == Bin1->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin1->T.SB.MasterStart[1]; - from = Bin1->T.SB.MasterStop[Bin1->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin1 = 1; } else { @@ -4605,8 +4610,8 @@ cancelled:; /* Now we need two new terms */ } LOCK(Bin1->T.SB.MasterBlockLock[blin1]); Bin1->T.SB.MasterBlock = blin1; + term1 = Bin1->T.SB.MasterStart[blin1]; } - term1 = next; goto next2; /* #] Equal : @@ -4621,6 +4626,7 @@ cancelled:; /* Now we need two new terms */ #[ Tail in one : */ while ( *term1 ) { + Bin1->T.SB.BlockTerms[blin1]--; if ( SortBotOut(BHEAD term1) < 0 ) { MLOCK(ErrorMessageLock); MesPrint("Called from SortBotMerge with thread = %d",AT.identity); @@ -4628,10 +4634,8 @@ cancelled:; /* Now we need two new terms */ error = -1; goto ReturnError; } - im = *term1; - next = term1 + im; - if ( next >= Bin1->T.SB.MasterStop[blin1] || ( *next && - next+*next+COMPINC > Bin1->T.SB.MasterStop[blin1] ) ) { + if ( Bin1->T.SB.BlockTerms[blin1] == 0 ) { + if ( blin1 == 1 ) { UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]); } @@ -4639,13 +4643,6 @@ cancelled:; /* Now we need two new terms */ UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]); } if ( blin1 == Bin1->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin1->T.SB.MasterStart[1]; - from = Bin1->T.SB.MasterStop[Bin1->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin1 = 1; } else { @@ -4653,8 +4650,11 @@ cancelled:; /* Now we need two new terms */ } LOCK(Bin1->T.SB.MasterBlockLock[blin1]); Bin1->T.SB.MasterBlock = blin1; + term1 = Bin1->T.SB.MasterStart[blin1]; + } + else { + term1 += *term1; } - term1 = next; } /* #] Tail in one : @@ -4665,6 +4665,7 @@ cancelled:; /* Now we need two new terms */ #[ Tail in two : */ while ( *term2 ) { + Bin2->T.SB.BlockTerms[blin2]--; if ( SortBotOut(BHEAD term2) < 0 ) { MLOCK(ErrorMessageLock); MesPrint("Called from SortBotMerge with thread = %d",AT.identity); @@ -4672,10 +4673,8 @@ cancelled:; /* Now we need two new terms */ error = -1; goto ReturnError; } - im = *term2; - next = term2 + im; - if ( next >= Bin2->T.SB.MasterStop[blin2] || ( *next - && next+*next+COMPINC > Bin2->T.SB.MasterStop[blin2] ) ) { + if ( Bin2->T.SB.BlockTerms[blin2] == 0 ) { + if ( blin2 == 1 ) { UNLOCK(Bin2->T.SB.MasterBlockLock[Bin2->T.SB.MasterNumBlocks]); } @@ -4683,13 +4682,6 @@ cancelled:; /* Now we need two new terms */ UNLOCK(Bin2->T.SB.MasterBlockLock[blin2-1]); } if ( blin2 == Bin2->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin2->T.SB.MasterStart[1]; - from = Bin2->T.SB.MasterStop[Bin2->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin2 = 1; } else { @@ -4697,17 +4689,27 @@ cancelled:; /* Now we need two new terms */ } LOCK(Bin2->T.SB.MasterBlockLock[blin2]); Bin2->T.SB.MasterBlock = blin2; + term2 = Bin2->T.SB.MasterStart[blin2]; + } + else { + term2 += *term2; } - term2 = next; } /* #] Tail in two : */ } + + // Both streams have hit the end-of-stream marker "0". We still need to + // decrement the BlockTerms counters a final time, the marker is included + // in the count. + Bin1->T.SB.BlockTerms[blin1]--; + Bin2->T.SB.BlockTerms[blin2]--; + SortBotOut(BHEAD 0); ReturnError:; /* - Release all locks + Release all locks. */ UNLOCK(Bin1->T.SB.MasterBlockLock[blin1]); if ( blin1 > 1 ) { @@ -4799,14 +4801,17 @@ int IniSortBlocks(int numworkers) AT.SB.MasterFill = AT.SB.MasterStart + (numberofblocks+1); AT.SB.MasterStop = AT.SB.MasterFill + (numberofblocks+1); AT.SB.MasterNumBlocks = numberofblocks; + AT.SB.BlockTerms = (LONG*)Malloc1(sizeof(LONG)*(numberofblocks+1),"BlockTerms"); AT.SB.MasterBlock = 0; AT.SB.FillBlock = 0; AT.SB.MasterFill[0] = AT.SB.MasterStart[0] = w; + AT.SB.BlockTerms[0] = 0; w += maxter; AT.SB.MasterStop[0] = w; AT.SB.MasterBlockLock[0] = dummylock; for ( j = 1; j <= numberofblocks; j++ ) { AT.SB.MasterFill[j] = AT.SB.MasterStart[j] = w; + AT.SB.BlockTerms[j] = 0; w += blocksize; AT.SB.MasterStop[j] = w; AT.SB.MasterBlockLock[j] = dummylock;