Friday, August 2, 2013

Using webMethods Java service (StAX Parser) to split a huge XML file in to multiple small files.

I did this same requirement using SAX parser in my previous post. The limitation using the SAX Parser was that if an XML was more than 300 MB, the service would run out of memory trying to split the xml. Because of the out of memory issues, i had to finally opt for StAX paraser since it is more efficient than the SAX or DOM (definitely not). I have tested this code with up to 2 GB files and it works with no issues. For a 1 GB file it took about 38 Mins to execute.

File:
<?xml version="1.0" encoding="UTF-8?>
<rootNode>
       <childNode>
               <ordernumber>12354</ordernumber>
       </childNode>
       <childNode>
               <ordernumber>12355</ordernumber>
       </childNode>
       <childNode>
               <ordernumber>12356</ordernumber>
       </childNode>
</rootNode>
The result should look like this:

file1:
<?xml version="1.0" encoding="UTF-8?>
<rootNode>
       <childNode>
               <ordernumber>12354</ordernumber>
       </childNode>
       <childNode>
               <ordernumber>12355</ordernumber>
       </childNode>
</rootNode>
file2:
<?xml version="1.0" encoding="UTF-8?>
<rootNode>
       <childNode>
               <ordernumber>12356</ordernumber>
       </childNode>
</rootNode>
Imports:
import java.io.*;
import javax.xml.namespace.QName;
import javax.xml.stream.*;
import javax.xml.stream.events.*;
import javax.xml.transform.stream.StreamSource;
Inputs: xmlFileAbsoluteName, rootNode, repeatingNode, splitFilesTargetDirectory, splitFilePrefixName, numOfRecordsPerFile.
Java Service Code:
IDataCursor pipelineCursor = pipeline.getCursor();
String	xmlFileAbsoluteName = IDataUtil.getString( pipelineCursor, "xmlFileAbsoluteName" );
String	rootNode = IDataUtil.getString( pipelineCursor, "rootNode" );
String	repeatingNode = IDataUtil.getString( pipelineCursor, "repeatingNode" );
String	splitFilesTargetDirectory = IDataUtil.getString( pipelineCursor, "splitFilesTargetDirectory" );
String	splitFilePrefixName = IDataUtil.getString( pipelineCursor, "splitFilePrefixName" );
String	numOfRecordsPerFile = IDataUtil.getString( pipelineCursor, "numOfRecordsPerFile" );		
pipelineCursor.destroy();
		
writeToApplicationLogFile("START: Start executing the Java code to split large XML to small XML files.", logFileName, project);
XMLInputFactory inputFactory = XMLInputFactory.newInstance();
XMLOutputFactory outputFactory = XMLOutputFactory.newInstance();	    
outputFactory.setProperty("javax.xml.stream.isRepairingNamespaces", Boolean.TRUE);
		            
int count = 0;
File f = new File(xmlFileAbsoluteName);
String sourceFilePath = f.getParent();		
String tempOutputFilePrefix=sourceFilePath+"\\"+"tempNodeFile";
QName name = new QName(repeatingNode);
XMLEventReader reader;		
try {
	FileReader tempFileReader=new FileReader(xmlFileAbsoluteName);
	reader = inputFactory.createXMLEventReader(tempFileReader);
	while (reader.hasNext()) {
		XMLEvent event = reader.nextEvent();
		if(event.getEventType() == XMLStreamConstants.START_ELEMENT){
			StartElement startElement = event.asStartElement();
			if(startElement.getName().equals(name)){
			writeToFile(reader, event, tempOutputFilePrefix+ (count++) + ".xml");
			}
		}
		if (event.isEndDocument())
			break;
		}
		reader.close();
		tempFileReader.close();
			
		File renameOrigianlFile=new File(xmlFileAbsoluteName+"_processed");
		f.renameTo(renameOrigianlFile);
		int chunkSize=Integer.parseInt(numOfRecordsPerFile);
		mergeXMLFiles(sourceFilePath, splitFilesTargetDirectory+"\\"+splitFilePrefixName, chunkSize, rootNode);
		IDataUtil.put(pipelineCursor, "splitStatus", "success");
		writeToApplicationLogFile("END: Finished executing the Java code to split large XML Files. ", logFileName, project);
		} catch (FileNotFoundException e) {
			IDataUtil.put(pipelineCursor, "splitStatus", e.toString());
			writeToApplicationLogFile("ERROR: "+e.toString(), logFileName, project);
		} catch (XMLStreamException e) {
			IDataUtil.put(pipelineCursor, "splitStatus", e.toString());
			writeToApplicationLogFile("ERROR: "+e.toString(), logFileName, project);
		} catch (IOException e) {
			IDataUtil.put(pipelineCursor, "splitStatus", e.toString());
			writeToApplicationLogFile("ERROR: "+e.toString(), logFileName, project);
		}
		pipelineCursor.destroy();
	}

