@@ -385,6 +385,7 @@ func Provider() terraform.ResourceProvider {
 		"azurerm_network_profile":                                                        resourceArmNetworkProfile(),
 		"azurerm_network_security_group":                                                 resourceArmNetworkSecurityGroup(),
 		"azurerm_network_security_rule":                                                  resourceArmNetworkSecurityRule(),
+		"azurerm_network_watcher_flow_log":                                               resourceArmNetworkWatcherFlowLog(),
 		"azurerm_network_watcher":                                                        resourceArmNetworkWatcher(),
 		"azurerm_netapp_account":                                                         resourceArmNetAppAccount(),
 		"azurerm_netapp_pool":                                                            resourceArmNetAppPool(),
@@ -374,7 +374,7 @@ func testCheckAzureRMNetworkConnectionMonitorDestroy(s *terraform.State) error {
 func testAccAzureRMNetworkConnectionMonitor_baseConfig(rInt int, location string) string {
 	return fmt.Sprintf(`
 resource "azurerm_resource_group" "test" {
-  name     = "acctestRG-%d"
+  name     = "acctestRG-watcher-%d"
   location = "%s"
@@ -204,7 +204,7 @@ func testCheckAzureRMNetworkPacketCaptureDestroy(s *terraform.State) error {
 func testAzureRMNetworkPacketCapture_base(rInt int, location string) string {
 	return fmt.Sprintf(`
 resource "azurerm_resource_group" "test" {
-  name     = "acctestRG-%d"
+  name     = "acctestRG-watcher-%d"
   location = "%s"
@@ -21,6 +21,7 @@ func resourceArmNetworkWatcher() *schema.Resource {
 		Read:   resourceArmNetworkWatcherRead,
 		Update: resourceArmNetworkWatcherCreateUpdate,
 		Delete: resourceArmNetworkWatcherDelete,
 		Importer: &schema.ResourceImporter{
 			State: schema.ImportStatePassthrough,
@@ -0,0 +1,392 @@
+package azurerm
+import (
+	"fmt"
+	"log"
+	"strings"
+	"time"
+	""
+	""
+	""
+	""
+	""
+	""
+	""
+	""
+type NetworkWatcherFlowLogAccountID struct {
+	azure.ResourceID
+	NetworkWatcherName     string
+	NetworkSecurityGroupID string
+func ParseNetworkWatcherFlowLogID(id string) (*NetworkWatcherFlowLogAccountID, error) {
+	parts := strings.Split(id, "/networkSecurityGroupId")
+	if len(parts) != 2 {
+		return nil, fmt.Errorf("Error: Network Watcher Flow Log ID could not be split on `/networkSecurityGroupId`: %s", id)
+	}
+	watcherId, err := azure.ParseAzureResourceID(parts[0])
+	if err != nil {
+		return nil, err
+	}
+	watcherName, ok := watcherId.Path["networkWatchers"]
+	if !ok {
+		return nil, fmt.Errorf("Error: Unable to parse Network Watcher Flow Log ID: networkWatchers is missing from: %s", id)
+	}
+	return &NetworkWatcherFlowLogAccountID{
+		ResourceID:             *watcherId,
+		NetworkWatcherName:     watcherName,
+		NetworkSecurityGroupID: parts[1],
+	}, nil
+func resourceArmNetworkWatcherFlowLog() *schema.Resource {
+	return &schema.Resource{
+		Create: resourceArmNetworkWatcherFlowLogCreateUpdate,
+		Read:   resourceArmNetworkWatcherFlowLogRead,
+		Update: resourceArmNetworkWatcherFlowLogCreateUpdate,
+		Delete: resourceArmNetworkWatcherFlowLogDelete,
+		Importer: &schema.ResourceImporter{
+			State: schema.ImportStatePassthrough,
+		},
+		Timeouts: &schema.ResourceTimeout{
+			Create: schema.DefaultTimeout(30 * time.Minute),
+			Read:   schema.DefaultTimeout(5 * time.Minute),
+			Update: schema.DefaultTimeout(30 * time.Minute),
+			Delete: schema.DefaultTimeout(30 * time.Minute),
+		},
+		Schema: map[string]*schema.Schema{
+			"network_watcher_name": {
+				Type:         schema.TypeString,
+				Required:     true,
+				ForceNew:     true,
+				ValidateFunc: validation.NoZeroValues,
+			},
+			"resource_group_name": azure.SchemaResourceGroupName(),
+			"network_security_group_id": {
+				Type:         schema.TypeString,
+				Required:     true,
+				ForceNew:     true,
+				ValidateFunc: azure.ValidateResourceID,
+			},
+			"storage_account_id": {
+				Type:         schema.TypeString,
+				Required:     true,
+				ValidateFunc: azure.ValidateResourceID,
+			},
+			"enabled": {
+				Type:     schema.TypeBool,
+				Required: true,
+			},
+			"retention_policy": {
+				Type:     schema.TypeList,
+				Required: true,
+				MaxItems: 1,
+				Elem: &schema.Resource{
+					Schema: map[string]*schema.Schema{
+						"enabled": {
+							Type:             schema.TypeBool,
+							Required:         true,
+							DiffSuppressFunc: azureRMSuppressFlowLogRetentionPolicyEnabledDiff,
+						},
+						"days": {
+							Type:             schema.TypeInt,
+							Required:         true,
+							DiffSuppressFunc: azureRMSuppressFlowLogRetentionPolicyDaysDiff,
+						},
+					},
+				},
+			},
+			"traffic_analytics": {
+				Type:     schema.TypeList,
+				Optional: true,
+				MaxItems: 1,
+				Elem: &schema.Resource{
+					Schema: map[string]*schema.Schema{
+						"enabled": {
+							Type:     schema.TypeBool,
+							Required: true,
+						},
+						"workspace_id": {
+							Type:         schema.TypeString,
+							Required:     true,
+							ValidateFunc: validate.UUID,
+						},
+						"workspace_region": {
+							Type:             schema.TypeString,
+							Required:         true,
+							StateFunc:        azure.NormalizeLocation,
+							DiffSuppressFunc: azure.SuppressLocationDiff,
+						},
+						"workspace_resource_id": {
+							Type:         schema.TypeString,
+							Required:     true,
+							ValidateFunc: azure.ValidateResourceIDOrEmpty,
+						},
+					},
+				},
+			},
+		},
+	}
+func azureRMSuppressFlowLogRetentionPolicyEnabledDiff(k, old, new string, d *schema.ResourceData) bool {
+	// Ignore if flow log is disabled as the returned flow log configuration
+	// returns default value `false` which may differ from config
+	return old != "" && !d.Get("enabled").(bool)
+func azureRMSuppressFlowLogRetentionPolicyDaysDiff(k, old, new string, d *schema.ResourceData) bool {
+	// Ignore if flow log is disabled as the returned flow log configuration
+	// returns default value `0` which may differ from config
+	return old != "" && !d.Get("enabled").(bool)
+func resourceArmNetworkWatcherFlowLogCreateUpdate(d *schema.ResourceData, meta interface{}) error {
+	client := meta.(*ArmClient).Network.WatcherClient
+	ctx, cancel := timeouts.ForCreateUpdate(meta.(*ArmClient).StopContext, d)
+	defer cancel()
+	networkWatcherName := d.Get("network_watcher_name").(string)
+	resourceGroupName := d.Get("resource_group_name").(string)
+	networkSecurityGroupID := d.Get("network_security_group_id").(string)
+	storageAccountID := d.Get("storage_account_id").(string)
+	enabled := d.Get("enabled").(bool)
+	parameters := network.FlowLogInformation{
+		TargetResourceID: &networkSecurityGroupID,
+		FlowLogProperties: &network.FlowLogProperties{
+			StorageID:       &storageAccountID,
+			Enabled:         &enabled,
+			RetentionPolicy: expandAzureRmNetworkWatcherFlowLogRetentionPolicy(d),
+		},
+	}
+	if _, ok := d.GetOk("traffic_analytics"); ok {
+		parameters.FlowAnalyticsConfiguration = expandAzureRmNetworkWatcherFlowLogTrafficAnalytics(d)
+	}
+	future, err := client.SetFlowLogConfiguration(ctx, resourceGroupName, networkWatcherName, parameters)
+	if err != nil {
+		return fmt.Errorf("Error setting Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", networkSecurityGroupID, networkWatcherName, resourceGroupName, err)
+	}
+	if err = future.WaitForCompletionRef(ctx, client.Client); err != nil {
+		return fmt.Errorf("Error waiting for completion of setting Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", networkSecurityGroupID, networkWatcherName, resourceGroupName, err)
+	}
+	resp, err := client.Get(ctx, resourceGroupName, networkWatcherName)
+	if err != nil {
+		return fmt.Errorf("Cannot read Network Watcher %q (Resource Group %q) err: %+v", networkWatcherName, resourceGroupName, err)
+	}
+	if resp.ID == nil {
+		return fmt.Errorf("Network Watcher %q is nil (Resource Group %q)", networkWatcherName, resourceGroupName)
+	}
+	d.SetId(*resp.ID + "/networkSecurityGroupId" + networkSecurityGroupID)
+	return resourceArmNetworkWatcherFlowLogRead(d, meta)
+func resourceArmNetworkWatcherFlowLogRead(d *schema.ResourceData, meta interface{}) error {
+	client := meta.(*ArmClient).Network.WatcherClient
+	ctx, cancel := timeouts.ForRead(meta.(*ArmClient).StopContext, d)
+	defer cancel()
+	id, err := ParseNetworkWatcherFlowLogID(d.Id())
+	if err != nil {
+		return err
+	}
+	// Get current flow log status
+	statusParameters := network.FlowLogStatusParameters{
+		TargetResourceID: &id.NetworkSecurityGroupID,
+	}
+	future, err := client.GetFlowLogStatus(ctx, id.ResourceGroup, id.NetworkWatcherName, statusParameters)
+	if err != nil {
+		if !response.WasNotFound(future.Response()) {
+			// One of storage account, NSG, or flow log is missing
+			log.Printf("[INFO] Error getting Flow Log Configuration %q for target %q - removing from state", d.Id(), id.NetworkSecurityGroupID)
+			d.SetId("")
+			return nil
+		}
+		return fmt.Errorf("Error retrieving Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+	}
+	if err = future.WaitForCompletionRef(ctx, client.Client); err != nil {
+		return fmt.Errorf("Error waiting for retrieval of Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+	}
+	fli, err := future.Result(*client)
+	if err != nil {
+		return fmt.Errorf("Error retrieving Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+	}
+	d.Set("network_watcher_name", id.NetworkWatcherName)
+	d.Set("resource_group_name", id.ResourceGroup)
+	d.Set("network_security_group_id", fli.TargetResourceID)
+	if err := d.Set("traffic_analytics", flattenAzureRmNetworkWatcherFlowLogTrafficAnalytics(fli.FlowAnalyticsConfiguration)); err != nil {
+		return fmt.Errorf("Error setting `traffic_analytics`: %+v", err)
+	}
+	if props := fli.FlowLogProperties; props != nil {
+		d.Set("enabled", props.Enabled)
+		// Azure API returns "" when flow log is disabled
+		// Don't overwrite to prevent storage account ID diff when that is the case
+		if props.StorageID != nil && *props.StorageID != "" {
+			d.Set("storage_account_id", props.StorageID)
+		}
+		if err := d.Set("retention_policy", flattenAzureRmNetworkWatcherFlowLogRetentionPolicy(props.RetentionPolicy)); err != nil {
+			return fmt.Errorf("Error setting `retention_policy`: %+v", err)
+		}
+	}
+	return nil
+func resourceArmNetworkWatcherFlowLogDelete(d *schema.ResourceData, meta interface{}) error {
+	client := meta.(*ArmClient).Network.WatcherClient
+	ctx, cancel := timeouts.ForDelete(meta.(*ArmClient).StopContext, d)
+	defer cancel()
+	id, err := ParseNetworkWatcherFlowLogID(d.Id())
+	if err != nil {
+		return err
+	}
+	// Get current flow log status
+	statusParameters := network.FlowLogStatusParameters{
+		TargetResourceID: &id.NetworkSecurityGroupID,
+	}
+	future, err := client.GetFlowLogStatus(ctx, id.ResourceGroup, id.NetworkWatcherName, statusParameters)
+	if err != nil {
+		return fmt.Errorf("Error getting Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+	}
+	if err = future.WaitForCompletionRef(ctx, client.Client); err != nil {
+		return fmt.Errorf("Error waiting for retrieval of Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+	}
+	fli, err := future.Result(*client)
+	if err != nil {
+		return fmt.Errorf("Error retrieving Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+	}
+	// There is no delete in Azure API. Disabling flow log is effectively a delete in Terraform.
+	if props := fli.FlowLogProperties; props != nil {
+		if props.Enabled != nil && *props.Enabled {
+			props.Enabled = utils.Bool(false)
+			setFuture, err := client.SetFlowLogConfiguration(ctx, id.ResourceGroup, id.NetworkWatcherName, fli)
+			if err != nil {
+				return fmt.Errorf("Error disabling Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+			}
+			if err = setFuture.WaitForCompletionRef(ctx, client.Client); err != nil {
+				return fmt.Errorf("Error waiting for completion of disabling Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+			}
+		}
+	}
+	return nil
+func expandAzureRmNetworkWatcherFlowLogRetentionPolicy(d *schema.ResourceData) *network.RetentionPolicyParameters {
+	vs := d.Get("retention_policy").([]interface{})
+	if len(vs) < 1 || vs[0] == nil {
+		return nil
+	}
+	v := vs[0].(map[string]interface{})
+	enabled := v["enabled"].(bool)
+	days := v["days"].(int)
+	return &network.RetentionPolicyParameters{
+		Enabled: utils.Bool(enabled),
+		Days:    utils.Int32(int32(days)),
+	}
+func flattenAzureRmNetworkWatcherFlowLogRetentionPolicy(input *network.RetentionPolicyParameters) []interface{} {
+	if input == nil {
+		return []interface{}{}
+	}
+	result := make(map[string]interface{})
+	if input.Enabled != nil {
+		result["enabled"] = *input.Enabled
+	}
+	if input.Days != nil {
+		result["days"] = *input.Days
+	}
+	return []interface{}{result}
+func flattenAzureRmNetworkWatcherFlowLogTrafficAnalytics(input *network.TrafficAnalyticsProperties) []interface{} {
+	if input == nil || input.NetworkWatcherFlowAnalyticsConfiguration == nil {
+		return []interface{}{}
+	}
+	result := make(map[string]interface{})
+	if cfg := input.NetworkWatcherFlowAnalyticsConfiguration; cfg != nil {
+		if cfg.Enabled != nil {
+			result["enabled"] = *cfg.Enabled
+		}
+		if cfg.WorkspaceID != nil {
+			result["workspace_id"] = *cfg.WorkspaceID
+		}
+		if cfg.WorkspaceRegion != nil {
+			result["workspace_region"] = *cfg.WorkspaceRegion
+		}
+		if cfg.WorkspaceResourceID != nil {
+			result["workspace_resource_id"] = *cfg.WorkspaceResourceID
+		}
+	}
+	return []interface{}{result}
+func expandAzureRmNetworkWatcherFlowLogTrafficAnalytics(d *schema.ResourceData) *network.TrafficAnalyticsProperties {
+	vs := d.Get("traffic_analytics").([]interface{})
+	v := vs[0].(map[string]interface{})
+	enabled := v["enabled"].(bool)
+	workspaceID := v["workspace_id"].(string)
+	workspaceRegion := v["workspace_region"].(string)
+	workspaceResourceID := v["workspace_resource_id"].(string)
+	return &network.TrafficAnalyticsProperties{
+		NetworkWatcherFlowAnalyticsConfiguration: &network.TrafficAnalyticsConfigurationProperties{
+			Enabled:             utils.Bool(enabled),
+			WorkspaceID:         utils.String(workspaceID),
+			WorkspaceRegion:     utils.String(workspaceRegion),
+			WorkspaceResourceID: utils.String(workspaceResourceID),
+		},
+	}
@@ -0,0 +1,537 @@
+package azurerm
+import (
+	"fmt"
+	"testing"
+	""
+	""
+	""
+	""
+func testAccAzureRMNetworkWatcherFlowLog_basic(t *testing.T) {
+	resourceName := "azurerm_network_watcher_flow_log.test"
+	ri := tf.AccRandTimeInt()
+	location := testLocation()
+	resource.Test(t, resource.TestCase{
+		PreCheck:     func() { testAccPreCheck(t) },
+		Providers:    testAccProviders,
+		CheckDestroy: testCheckAzureRMNetworkWatcherDestroy,
+		Steps: []resource.TestStep{
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_basicConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "false"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "0"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+				),
+			},
+			{
+				ResourceName:      resourceName,
+				ImportState:       true,
+				ImportStateVerify: true,
+			},
+		},
+	})
+func testAccAzureRMNetworkWatcherFlowLog_disabled(t *testing.T) {
+	resourceName := "azurerm_network_watcher_flow_log.test"
+	ri := tf.AccRandTimeInt()
+	location := testLocation()
+	resource.Test(t, resource.TestCase{
+		PreCheck:     func() { testAccPreCheck(t) },
+		Providers:    testAccProviders,
+		CheckDestroy: testCheckAzureRMNetworkWatcherDestroy,
+		Steps: []resource.TestStep{
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_disabledConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttrSet(resourceName, "retention_policy.0.enabled"),
+					resource.TestCheckResourceAttrSet(resourceName, "retention_policy.0.days"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "false"),
+				),
+			},
+			// disabled flow logs don't import all values
+		},
+	})
+func testAccAzureRMNetworkWatcherFlowLog_reenabled(t *testing.T) {
+	resourceName := "azurerm_network_watcher_flow_log.test"
+	ri := tf.AccRandTimeInt()
+	location := testLocation()
+	resource.Test(t, resource.TestCase{
+		PreCheck:     func() { testAccPreCheck(t) },
+		Providers:    testAccProviders,
+		CheckDestroy: testCheckAzureRMNetworkWatcherDestroy,
+		Steps: []resource.TestStep{
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_disabledConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttrSet(resourceName, "retention_policy.0.enabled"),
+					resource.TestCheckResourceAttrSet(resourceName, "retention_policy.0.days"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "false"),
+				),
+			},
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_basicConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "false"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "0"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+				),
+			},
+			{
+				ResourceName:      resourceName,
+				ImportState:       true,
+				ImportStateVerify: true,
+			},
+		},
+	})
+func testAccAzureRMNetworkWatcherFlowLog_retentionPolicy(t *testing.T) {
+	resourceName := "azurerm_network_watcher_flow_log.test"
+	ri := tf.AccRandTimeInt()
+	location := testLocation()
+	resource.Test(t, resource.TestCase{
+		PreCheck:     func() { testAccPreCheck(t) },
+		Providers:    testAccProviders,
+		CheckDestroy: testCheckAzureRMNetworkWatcherDestroy,
+		Steps: []resource.TestStep{
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_basicConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "false"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "0"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+				),
+			},
+			{
+				ResourceName:      resourceName,
+				ImportState:       true,
+				ImportStateVerify: true,
+			},
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_retentionPolicyConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "true"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "7"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+				),
+			},
+			{
+				ResourceName:      resourceName,
+				ImportState:       true,
+				ImportStateVerify: true,
+			},
+		},
+	})
+func testAccAzureRMNetworkWatcherFlowLog_updateStorageAccount(t *testing.T) {
+	resourceName := "azurerm_network_watcher_flow_log.test"
+	ri := tf.AccRandTimeInt()
+	location := testLocation()
+	resource.Test(t, resource.TestCase{
+		PreCheck:     func() { testAccPreCheck(t) },
+		Providers:    testAccProviders,
+		CheckDestroy: testCheckAzureRMNetworkWatcherDestroy,
+		Steps: []resource.TestStep{
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_retentionPolicyConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "true"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "7"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+				),
+			},
+			{
+				ResourceName:      resourceName,
+				ImportState:       true,
+				ImportStateVerify: true,
+			},
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_retentionPolicyConfigUpdateStorageAccount(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "true"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "7"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+				),
+			},
+			{
+				ResourceName:      resourceName,
+				ImportState:       true,
+				ImportStateVerify: true,
+			},
+		},
+	})
+func testAccAzureRMNetworkWatcherFlowLog_trafficAnalytics(t *testing.T) {
+	resourceName := "azurerm_network_watcher_flow_log.test"
+	ri := tf.AccRandTimeInt()
+	location := testLocation()
+	resource.Test(t, resource.TestCase{
+		PreCheck:     func() { testAccPreCheck(t) },
+		Providers:    testAccProviders,
+		CheckDestroy: testCheckAzureRMNetworkWatcherDestroy,
+		Steps: []resource.TestStep{
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_basicConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "false"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "0"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+				),
+			},
+			{
+				ResourceName:      resourceName,
+				ImportState:       true,
+				ImportStateVerify: true,
+			},
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_TrafficAnalyticsDisabledConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "true"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "7"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+				),
+			},
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_TrafficAnalyticsEnabledConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "true"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "7"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+					resource.TestCheckResourceAttr(resourceName, "traffic_analytics.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "traffic_analytics.0.enabled", "true"),
+					resource.TestCheckResourceAttrSet(resourceName, "traffic_analytics.0.workspace_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "traffic_analytics.0.workspace_region"),
+					resource.TestCheckResourceAttrSet(resourceName, "traffic_analytics.0.workspace_resource_id"),
+				),
+			},
+			{
+				ResourceName:      resourceName,
+				ImportState:       true,
+				ImportStateVerify: true,
+			},
+			// flow log must be disabled before destroy
+			{
+				Config: testAccAzureRMNetworkWatcherFlowLog_TrafficAnalyticsDisabledConfig(ri, location),
+				Check: resource.ComposeTestCheckFunc(
+					testCheckAzureRMNetworkWatcherFlowLogExists(resourceName),
+					resource.TestCheckResourceAttrSet(resourceName, "network_watcher_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "resource_group_name"),
+					resource.TestCheckResourceAttrSet(resourceName, "network_security_group_id"),
+					resource.TestCheckResourceAttrSet(resourceName, "storage_account_id"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.#", "1"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.enabled", "true"),
+					resource.TestCheckResourceAttr(resourceName, "retention_policy.0.days", "7"),
+					resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
+				),
+			},
+		},
+	})
+func testCheckAzureRMNetworkWatcherFlowLogExists(name string) resource.TestCheckFunc {
+	return func(s *terraform.State) error {
+		client := testAccProvider.Meta().(*ArmClient).Network.WatcherClient
+		ctx := testAccProvider.Meta().(*ArmClient).StopContext
+		// Ensure we have enough information in state to look up in API
+		rs, ok := s.RootModule().Resources[name]
+		if !ok {
+			return fmt.Errorf("Not found: %s", name)
+		}
+		id, err := ParseNetworkWatcherFlowLogID(rs.Primary.Attributes["id"])
+		if err != nil {
+			return err
+		}
+		statusParameters := network.FlowLogStatusParameters{
+			TargetResourceID: &id.NetworkSecurityGroupID,
+		}
+		future, err := client.GetFlowLogStatus(ctx, id.ResourceGroup, id.NetworkWatcherName, statusParameters)
+		if err != nil {
+			return fmt.Errorf("Error retrieving Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+		}
+		if err := future.WaitForCompletionRef(ctx, client.Client); err != nil {
+			return fmt.Errorf("Error waiting for retrieval of Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+		}
+		if _, err := future.Result(*client); err != nil {
+			return fmt.Errorf("Error retrieving of Flow Log Configuration for target %q (Network Watcher %q / Resource Group %q): %+v", id.NetworkSecurityGroupID, id.NetworkWatcherName, id.ResourceGroup, err)
+		}
+		return nil
+	}
+func testAccAzureRMNetworkWatcherFlowLog_prerequisites(rInt int, location string) string {
+	return fmt.Sprintf(` 
+resource "azurerm_resource_group" "test" {
+    name     = "acctestRG-watcher-%d"
+    location = "%s"
+resource "azurerm_network_security_group" "test" {
+    name                = "acctestNSG%d"
+    location            = "${azurerm_resource_group.test.location}"
+    resource_group_name = "${}"
+resource "azurerm_network_watcher" "test" {
+    name                = "acctest-NW-%d"
+    location            = "${azurerm_resource_group.test.location}"
+    resource_group_name = "${}"
+resource "azurerm_storage_account" "test" {
+    name                = "acctestsa%d"
+    resource_group_name = "${}"
+    location            = "${azurerm_resource_group.test.location}"
+    account_tier              = "Standard"
+    account_kind              = "StorageV2"
+    account_replication_type  = "LRS"
+    enable_https_traffic_only = true
+`, rInt, location, rInt, rInt, rInt%1000000)
+func testAccAzureRMNetworkWatcherFlowLog_basicConfig(rInt int, location string) string {
+	return fmt.Sprintf(`
+resource "azurerm_network_watcher_flow_log" "test" {
+    network_watcher_name = "${}"
+    resource_group_name  = "${}"
+    network_security_group_id = "${}"
+    storage_account_id        = "${}"
+    enabled                   = true
+    retention_policy {
+        enabled = false
+        days    = 0
+    }
+`, testAccAzureRMNetworkWatcherFlowLog_prerequisites(rInt, location))
+func testAccAzureRMNetworkWatcherFlowLog_retentionPolicyConfig(rInt int, location string) string {
+	return fmt.Sprintf(`
+resource "azurerm_network_watcher_flow_log" "test" {
+    network_watcher_name = "${}"
+    resource_group_name  = "${}"
+    network_security_group_id = "${}"
+    storage_account_id        = "${}"
+    enabled                   = true
+    retention_policy {
+        enabled = true
+        days    = 7
+    }
+`, testAccAzureRMNetworkWatcherFlowLog_prerequisites(rInt, location))
+func testAccAzureRMNetworkWatcherFlowLog_retentionPolicyConfigUpdateStorageAccount(rInt int, location string) string {
+	return fmt.Sprintf(`
+resource "azurerm_storage_account" "testb" {
+    name                = "acctestsab%d"
+    resource_group_name = "${}"
+    location            = "${azurerm_resource_group.test.location}"
+    account_tier              = "Standard"
+    account_kind              = "StorageV2"
+    account_replication_type  = "LRS"
+    enable_https_traffic_only = true
+resource "azurerm_network_watcher_flow_log" "test" {
+    network_watcher_name = "${}"
+    resource_group_name  = "${}"
+    network_security_group_id = "${}"
+    storage_account_id        = "${}"
+    enabled                   = true
+    retention_policy {
+        enabled = true
+        days    = 7
+    }
+`, testAccAzureRMNetworkWatcherFlowLog_prerequisites(rInt, location), rInt%1000000+1)
+func testAccAzureRMNetworkWatcherFlowLog_disabledConfig(rInt int, location string) string {
+	return fmt.Sprintf(`
+resource "azurerm_network_watcher_flow_log" "test" {
+    network_watcher_name = "${}"
+    resource_group_name  = "${}"
+    network_security_group_id = "${}"
+    storage_account_id        = "${}"
+    enabled                   = false
+    retention_policy {
+        enabled = true
+        days    = 7
+    }
+`, testAccAzureRMNetworkWatcherFlowLog_prerequisites(rInt, location))
+func testAccAzureRMNetworkWatcherFlowLog_TrafficAnalyticsEnabledConfig(rInt int, location string) string {
+	return fmt.Sprintf(`
+resource "azurerm_log_analytics_workspace" "test" {
+    name                = "acctestLAW-%d"
+    location            = "${azurerm_resource_group.test.location}"
+    resource_group_name = "${}"
+    sku                 = "PerGB2018"
+resource "azurerm_network_watcher_flow_log" "test" {
+    network_watcher_name = "${}"
+    resource_group_name  = "${}"
+    network_security_group_id = "${}"
+    storage_account_id        = "${}"
+    enabled                   = true
+    retention_policy {
+        enabled = true
+        days    = 7
+    }
+    traffic_analytics {
+        enabled               = true
+        workspace_id          = "${azurerm_log_analytics_workspace.test.workspace_id}"
+        workspace_region      = "${azurerm_log_analytics_workspace.test.location}"
+        workspace_resource_id = "${}"
+    }
+`, testAccAzureRMNetworkWatcherFlowLog_prerequisites(rInt, location), rInt)
+func testAccAzureRMNetworkWatcherFlowLog_TrafficAnalyticsDisabledConfig(rInt int, location string) string {
+	return fmt.Sprintf(`
+resource "azurerm_log_analytics_workspace" "test" {
+    name                = "acctestLAW-%d"
+    location            = "${azurerm_resource_group.test.location}"
+    resource_group_name = "${}"
+    sku                 = "PerGB2018"
+resource "azurerm_network_watcher_flow_log" "test" {
+    network_watcher_name = "${}"
+    resource_group_name  = "${}"
+    network_security_group_id = "${}"
+    storage_account_id        = "${}"
+    enabled                   = true
+    retention_policy {
+        enabled = true
+        days    = 7
+    }
+    traffic_analytics {
+        enabled               = false
+        workspace_id          = "${azurerm_log_analytics_workspace.test.workspace_id}"
+        workspace_region      = "${azurerm_log_analytics_workspace.test.location}"
+        workspace_resource_id = "${}"
+    }
+`, testAccAzureRMNetworkWatcherFlowLog_prerequisites(rInt, location), rInt)
@@ -65,6 +65,14 @@ func TestAccAzureRMNetworkWatcher(t *testing.T) {
 			"withFilters":                testAccAzureRMNetworkPacketCapture_withFilters,
 			"requiresImport":             testAccAzureRMNetworkPacketCapture_requiresImport,
+		"FlowLog": {
+			"basic":                testAccAzureRMNetworkWatcherFlowLog_basic,
+			"disabled":             testAccAzureRMNetworkWatcherFlowLog_disabled,
+			"reenabled":            testAccAzureRMNetworkWatcherFlowLog_reenabled,
+			"retentionPolicy":      testAccAzureRMNetworkWatcherFlowLog_retentionPolicy,
+			"updateStorageAccount": testAccAzureRMNetworkWatcherFlowLog_updateStorageAccount,
+			"trafficAnalytics":     testAccAzureRMNetworkWatcherFlowLog_trafficAnalytics,
+		},
 	for group, m := range testCases {
@@ -284,12 +292,12 @@ func testCheckAzureRMNetworkWatcherDestroy(s *terraform.State) error {
 func testAccAzureRMNetworkWatcher_basicConfig(rInt int, location string) string {
 	return fmt.Sprintf(`
 resource "azurerm_resource_group" "test" {
-  name     = "acctestRG-%d"
+  name     = "acctestRG-watcher-%d"
   location = "%s"
 resource "azurerm_network_watcher" "test" {
-  name                = "acctestnw-%d"
+  name                = "acctestNW-%d"
   location            = "${azurerm_resource_group.test.location}"
   resource_group_name = "${}"
@@ -312,12 +320,12 @@ resource "azurerm_network_watcher" "import" {
 func testAccAzureRMNetworkWatcher_completeConfig(rInt int, location string) string {
 	return fmt.Sprintf(`
 resource "azurerm_resource_group" "test" {
-  name     = "acctestRG-%d"
+  name     = "acctestRG-watcher-%d"
   location = "%s"
 resource "azurerm_network_watcher" "test" {
-  name                = "acctestnw-%d"
+  name                = "acctestNW-%d"
   location            = "${azurerm_resource_group.test.location}"
   resource_group_name = "${}"
@@ -1676,6 +1676,10 @@
                   <a href="/docs/providers/azurerm/r/network_watcher.html">azurerm_network_watcher</a>
+                <li>
+                  <a href="/docs/providers/azurerm/r/network_watcher_flow_log.html">azurerm_network_watcher_flow_log</a>
+                </li>
                   <a href="/docs/providers/azurerm/r/packet_capture.html">azurerm_packet_capture</a>
@@ -0,0 +1,120 @@
+subcategory: "Network"
+layout: "azurerm"
+page_title: "Azure Resource Manager: azurerm_network_watcher_flow_log"
+sidebar_current: "docs-azurerm-resource-network-watcher-flow-log"
+description: |-
+  Manages a Network Watcher Flow Log.
+# azurerm_network_watcher_flow_log
+Manages a Network Watcher Flow Log.
+## Example Usage
+resource "azurerm_resource_group" "test" {
+  name     = "acctestRG"
+  location = "eastus"
+resource "azurerm_network_security_group" "test" {
+  name                = "acctestnsg"
+  location            = "${azurerm_resource_group.test.location}"
+  resource_group_name = "${}"
+resource "azurerm_network_watcher" "test" {
+    name                = "acctestnw"
+    location            = "${azurerm_resource_group.test.location}"
+    resource_group_name = "${}"
+resource "azurerm_storage_account" "test" {
+  name                = "acctestsa"
+  resource_group_name = "${}"
+  location            = "${azurerm_resource_group.test.location}"
+  account_tier              = "Standard"
+  account_replication_type  = "LRS"
+  enable_https_traffic_only = true
+resource "azurerm_log_analytics_workspace" "test" {
+  name                = "acctestlaw"
+  location            = "${azurerm_resource_group.test.location}"
+  resource_group_name = "${}"
+  sku                 = "PerGB2018"
+resource "azurerm_network_watcher_flow_log" "test" {
+  network_watcher_name = "${}"
+  resource_group_name  = "${}"
+  network_security_group_id = "${}"
+  storage_account_id        = "${}"
+  enabled                   = true
+  retention_policy {
+    enabled = true
+    days    = 7
+  }
+  traffic_analytics {
+    enabled               = true
+    workspace_id          = "${azurerm_log_analytics_workspace.test.workspace_id}"
+    workspace_region      = "${azurerm_log_analytics_workspace.test.location}"
+    workspace_resource_id = "${}"
+  }
+## Argument Reference
+The following arguments are supported:
+* `network_watcher_name` - (Required) The name of the Network Watcher. Changing this forces a new resource to be created.
+* `resource_group_name` - (Required) The name of the resource group in which the Network Watcher was deployed. Changing this forces a new resource to be created.
+* `network_security_group_id` - (Required) The ID of the Network Security Group for which to enable flow logs for. Changing this forces a new resource to be created.
+* `storage_account_id` - (Required) The ID of the Storage Account where flow logs are stored.
+* `enabled` - (Required) Should Network Flow Logging be Enabled?
+* `retention_policy` - (Required) A `retention_policy` block as documented below.
+* `traffic_analytics` - (Optional) A `traffic_analytics` block as documented below.
+* `retention_policy` supports the following:
+* `enabled` - (Required) Boolean flag to enable/disable retention.
+* `days` - (Required) The number of days to retain flow log records.
+* `traffic_analytics` supports the following:
+* `enabled` - (Required) Boolean flag to enable/disable traffic analytics.
+* `workspace_id` - (Required) The resource guid of the attached workspace.
+* `workspace_region` - (Required) The location of the attached workspace.
+* `workspace_resource_id` - (Required) The resource ID of the attached workspace.
+## Attributes Reference
+The following attributes are exported:
+* `id` - The Network Watcher ID.
+## Import
+Network Watcher Flow Logs can be imported using the `resource id`, e.g.
+terraform import azurerm_network_watcher_flow_log.watcher1 /subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/mygroup1/providers/Microsoft.Network/networkWatchers/watcher1