diff --git a/abletonosc/arrangement.py b/abletonosc/arrangement.py new file mode 100644 index 0000000..73ee308 --- /dev/null +++ b/abletonosc/arrangement.py @@ -0,0 +1,210 @@ +from typing import Tuple, Any +from .handler import AbletonOSCHandler +import Live +import logging + +logger = logging.getLogger("abletonosc") + + +class ArrangementHandler(AbletonOSCHandler): + def __init__(self, manager): + super().__init__(manager) + self.class_identifier = "arrangement" + + def init_api(self): + def create_midi_clip(params: Tuple[Any]): + """ + Create an empty MIDI clip in the arrangement view. + /live/arrangement/create_midi_clip (track, start_time, length) + Requires Live 12.2+ + """ + track, track_id = self._resolve_track(params[0]) + start_time = float(params[1]) + length = float(params[2]) + + track.create_midi_clip(start_time, length) + + logger.info("Created arrangement MIDI clip on track %s at beat %.1f, length %.1f" % + (track_id, start_time, length)) + return (track_id, start_time, length) + + def duplicate_to_arrangement(params: Tuple[Any]): + """ + Copy a session clip to the arrangement at a specific beat position. + /live/arrangement/duplicate_to_arrangement (track, clip_slot_index, destination_time) + """ + track, track_id = self._resolve_track(params[0]) + clip_slot_index = int(params[1]) + destination_time = float(params[2]) + + if not hasattr(track, 'clip_slots'): + raise ValueError("Track %s has no clip slots" % track_id) + + clip_slot = track.clip_slots[clip_slot_index] + + if not clip_slot.has_clip: + raise ValueError("No clip in slot %d on track %s" % (clip_slot_index, track_id)) + + clip = clip_slot.clip + track.duplicate_clip_to_arrangement(clip, destination_time) + + logger.info("Duplicated session clip (track %s, slot %d) to arrangement at beat %.1f" % + (track_id, clip_slot_index, destination_time)) + return (track_id, clip_slot_index, destination_time) + + def delete_clip(params: Tuple[Any]): + """ + Delete an arrangement clip. + /live/arrangement/delete_clip (track, clip_index) + """ + track, track_id = self._resolve_track(params[0]) + clip_index = int(params[1]) + + clips = track.arrangement_clips + + if clip_index >= len(clips): + raise ValueError("Clip index %d out of range (track has %d arrangement clips)" % + (clip_index, len(clips))) + + clip = clips[clip_index] + track.delete_clip(clip) + + logger.info("Deleted arrangement clip %d on track %s" % (clip_index, track_id)) + return (track_id, clip_index) + + def get_clips(params: Tuple[Any]): + """ + List arrangement clips for a track. + /live/arrangement/get/clips (track) + Returns: (track_id, name, start_time, length, is_midi, ...) + """ + track, track_id = self._resolve_track(params[0]) + clips = track.arrangement_clips + + result = [track_id] + for clip in clips: + try: + result.append(clip.name) + result.append(clip.start_time) + result.append(clip.length) + result.append(1 if clip.is_midi_clip else 0) + except Exception as e: + logger.warning("Error reading arrangement clip: %s" % e) + continue + + return tuple(result) + + def get_notes(params: Tuple[Any]): + """ + Get MIDI notes from an arrangement clip. + /live/arrangement/get/notes (track, clip_index, [start_pitch, pitch_span, start_time, time_span]) + """ + track, track_id = self._resolve_track(params[0]) + clip_index = int(params[1]) + clips = track.arrangement_clips + + if clip_index >= len(clips): + raise ValueError("Clip index %d out of range" % clip_index) + + clip = clips[clip_index] + if not clip.is_midi_clip: + raise ValueError("Clip at index %d is not a MIDI clip" % clip_index) + + if len(params) >= 6: + from_pitch = int(params[2]) + pitch_span = int(params[3]) + from_time = float(params[4]) + time_span = float(params[5]) + else: + from_pitch = 0 + pitch_span = 128 + from_time = 0.0 + time_span = clip.length + + notes = clip.get_notes_extended(from_pitch, pitch_span, from_time, time_span) + + result = [track_id, clip_index] + for note in notes: + result.append(note.pitch) + result.append(note.start_time) + result.append(note.duration) + result.append(note.velocity) + result.append(1 if note.mute else 0) + + return tuple(result) + + def add_notes(params: Tuple[Any]): + """ + Add MIDI notes to an arrangement clip. + /live/arrangement/add/notes (track, clip_index, pitch, start, duration, velocity, mute, ...) + """ + track, track_id = self._resolve_track(params[0]) + clip_index = int(params[1]) + clips = track.arrangement_clips + + if clip_index >= len(clips): + raise ValueError("Clip index %d out of range" % clip_index) + + clip = clips[clip_index] + if not clip.is_midi_clip: + raise ValueError("Clip at index %d is not a MIDI clip" % clip_index) + + note_params = params[2:] + notes = [] + for i in range(0, len(note_params), 5): + if i + 4 >= len(note_params): + break + spec = Live.Clip.MidiNoteSpecification( + pitch=int(note_params[i]), + start_time=float(note_params[i + 1]), + duration=float(note_params[i + 2]), + velocity=float(note_params[i + 3]), + mute=bool(int(note_params[i + 4])) + ) + notes.append(spec) + + if notes: + clip.add_new_notes(tuple(notes)) + logger.info("Added %d notes to arrangement clip %d on track %s" % + (len(notes), clip_index, track_id)) + + return (track_id, clip_index, len(notes)) + + def remove_notes(params: Tuple[Any]): + """ + Remove notes from an arrangement clip by range. + /live/arrangement/remove/notes (track, clip_index, [from_pitch, pitch_span, from_time, time_span]) + """ + track, track_id = self._resolve_track(params[0]) + clip_index = int(params[1]) + clips = track.arrangement_clips + + if clip_index >= len(clips): + raise ValueError("Clip index %d out of range" % clip_index) + + clip = clips[clip_index] + if not clip.is_midi_clip: + raise ValueError("Clip at index %d is not a MIDI clip" % clip_index) + + if len(params) >= 6: + from_pitch = int(params[2]) + pitch_span = int(params[3]) + from_time = float(params[4]) + time_span = float(params[5]) + else: + from_pitch = 0 + pitch_span = 128 + from_time = 0.0 + time_span = clip.length + + clip.remove_notes_extended(from_pitch, pitch_span, from_time, time_span) + logger.info("Removed notes from arrangement clip %d on track %s" % (clip_index, track_id)) + return (track_id, clip_index) + + self.osc_server.add_handler("/live/arrangement/create_midi_clip", create_midi_clip) + self.osc_server.add_handler("/live/arrangement/duplicate_to_arrangement", duplicate_to_arrangement) + self.osc_server.add_handler("/live/arrangement/delete_clip", delete_clip) + self.osc_server.add_handler("/live/arrangement/get/clips", get_clips) + self.osc_server.add_handler("/live/arrangement/get/notes", get_notes) + self.osc_server.add_handler("/live/arrangement/add/notes", add_notes) + self.osc_server.add_handler("/live/arrangement/remove/notes", remove_notes) diff --git a/abletonosc/device.py b/abletonosc/device.py index 19c0681..24b5161 100644 --- a/abletonosc/device.py +++ b/abletonosc/device.py @@ -9,21 +9,23 @@ def __init__(self, manager): def init_api(self): def create_device_callback(func, *args, include_ids: bool = False): def device_callback(params: Tuple[Any]): - track_index, device_index = int(params[0]), int(params[1]) - device = self.song.tracks[track_index].devices[device_index] + track, track_id = self._resolve_track(params[0]) + device_index = int(params[1]) + device = track.devices[device_index] if (include_ids): - rv = func(device, *args, params[0:]) + rv = func(device, *args, tuple([track_id, device_index] + list(params[2:]))) else: rv = func(device, *args, params[2:]) if rv is not None: - return (track_index, device_index, *rv) + return (track_id, device_index, *rv) return device_callback methods = [ ] properties_r = [ + "can_have_chains", "class_name", "name", "type" @@ -140,3 +142,14 @@ def device_get_parameter_name(device, params: Tuple[Any] = ()): self.osc_server.add_handler("/live/device/get/parameter/name", create_device_callback(device_get_parameter_name)) self.osc_server.add_handler("/live/device/start_listen/parameter/value", create_device_callback(device_get_parameter_value_listener, include_ids = True)) self.osc_server.add_handler("/live/device/stop_listen/parameter/value", create_device_callback(device_get_parameter_remove_value_listener, include_ids = True)) + + #-------------------------------------------------------------------------------- + # Device: Chain discovery (bridge to ChainHandler) + #-------------------------------------------------------------------------------- + def device_get_num_chains(device, params: Tuple[Any] = ()): + try: + return (len(device.chains) if hasattr(device, 'chains') else 0,) + except Exception: + return (0,) + + self.osc_server.add_handler("/live/device/get/num_chains", create_device_callback(device_get_num_chains)) diff --git a/abletonosc/handler.py b/abletonosc/handler.py index 55d4913..ae76c94 100644 --- a/abletonosc/handler.py +++ b/abletonosc/handler.py @@ -106,6 +106,53 @@ def _stop_listen(self, target, prop, params: Optional[Tuple[Any]] = ()) -> None: else: self.logger.warning("No listener function found for property: %s (%s)" % (prop, str(params))) + def _has_chains(self, device): + """Check if device has chains (works for Drum Racks where can_have_chains is False).""" + try: + return hasattr(device, 'chains') and len(device.chains) > 0 + except Exception: + return False + + def _resolve_track(self, track_param): + """ + Resolve a track parameter to (track_object, identifier). + + Accepts: + - int or numeric string: self.song.tracks[index] → (track, int) + - "master": self.song.master_track → (track, "master") + - "return_0", "return_1", ...: self.song.return_tracks[n] → (track, "return_N") + - "return_A", "return_B", ...: letter mapped to index → (track, "return_N") + + Returns: (track_object, identifier) + Raises: ValueError if track_param cannot be resolved + """ + param = track_param + if isinstance(param, (int, float)): + index = int(param) + return self.song.tracks[index], index + + param_str = str(param) + + if param_str.lower() in ("master", "main"): + return self.song.master_track, "master" + + if param_str.lower().startswith("return_"): + suffix = param_str[7:] + if suffix.isdigit(): + index = int(suffix) + elif len(suffix) == 1 and suffix.isalpha(): + index = ord(suffix.upper()) - ord("A") + else: + raise ValueError("Invalid return track identifier: %s" % param_str) + return self.song.return_tracks[index], "return_%d" % index + + # Fallback: try as numeric index + try: + index = int(param_str) + return self.song.tracks[index], index + except (ValueError, IndexError): + raise ValueError("Cannot resolve track: %s" % param_str) + def _clear_listeners(self): """ Clears all listener functions, to prevent listeners continuing to report after a reload. diff --git a/abletonosc/song.py b/abletonosc/song.py index d9d7e01..35d49ab 100644 --- a/abletonosc/song.py +++ b/abletonosc/song.py @@ -162,50 +162,79 @@ def song_get_track_data(params): return tuple(rv) self.osc_server.add_handler("/live/song/get/track_data", song_get_track_data) + #-------------------------------------------------------------------------------- + # Master and return track info + #-------------------------------------------------------------------------------- + self.osc_server.add_handler("/live/song/get/num_return_tracks", + lambda _: (len(self.song.return_tracks),)) + self.osc_server.add_handler("/live/song/get/return_track_names", + lambda _: tuple(rt.name for rt in self.song.return_tracks)) + self.osc_server.add_handler("/live/song/get/master_track_name", + lambda _: (self.song.master_track.name,)) - def song_export_structure(params): - tracks = [] - for track_index, track in enumerate(self.song.tracks): - group_track = None - if track.group_track is not None: - group_track = list(self.song.tracks).index(track.group_track) - track_data = { - "index": track_index, - "name": track.name, - "is_foldable": track.is_foldable, - "group_track": group_track, - "clips": [], - "devices": [] - } + + def serialize_device(device): + """Recursively serialize a device, including chains for rack devices.""" + device_data = { + "class_name": device.class_name, + "type": device.type, + "name": device.name, + "can_have_chains": hasattr(device, 'chains') and len(device.chains) > 0 if hasattr(device, 'chains') else False, + "parameters": [] + } + for parameter in device.parameters: + device_data["parameters"].append({ + "name": parameter.name, + "value": parameter.value, + "min": parameter.min, + "max": parameter.max, + "is_quantized": parameter.is_quantized, + }) + if hasattr(device, 'chains') and len(device.chains) > 0: + device_data["chains"] = [] + try: + for chain in device.chains: + chain_data = { + "name": chain.name, + "devices": [serialize_device(d) for d in chain.devices] + } + device_data["chains"].append(chain_data) + except Exception as e: + self.logger.warning("Error serializing chains: %s" % e) + return device_data + + def serialize_track(track, track_id): + """Serialize a track with its clips and devices.""" + track_data = { + "index": track_id, + "name": track.name, + "devices": [serialize_device(d) for d in track.devices] + } + if hasattr(track, 'clip_slots'): + track_data["clips"] = [] for clip_index, clip_slot in enumerate(track.clip_slots): if clip_slot.clip: - clip_data = { + track_data["clips"].append({ "index": clip_index, "name": clip_slot.clip.name, "length": clip_slot.clip.length, - } - track_data["clips"].append(clip_data) - - for device_index, device in enumerate(track.devices): - device_data = { - "class_name": device.class_name, - "type": device.type, - "name": device.name, - "parameters": [] - } - for parameter in device.parameters: - device_data["parameters"].append({ - "name": parameter.name, - "value": parameter.value, - "min": parameter.min, - "max": parameter.max, - "is_quantized": parameter.is_quantized, }) - track_data["devices"].append(device_data) + try: + track_data["is_foldable"] = track.is_foldable + if track.group_track is not None: + track_data["group_track"] = list(self.song.tracks).index(track.group_track) + except Exception: + pass + return track_data + + def song_export_structure(params): + tracks = [serialize_track(track, i) for i, track in enumerate(self.song.tracks)] - tracks.append(track_data) song = { - "tracks": tracks + "tracks": tracks, + "master_track": serialize_track(self.song.master_track, "master"), + "return_tracks": [serialize_track(rt, "return_%d" % i) + for i, rt in enumerate(self.song.return_tracks)], } if sys.platform == "darwin": diff --git a/abletonosc/track.py b/abletonosc/track.py index 5e21353..f62bed2 100644 --- a/abletonosc/track.py +++ b/abletonosc/track.py @@ -13,23 +13,28 @@ def create_track_callback(func: Callable, include_track_id: bool = False): def track_callback(params: Tuple[Any]): if params[0] == "*": - track_indices = list(range(len(self.song.tracks))) + targets = [(self.song.tracks[i], i) for i in range(len(self.song.tracks))] + targets.append((self.song.master_track, "master")) + for i, rt in enumerate(self.song.return_tracks): + targets.append((rt, "return_%d" % i)) else: - track_indices = [int(params[0])] + track, track_id = self._resolve_track(params[0]) + targets = [(track, track_id)] - for track_index in track_indices: - track = self.song.tracks[track_index] + for track, track_id in targets: if include_track_id: - rv = func(track, *args, tuple([track_index] + params[1:])) + rv = func(track, *args, tuple([track_id] + list(params[1:]))) else: rv = func(track, *args, tuple(params[1:])) if rv is not None: - return (track_index, *rv) + return (track_id, *rv) return track_callback methods = [ + "create_audio_clip", + "create_midi_clip", "delete_device", "stop_all_clips" ] @@ -103,18 +108,26 @@ def track_set_send(track, params: Tuple[Any] = ()): self.osc_server.add_handler("/live/track/set/send", create_track_callback(track_set_send)) def track_delete_clip(track, params: Tuple[Any]): + if not hasattr(track, 'clip_slots'): + return clip_index, = params track.clip_slots[clip_index].delete_clip() self.osc_server.add_handler("/live/track/delete_clip", create_track_callback(track_delete_clip)) def track_get_clip_names(track, _): + if not hasattr(track, 'clip_slots'): + return () return tuple(clip_slot.clip.name if clip_slot.clip else None for clip_slot in track.clip_slots) def track_get_clip_lengths(track, _): + if not hasattr(track, 'clip_slots'): + return () return tuple(clip_slot.clip.length if clip_slot.clip else None for clip_slot in track.clip_slots) def track_get_clip_colors(track, _): + if not hasattr(track, 'clip_slots'): + return () return tuple(clip_slot.clip.color if clip_slot.clip else None for clip_slot in track.clip_slots) def track_get_arrangement_clip_names(track, _): @@ -149,7 +162,12 @@ def track_get_device_class_names(track, _): return tuple(device.class_name for device in track.devices) def track_get_device_can_have_chains(track, _): - return tuple(device.can_have_chains for device in track.devices) + def has_chains(d): + try: + return 1 if (hasattr(d, 'chains') and len(d.chains) > 0) else 0 + except Exception: + return 0 + return tuple(has_chains(d) for d in track.devices) """ - name: the device's human-readable name