From 050102016bb4015f8c7ac61a270d7d5af17d1884 Mon Sep 17 00:00:00 2001 From: Gisli Magnusson Date: Tue, 19 May 2026 19:00:29 +0000 Subject: [PATCH 1/4] fix(ENGKNOW-3353): prevent empty gorz files in pgor gord partition writes pgor rewrites each partition's write target from `ref_af.gord/` (folder mode) to a specific `ref_af.gord/.gorz` (single-file mode). ForkWrite was creating empty gorz files for every chromosome partition that produced no rows (e.g. gorrows with a fixed range hits only 2 of ~48 partitions). These ~46 unnecessary S3 multipart uploads caused random throttling failures. Fix 1 (ForkWrite): skip empty-file creation when the write target is inside a gord folder. Empty partitions in pgor writes should produce no file; the GORDICTFOLDER step builds the dictionary from meta files and correctly excludes empty entries. Fix 2 (S3MultipartOutputStream): add retry loop to initiateMultipartUpload() matching the existing uploadWithRetry pattern, so transient S3 errors during upload initiation don't cause permanent failures. Co-Authored-By: Claude Sonnet 4.6 --- .../s3/driver/S3MultipartOutputStream.java | 19 ++++++++--- .../scala/gorsat/Analysis/ForkWrite.scala | 7 +++- .../test/java/gorsat/UTestGorWriteFolder.java | 34 +++++++++++++++++++ 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/drivers/src/main/java/org/gorpipe/s3/driver/S3MultipartOutputStream.java b/drivers/src/main/java/org/gorpipe/s3/driver/S3MultipartOutputStream.java index 65f4a7c5..6defc70d 100644 --- a/drivers/src/main/java/org/gorpipe/s3/driver/S3MultipartOutputStream.java +++ b/drivers/src/main/java/org/gorpipe/s3/driver/S3MultipartOutputStream.java @@ -64,11 +64,22 @@ private String initiateMultipartUpload() throws IOException { .bucket(bucket) .key(key) .build(); - try { - return sendCreateMultipartUploadRequest(req).uploadId(); - } catch (Exception e) { - throw new IOException("Failed to initiate multipart upload", e); + for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) { + try { + return sendCreateMultipartUploadRequest(req).uploadId(); + } catch (Exception e) { + String errorMsg = String.format("Failed to initiate multipart upload for %s/%s on attempt %d/%d: %s", + bucket, key, attempt, MAX_RETRIES, e.getMessage()); + logger.warn(errorMsg, e); + if (attempt == MAX_RETRIES) throw new IOException("Failed to initiate multipart upload after retries", e); + try { + Thread.sleep(RETRY_SLEEP_BASE_MS * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } } + throw new IOException("Failed to initiate multipart upload after retries"); } @Override diff --git a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala index efa2103e..9de487e1 100644 --- a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala +++ b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala @@ -316,7 +316,12 @@ case class ForkWrite(forkCol: Int, sh.fileOpen = false } }) - if (options.useFolder.isEmpty && !somethingToWrite && !useFork) { + // Skip empty-file creation when writing to a specific gorz inside a gord folder + // (pgor partition writes). Those partitions should produce no file when they have no + // data; creating empty gorz files causes unnecessary writes (and S3 throttling) for + // every chromosome partition that falls outside the source data range. + val isInsideGordFolder = DataUtil.isGord(PathUtils.getParent(fullFileName)) + if (options.useFolder.isEmpty && !somethingToWrite && !useFork && !isInsideGordFolder) { val out = createOutFile(fullFileName, false) out.setup() out.finish() diff --git a/gortools/src/test/java/gorsat/UTestGorWriteFolder.java b/gortools/src/test/java/gorsat/UTestGorWriteFolder.java index b85d98e8..8c8d13db 100644 --- a/gortools/src/test/java/gorsat/UTestGorWriteFolder.java +++ b/gortools/src/test/java/gorsat/UTestGorWriteFolder.java @@ -184,4 +184,38 @@ public void testPartgorWriteOverGordFolder() throws IOException { Assert.assertEquals(UTestGorWriteExplicit.WRONG_RESULT, "chrom\tbpStart\tbpStop\tx\n" + "chr1\t1\t1\t'PN1'\n" , results); } + + @Test + public void testPgorWriteGordFolderWithGorrowsMerge() throws IOException { + // Regression test for ENGKNOW-3353: pgor write randomly fails when inner query + // uses gorrows with fixed chromosome ranges. pgor creates partitions for ALL + // chromosomes; most produce 0 rows. In single-file mode (write to specific .gorz), + // ForkWrite was creating empty gorz files for every empty partition, causing + // excessive S3 writes and random failures. + var folderpath = workDirPath.resolve("ref_af.gord"); + + TestUtils.runGorPipe( + "create #x# = gorrows -p chr2:1000-16001|merge <(gorrows -p chr19:910100-920102);\n" + + "pgor [#x#]|write " + folderpath, + "-gorroot", workDirPath.toAbsolutePath().toString(), + "-cachedir", cachePath.toString()); + + // Dictionary must exist + Assert.assertTrue("Dictionary file must be created", Files.exists(folderpath.resolve(DEFAULT_FOLDER_DICTIONARY_NAME))); + + // Only gorz files with actual data should be in the folder — no empty gorz files. + List gorzFiles = Files.list(folderpath) + .filter(p -> p.toString().endsWith(".gorz")) + .collect(Collectors.toList()); + for (Path gorzFile : gorzFiles) { + long rowCount = TestUtils.runGorPipeCount("gor " + gorzFile); + Assert.assertTrue("No empty gorz files should exist in gord folder: " + gorzFile.getFileName(), rowCount > 0); + } + + // Reading the dict must return all chr2 and chr19 rows + long rowCount = TestUtils.runGorPipeCount("gor " + folderpath, + "-gorroot", workDirPath.toAbsolutePath().toString(), + "-cachedir", cachePath.toString()); + Assert.assertEquals("pgor write gord should contain chr2 + chr19 rows", 15001L + 10002L, rowCount); + } } From 5d1a09e03c0db24631ed3f57c713f2dec4f3c427 Mon Sep 17 00:00:00 2001 From: Gisli Magnusson Date: Mon, 1 Jun 2026 23:49:39 +0000 Subject: [PATCH 2/4] fix(ENGKNOW-3353): Fix pgor write. --- .../src/main/scala/gorsat/Analysis/ForkWrite.scala | 10 ++++------ .../gorsat/QueryHandlers/GeneralQueryHandler.scala | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala index 9de487e1..8c219360 100644 --- a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala +++ b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala @@ -189,7 +189,7 @@ case class ForkWrite(forkCol: Int, def inferFileName(inFileName: String, forkValue: String): String = { var inferredFileName: String = "" - if(options.useFolder.nonEmpty) { + if(options.useFolder.nonEmpty /*&& forkCol == -1*/) { val folder = options.useFolder.get val fn = if (inFileName.isEmpty || DataUtil.isGord(folder)) { val uuid = UUID.randomUUID().toString @@ -316,11 +316,9 @@ case class ForkWrite(forkCol: Int, sh.fileOpen = false } }) - // Skip empty-file creation when writing to a specific gorz inside a gord folder - // (pgor partition writes). Those partitions should produce no file when they have no - // data; creating empty gorz files causes unnecessary writes (and S3 throttling) for - // every chromosome partition that falls outside the source data range. + val isInsideGordFolder = DataUtil.isGord(PathUtils.getParent(fullFileName)) + if (options.useFolder.isEmpty && !somethingToWrite && !useFork && !isInsideGordFolder) { val out = createOutFile(fullFileName, false) out.setup() @@ -345,7 +343,7 @@ case class ForkWrite(forkCol: Int, } // Only write links for files that are NOT inside gord - if (options.useFolder.isEmpty && !singleFileHolder.fileName.contains(".gord/")) { + if (options.useFolder.isEmpty && !isInsideGordFolder) { if (useFork) { forkMap.values.foreach(sh => { val linkData = LinkFileUtil.extractLink(session.getProjectContext.getFileReader, sh.fileName, diff --git a/gortools/src/main/scala/gorsat/QueryHandlers/GeneralQueryHandler.scala b/gortools/src/main/scala/gorsat/QueryHandlers/GeneralQueryHandler.scala index cc4d4f7b..91471fc4 100644 --- a/gortools/src/main/scala/gorsat/QueryHandlers/GeneralQueryHandler.scala +++ b/gortools/src/main/scala/gorsat/QueryHandlers/GeneralQueryHandler.scala @@ -138,7 +138,7 @@ class GeneralQueryHandler(context: GorContext, header: Boolean) extends GorParal var cacheFile = fileCache.lookupFile(commandSignature) cacheFile = GorJavaUtilities.verifyLinkFileLastModified(context.getSession.getProjectContext,cacheFile) // Do this if we have result cache active or if we are running locally and the local cacheFile does not exist. - fileNames(i) = if (cacheFile == null) { + fileNames(i) = if (cacheFile == null || !fileReader.exists(cacheFile)) { val writeLocationPath = cacheFiles(i) if (writeLocationPath != null) { runAndStoreLinkFileInCache(nested, writeLocationPath, fileCache, useMd5) From c0d9da8a3f42dc42ec0b67394642038d5ea6aec4 Mon Sep 17 00:00:00 2001 From: Gisli Magnusson Date: Mon, 1 Jun 2026 23:53:58 +0000 Subject: [PATCH 3/4] fix(ENGKNOW-3353): Fix pgor write. --- gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala index 8c219360..6731855c 100644 --- a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala +++ b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala @@ -189,7 +189,7 @@ case class ForkWrite(forkCol: Int, def inferFileName(inFileName: String, forkValue: String): String = { var inferredFileName: String = "" - if(options.useFolder.nonEmpty /*&& forkCol == -1*/) { + if(options.useFolder.nonEmpty) { val folder = options.useFolder.get val fn = if (inFileName.isEmpty || DataUtil.isGord(folder)) { val uuid = UUID.randomUUID().toString From b93dd5d31ada622f9d7df7047260376be959d879 Mon Sep 17 00:00:00 2001 From: Gisli Magnusson Date: Tue, 2 Jun 2026 00:07:57 +0000 Subject: [PATCH 4/4] fix(ENGKNOW-3353): Fix pgor write. --- .../test/java/gorsat/UTestGorWriteFolder.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/gortools/src/test/java/gorsat/UTestGorWriteFolder.java b/gortools/src/test/java/gorsat/UTestGorWriteFolder.java index 8c8d13db..b0a1be6e 100644 --- a/gortools/src/test/java/gorsat/UTestGorWriteFolder.java +++ b/gortools/src/test/java/gorsat/UTestGorWriteFolder.java @@ -186,17 +186,19 @@ public void testPartgorWriteOverGordFolder() throws IOException { } @Test - public void testPgorWriteGordFolderWithGorrowsMerge() throws IOException { - // Regression test for ENGKNOW-3353: pgor write randomly fails when inner query - // uses gorrows with fixed chromosome ranges. pgor creates partitions for ALL - // chromosomes; most produce 0 rows. In single-file mode (write to specific .gorz), - // ForkWrite was creating empty gorz files for every empty partition, causing - // excessive S3 writes and random failures. + public void testOverwritePgorWriteGordFolderWithGorrowsMerge() throws IOException { var folderpath = workDirPath.resolve("ref_af.gord"); + var query = "create #x# = gorrows -p chr2:1000-16001|merge <(gorrows -p chr19:910100-920102);\n" + + "pgor [#x#]|write " + folderpath; TestUtils.runGorPipe( - "create #x# = gorrows -p chr2:1000-16001|merge <(gorrows -p chr19:910100-920102);\n" + - "pgor [#x#]|write " + folderpath, + query, + "-gorroot", workDirPath.toAbsolutePath().toString(), + "-cachedir", cachePath.toString()); + + // Overwrite + TestUtils.runGorPipe( + query, "-gorroot", workDirPath.toAbsolutePath().toString(), "-cachedir", cachePath.toString());