From 3bb9e481e997757c65fc4dbe9c9c3f0a4903ac41 Mon Sep 17 00:00:00 2001 From: Paul Andrew <60190428+mrpaulandrew@users.noreply.github.com> Date: Sat, 22 Oct 2022 11:36:33 +0100 Subject: [PATCH] Adding dataflow: MappingOrderAggregationWithParam --- .../MappingOrderAggregationWithParam.json | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 Code/DataFactory/dataflow/MappingOrderAggregationWithParam.json diff --git a/Code/DataFactory/dataflow/MappingOrderAggregationWithParam.json b/Code/DataFactory/dataflow/MappingOrderAggregationWithParam.json new file mode 100644 index 0000000..d584639 --- /dev/null +++ b/Code/DataFactory/dataflow/MappingOrderAggregationWithParam.json @@ -0,0 +1,45 @@ +{ + "name": "MappingOrderAggregationWithParam", + "properties": { + "type": "MappingDataFlow", + "typeProperties": { + "sources": [ + { + "dataset": { + "referenceName": "LakeFileOrderHeaderParquet", + "type": "DatasetReference" + }, + "name": "OrderHeader" + }, + { + "dataset": { + "referenceName": "LakeFileOrderDetailLinesParquet", + "type": "DatasetReference" + }, + "name": "OrderLineDetails" + } + ], + "sinks": [ + { + "dataset": { + "referenceName": "TableOrderSummary", + "type": "DatasetReference" + }, + "name": "OrderSummary" + } + ], + "transformations": [ + { + "name": "JoinHeaderToLineDetails" + }, + { + "name": "OrderLineCount" + }, + { + "name": "AddAuditColum" + } + ], + "script": "parameters{\n\tAuditColumn as string\n}\nsource(output(\n\t\tSalesOrderID as integer,\n\t\tRevisionNumber as integer,\n\t\tOrderDate as timestamp,\n\t\tDueDate as timestamp,\n\t\tShipDate as timestamp,\n\t\tStatus as integer,\n\t\tOnlineOrderFlag as boolean,\n\t\tSalesOrderNumber as string,\n\t\tPurchaseOrderNumber as string,\n\t\tAccountNumber as string,\n\t\tCustomerID as integer,\n\t\tShipToAddressID as integer,\n\t\tBillToAddressID as integer,\n\t\tShipMethod as string,\n\t\tCreditCardApprovalCode as string,\n\t\tSubTotal as decimal(19,4),\n\t\tTaxAmt as decimal(19,4),\n\t\tFreight as decimal(19,4),\n\t\tTotalDue as decimal(19,4),\n\t\tComment as string,\n\t\trowguid as string,\n\t\tModifiedDate as timestamp\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false,\n\tignoreNoFilesFound: false,\n\tformat: 'parquet',\n\tpartitionBy('hash', 1)) ~> OrderHeader\nsource(output(\n\t\tSalesOrderID as integer,\n\t\tSalesOrderDetailID as integer,\n\t\tOrderQty as integer,\n\t\tProductID as integer,\n\t\tUnitPrice as decimal(19,4),\n\t\tUnitPriceDiscount as decimal(19,4),\n\t\tLineTotal as decimal(38,6),\n\t\trowguid as string,\n\t\tModifiedDate as timestamp\n\t),\n\tallowSchemaDrift: false,\n\tvalidateSchema: false,\n\tignoreNoFilesFound: false,\n\tformat: 'parquet',\n\tpartitionBy('hash', 1)) ~> OrderLineDetails\nOrderHeader, OrderLineDetails join(OrderHeader@SalesOrderID == OrderLineDetails@SalesOrderID,\n\tjoinType:'inner',\n\tmatchType:'exact',\n\tignoreSpaces: false,\n\tpartitionBy('hash', 1),\n\tbroadcast: 'both')~> JoinHeaderToLineDetails\nJoinHeaderToLineDetails aggregate(groupBy(SalesOrderNumber),\n\tRecordCount = count(SalesOrderDetailID),\n\tpartitionBy('roundRobin', 4)) ~> OrderLineCount\nOrderLineCount derive(AuditValue = $AuditColumn) ~> AddAuditColum\nAddAuditColum sink(allowSchemaDrift: false,\n\tvalidateSchema: false,\n\tinput(\n\t\tSalesOrderNumber as string,\n\t\tRecordCount as integer\n\t),\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\ttruncate:true,\n\tformat: 'table',\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true,\n\terrorHandlingOption: 'stopOnFirstError',\n\tmapColumn(\n\t\tSalesOrderNumber,\n\t\tRecordCount\n\t),\n\tpartitionBy('roundRobin', 4)) ~> OrderSummary" + } + } +} \ No newline at end of file