diff --git a/docs/API_Server.md b/docs/API_Server.md index 3837f737ed99..bc0e3f5569ac 100644 --- a/docs/API_Server.md +++ b/docs/API_Server.md @@ -396,6 +396,22 @@ and might later produce asynchronous messages such as: `{"params":{"data":[[3292.432935, 562534, 0.067059278], [3292.4394937, 5625322, 0.670590639]]}}` +### load_cell/dump_force + +This endpoint is used to subscribe to force data produced by a load_cell. +Using this endpoint may increase Klipper's system load. + +A request may look like: +`{"id": 123, "method":"load_cell/dump_force", +"params": {"sensor": "load_cell", "response_template": {}}}` +and might return: +`{"id": 123,"result":{"header":["time", "force (g)", "counts", "tare_counts"]}}` +and might later produce asynchronous messages such as: +`{"params":{"data":[[3292.432935, 40.65, 562534, -234467]]}}` + +The "header" field in the initial query response is used to describe +the fields found in later "data" responses. + ### pause_resume/cancel This endpoint is similar to running the "PRINT_CANCEL" G-Code command. diff --git a/docs/Config_Reference.md b/docs/Config_Reference.md index 1489670e4a13..761a0373f6cb 100644 --- a/docs/Config_Reference.md +++ b/docs/Config_Reference.md @@ -4758,6 +4758,18 @@ scale. [load_cell] sensor_type: # This must be one of the supported sensor types, see below. +#counts_per_gram: +# The floating point number of sensor counts that indicates 1 gram of force. +# This value is calculated by the CALIBRATE_LOAD_CELL command. +#reverse: +# Reverse the polarity of the force reported by the load cell. The default +# is False. +#reference_tare_counts: +# The integer tare value, in raw sensor counts, taken when CALIBRATE_LOAD_CELL +# is run. This is the default tare value when klipper starts up. +#reverse: +# Reverses the polarity of the load cell. This is a boolean value, the +# default is False. ``` #### HX711 diff --git a/docs/G-Codes.md b/docs/G-Codes.md index 78c1c74b14a5..f302adf8d7eb 100644 --- a/docs/G-Codes.md +++ b/docs/G-Codes.md @@ -721,6 +721,40 @@ and RAW sensor value for calibration points. #### DISABLE_FILAMENT_WIDTH_LOG `DISABLE_FILAMENT_WIDTH_LOG`: Turn off diameter logging. +### [load_cell] + +The following commands are enabled if a +[load_cell config section](Config_Reference.md#load_cell) has been enabled. + +### LOAD_CELL_DIAGNOSTIC +`LOAD_CELL_DIAGNOSTIC [LOAD_CELL=]`: This command collects 10 +seconds of load cell data and reports statistics that can help you verify proper +operation of the load cell. This command can be run on both calibrated and +uncalibrated load cells. + +### CALIBRATE_LOAD_CELL +`CALIBRATE_LOAD_CELL [LOAD_CELL=]`: Start the guided calibration +utility. Calibration is a 3 step process: +1. First you remove all load from the load cell and run the `TARE` command +1. Next you apply a known load to the load cell and run the +`CALIBRATE GRAMS=nnn` command +1. Finally use the `ACCEPT` command to save the results + +You can cancel the calibration process at any time with `ABORT`. + +### TARE_LOAD_CELL +`TARE_LOAD_CELL [LOAD_CELL=]`: This works just like the tare button +on digital scale. It sets the current raw reading of the load cell to be the +zero point reference value. The response is the percentage of the sensors range +that was read and the raw value in counts. + +### READ_LOAD_CELL load_cell="name" +`READ_LOAD_CELL [LOAD_CELL=]`: +This command takes a reading from the load cell. The response is the percentage +of the sensors range that was read and the raw value in counts. If the load cell +is calibrated a force in grams is also reported. + + ### [heaters] The heaters module is automatically loaded if a heater is defined in diff --git a/docs/Load_Cell.md b/docs/Load_Cell.md new file mode 100644 index 000000000000..b0c42208088a --- /dev/null +++ b/docs/Load_Cell.md @@ -0,0 +1,102 @@ +# Load Cells + +This document describes Klipper's support for load cells. Basic load cell +functionality can be used to read force data and to weigh things like filament. +A calibrated force sensor is an important part of a load cell based probe. + +## Related Documentation + +* [load_cell Config Reference](Config_Reference.md#load_cell) +* [load_cell G-Code Commands](G-Codes.md#load_cell) +* [load_cell Status Reference](Status_Reference.md#load_cell) + +## Using `LOAD_CELL_DIAGNOSTIC` + +When you first connect a load cell its good practice to check for issues by +running `LOAD_CELL_DIAGNOSTIC`. This tool collects 10 seconds of data from the +load cell and resport statistics: + +``` +$ LOAD_CELL_DIAGNOSTIC +// Collecting load cell data for 10 seconds... +// Samples Collected: 3211 +// Measured samples per second: 332.0 +// Good samples: 3211, Saturated samples: 0, Unique values: 900 +// Sample range: [4.01% to 4.02%] +// Sample range / sensor capacity: 0.00524% +``` + +Things you can check with this data: +* The configured sample rate of the sensor should be close to the 'Measured +samples per second' value. If it is not you may have a configuration or wiring +issue. +* 'Saturated samples' should be 0. If you have saturated samples it means the +load sell is seeing more force than it can measure. +* 'Unique values' should be a large percentage of the 'Samples +Collected' value. If 'Unique values' is 1 it is very likely a wiring issue. +* Tap or push on the sensor while `LOAD_CELL_DIAGNOSTIC` runs. If +things are working correctly ths should increase the 'Sample range'. + +## Calibrating a Load Cell + +Load cells are calibrated using the `CALIBRATE_LOAD_CELL` command. This is an +interactive calibration utility that walks you though a 3 step process: +1. First use the `TARE` command to establish the zero force value. This is the +`reference_tare_counts` config value. +2. Next you apply a known load or force to the load cell and run the +`CALIBRATE GRAMS=nnn` command. From this the `counts_per_gram` value is +calculated. See [the next section](#applying-a-known-force-or-load) for some +suggestions on how to do this. +3. Finally, use the `ACCEPT` command to save the results. + +You can cancel the calibration process at any time with `ABORT`. + +### Applying a Known Force or Load + +The `CALIBRATE GRAMS=nnn` step can be accomplished in a number of ways. If your +load cell is under a platform like a bed or filament holder it might be easiest +to put a known mass on the platform. E.g. you could use a couple of 1KG filament +spools. + +If your load cell is in the printer's toolhead a different approach is easier. +Put a digital scale on the printers bed and gently lower the toolhead onto the +scale (or raise the bed into the toolhead if your bed moves). You may be able to +do this using the `FORCE_MOVE` command. But more likely you will have to +manually moving the z axis with the motors off until the toolhead presses on the +scale. + +A good calibration force would ideally be a large percentage of the load cell's +rated capacity. E.g. if you have a 5Kg load cell you would ideally calibrate it +with a 5kg mass. This might work well with under-bed sensors that have to +support a lot of weight. For toolhead probes this may not be a load that your +printer bed or toolhead can tolerate without damage. Do try to use at least 1Kg +of force, most printers should tolerate this without issue. + +When calibrating make careful note of the values reported: +``` +$ CALIBRATE GRAMS=555 +// Calibration value: -2.78% (-59803108), Counts/gram: 73039.78739, +Total capacity: +/- 29.14Kg +``` +The `Total capacity` should be close to the rating of the load cell itself. If +it is much larger you could have used a higher gain setting in the sensor or a +more sensitive load cell. This isn't as critical for 32bit and 24bit sensors but +is much more critical for low bit width sensors. + +## Reading Force Data +Force data can be read with a GCode command: + +``` +READ_LOAD_CELL +// 10.6g (1.94%) +``` + +Data is also continuously read and can be consumed from the load_cell printer +object in a macro: + +``` +{% set grams = printer.load_cell.force_g %} +``` + +This provides an average force over the last 1 second, similar to how +temperature sensors work. diff --git a/docs/Status_Reference.md b/docs/Status_Reference.md index ee8099025c95..532954e1703d 100644 --- a/docs/Status_Reference.md +++ b/docs/Status_Reference.md @@ -303,6 +303,17 @@ The following information is available for each `[led led_name]`, chain could be accessed at `printer["neopixel "].color_data[1][2]`. +## load_cell + +The following information is available for each `[load_cell name]`: +- 'is_calibrated': True/False is the load cell calibrated +- 'counts_per_gram': The number of raw sensor counts that equals 1 gram of force +- 'reference_tare_counts': The reference number of raw sensor counts for 0 force +- 'tare_counts': The current number of raw sensor counts for 0 force +- 'force_g': The force in grams, averaged over the last polling period. +- 'min_force_g': The minimum force in grams, over the last polling period. +- 'max_force_g': The maximum force in grams, over the last polling period. + ## manual_probe The following information is available in the diff --git a/klippy/extras/load_cell.py b/klippy/extras/load_cell.py index 14f3c2983f99..91319236469f 100644 --- a/klippy/extras/load_cell.py +++ b/klippy/extras/load_cell.py @@ -3,21 +3,549 @@ # Copyright (C) 2024 Gareth Farrington # # This file may be distributed under the terms of the GNU GPLv3 license. + from . import hx71x from . import ads1220 +from .bulk_sensor import BatchWebhooksClient +import logging, collections, itertools +# We want either Python 3's zip() or Python 2's izip() but NOT 2's zip(): +zip_impl = zip +try: + from itertools import izip as zip_impl # python 2.x izip +except ImportError: # will be Python 3.x + pass + +# Helper for event driven webhooks (i.e. non polling based data source) +class WebhooksHelper(object): + def __init__(self, printer): + self.printer = printer + self.client_cbs = [] + self.webhooks_start_resp = {} + + # send data to clients + def send(self, msg): + for client_cb in list(self.client_cbs): + res = client_cb(msg) + if not res: + # This client no longer needs updates - unregister it + self.client_cbs.remove(client_cb) + + # Client registration + def add_client(self, client_cb): + self.client_cbs.append(client_cb) + + # Webhooks registration + def _add_api_client(self, web_request): + whbatch = BatchWebhooksClient(web_request) + self.add_client(whbatch.handle_batch) + web_request.send(self.webhooks_start_resp) + + def add_mux_endpoint(self, path, key, value, webhooks_start_resp): + self.webhooks_start_resp = webhooks_start_resp + wh = self.printer.lookup_object('webhooks') + wh.register_mux_endpoint(path, key, value, self._add_api_client) + +# Adapter for WebhooksHelper that transforms the response using a function +# Anything that implements the add_client contract can be a message source +# Outputs to its own clients +class WebhooksTransformer(WebhooksHelper): + def __init__(self, printer, msg_source, transform_fn): + super(WebhooksTransformer, self).__init__(printer) + self.msg_source = msg_source + self.transform_fn = transform_fn + self.is_started = False + + def _start(self): + if self.is_started: + return + self.is_started = True + self.msg_source.add_client(self._transform_batch) + + def _stop(self): + self.is_started = False + del self.client_cbs[:] + + def _transform_batch(self, msg): + try: + msg_transformed = self.transform_fn(msg) + except self.printer.command_error: + logging.exception("BatchBulkTransformer transform_batch error") + self._stop() + return self.is_started + if not msg_transformed: + return self.is_started + self.send(msg_transformed) + if len(self.client_cbs) == 0: + self._stop() + return self.is_started + + def add_client(self, client_cb): + self.client_cbs.append(client_cb) + self._start() + +# alternative to numpy's column selection: +def select_column(data, column_idx): + return list(zip_impl(*data))[column_idx] + +def avg(data): + return sum(data) / len(data) + +# Class for handling commands related ot load cells +class LoadCellCommandHelper: + def __init__(self, config, load_cell): + self.printer = config.get_printer() + self.load_cell = load_cell + name_parts = config.get_name().split() + self.name = name_parts[-1] + self.register_commands(self.name) + if len(name_parts) == 1: + self.register_commands(None) + + def register_commands(self, name): + # Register commands + gcode = self.printer.lookup_object('gcode') + gcode.register_mux_command("TARE_LOAD_CELL", "LOAD_CELL", name, + self.cmd_TARE_LOAD_CELL, + desc=self.cmd_TARE_LOAD_CELL_help) + gcode.register_mux_command("CALIBRATE_LOAD_CELL", "LOAD_CELL", name, + self.cmd_CALIBRATE_LOAD_CELL, + desc=self.cmd_CALIBRATE_LOAD_CELL_help) + gcode.register_mux_command("READ_LOAD_CELL", "LOAD_CELL", name, + self.cmd_READ_LOAD_CELL, + desc=self.cmd_READ_LOAD_CELL_help) + gcode.register_mux_command("LOAD_CELL_DIAGNOSTIC", "LOAD_CELL", name, + self.cmd_LOAD_CELL_DIAGNOSTIC, + desc=self.cmd_LOAD_CELL_DIAGNOSTIC_help) + + cmd_TARE_LOAD_CELL_help = "Set the Zero point of the load cell" + def cmd_TARE_LOAD_CELL(self, gcmd): + tare_counts = self.load_cell.avg_counts() + tare_percent = self.load_cell.counts_to_percent(tare_counts) + self.load_cell.tare(tare_counts) + gcmd.respond_info("Load cell tare value: %.2f%% (%i)" + % (tare_percent, tare_counts)) + + cmd_CALIBRATE_LOAD_CELL_help = "Start interactive calibration tool" + def cmd_CALIBRATE_LOAD_CELL(self, gcmd): + LoadCellGuidedCalibrationHelper(self.printer, self.load_cell) + + cmd_READ_LOAD_CELL_help = "Take a reading from the load cell" + def cmd_READ_LOAD_CELL(self, gcmd): + counts = self.load_cell.avg_counts() + percent = self.load_cell.counts_to_percent(counts) + force = self.load_cell.counts_to_grams(counts) + if percent >= 100 or percent <= -100: + gcmd.respond_info("Err (%.2f%%)" % (percent,)) + if force is None: + gcmd.respond_info("---.-g (%.2f%%)" % (percent,)) + else: + gcmd.respond_info("%.1fg (%.2f%%)" % (force, percent)) + + cmd_LOAD_CELL_DIAGNOSTIC_help = "Check the health of the load cell" + def cmd_LOAD_CELL_DIAGNOSTIC(self, gcmd): + gcmd.respond_info("Collecting load cell data for 10 seconds...") + collector = self.load_cell.get_collector() + reactor = self.printer.get_reactor() + collector.start_collecting() + reactor.pause(reactor.monotonic() + 10.) + samples, errors = collector.stop_collecting() + if errors: + gcmd.respond_info("Sensor reported errors: %i errors," + " %i overflows" % (errors[0], errors[1])) + else: + gcmd.respond_info("Sensor reported no errors") + if not samples: + raise gcmd.error("No samples returned from sensor!") + counts = select_column(samples, 2) + range_min, range_max = self.load_cell.saturation_range() + good_count = 0 + saturation_count = 0 + for sample in counts: + if sample >= range_max or sample <= range_min: + saturation_count += 1 + else: + good_count += 1 + gcmd.respond_info("Samples Collected: %i" % (len(samples))) + if len(samples) > 2: + sensor_sps = self.load_cell.sensor.get_samples_per_second() + sps = float(len(samples)) / (samples[-1][0] - samples[0][0]) + gcmd.respond_info("Measured samples per second: %.1f, " + "configured: %.1f" % (sps, sensor_sps)) + gcmd.respond_info("Good samples: %i, Saturated samples: %i, Unique" + " values: %i" % (good_count, saturation_count, + len(set(counts)))) + max_pct = self.load_cell.counts_to_percent(max(counts)) + min_pct = self.load_cell.counts_to_percent(min(counts)) + gcmd.respond_info("Sample range: [%.2f%% to %.2f%%]" + % (min_pct, max_pct)) + gcmd.respond_info("Sample range / sensor capacity: %.5f%%" + % ((max_pct - min_pct) / 2.)) + +# Class to guide the user through calibrating a load cell +class LoadCellGuidedCalibrationHelper: + def __init__(self, printer, load_cell): + self.printer = printer + self.gcode = printer.lookup_object('gcode') + self.load_cell = load_cell + self._tare_counts = self._counts_per_gram = None + self.tare_percent = 0. + self.register_commands() + self.gcode.respond_info( + "Starting load cell calibration. \n" + "1.) Remove all load and run TARE. \n" + "2.) Apply a known load, run CALIBRATE GRAMS=nnn. \n" + "Complete calibration with the ACCEPT command.\n" + "Use the ABORT command to quit.") + + def verify_no_active_calibration(self,): + try: + self.gcode.register_command('TARE', 'dummy') + except self.printer.config_error as e: + raise self.gcode.error( + "Already Calibrating a Load Cell. Use ABORT to quit.") + self.gcode.register_command('TARE', None) + + def register_commands(self): + self.verify_no_active_calibration() + register_command = self.gcode.register_command + register_command("ABORT", self.cmd_ABORT, desc=self.cmd_ABORT_help) + register_command("ACCEPT", self.cmd_ACCEPT, desc=self.cmd_ACCEPT_help) + register_command("TARE", self.cmd_TARE, desc=self.cmd_TARE_help) + register_command("CALIBRATE", self.cmd_CALIBRATE, + desc=self.cmd_CALIBRATE_help) + + # convert the delta of counts to a counts/gram metric + def counts_per_gram(self, grams, cal_counts): + return float(abs(int(self._tare_counts - cal_counts))) / grams + + # calculate max force that the load cell can register + # given tare bias, at saturation in kilograms + def capacity_kg(self, counts_per_gram): + range_min, range_max = self.load_cell.saturation_range() + return (int((range_max - abs(self._tare_counts)) / counts_per_gram) + / 1000.) + + def finalize(self, save_results=False): + for name in ['ABORT', 'ACCEPT', 'TARE', 'CALIBRATE']: + self.gcode.register_command(name, None) + if not save_results: + self.gcode.respond_info("Load cell calibration aborted") + return + if self._counts_per_gram is None or self._tare_counts is None: + self.gcode.respond_info("Calibration process is incomplete, " + "aborting") + self.load_cell.set_calibration(self._counts_per_gram, self._tare_counts) + self.gcode.respond_info("Load cell calibration settings:\n\n" + "counts_per_gram: %.6f\n" + "reference_tare_counts: %i\n\n" + "The SAVE_CONFIG command will update the printer config file" + " with the above and restart the printer." + % (self._counts_per_gram, self._tare_counts)) + self.load_cell.tare(self._tare_counts) + + cmd_ABORT_help = "Abort load cell calibration tool" + def cmd_ABORT(self, gcmd): + self.finalize(False) + + cmd_ACCEPT_help = "Accept calibration results and apply to load cell" + def cmd_ACCEPT(self, gcmd): + self.finalize(True) + + cmd_TARE_help = "Tare the load cell" + def cmd_TARE(self, gcmd): + self._tare_counts = self.load_cell.avg_counts() + self._counts_per_gram = None # require re-calibration on tare + self.tare_percent = self.load_cell.counts_to_percent(self._tare_counts) + gcmd.respond_info("Load cell tare value: %.2f%% (%i)" + % (self.tare_percent, self._tare_counts)) + if self.tare_percent > 2.: + gcmd.respond_info( + "WARNING: tare value is more than 2% away from 0!\n" + "The load cell's range will be impacted.\n" + "Check for external force on the load cell.") + gcmd.respond_info("Now apply a known force to the load cell and enter \ + the force value with:\n CALIBRATE GRAMS=nnn") -# Printer class that controls a load cell + cmd_CALIBRATE_help = "Enter the load cell value in grams" + def cmd_CALIBRATE(self, gcmd): + if self._tare_counts is None: + gcmd.respond_info("You must use TARE first.") + return + grams = gcmd.get_float("GRAMS", minval=50., maxval=25000.) + cal_counts = self.load_cell.avg_counts() + cal_percent = self.load_cell.counts_to_percent(cal_counts) + c_per_g = self.counts_per_gram(grams, cal_counts) + cap_kg = self.capacity_kg(c_per_g) + gcmd.respond_info("Calibration value: %.2f%% (%i), Counts/gram: %.5f, \ + Total capacity: +/- %0.2fKg" + % (cal_percent, cal_counts, c_per_g, cap_kg)) + range_min, range_max = self.load_cell.saturation_range() + if cal_counts >= range_max or cal_counts <= range_min: + raise self.printer.command_error( + "ERROR: Sensor is saturated with too much load!\n" + "Use less force to calibrate the load cell.") + if cal_counts == self._tare_counts: + raise self.printer.command_error( + "ERROR: Tare and Calibration readings are the same!\n" + "Check wiring and validate sensor with READ_LOAD_CELL command.") + if (abs(cal_percent - self.tare_percent)) < 1.: + raise self.printer.command_error( + "ERROR: Tare and Calibration readings are less than 1% " + "different!\n" + "Use more force when calibrating or a higher sensor gain.") + # only set _counts_per_gram after all errors are raised + self._counts_per_gram = c_per_g + if cap_kg < 1.: + gcmd.respond_info("WARNING: Load cell capacity is less than 1kg!\n" + "Check wiring and consider using a lower sensor gain.") + if cap_kg > 25.: + gcmd.respond_info("WARNING: Load cell capacity is more than 25Kg!\n" + "Check wiring and consider using a higher sensor gain.") + gcmd.respond_info("Accept calibration with the ACCEPT command.") + + +# Utility to collect some samples from the LoadCell for later analysis +# Optionally blocks execution while collecting with reactor.pause() +# can collect a minimum n samples or collect until a specific print_time +# samples returned in [[time],[force],[counts]] arrays for easy processing +RETRY_DELAY = 0.05 # 20Hz +class LoadCellSampleCollector: + def __init__(self, printer, load_cell): + self._printer = printer + self._load_cell = load_cell + self._reactor = printer.get_reactor() + self._mcu = load_cell.sensor.get_mcu() + self.min_time = 0. + self.max_time = float("inf") + self.min_count = float("inf") # In Python 3.5 math.inf is better + self.is_started = False + self._samples = [] + self._errors = 0 + self._overflows = 0 + + def _on_samples(self, msg): + if not self.is_started: + return False # already stopped, ignore + self._errors += msg['errors'] + self._overflows += msg['overflows'] + samples = msg['data'] + for sample in samples: + time = sample[0] + if self.min_time <= time <= self.max_time: + self._samples.append(sample) + if time > self.max_time: + self.is_started = False + if len(self._samples) >= self.min_count: + self.is_started = False + return self.is_started + + def _finish_collecting(self): + self.is_started = False + self.min_time = 0. + self.max_time = float("inf") + self.min_count = float("inf") # In Python 3.5 math.inf is better + samples = self._samples + self._samples = [] + errors = self._errors + self._errors = 0 + overflows = self._overflows + self._overflows = 0 + return samples, (errors, overflows) if errors or overflows else 0 + + def _collect_until(self, timeout): + self.start_collecting() + while self.is_started: + now = self._reactor.monotonic() + if self._mcu.estimated_print_time(now) > timeout: + self._finish_collecting() + raise self._printer.command_error( + "LoadCellSampleCollector timed out! Errors: %i," + " Overflows: %i" % (self._errors, self._overflows)) + self._reactor.pause(now + RETRY_DELAY) + return self._finish_collecting() + + # start collecting with no automatic end to collection + def start_collecting(self, min_time=None): + if self.is_started: + return + self.min_time = min_time if min_time is not None else self.min_time + self.is_started = True + self._load_cell.add_client(self._on_samples) + + # stop collecting immediately and return results + def stop_collecting(self): + return self._finish_collecting() + + # block execution until at least min_count samples are collected + # will return all samples collected, not just up to min_count + def collect_min(self, min_count=1): + self.min_count = min_count + if len(self._samples) >= min_count: + return self._finish_collecting() + print_time = self._mcu.estimated_print_time(self._reactor.monotonic()) + start_time = max(print_time, self.min_time) + sps = self._load_cell.sensor.get_samples_per_second() + return self._collect_until(start_time + 1. + (min_count / sps)) + + # returns when a sample is collected with a timestamp after print_time + def collect_until(self, print_time=None): + self.max_time = print_time + if len(self._samples) and self._samples[-1][0] >= print_time: + return self._finish_collecting() + return self._collect_until(self.max_time + 1.) + +# Printer class that controls the load cell +MIN_COUNTS_PER_GRAM = 1. class LoadCell: def __init__(self, config, sensor): self.printer = printer = config.get_printer() - self.sensor = sensor # must implement BulkAdcSensor + self.config_name = config.get_name() + self.name = config.get_name().split()[-1] + self.sensor = sensor # must implement BulkSensorAdc + buffer_size = sensor.get_samples_per_second() // 2 + self._force_buffer = collections.deque(maxlen=buffer_size) + self.reference_tare_counts = config.getint('reference_tare_counts', + default=None) + self.tare_counts = self.reference_tare_counts + self.counts_per_gram = config.getfloat('counts_per_gram', + minval=MIN_COUNTS_PER_GRAM, default=None) + self.is_reversed = config.getboolean('reverse', default=False) + self.reverse = -1 if self.is_reversed else 1 + LoadCellCommandHelper(config, self) + # webhooks support + self.wh_transformer = WebhooksTransformer(printer, sensor, + self._sensor_data_event) + header = {"header": ["time", "force (g)", "counts", "tare_counts"]} + self.wh_transformer.add_mux_endpoint("load_cell/dump_force", + "load_cell", self.name, header) + # startup, when klippy is ready, start capturing data + printer.register_event_handler("klippy:ready", self._handle_ready) + + def _handle_ready(self): + self.add_client(self._on_sample) + # announce calibration status on ready + if self.is_calibrated(): + self.printer.send_event("load_cell:calibrate", self) + if self.is_tared(): + self.printer.send_event("load_cell:tare", self) + + # convert raw counts to grams and broadcast to clients + def _sensor_data_event(self, msg): + data = msg.get("data") + errors = msg.get("errors") + overflows = msg.get("overflows") + if data is None: + return None + samples = [] + for row in data: + # [time, grams, counts, tare_counts] + samples.append([row[0], self.counts_to_grams(row[1]), row[1], + self.tare_counts]) + return {'data': samples, 'errors': errors, 'overflows': overflows} + + # get internal events of force data + def add_client(self, callback): + self.wh_transformer.add_client(callback) + + def tare(self, tare_counts): + self.tare_counts = int(tare_counts) + self.printer.send_event("load_cell:tare", self) + + def set_calibration(self, counts_per_gram, tare_counts): + if (counts_per_gram is None + or abs(counts_per_gram) < MIN_COUNTS_PER_GRAM): + raise self.printer.command_error("Invalid counts per gram value") + if tare_counts is None: + raise self.printer.command_error("Missing tare counts") + self.counts_per_gram = counts_per_gram + self.reference_tare_counts = int(tare_counts) + configfile = self.printer.lookup_object('configfile') + configfile.set(self.config_name, 'counts_per_gram', + "%.5f" % (self.counts_per_gram,)) + configfile.set(self.config_name, 'reference_tare_counts', + "%i" % (self.reference_tare_counts,)) + self.printer.send_event("load_cell:calibrate", self) + + def counts_to_grams(self, sample): + if not self.is_calibrated() or not self.is_tared(): + return None + sample_delta = float(sample - self.tare_counts) + return self.reverse * (sample_delta / self.counts_per_gram) + + # The maximum range of the sensor based on its bit width + def saturation_range(self): + return self.sensor.get_range() + + # convert raw counts to a +/- percentage of the sensors range + def counts_to_percent(self, counts): + range_min, range_max = self.saturation_range() + return (float(counts) / float(range_max)) * 100. + + # read 1 second of load cell data and average it + # performs safety checks for saturation + def avg_counts(self, num_samples=None): + if num_samples is None: + num_samples = self.sensor.get_samples_per_second() + samples, errors = self.get_collector().collect_min(num_samples) + if errors: + raise self.printer.command_error( + "Sensor reported %i errors while sampling" + % (errors[0] + errors[1])) + # check samples for saturated readings + range_min, range_max = self.saturation_range() + for sample in samples: + if sample[2] >= range_max or sample[2] <= range_min: + raise self.printer.command_error( + "Some samples are saturated (+/-100%)") + return avg(select_column(samples, 2)) def _on_sample(self, msg): + if not (self.is_calibrated() and self.is_tared()): + return True + samples = msg['data'] + for sample in samples: + self._force_buffer.append(sample[1]) return True + def _force_g(self): + if (self.is_calibrated() and self.is_tared() + and len(self._force_buffer) > 0): + return {"force_g": round(avg(self._force_buffer), 1), + "min_force_g": round(min(self._force_buffer), 1), + "max_force_g": round(max(self._force_buffer), 1)} + return {} + + def is_tared(self): + return self.tare_counts is not None + + def is_calibrated(self): + return (self.counts_per_gram is not None + and self.reference_tare_counts is not None) + def get_sensor(self): return self.sensor + def get_reference_tare_counts(self): + return self.reference_tare_counts + + def get_tare_counts(self): + return self.tare_counts + + def get_counts_per_gram(self): + return self.counts_per_gram + + def get_collector(self): + return LoadCellSampleCollector(self.printer, self) + + def get_status(self, eventtime): + status = self._force_g() + status.update({'is_calibrated': self.is_calibrated(), + 'counts_per_gram': self.counts_per_gram, + 'reference_tare_counts': self.reference_tare_counts, + 'tare_counts': self.tare_counts}) + return status + + def load_config(config): # Sensor types sensors = {}