Shared Code:


static String logFileName="..\\IntegrationServer\\logs\\LargeFileHandling.log";
static String project="Test Calling application";

private static void writeToFile(XMLEventReader reader, XMLEvent startEvent,String filename) throws XMLStreamException, IOException {
		XMLOutputFactory outputFactory = XMLOutputFactory.newInstance();	    
		outputFactory.setProperty("javax.xml.stream.isRepairingNamespaces", Boolean.TRUE);
		XMLEventFactory eventFactory = XMLEventFactory.newInstance();
		
		StartElement element = startEvent.asStartElement();
		QName name = element.getName();
		int stack = 1;
		XMLEventWriter writer=null;
		File nodeFileName=new File(filename);
		
		FileWriter nodeFileWriter=new FileWriter(nodeFileName);
		try {
			writer = outputFactory.createXMLEventWriter(nodeFileWriter);
			writer.add(eventFactory.createStartDocument("UTF-8", "1.0"));			
			writer.add(element);			
			while (reader.hasNext()) {
			XMLEvent event = reader.nextEvent();
			if (event.isStartElement()
			        && event.asStartElement().getName().equals(name))
			    stack++;
			if (event.isEndElement()) {
			    EndElement end = event.asEndElement();
			    if (end.getName().equals(name)) {
			        stack--;
			        if (stack == 0) {
			            writer.add(event);
			            break;
			        }
			    }
			}
			writer.add(event);
			}
		} catch (Exception e) {
			writeToApplicationLogFile("ERROR: "+e.toString(), logFileName, project);
		}
		finally{			
			writer.flush();
			writer.close();
			nodeFileWriter.close();
		}
}

private static void writeToApplicationLogFile(String logData,String logFileName, String project){
		IData input = IDataFactory.create();
		IDataCursor inputCursor = input.getCursor();
		IDataUtil.put( inputCursor, "data", logData);
		IDataUtil.put( inputCursor, "logFileName", logFileName );
		IDataUtil.put( inputCursor, "projectName", project);
		inputCursor.destroy();
		IData 	output = IDataFactory.create();
		try{
			output = Service.doInvoke( "test.logging.services", "logService", input );
		}catch( Exception e){}
}

public static void mergeXMLFiles(String sourceDirectory,String destinationFileName, int chunkSize, String rootNode){
		
		writeToApplicationLogFile("START: Start executing the mergeXMLFiels static method.", logFileName, project);
		File dir = new File(sourceDirectory);
		int totalFilesCount=dir.list().length;
		int fileCount=0;
		double chunksCountDecimal=(double)totalFilesCount/(double)chunkSize;
		int chunksCount=(int) Math.ceil(chunksCountDecimal);		
		Writer outputWriter = null;
		XMLEventWriter xmlEventWriter = null;
		
		//loop to create the number of files with limited number of records.
		for (int i = 0; i < chunksCount; i++) {
			File[] rootFiles = dir.listFiles(new FilenameFilter() { 
				public boolean accept(File dir, String filename)
				     { 
				    	 return filename.endsWith(".xml"); 
				     }
				}
			);			
		int noOfFiles=rootFiles.length;		
		try {
				outputWriter = new FileWriter(destinationFileName+fileCount+".xml");
				XMLOutputFactory xmlOutFactory = XMLOutputFactory.newFactory();
				xmlEventWriter = xmlOutFactory.createXMLEventWriter(outputWriter);
				XMLEventFactory xmlEventFactory = XMLEventFactory.newFactory();
				xmlEventWriter.add(xmlEventFactory.createStartDocument("UTF-8", "1.0"));
				xmlEventWriter.add(xmlEventFactory.createStartElement("", null, rootNode));				
				XMLInputFactory xmlInFactory = XMLInputFactory.newFactory();
				for (int i1 = 0; i1 < chunkSize; i1++) {
					if(i1


No comments:

Post a Comment