You should do some test before going on, and also it depends on the number of cores etc... My suggestion is create a crontab task for it and use several thread for importing documents. The logic might be:
starting from a local folder:
-> Create folders always in the main thread
-> Look for documents into the folder ( list ) and execute the import in a queue.
You can find a crontab sample into
https://sourceforge.net/projects/openkmportabledev/ or take a look here
https://wiki.openkm.com/index.php/Crontab_xml_importer ( implementation of a crontab task )
Here some classes what can helping you ( obviously you must work on it, it's only an starting point ).
// Approach for document import
Code: Select all....
public class MigrationParallel {
public static final int SLEEP_TIME = 100; // 100 milisecconds.
public static final String FILE_LOG_NAME = "migration-process";
public static final String FILE_ERROR_LOG_NAME = "migration-error";
public static final String FILE_LOG_PARALLEL_NAME = "migration-process-parallel";
public static final String FILE_LOG_PARALLEL_SLEEPING_NAME = "migration-process-sleeping";
public static Map<String, String> mapUsuarios;
public static Map<String, String> mapActoNuevaInscripcion;
public static Map<String, String> mapActoCancelacionJudicial;
public static Map<String, String> mapActoCancelacion;
public static int numMaxObjectsQueue = 16;
public static int parallelThreads = 4; // if you have 4 cores I suggest same number or 4*2 ( you should do some tests to find the good number for your hardware )
public static long threadExecutionTimeout = 2; // 2 minutes
...
long id = ExecutionQueue.getInstance().getId();
CorrectionMigrationThread thread = new CorrectionMigrationThread(file, okmDst, id);
ExecutionQueue.getInstance().doAction(ExecutionQueue.ACTION_ADD, id, thread);
// Sleep always after adding processes in queue
int iterations = 0;
while (ExecutionQueue.getInstance().threadListSize() > MigrationParallel.numMaxObjectsQueue) {
// Stop until the number of object in queue is less than..
try {
Thread.sleep(MigrationParallel.SLEEP_TIME);
iterations++;
if (iterations % 10==0) {
FileLogger.info(MigrationParallel.FILE_LOG_PARALLEL_SLEEPING_NAME, "Durmiendo "+(iterations*MigrationParallel.SLEEP_TIME)+" milisegundos y en la cola esperando " + ExecutionQueue.getInstance().threadListSize() + " threads");
}
} catch (Exception e) {
}
}
....
Approach for import document into thread
Code: Select allpackage com.openkm.migracion.logic;
import java.io.File;
import java.util.concurrent.TimeUnit;
import com.openkm.migracion.crontab.MigrationParallel;
import com.openkm.migracion.util.ExecutionQueue;
import com.openkm.util.FileLogger;
/**
* CorrectionMigrationThread
*
* @author jllort
*/
public class CorrectionMigrationThread implements Runnable {
private File file;
private String okmDst;
private long id = 0;
private String appUrl;
/**
* MigrationThread
*/
public CorrectionMigrationThread(File file, String okmDst, long id) {
this.file = file;
this.okmDst = okmDst;
this.id = id;
}
public long getId() {
return id;
}
// Migrate document
@Override
public void run() {
migration(file, okmDst);
}
/*
* Migrate document
*/
private void migration(File file, String okmDst) {
try {
long startTime = System.currentTimeMillis();
// Do something here
String timing = "["+ TimeUnit.SECONDS.convert((System.currentTimeMillis()-startTime), TimeUnit.MILLISECONDS) + " seconds]";
FileLogger.info(MigrationParallel.FILE_LOG_NAME, String.format("%010d", id) + " Processed: , queue="+ExecutionQueue.getInstance().getThreadListIds());
} catch (Exception e) {
// Log Something
FileLogger.info(MigrationParallel.FILE_ERROR_LOG_NAME, String.format("%010d", MigrationParallel.incrementErrors()) + " Error:XXXX");
} finally {
try {
ExecutionQueue.getInstance().doAction(ExecutionQueue.ACTION_REMOVE, id, null);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Document import queue
Code: Select allpackage com.openkm.migracion.util;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.openkm.core.DatabaseException;
import com.openkm.migracion.crontab.MigrationParallel;
/**
* ExecutionQueue
*
* @author jllort
*/
public class ExecutionQueue {
public static final int ACTION_ADD = 1;
public static final int ACTION_REMOVE = 2;
public static final int ACTION_CLEAN = 3;
private static Logger log = LoggerFactory.getLogger(ExecutionQueue.class);
private Map<Long, Runnable> threadList = new HashMap<Long, Runnable>();
private static ExecutionQueue dcq = new ExecutionQueue();
private ExecutorService executor;
private long id = 1;
/**
* getInstance
*/
public static synchronized ExecutionQueue getInstance() {
return dcq;
}
/**
* getId
*/
public synchronized long getId() {
return id++;
}
/**
* getId
*/
public synchronized long threadListSize() {
return threadList.size();
}
/**
* getThreadListIds
*/
public String getThreadListIds() {
return threadList.keySet().toString();
}
/**
* add
*/
public synchronized void doAction(int action, long id, Runnable runnable) throws DatabaseException {
switch (action) {
case ACTION_ADD:
if (executor == null) {
executor = Executors.newFixedThreadPool(MigrationParallel.parallelThreads);
}
threadList.put(id, runnable);
executor.execute(runnable);
break;
case ACTION_REMOVE:
threadList.remove(id);
if (threadList.size() == 0) {
if (executor != null) {
Thread clean = new Thread() {
public void run() {
try {
doAction(ACTION_CLEAN, 0, null);
} catch (DatabaseException e) {
e.printStackTrace();
}
}
};
clean.start();
}
}
break;
case ACTION_CLEAN:
if (threadList.size() == 0) {
if (executor != null) {
executor.shutdown();
log.info("### All threads shutdown requested ###");
try {
for (int i = 0; !executor.awaitTermination(MigrationParallel.threadExecutionTimeout, TimeUnit.MINUTES); i++) {
log.info("### Awaiting for execution pool tasks termination... ({}) ###", i);
}
} catch (InterruptedException e) {
log.warn("### Exception awaiting for dominion pool tasks termination: {} ###", e.getMessage());
}
//log.info("### All threads have finished ###");
threadList.clear();
executor = null;
}
}
break;
}
}
}