_IMPROVING KERMIT PERFORMANCE_ by Tim Kientzle STATIC int KSendPacketFromCache(KERMIT_PRIVATE *pK, long sequence, int addToList) { int slot = sequence & 63; long prev, next; if ((pK->exchange[slot].myPacket.type == 0) || (pK->exchange[slot].myPacket.data == NULL)) return kOK; prev = pK->exchange[slot].previousPacket; /* Unlink from list */ next = pK->exchange[slot].nextPacket; if ((pK->lastPacket & 63) == slot) pK->lastPacket = prev; if (prev >= 0) pK->exchange[prev & 63].nextPacket = next; if (next >= 0) pK->exchange[next & 63].previousPacket = prev; pK->exchange[slot].nextPacket = -1; pK->exchange[slot].previousPacket = addToList ? pK->lastPacket : -1; if (addToList) { if (pK->lastPacket >= 0) /* Add to end of list */ pK->exchange[pK->lastPacket & 63].nextPacket = pK->exchange[slot].sequence; pK->lastPacket = pK->exchange[slot].sequence; } pK->exchange[slot].tries++; /* Count number of sends */ pK->exchange[slot].sendTime = SerialTime (pK->initTime); /* Stamp time of send */ return StsWarn (KSendPacket (pK, slot, pK->exchange[slot].myPacket.type, /* Send it */ pK->exchange[slot].myPacket.data, pK->exchange[slot].myPacket.length)); } STATIC int KSendPacketReliable(KERMIT_PRIVATE *pK, BYTE type, const BYTE *pSendData, unsigned long sendDataLength, unsigned long rawDataLength) { int blocked = FALSE; int err; int slot = pK->sequence & 63; int timeout = pK->my.timeout; { /* Put packet into cache */ EXCHANGE *pThisExchange = &(pK->exchange[slot]); if (pThisExchange->myPacket.data == NULL) { if (pK->minCache < pK->minUsed) { SwapSlots (pK->minCache, slot); /* Move free exchange to end of window */ pK->minCache++; pThisExchange->yourPacket.type = 0; } else return StsWarn (kFail); /* Internal consistency failure */ } if (pSendData == pK->spareExchange.myPacket.data) { /* In the reserved slot ? */ BYTE *pTmp = pThisExchange->myPacket.data; /* Just swap it in */ pThisExchange->myPacket.data = pK->spareExchange.myPacket.data; pK->spareExchange.myPacket.data = pTmp; } else /* copy it */ memcpy (pThisExchange->myPacket.data, pSendData, sendDataLength); if (pK->sequence > pK->maxUsed) pK->maxUsed = pK->sequence; /* Update end of window */ pThisExchange->sequence = pK->sequence; /* Finish initializing this exchange */ pThisExchange->myPacket.length = sendDataLength; pThisExchange->myPacket.type = type; pThisExchange->rawLength = rawDataLength; pThisExchange->tries = 0; pK->txPacket.data = pK->spareExchange.myPacket.data; pK->txPacket.length = 0; } StsRet (KSendPacketFromCache (pK, pK->sequence, TRUE)); /* Send this packet */ if (pK->minUsed <= pK->minCache) blocked = 1; /* Are we blocked? */ if (pK->maxUsed - pK->minUsed + 1 >= pK->currentWindowSize) /* How blocked are we? */ blocked = (pK->maxUsed - pK->minUsed + 1) - pK->currentWindowSize + 1; err = KReceivePacketCache (pK, 0); /* Get a packet if one's ready */ do { /* Until we're not blocked and there are no more packets pending */ switch (err) { case kBadPacket: /* Didn't get a packet */ case kTimeout: break; default: /* Unrecognized error, pass up to caller */ return StsWarn (err); case kOK: /* Got one! */ { EXCHANGE *pThisExchange = &(pK->exchange[pK->rxPacketSequence & 63]); switch (pK->rxPacket.type) { case 'N': /* Got a NAK */ if (pThisExchange->myPacket.type != 0) /* Resend packet */ StsRet (KSendPacketFromCache (pK, pK->rxPacketSequence, FALSE)); if ((pK->currentWindowSize > 1) || (pK->maxUsed > pK->minUsed)) break; /* Don't generate implicit ACKs for large windows */ pThisExchange = &(pK->exchange[(pK->rxPacketSequence - 1) & 63]); pThisExchange->yourPacket.type = 'Y'; case 'Y': /* Got an ACK */ if (pThisExchange->rawLength > 0) { /* Has this been ACKed before ? */ if (pThisExchange->tries == 1) { /* Update round-trip stats */ long now = SerialTime (pK->initTime); long thisDelay = now - pThisExchange->sendTime; if (pK->roundTripSamples++ == 0) { /* Treat first sample specially */ pK->roundTripDelay = thisDelay; pK->roundTripDelayVariance = 0; } else { long oldAverage = pK->roundTripDelay; long diffSquared; if (pK->roundTripSamples > 30) /* Straight average for first 30 */ pK->roundTripSamples = 30; /* Then decaying average */ pK->roundTripDelay += (thisDelay - pK->roundTripDelay) / pK->roundTripSamples; diffSquared = (thisDelay - oldAverage) * (thisDelay - oldAverage); pK->roundTripDelayVariance += (diffSquared - pK->roundTripDelayVariance) / pK->roundTripSamples; pK->roundTripDelaySD = (pK->roundTripDelaySD + pK->roundTripDelayVariance / pK->roundTripDelaySD) / 2; } } if (pK->sending) /* Update file progress only on receipt of ACK */ pK->filePosition += pThisExchange->rawLength; pThisExchange->rawLength = 0; /* Make sure this packet isn't counted again */ } { long j, i = pThisExchange->previousPacket; while (i >= 0) { /* Resend packets based on send order*/ j = pK->exchange[i & 63].previousPacket; if (pK->exchange[i & 63].yourPacket.type != 'Y') StsRet (KSendPacketFromCache (pK, i, TRUE)); i = j; } } while ((pK->exchange[pK->minUsed & 63].yourPacket.type == 'Y') && (pK->exchange[pK->minUsed & 63].sequence == pK->minUsed)) { { /* Free up slots that have been acknowledged */ long prev, next; prev = pK->exchange[pK->minUsed & 63].previousPacket; /* Unlink */ next = pK->exchange[pK->minUsed & 63].nextPacket; if ((pK->lastPacket & 63) == (pK->minUsed & 63)) pK->lastPacket = prev; if (prev >= 0) pK->exchange[prev & 63].nextPacket = next; if (next >= 0) pK->exchange[next & 63].previousPacket = prev; pK->exchange[pK->minUsed & 63].previousPacket = -1; pK->exchange[pK->minUsed & 63].nextPacket = -1; } pK->minUsed++; /* Mark this exchange as free */ if (blocked > 0) blocked--; /* Reduce count needed to unblock window*/ } break; case 'E': /* Received error packet, terminate transfer */ return StsWarn (kFailed); default: /* Received unrecognized packet type, terminate transfer */ return StsWarn (kFail); } } break; } { /* Resend timed-out packets */ long i; unsigned long now = SerialTime (pK->initTime); unsigned long packetTimeout = pK->roundTripDelay + 3 * pK->roundTripDelaySD + 2 * SERIAL_TIME_SCALE; /* Avg + 3 standard deviations + 2 seconds */ long oldest = -1; unsigned long oldestTime = ULONG_MAX; long firstOld = -1; for (i = pK->minUsed; i <= pK->maxUsed; i++) { if (pK->exchange[i & 63].yourPacket.type != 'Y') { if ((firstOld == -1) && (pK->exchange[i & 63].sendTime + packetTimeout < now)) firstOld = i; /* Find first timed-out packet */ else if (pK->exchange[i & 63].sendTime < oldestTime) { /* Find oldest packet */ oldestTime = pK->exchange[i & 63].sendTime; oldest = i; } } } if (firstOld != -1) /* Resend first timed-out packet */ StsRet (KSendPacketFromCache (pK, firstOld, FALSE)); if (oldestTime == ULONG_MAX) timeout = pK->my.timeout; else if (packetTimeout + oldestTime < now) /* Next oldest already timed-out? */ timeout = 0; else /* Compute interval until next timeout */ timeout = (packetTimeout - (now - oldestTime)) / SERIAL_TIME_SCALE; if (timeout < 1) timeout = 1; /* Minimum is one second */ if (timeout > pK->my.timeout) timeout = pK->my.timeout; /* Max is negotiated value */ } err = KReceivePacketCache (pK, blocked ? timeout : 0); /* Try to receive a packet */ } while (blocked || (err == kOK)); if (pK->exchange[pK->sequence & 63].yourPacket.type && (pK->exchange[pK->sequence & 63].sequence == pK->sequence)) pK->rxPacket = pK->exchange[pK->sequence & 63].yourPacket; pK->sequence++; if (err == kTimeout) return kOK; if (err == kBadPacket) return kOK; return StsWarn (err); } Example 1: /* Step 1: Unlink this packet from doubly-linked list */ slot = sequence number of this packet prev = exchange[slot].previousPacket; next = exchange[slot].nextPacket; if (slot == lastPacket) lastPacket=prev; if (prev >= 0) exchange[prev].nextPacket = next; if (next >= 0) exchange[next].previousPacket = prev; /* Step 2: Link at end of list */ exchange[slot].previousPacket = lastPacket; exchange[slot].nextPacket = -1; exchange[lastPacket].nextPacket = slot; Example 2: thisDelay = now -