Hello,
try following graph:
Code:
<?xml version="1.0" encoding="UTF-8"?>
<Graph id="1279265421952" name="OIA" revision="1.46">
<Global>
<Metadata id="Metadata0" >
<Record name="data" recordDelimiter="\n" type="delimited">
<Field eofAsDelimiter="true" name="data" type="string"/>
<Field auto_filling="source_name" delimiter="." name="input_file_name" type="string"/>
</Record>
</Metadata>
<Metadata id="Metadata1" >
<Record fieldDelimiter="," name="files" recordDelimiter="\n" type="delimited">
<Field name="input" type="string"/>
<Field name="output" type="string"/>
</Record>
</Metadata>
<Property fileURL="workspace.prm" id="GraphParameter0"/>
<LookupTable id="LookupTable0" initialSize="512" key="input" metadata="Metadata1" name="files" type="simpleLookup"/>
<Dictionary/>
</Global>
<Phase number="0">
<Node fileURL="${DATAIN_DIR}/*.txt" id="DATA_READER0" trim="false" type="DATA_READER"/>
<Node id="DENORMALIZER0" key="input_file_name" type="DENORMALIZER">
<attr name="denormalize"><![CDATA[//#CTL1
// This transformation defines the way in which multiple input records
// (with the same key) are denormalized into one output record.
// This function is called for each input record from a group of records
// with the same key.
function append() {
}
// This function is called once after the append() function was called for all records
// of a group of input records defined by the key.
// It creates a single output record for the whole group.
function transform() {
list parts = split($input_file_name,"/");
$input := $input_file_name;
$output := replace(parts[length(parts) - 1],"in","out");
return OK
}
// Called during component initialization.
// function boolean init() {}
// Called during each graph run before the transform is executed. May be used to allocate and initialize resources
// required by the transform. All resources allocated within this method should be released
// by the postExecute() method.
// function void preExecute() {}
// Called only if append() throws an exception.
// function integer appendOnError(string errorMessage, string stackTrace) {
// }
// Called only if transform() throws an exception.
//function integer transformOnError(string errorMessage, string stackTrace) {
//}
// Called after transform() to return the resources that have been used to their initial state
// so that next group of records with different key may be parsed.
// function void clean() {}
// Called during each graph run after the entire transform was executed. Should be used to free any resources
// allocated within the preExecute() method.
// function void postExecute() {}
// Called to return a user-defined error message when an error occurs.
// function string getMessage() {}
]]></attr>
</Node>
<Node id="LOOKUP_TABLE_READER_WRITER0" lookupTable="LookupTable0" type="LOOKUP_TABLE_READER_WRITER"/>
<Node id="SIMPLE_COPY0" type="SIMPLE_COPY"/>
<Edge fromNode="DATA_READER0:0" id="Edge2" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 0 (output)" toNode="SIMPLE_COPY0:0"/>
<Edge debugMode="true" fromNode="DENORMALIZER0:0" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 0 (out)" toNode="LOOKUP_TABLE_READER_WRITER0:0"/>
<Edge fromNode="SIMPLE_COPY0:0" id="Edge3" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 0 (out)" toNode="DATA_WRITER0:0"/>
<Edge fromNode="SIMPLE_COPY0:1" id="Edge4" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 1 (out)" toNode="DENORMALIZER0:0"/>
</Phase>
<Phase number="1">
<Node append="false" excludeFields="input_file_name" fileURL="${DATAOUT_DIR}/#" id="DATA_WRITER0" partition="LookupTable0" partitionFileTag="keyNameFileTag" partitionKey="input_file_name" partitionOutFields="output" type="DATA_WRITER"/>
</Phase>
</Graph>