Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request - Add Bulk Data Load Endpoints #53

Open
bakes82 opened this issue Oct 21, 2022 · 2 comments
Open

Feature Request - Add Bulk Data Load Endpoints #53

bakes82 opened this issue Oct 21, 2022 · 2 comments

Comments

@bakes82
Copy link

bakes82 commented Oct 21, 2022

It would be nice to have these added.

https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/bulk_api_2_0.htm

@senj
Copy link

senj commented Oct 28, 2022

Just for reference, I did something like this:

public class SalesforceJobRepository : ISalesforceJobRepository
    {
        private readonly ILogger<SalesforceJobRepository> _logger;
        private readonly SalesforceFaultHandler _salesforceFaultHandler;
        private readonly IResilientSalesforceClient _resilientSalesforceClient;
        private readonly HttpClient _httpClient;
        private readonly string _instanceUrl;

        public SalesforceJobRepository(
            ILogger<SalesforceJobRepository> logger,
            SalesforceFaultHandler salesforceFaultHandler,
            IResilientSalesforceClient resilientSalesforceClient,
            HttpClient httpClient)
        {
            _logger = logger;
            _salesforceFaultHandler = salesforceFaultHandler;
            _resilientSalesforceClient = resilientSalesforceClient;
            _httpClient = httpClient;

            _instanceUrl = _resilientSalesforceClient.GetForceClient().InstanceUrl;
        }

        public async Task WriteProducts(IEnumerable<SalesforceProductModel> products)
        {
            string jobId = await CreateJobAsync();
            _logger.LogInformation("Created salesforce job with id: {id}", jobId);

            await AddDataToJobAsync(jobId, products);
            _logger.LogInformation("Added {count} products to job", products.Count());

            await CloseJobAsync(jobId);
            _logger.LogInformation("Closed job");
        }

        private async Task<string> CreateJobAsync()
        {
            // only for updating access token
            int count = await _salesforceFaultHandler.ExecuteAsync(() => _resilientSalesforceClient.CountQueryAsync("SELECT COUNT() FROM Product2", false));
            _logger.LogInformation("Before starting the job, there are {count} products in salesforce.", count);

            string instanceUrl = $"{_instanceUrl}/services/data/v53.0/jobs/ingest/";
            _logger.LogDebug("Working on: {url}", instanceUrl);

            using HttpRequestMessage request = new(HttpMethod.Post, instanceUrl);
            request.Content = new StringContent(JsonConvert.SerializeObject(new CreateJobRequest
            {
                ContentType = "CSV",
                Operation = "upsert",
                ExternalIdFieldName = "MaterialNumberExternalId__c",
                ObjectTypeName = TtsSfProduct2.SObjectTypeName,
                LineEnding = "CRLF"
            }), Encoding.UTF8, "application/json");

            request.Headers.Add("Authorization", $"Bearer {_resilientSalesforceClient.GetForceClient().AccessToken}");

            HttpResponseMessage response = await _httpClient.SendAsync(request);
            string content = await response.Content.ReadAsStringAsync();
            
            if (!response.IsSuccessStatusCode)
            {
                _logger.LogError("Unable to create salesforce job: {statusCode} {message}", response.StatusCode, content);
            }
            else
            {
                _logger.LogInformation("Creating salesforce job returned with: {statusCode} {message}", response.StatusCode, content);
            }

            return JsonConvert.DeserializeObject<CreateJobResponse>(content).Id;
        }

        private async Task AddDataToJobAsync(string jobId, IEnumerable<SalesforceProductModel> products)
        {
            string instanceUrl = $"{_instanceUrl}/services/data/v53.0/jobs/ingest/{jobId}/batches";

            using HttpRequestMessage request = new(HttpMethod.Put, instanceUrl);
            string content = JsonToCsv(JsonConvert.SerializeObject(products), ",");
            request.Content = new StringContent(content, Encoding.UTF8, "text/csv");

            request.Headers.Add("Authorization", $"Bearer {_resilientSalesforceClient.GetForceClient().AccessToken}");

            HttpResponseMessage response = await _httpClient.SendAsync(request);
            string responseContent = await response.Content?.ReadAsStringAsync();
            if (!response.IsSuccessStatusCode)
            {
                _logger.LogError("Unable to update salesforce job: {statusCode} {message}", response.StatusCode, responseContent);
            }
            else
            {
                _logger.LogInformation("Updating salesforce job returned with: {statusCode} {message}", response.StatusCode, responseContent);
            }

            response.EnsureSuccessStatusCode();
        }

        private async Task CloseJobAsync(string jobId)
        {
            string instanceUrl = $"{_instanceUrl}/services/data/v53.0/jobs/ingest/{jobId}";

            using HttpRequestMessage request = new(HttpMethod.Patch, instanceUrl);
            request.Content = new StringContent(JsonConvert.SerializeObject(new Models.Salesforce.JobModels.UpdateJobRequest 
            {
                State = "UploadComplete" 
            }), Encoding.UTF8, "application/json");

            request.Headers.Add("Authorization", $"Bearer {_resilientSalesforceClient.GetForceClient().AccessToken}");

            HttpResponseMessage response = await _httpClient.SendAsync(request);
            string content = await response.Content?.ReadAsStringAsync();
            if (!response.IsSuccessStatusCode)
            {
                _logger.LogError("Unable to close salesforce job: {statusCode} {message}", response.StatusCode, content);
            }
            else
            {
                _logger.LogInformation("Closing salesforce job returned with: {statusCode} {message}", response.StatusCode, content);
            }

            response.EnsureSuccessStatusCode();
        }

        private static string JsonToCsv(string jsonContent, string delimiter)
        {
            CsvConfiguration config = new(CultureInfo.CurrentCulture)
            {
                Delimiter = delimiter
            };

            ExpandoObject[] expandos = JsonConvert.DeserializeObject<ExpandoObject[]>(jsonContent);
            using StringWriter writer = new();
            using CsvWriter csv = new(writer, config);
            csv.WriteRecords(expandos as IEnumerable<dynamic>);

            return writer.ToString();
        }
    }

@bakes82
Copy link
Author

bakes82 commented Oct 31, 2022

Yup, I did roughly the same ;) Figured it would be a fairly straight forward ask, plus way better on API calls than trying to load thousands of records 1 by 1. Only gotchya is on the AddDataToJob, you cant exceed 150MB of data base64 encoded, they recommend 100MB if not base64 encoded, it will throw an error if you try to though if I remember correctly from my testing ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants