@@ -22,31 +22,31 @@ public class FileServiceImpl implements FileService, FileStreamer {
22
22
23
23
private static final Logger LOGGER = LoggerFactory .getLogger (FileServiceImpl .class );
24
24
25
- private static final int DEFAULT_MAX_CHUNK_SIZE = 64 * 1024 ;
26
- private static final String TEMP_DIR = System .getProperty ("java.io.tmpdir" );
25
+ private static final int DEFAULT_CHUNK_SIZE = 64 * 1024 ;
26
+ private static final String DEFAULT_DIR = System .getProperty ("java.io.tmpdir" );
27
+ private static final Duration DEFAULT_TTL = Duration .ofSeconds (600 );
27
28
28
29
private final Path baseDir ;
29
- private final int maxChunkSize ;
30
+ private final int chunkSize ;
31
+ private final Duration fileTtl ;
30
32
31
33
private String serviceEndpointId ;
32
34
33
- /**
34
- * Default constructor. {@code baseDir} will be {@code java.io.tmpdir}, {@code maxChunkSize} will
35
- * be {@code 64k}.
36
- */
37
35
public FileServiceImpl () {
38
- this (new File (TEMP_DIR ), DEFAULT_MAX_CHUNK_SIZE );
36
+ this (new File (DEFAULT_DIR ), DEFAULT_CHUNK_SIZE , DEFAULT_TTL );
39
37
}
40
38
41
39
/**
42
40
* Constructor.
43
41
*
44
42
* @param baseDir baseDir for storing files
45
- * @param maxChunkSize maximum buffer size for reading file and publishing by flux
43
+ * @param chunkSize maximum buffer size for reading file and publishing by flux
44
+ * @param fileTtl file lifetime, {@code null} means forever
46
45
*/
47
- public FileServiceImpl (File baseDir , int maxChunkSize ) {
46
+ public FileServiceImpl (File baseDir , int chunkSize , Duration fileTtl ) {
48
47
this .baseDir = baseDir .toPath ();
49
- this .maxChunkSize = maxChunkSize ;
48
+ this .chunkSize = chunkSize ;
49
+ this .fileTtl = fileTtl ;
50
50
}
51
51
52
52
@ AfterConstruct
@@ -73,15 +73,17 @@ public Mono<String> addFile(AddFileRequest request) {
73
73
throw new IllegalArgumentException ("Wrong file: " + file );
74
74
}
75
75
76
- if (ttl != null && ttl != Duration .ZERO ) {
76
+ final var actualTtl = ttl != null ? ttl : fileTtl ;
77
+
78
+ if (actualTtl != null ) {
77
79
final var scheduler = Schedulers .single ();
78
80
scheduler .schedule (
79
81
() -> {
80
82
if (!file .delete ()) {
81
83
LOGGER .warn ("Cannot delete file: {}" , file );
82
84
}
83
85
},
84
- ttl .toMillis (),
86
+ actualTtl .toMillis (),
85
87
TimeUnit .MILLISECONDS );
86
88
}
87
89
@@ -100,7 +102,7 @@ public Flux<byte[]> streamFile() {
100
102
if (!isPathValid (filePath )) {
101
103
return Flux .error (new FileNotFoundException ("File not found: " + name ));
102
104
} else {
103
- return fluxFrom (filePath , ByteBuffer .allocate (maxChunkSize ));
105
+ return fluxFrom (filePath , ByteBuffer .allocate (chunkSize ));
104
106
}
105
107
});
106
108
}
@@ -134,8 +136,8 @@ private static Flux<byte[]> fluxFrom(Path filePath, ByteBuffer chunkBuffer) {
134
136
channel -> {
135
137
try {
136
138
channel .close ();
137
- } catch (Throwable e ) {
138
- LOGGER .warn ("Cannot close file: {}" , filePath );
139
+ } catch (Exception e ) {
140
+ LOGGER .warn ("Cannot close file: {}" , filePath , e );
139
141
}
140
142
});
141
143
}
0 commit comments