mirror of
https://github.com/System-End/cdn.git
synced 2026-04-19 15:18:15 +00:00
feat: multipart, 206, FetchHttpHandler etc.
- Large files are uploaded through multipart to the bucket - Slack files are downloaded in chunks if downloading at once fails - use FetchHttpHandler
This commit is contained in:
parent
927e7ef9e2
commit
ab39b9cca3
4 changed files with 339 additions and 42 deletions
BIN
bun.lockb
BIN
bun.lockb
Binary file not shown.
39
package.json
39
package.json
|
|
@ -1,21 +1,22 @@
|
|||
{
|
||||
"name": "cdn-v2-hackclub",
|
||||
"version": "1.0.0",
|
||||
"description": "API to upload files to S3-compatible storage with unique URLs",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"start": "bun index.js",
|
||||
"dev": "bun --watch index.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"@aws-sdk/client-s3": "^3.478.0",
|
||||
"cors": "^2.8.5",
|
||||
"express": "^4.21.2",
|
||||
"multer": "^1.4.5-lts.1",
|
||||
"node-fetch": "^2.6.1",
|
||||
"p-limit": "^6.2.0",
|
||||
"winston": "^3.17.0"
|
||||
},
|
||||
"author": "",
|
||||
"license": "MIT"
|
||||
"name": "cdn-v2-hackclub",
|
||||
"version": "1.0.0",
|
||||
"description": "API to upload files to S3-compatible storage with unique URLs",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"start": "bun index.js",
|
||||
"dev": "bun --watch index.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"@aws-sdk/client-s3": "^3.478.0",
|
||||
"@smithy/fetch-http-handler": "^5.1.0",
|
||||
"cors": "^2.8.5",
|
||||
"express": "^4.21.2",
|
||||
"multer": "^1.4.5-lts.1",
|
||||
"node-fetch": "^2.6.1",
|
||||
"p-limit": "^6.2.0",
|
||||
"winston": "^3.17.0"
|
||||
},
|
||||
"author": "",
|
||||
"license": "MIT"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ const uploadEndpoint = async (url, downloadAuth = null) => {
|
|||
|
||||
// Upload to S3 storage
|
||||
logger.debug(`Uploading: ${fileName}`);
|
||||
const uploadResult = await uploadToStorage('s/v3', fileName, buffer, response.headers.get('content-type'));
|
||||
const uploadResult = await uploadToStorage('s/v3', fileName, buffer, response.headers.get('content-type'), buffer.length);
|
||||
if (uploadResult.success === false) {
|
||||
throw new Error(`Storage upload failed: ${uploadResult.error}`);
|
||||
}
|
||||
|
|
|
|||
340
src/storage.js
340
src/storage.js
|
|
@ -1,4 +1,5 @@
|
|||
const { S3Client, PutObjectCommand } = require('@aws-sdk/client-s3');
|
||||
const { S3Client, PutObjectCommand, CreateMultipartUploadCommand, UploadPartCommand, CompleteMultipartUploadCommand, AbortMultipartUploadCommand } = require('@aws-sdk/client-s3');
|
||||
const { FetchHttpHandler } = require('@smithy/fetch-http-handler');
|
||||
const crypto = require('crypto');
|
||||
const logger = require('./config/logger');
|
||||
const {generateFileUrl} = require('./utils');
|
||||
|
|
@ -63,6 +64,178 @@ function generateUniqueFileName(fileName) {
|
|||
return uniqueFileName;
|
||||
}
|
||||
|
||||
function calculatePartSize(fileSize) {
|
||||
const MIN_PSIZE = 5242880; // r2 has a 5mb min part size (except last part)
|
||||
const MAX_PSIZE = 100 * 1024 * 1024; // 100mb maximum per part
|
||||
const MAX_PARTS = 1000; // aws limit
|
||||
|
||||
let partSize = MIN_PSIZE;
|
||||
|
||||
if (fileSize / MIN_PSIZE > MAX_PARTS) {
|
||||
partSize = Math.ceil(fileSize / MAX_PARTS);
|
||||
}
|
||||
|
||||
// hardcode a bit
|
||||
if (fileSize > 100 * 1024 * 1024) partSize = Math.max(partSize, 10 * 1024 * 1024); // >100mb use 10mb parts
|
||||
if (fileSize > 500 * 1024 * 1024) partSize = Math.max(partSize, 25 * 1024 * 1024); // >500mb use 25mb parts
|
||||
if (fileSize > 1024 * 1024 * 1024) partSize = Math.max(partSize, 50 * 1024 * 1024); // >1gb use 50mb parts
|
||||
|
||||
return Math.min(Math.max(partSize, MIN_PSIZE), MAX_PSIZE);
|
||||
}
|
||||
|
||||
// download file using 206 partial content in chunks for slack only
|
||||
async function downloadFileInChunks(url, fileSize, authHeader) {
|
||||
logger.debug('Attempting chunked download', { url, fileSize, chunks: 4 });
|
||||
|
||||
// First, check if server supports range requests
|
||||
try {
|
||||
const headResponse = await fetch(url, {
|
||||
method: 'HEAD',
|
||||
headers: { Authorization: authHeader }
|
||||
});
|
||||
|
||||
if (!headResponse.ok) {
|
||||
throw new Error(`HEAD request failed: ${headResponse.status}`);
|
||||
}
|
||||
|
||||
const acceptsRanges = headResponse.headers.get('accept-ranges');
|
||||
if (acceptsRanges !== 'bytes') {
|
||||
logger.warn('Server may not support range requests', { acceptsRanges });
|
||||
}
|
||||
|
||||
// Verify the file size matches
|
||||
const contentLength = parseInt(headResponse.headers.get('content-length') || '0');
|
||||
if (contentLength !== fileSize && contentLength > 0) {
|
||||
logger.warn('File size mismatch detected', {
|
||||
expectedSize: fileSize,
|
||||
actualSize: contentLength
|
||||
});
|
||||
// Use the actual size from the server
|
||||
fileSize = contentLength;
|
||||
}
|
||||
|
||||
} catch (headError) {
|
||||
logger.warn('HEAD request failed, proceeding with chunked download anyway', {
|
||||
error: headError.message
|
||||
});
|
||||
}
|
||||
|
||||
const chunkSize = Math.ceil(fileSize / 4);
|
||||
const chunks = [];
|
||||
|
||||
try {
|
||||
// Download all chunks in parallel
|
||||
const chunkPromises = [];
|
||||
|
||||
for (let i = 0; i < 4; i++) {
|
||||
const start = i * chunkSize;
|
||||
const end = Math.min(start + chunkSize - 1, fileSize - 1);
|
||||
|
||||
chunkPromises.push(downloadChunk(url, start, end, authHeader, i));
|
||||
}
|
||||
|
||||
const chunkResults = await Promise.all(chunkPromises);
|
||||
|
||||
// Verify all chunks downloaded successfully
|
||||
for (let i = 0; i < chunkResults.length; i++) {
|
||||
if (!chunkResults[i]) {
|
||||
throw new Error(`Chunk ${i} failed to download`);
|
||||
}
|
||||
chunks[i] = chunkResults[i];
|
||||
}
|
||||
|
||||
// Combine all chunks into a single buffer
|
||||
const totalBuffer = Buffer.concat(chunks);
|
||||
|
||||
logger.debug('Chunked download successful', {
|
||||
totalSize: totalBuffer.length,
|
||||
expectedSize: fileSize
|
||||
});
|
||||
|
||||
return totalBuffer;
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Chunked download failed', { error: error.message });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Download a single chunk using Range header
|
||||
async function downloadChunk(url, start, end, authHeader, chunkIndex, retryCount = 0) {
|
||||
const maxRetries = 3;
|
||||
|
||||
try {
|
||||
logger.debug(`Downloading chunk ${chunkIndex} (attempt ${retryCount + 1})`, {
|
||||
start,
|
||||
end,
|
||||
size: end - start + 1
|
||||
});
|
||||
|
||||
const response = await fetch(url, {
|
||||
headers: {
|
||||
'Authorization': authHeader,
|
||||
'Range': `bytes=${start}-${end}`
|
||||
}
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Chunk ${chunkIndex} download failed: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
// Check if server supports partial content
|
||||
if (response.status !== 206) {
|
||||
// If it's a 200 response, the server might be returning the whole file
|
||||
if (response.status === 200) {
|
||||
logger.warn(`Chunk ${chunkIndex}: Server returned full file instead of partial content`);
|
||||
const fullBuffer = await response.buffer();
|
||||
|
||||
// Extract just the chunk we need from the full file
|
||||
const chunkBuffer = fullBuffer.slice(start, end + 1);
|
||||
|
||||
logger.debug(`Chunk ${chunkIndex} extracted from full download`, {
|
||||
actualSize: chunkBuffer.length,
|
||||
expectedSize: end - start + 1
|
||||
});
|
||||
|
||||
return chunkBuffer;
|
||||
} else {
|
||||
throw new Error(`Server doesn't support partial content, got status ${response.status}`);
|
||||
}
|
||||
}
|
||||
|
||||
const buffer = await response.buffer();
|
||||
|
||||
// Verify chunk size
|
||||
const expectedSize = end - start + 1;
|
||||
if (buffer.length !== expectedSize) {
|
||||
throw new Error(`Chunk ${chunkIndex} size mismatch: expected ${expectedSize}, got ${buffer.length}`);
|
||||
}
|
||||
|
||||
logger.debug(`Chunk ${chunkIndex} downloaded successfully`, {
|
||||
actualSize: buffer.length,
|
||||
expectedSize: expectedSize
|
||||
});
|
||||
|
||||
return buffer;
|
||||
|
||||
} catch (error) {
|
||||
logger.error(`Chunk ${chunkIndex} download failed (attempt ${retryCount + 1})`, {
|
||||
error: error.message
|
||||
});
|
||||
|
||||
// Retry logic
|
||||
if (retryCount < maxRetries) {
|
||||
const delay = Math.pow(2, retryCount) * 1000; // Exponential backoff
|
||||
logger.debug(`Retrying chunk ${chunkIndex} in ${delay}ms`);
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, delay));
|
||||
return downloadChunk(url, start, end, authHeader, chunkIndex, retryCount + 1);
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// upload files to the /s/ directory
|
||||
async function processFiles(fileMessage, client) {
|
||||
const uploadedFiles = [];
|
||||
|
|
@ -98,21 +271,52 @@ async function processFiles(fileMessage, client) {
|
|||
url: file.url_private
|
||||
});
|
||||
|
||||
const response = await fetch(file.url_private, {
|
||||
headers: {Authorization: `Bearer ${process.env.SLACK_BOT_TOKEN}`}
|
||||
});
|
||||
let uploadData;
|
||||
const authHeader = `Bearer ${process.env.SLACK_BOT_TOKEN}`;
|
||||
|
||||
try {
|
||||
const response = await fetch(file.url_private, {
|
||||
headers: { Authorization: authHeader }
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Slack download failed: ${response.status} ${response.statusText}`);
|
||||
if (!response.ok) {
|
||||
throw new Error(`Slack download failed: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
uploadData = await response.buffer();
|
||||
logger.debug('File downloaded', {
|
||||
fileName: file.name,
|
||||
size: uploadData.length
|
||||
});
|
||||
|
||||
} catch (downloadError) {
|
||||
logger.warn('Regular download failed, trying chunked download', {
|
||||
fileName: file.name,
|
||||
error: downloadError.message
|
||||
});
|
||||
|
||||
try {
|
||||
uploadData = await downloadFileInChunks(file.url_private, file.size, authHeader);
|
||||
logger.info('Chunked download successful as fallback', {
|
||||
fileName: file.name,
|
||||
size: uploadData.length
|
||||
});
|
||||
} catch (chunkedError) {
|
||||
logger.error('Both regular and chunked downloads failed', {
|
||||
fileName: file.name,
|
||||
regularError: downloadError.message,
|
||||
chunkedError: chunkedError.message
|
||||
});
|
||||
throw new Error(`All download methods failed. Regular: ${downloadError.message}, Chunked: ${chunkedError.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
const buffer = await response.buffer();
|
||||
const contentType = file.mimetype || 'application/octet-stream';
|
||||
const uniqueFileName = generateUniqueFileName(file.name);
|
||||
const userDir = `s/${fileMessage.user}`;
|
||||
|
||||
const uploadResult = await uploadLimit(() =>
|
||||
uploadToStorage(userDir, uniqueFileName, buffer, contentType)
|
||||
uploadToStorage(userDir, uniqueFileName, uploadData, contentType, file.size)
|
||||
);
|
||||
|
||||
if (uploadResult.success === false) {
|
||||
|
|
@ -274,36 +478,128 @@ async function handleFileUpload(event, client) {
|
|||
const s3Client = new S3Client({
|
||||
region: process.env.AWS_REGION,
|
||||
endpoint: process.env.AWS_ENDPOINT,
|
||||
requestHandler: new FetchHttpHandler(),
|
||||
credentials: {
|
||||
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
|
||||
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
|
||||
}
|
||||
},
|
||||
forcePathStyle: true,
|
||||
requestTimeout: 300000,
|
||||
maxAttempts: 3
|
||||
});
|
||||
|
||||
async function uploadToStorage(userDir, uniqueFileName, buffer, contentType = 'application/octet-stream') {
|
||||
async function uploadToStorage(userDir, uniqueFileName, bodyData, contentType = 'application/octet-stream', fileSize) {
|
||||
try {
|
||||
const params = {
|
||||
Bucket: process.env.AWS_BUCKET_NAME,
|
||||
Key: `${userDir}/${uniqueFileName}`,
|
||||
Body: buffer,
|
||||
ContentType: contentType,
|
||||
CacheControl: 'public, immutable, max-age=31536000'
|
||||
};
|
||||
const key = `${userDir}/${uniqueFileName}`;
|
||||
|
||||
if (fileSize >= 10485760) { // 10mb threshold
|
||||
return await uploadMultipart(key, bodyData, contentType);
|
||||
} else {
|
||||
const params = {
|
||||
Bucket: process.env.AWS_BUCKET_NAME,
|
||||
Key: key,
|
||||
Body: bodyData,
|
||||
ContentType: contentType,
|
||||
CacheControl: 'public, immutable, max-age=31536000'
|
||||
};
|
||||
|
||||
logger.info(`Uploading: ${uniqueFileName}`);
|
||||
await s3Client.send(new PutObjectCommand(params));
|
||||
return true;
|
||||
logger.info(`Single part upload: ${key}`);
|
||||
await s3Client.send(new PutObjectCommand(params));
|
||||
return { success: true };
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Upload failed: ${error.message}`, {
|
||||
path: `${userDir}/${uniqueFileName}`,
|
||||
error: error.message
|
||||
});
|
||||
return false;
|
||||
return { success: false, error: error.message };
|
||||
}
|
||||
}
|
||||
|
||||
async function uploadMultipart(key, bodyData, contentType) {
|
||||
let uploadId;
|
||||
|
||||
try {
|
||||
const createParams = {
|
||||
Bucket: process.env.AWS_BUCKET_NAME,
|
||||
Key: key,
|
||||
ContentType: contentType,
|
||||
CacheControl: 'public, immutable, max-age=31536000'
|
||||
};
|
||||
|
||||
const createResult = await s3Client.send(new CreateMultipartUploadCommand(createParams));
|
||||
uploadId = createResult.UploadId;
|
||||
|
||||
const partSize = calculatePartSize(bodyData.length);
|
||||
const totalParts = Math.ceil(bodyData.length / partSize);
|
||||
|
||||
logger.info(`multipart upload: ${key}`, {
|
||||
uploadId,
|
||||
fileSize: bodyData.length,
|
||||
partSize,
|
||||
totalParts
|
||||
});
|
||||
|
||||
const uploadPromises = [];
|
||||
|
||||
for (let partNumber = 1; partNumber <= totalParts; partNumber++) {
|
||||
const start = (partNumber - 1) * partSize;
|
||||
const end = Math.min(start + partSize, bodyData.length); // last part can be below 5mb and below but not above normal part size
|
||||
const partData = bodyData.slice(start, end);
|
||||
|
||||
const uploadPartParams = {
|
||||
Bucket: process.env.AWS_BUCKET_NAME,
|
||||
Key: key,
|
||||
PartNumber: partNumber,
|
||||
UploadId: uploadId,
|
||||
Body: partData
|
||||
};
|
||||
|
||||
const uploadPromise = s3Client.send(new UploadPartCommand(uploadPartParams))
|
||||
.then(result => ({
|
||||
PartNumber: partNumber,
|
||||
ETag: result.ETag
|
||||
}));
|
||||
|
||||
uploadPromises.push(uploadPromise);
|
||||
}
|
||||
|
||||
const parts = await Promise.all(uploadPromises);
|
||||
parts.sort((a, b) => a.PartNumber - b.PartNumber);
|
||||
|
||||
const completeParams = {
|
||||
Bucket: process.env.AWS_BUCKET_NAME,
|
||||
Key: key,
|
||||
UploadId: uploadId,
|
||||
MultipartUpload: { Parts: parts }
|
||||
};
|
||||
|
||||
await s3Client.send(new CompleteMultipartUploadCommand(completeParams));
|
||||
logger.info(`multipart upload completed: ${key}`);
|
||||
|
||||
return { success: true };
|
||||
|
||||
} catch (error) {
|
||||
if (uploadId) {
|
||||
try {
|
||||
await s3Client.send(new AbortMultipartUploadCommand({
|
||||
Bucket: process.env.AWS_BUCKET_NAME,
|
||||
Key: key,
|
||||
UploadId: uploadId
|
||||
}));
|
||||
logger.info(`aborted multipart upload: ${key}`);
|
||||
} catch (abortError) {
|
||||
logger.error(`failed to abort multipart upload: ${abortError.message}`);
|
||||
}
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
handleFileUpload,
|
||||
initialize,
|
||||
uploadToStorage
|
||||
uploadToStorage,
|
||||
downloadFileInChunks,
|
||||
downloadChunk
|
||||
};
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue